Files
resolutionflow/backend/tests/test_tree_transfer.py
chihlasm 88c1553c5d refactor: remove XML export, JSON-only for .rfflow files
- Remove XML builder, format query param, and XML tests
- Simplify ExportFlowModal (no format picker)
- Simplify rfflowParser (JSON-only)
- Remove format field from schemas and types

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 00:56:07 -05:00

283 lines
9.3 KiB
Python

"""Tests for flow export/import (.rfflow) endpoints."""
import pytest
from httpx import AsyncClient
# --- Helpers ---
TREE_DATA = {
"name": "DNS Troubleshooting",
"description": "Diagnose DNS resolution issues",
"category": "Networking",
"tree_structure": {
"id": "root",
"type": "decision",
"question": "Is DNS resolving?",
"options": [
{"id": "yes", "label": "Yes", "next_node_id": "sol1"},
{"id": "no", "label": "No", "next_node_id": "sol2"},
],
"children": [
{"id": "sol1", "type": "solution", "title": "DNS OK", "description": "DNS is working", "solution": "No action needed"},
{"id": "sol2", "type": "solution", "title": "DNS Fail", "description": "DNS is not resolving", "solution": "Check DNS server config"},
],
},
"tags": ["dns", "networking"],
}
async def create_tree_with_tags(client: AsyncClient, headers: dict, data: dict | None = None) -> dict:
"""Create a tree and return the response."""
resp = await client.post("/api/v1/trees", json=data or TREE_DATA, headers=headers)
assert resp.status_code == 201
return resp.json()
# --- Export Tests ---
@pytest.mark.asyncio
async def test_export_json_format(client, auth_headers, test_tree):
"""Export should return valid .rfflow JSON with correct structure."""
resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export",
headers=auth_headers,
)
assert resp.status_code == 200
assert "attachment" in resp.headers.get("content-disposition", "")
assert ".rfflow" in resp.headers.get("content-disposition", "")
data = resp.json()
assert data["rfflow_version"] == "1.0"
assert data["source_app"] == "ResolutionFlow"
assert data["exported_at"] is not None
flow = data["flow"]
assert flow["name"] == test_tree["name"]
assert flow["tree_structure"] is not None
assert flow["tree_type"] == "troubleshooting"
# No IDs leaked
assert "id" not in flow or flow.get("id") is None
assert "author_id" not in flow
assert "account_id" not in flow
@pytest.mark.asyncio
async def test_export_with_category_and_tags(client, auth_headers):
"""Export should include category and tag data."""
tree = await create_tree_with_tags(client, auth_headers)
resp = await client.get(
f"/api/v1/trees/{tree['id']}/export",
headers=auth_headers,
)
assert resp.status_code == 200
flow = resp.json()["flow"]
assert len(flow["tags"]) == 2
assert "dns" in flow["tags"]
assert "networking" in flow["tags"]
@pytest.mark.asyncio
async def test_export_access_control(client, auth_headers, test_admin, admin_auth_headers, test_tree):
"""Users should only export trees they can access."""
# Create a second user who can't access the tree
user2_data = {
"email": "other@example.com",
"password": "OtherPass123!",
"name": "Other User",
}
await client.post("/api/v1/auth/register", json=user2_data)
login_resp = await client.post("/api/v1/auth/login/json", json={
"email": user2_data["email"],
"password": user2_data["password"],
})
other_headers = {"Authorization": f"Bearer {login_resp.json()['access_token']}"}
resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export",
headers=other_headers,
)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_export_nonexistent_tree(client, auth_headers):
"""Export of non-existent tree returns 404."""
import uuid
resp = await client.get(
f"/api/v1/trees/{uuid.uuid4()}/export",
headers=auth_headers,
)
assert resp.status_code == 404
# --- Import Tests ---
@pytest.mark.asyncio
async def test_import_happy_path(client, auth_headers, test_tree):
"""Import should create a draft tree owned by the importing user."""
# First export
export_resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export",
headers=auth_headers,
)
rfflow_data = export_resp.json()
# Import
import_resp = await client.post(
"/api/v1/trees/import",
json=rfflow_data,
headers=auth_headers,
)
assert import_resp.status_code == 201
result = import_resp.json()
assert result["status"] == "draft"
assert result["name"] == test_tree["name"]
assert result["tree_id"] is not None
# Verify the created tree
tree_resp = await client.get(
f"/api/v1/trees/{result['tree_id']}",
headers=auth_headers,
)
assert tree_resp.status_code == 200
tree = tree_resp.json()
assert tree["status"] == "draft"
assert tree["import_metadata"] is not None
assert tree["import_metadata"]["source_app"] == "ResolutionFlow"
@pytest.mark.asyncio
async def test_import_with_name_override(client, auth_headers, test_tree):
"""Import with name_override should use the override name."""
export_resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export",
headers=auth_headers,
)
rfflow_data = export_resp.json()
import_resp = await client.post(
"/api/v1/trees/import?name_override=Custom%20Name",
json=rfflow_data,
headers=auth_headers,
)
assert import_resp.status_code == 201
assert import_resp.json()["name"] == "Custom Name"
@pytest.mark.asyncio
async def test_import_with_new_tags(client, auth_headers):
"""Import with new tags should create them automatically."""
rfflow = {
"rfflow_version": "1.0",
"exported_at": "2026-03-05T14:30:00+00:00",
"source_app": "ResolutionFlow",
"flow": {
"name": "Test Import Tags",
"description": "Testing tag creation",
"tree_type": "troubleshooting",
"version": 1,
"author_name": "Test Author",
"category": None,
"tags": ["brand-new-tag", "another-tag"],
"tree_structure": {
"id": "root",
"type": "decision",
"question": "Q?",
"options": [{"id": "a", "label": "A", "next_node_id": "s1"}],
"children": [{"id": "s1", "type": "solution", "title": "S", "description": "D", "solution": "S"}],
},
"intake_form": None,
},
}
resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
assert resp.status_code == 201
result = resp.json()
assert "brand-new-tag" in result["tags_created"]
assert "another-tag" in result["tags_created"]
@pytest.mark.asyncio
async def test_import_with_category_creation(client, auth_headers):
"""Import with a new category should create it."""
rfflow = {
"rfflow_version": "1.0",
"exported_at": "2026-03-05T14:30:00+00:00",
"source_app": "ResolutionFlow",
"flow": {
"name": "Import Category Test",
"description": None,
"tree_type": "troubleshooting",
"version": 1,
"author_name": None,
"category": {"name": "New Category", "slug": "new-category"},
"tags": [],
"tree_structure": {
"id": "root",
"type": "decision",
"question": "Q?",
"options": [{"id": "a", "label": "A", "next_node_id": "s1"}],
"children": [{"id": "s1", "type": "solution", "title": "S", "description": "D", "solution": "S"}],
},
"intake_form": None,
},
}
resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
assert resp.status_code == 201
assert resp.json()["category_created"] is True
@pytest.mark.asyncio
async def test_import_invalid_version(client, auth_headers):
"""Import with unsupported rfflow version should return 422."""
rfflow = {
"rfflow_version": "99.0",
"exported_at": "2026-03-05T14:30:00+00:00",
"source_app": "ResolutionFlow",
"flow": {
"name": "Bad Version",
"tree_type": "troubleshooting",
"version": 1,
"tags": [],
"tree_structure": {"id": "root", "type": "decision", "question": "Q?", "options": [], "children": []},
},
}
resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_import_round_trip(client, auth_headers):
"""Export then import should produce a tree with matching data."""
original = await create_tree_with_tags(client, auth_headers)
# Export
export_resp = await client.get(
f"/api/v1/trees/{original['id']}/export",
headers=auth_headers,
)
rfflow = export_resp.json()
# Import
import_resp = await client.post(
"/api/v1/trees/import",
json=rfflow,
headers=auth_headers,
)
assert import_resp.status_code == 201
result = import_resp.json()
# Verify imported tree matches original structure
tree_resp = await client.get(
f"/api/v1/trees/{result['tree_id']}",
headers=auth_headers,
)
imported_tree = tree_resp.json()
assert imported_tree["name"] == original["name"]
assert imported_tree["tree_structure"]["id"] == original["tree_structure"]["id"]
assert imported_tree["tree_type"] == original["tree_type"]
assert imported_tree["status"] == "draft" # Always draft on import