- Remove XML builder, format query param, and XML tests - Simplify ExportFlowModal (no format picker) - Simplify rfflowParser (JSON-only) - Remove format field from schemas and types Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
283 lines
9.3 KiB
Python
283 lines
9.3 KiB
Python
"""Tests for flow export/import (.rfflow) endpoints."""
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
|
|
# --- Helpers ---
|
|
|
|
TREE_DATA = {
|
|
"name": "DNS Troubleshooting",
|
|
"description": "Diagnose DNS resolution issues",
|
|
"category": "Networking",
|
|
"tree_structure": {
|
|
"id": "root",
|
|
"type": "decision",
|
|
"question": "Is DNS resolving?",
|
|
"options": [
|
|
{"id": "yes", "label": "Yes", "next_node_id": "sol1"},
|
|
{"id": "no", "label": "No", "next_node_id": "sol2"},
|
|
],
|
|
"children": [
|
|
{"id": "sol1", "type": "solution", "title": "DNS OK", "description": "DNS is working", "solution": "No action needed"},
|
|
{"id": "sol2", "type": "solution", "title": "DNS Fail", "description": "DNS is not resolving", "solution": "Check DNS server config"},
|
|
],
|
|
},
|
|
"tags": ["dns", "networking"],
|
|
}
|
|
|
|
|
|
async def create_tree_with_tags(client: AsyncClient, headers: dict, data: dict | None = None) -> dict:
|
|
"""Create a tree and return the response."""
|
|
resp = await client.post("/api/v1/trees", json=data or TREE_DATA, headers=headers)
|
|
assert resp.status_code == 201
|
|
return resp.json()
|
|
|
|
|
|
# --- Export Tests ---
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_json_format(client, auth_headers, test_tree):
|
|
"""Export should return valid .rfflow JSON with correct structure."""
|
|
resp = await client.get(
|
|
f"/api/v1/trees/{test_tree['id']}/export",
|
|
headers=auth_headers,
|
|
)
|
|
assert resp.status_code == 200
|
|
assert "attachment" in resp.headers.get("content-disposition", "")
|
|
assert ".rfflow" in resp.headers.get("content-disposition", "")
|
|
|
|
data = resp.json()
|
|
assert data["rfflow_version"] == "1.0"
|
|
assert data["source_app"] == "ResolutionFlow"
|
|
assert data["exported_at"] is not None
|
|
|
|
flow = data["flow"]
|
|
assert flow["name"] == test_tree["name"]
|
|
assert flow["tree_structure"] is not None
|
|
assert flow["tree_type"] == "troubleshooting"
|
|
|
|
# No IDs leaked
|
|
assert "id" not in flow or flow.get("id") is None
|
|
assert "author_id" not in flow
|
|
assert "account_id" not in flow
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_with_category_and_tags(client, auth_headers):
|
|
"""Export should include category and tag data."""
|
|
tree = await create_tree_with_tags(client, auth_headers)
|
|
resp = await client.get(
|
|
f"/api/v1/trees/{tree['id']}/export",
|
|
headers=auth_headers,
|
|
)
|
|
assert resp.status_code == 200
|
|
flow = resp.json()["flow"]
|
|
assert len(flow["tags"]) == 2
|
|
assert "dns" in flow["tags"]
|
|
assert "networking" in flow["tags"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_access_control(client, auth_headers, test_admin, admin_auth_headers, test_tree):
|
|
"""Users should only export trees they can access."""
|
|
# Create a second user who can't access the tree
|
|
user2_data = {
|
|
"email": "other@example.com",
|
|
"password": "OtherPass123!",
|
|
"name": "Other User",
|
|
}
|
|
await client.post("/api/v1/auth/register", json=user2_data)
|
|
login_resp = await client.post("/api/v1/auth/login/json", json={
|
|
"email": user2_data["email"],
|
|
"password": user2_data["password"],
|
|
})
|
|
other_headers = {"Authorization": f"Bearer {login_resp.json()['access_token']}"}
|
|
|
|
resp = await client.get(
|
|
f"/api/v1/trees/{test_tree['id']}/export",
|
|
headers=other_headers,
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_export_nonexistent_tree(client, auth_headers):
|
|
"""Export of non-existent tree returns 404."""
|
|
import uuid
|
|
resp = await client.get(
|
|
f"/api/v1/trees/{uuid.uuid4()}/export",
|
|
headers=auth_headers,
|
|
)
|
|
assert resp.status_code == 404
|
|
|
|
|
|
# --- Import Tests ---
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_import_happy_path(client, auth_headers, test_tree):
|
|
"""Import should create a draft tree owned by the importing user."""
|
|
# First export
|
|
export_resp = await client.get(
|
|
f"/api/v1/trees/{test_tree['id']}/export",
|
|
headers=auth_headers,
|
|
)
|
|
rfflow_data = export_resp.json()
|
|
|
|
# Import
|
|
import_resp = await client.post(
|
|
"/api/v1/trees/import",
|
|
json=rfflow_data,
|
|
headers=auth_headers,
|
|
)
|
|
assert import_resp.status_code == 201
|
|
result = import_resp.json()
|
|
assert result["status"] == "draft"
|
|
assert result["name"] == test_tree["name"]
|
|
assert result["tree_id"] is not None
|
|
|
|
# Verify the created tree
|
|
tree_resp = await client.get(
|
|
f"/api/v1/trees/{result['tree_id']}",
|
|
headers=auth_headers,
|
|
)
|
|
assert tree_resp.status_code == 200
|
|
tree = tree_resp.json()
|
|
assert tree["status"] == "draft"
|
|
assert tree["import_metadata"] is not None
|
|
assert tree["import_metadata"]["source_app"] == "ResolutionFlow"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_import_with_name_override(client, auth_headers, test_tree):
|
|
"""Import with name_override should use the override name."""
|
|
export_resp = await client.get(
|
|
f"/api/v1/trees/{test_tree['id']}/export",
|
|
headers=auth_headers,
|
|
)
|
|
rfflow_data = export_resp.json()
|
|
|
|
import_resp = await client.post(
|
|
"/api/v1/trees/import?name_override=Custom%20Name",
|
|
json=rfflow_data,
|
|
headers=auth_headers,
|
|
)
|
|
assert import_resp.status_code == 201
|
|
assert import_resp.json()["name"] == "Custom Name"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_import_with_new_tags(client, auth_headers):
|
|
"""Import with new tags should create them automatically."""
|
|
rfflow = {
|
|
"rfflow_version": "1.0",
|
|
"exported_at": "2026-03-05T14:30:00+00:00",
|
|
"source_app": "ResolutionFlow",
|
|
"flow": {
|
|
"name": "Test Import Tags",
|
|
"description": "Testing tag creation",
|
|
"tree_type": "troubleshooting",
|
|
"version": 1,
|
|
"author_name": "Test Author",
|
|
"category": None,
|
|
"tags": ["brand-new-tag", "another-tag"],
|
|
"tree_structure": {
|
|
"id": "root",
|
|
"type": "decision",
|
|
"question": "Q?",
|
|
"options": [{"id": "a", "label": "A", "next_node_id": "s1"}],
|
|
"children": [{"id": "s1", "type": "solution", "title": "S", "description": "D", "solution": "S"}],
|
|
},
|
|
"intake_form": None,
|
|
},
|
|
}
|
|
|
|
resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
|
|
assert resp.status_code == 201
|
|
result = resp.json()
|
|
assert "brand-new-tag" in result["tags_created"]
|
|
assert "another-tag" in result["tags_created"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_import_with_category_creation(client, auth_headers):
|
|
"""Import with a new category should create it."""
|
|
rfflow = {
|
|
"rfflow_version": "1.0",
|
|
"exported_at": "2026-03-05T14:30:00+00:00",
|
|
"source_app": "ResolutionFlow",
|
|
"flow": {
|
|
"name": "Import Category Test",
|
|
"description": None,
|
|
"tree_type": "troubleshooting",
|
|
"version": 1,
|
|
"author_name": None,
|
|
"category": {"name": "New Category", "slug": "new-category"},
|
|
"tags": [],
|
|
"tree_structure": {
|
|
"id": "root",
|
|
"type": "decision",
|
|
"question": "Q?",
|
|
"options": [{"id": "a", "label": "A", "next_node_id": "s1"}],
|
|
"children": [{"id": "s1", "type": "solution", "title": "S", "description": "D", "solution": "S"}],
|
|
},
|
|
"intake_form": None,
|
|
},
|
|
}
|
|
|
|
resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
|
|
assert resp.status_code == 201
|
|
assert resp.json()["category_created"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_import_invalid_version(client, auth_headers):
|
|
"""Import with unsupported rfflow version should return 422."""
|
|
rfflow = {
|
|
"rfflow_version": "99.0",
|
|
"exported_at": "2026-03-05T14:30:00+00:00",
|
|
"source_app": "ResolutionFlow",
|
|
"flow": {
|
|
"name": "Bad Version",
|
|
"tree_type": "troubleshooting",
|
|
"version": 1,
|
|
"tags": [],
|
|
"tree_structure": {"id": "root", "type": "decision", "question": "Q?", "options": [], "children": []},
|
|
},
|
|
}
|
|
|
|
resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
|
|
assert resp.status_code == 422
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_import_round_trip(client, auth_headers):
|
|
"""Export then import should produce a tree with matching data."""
|
|
original = await create_tree_with_tags(client, auth_headers)
|
|
|
|
# Export
|
|
export_resp = await client.get(
|
|
f"/api/v1/trees/{original['id']}/export",
|
|
headers=auth_headers,
|
|
)
|
|
rfflow = export_resp.json()
|
|
|
|
# Import
|
|
import_resp = await client.post(
|
|
"/api/v1/trees/import",
|
|
json=rfflow,
|
|
headers=auth_headers,
|
|
)
|
|
assert import_resp.status_code == 201
|
|
result = import_resp.json()
|
|
|
|
# Verify imported tree matches original structure
|
|
tree_resp = await client.get(
|
|
f"/api/v1/trees/{result['tree_id']}",
|
|
headers=auth_headers,
|
|
)
|
|
imported_tree = tree_resp.json()
|
|
assert imported_tree["name"] == original["name"]
|
|
assert imported_tree["tree_structure"]["id"] == original["tree_structure"]["id"]
|
|
assert imported_tree["tree_type"] == original["tree_type"]
|
|
assert imported_tree["status"] == "draft" # Always draft on import
|