"""Tests for flow export/import (.rfflow) endpoints.""" import pytest from httpx import AsyncClient # --- Helpers --- TREE_DATA = { "name": "DNS Troubleshooting", "description": "Diagnose DNS resolution issues", "category": "Networking", "tree_structure": { "id": "root", "type": "decision", "question": "Is DNS resolving?", "options": [ {"id": "yes", "label": "Yes", "next_node_id": "sol1"}, {"id": "no", "label": "No", "next_node_id": "sol2"}, ], "children": [ {"id": "sol1", "type": "solution", "title": "DNS OK", "description": "DNS is working", "solution": "No action needed"}, {"id": "sol2", "type": "solution", "title": "DNS Fail", "description": "DNS is not resolving", "solution": "Check DNS server config"}, ], }, "tags": ["dns", "networking"], } async def create_tree_with_tags(client: AsyncClient, headers: dict, data: dict | None = None) -> dict: """Create a tree and return the response.""" resp = await client.post("/api/v1/trees", json=data or TREE_DATA, headers=headers) assert resp.status_code == 201 return resp.json() # --- Export Tests --- @pytest.mark.asyncio async def test_export_json_format(client, auth_headers, test_tree): """Export should return valid .rfflow JSON with correct structure.""" resp = await client.get( f"/api/v1/trees/{test_tree['id']}/export", headers=auth_headers, ) assert resp.status_code == 200 assert "attachment" in resp.headers.get("content-disposition", "") assert ".rfflow" in resp.headers.get("content-disposition", "") data = resp.json() assert data["rfflow_version"] == "1.0" assert data["source_app"] == "ResolutionFlow" assert data["exported_at"] is not None flow = data["flow"] assert flow["name"] == test_tree["name"] assert flow["tree_structure"] is not None assert flow["tree_type"] == "troubleshooting" # No IDs leaked assert "id" not in flow or flow.get("id") is None assert "author_id" not in flow assert "account_id" not in flow @pytest.mark.asyncio async def test_export_with_category_and_tags(client, auth_headers): """Export should include category and tag data.""" tree = await create_tree_with_tags(client, auth_headers) resp = await client.get( f"/api/v1/trees/{tree['id']}/export", headers=auth_headers, ) assert resp.status_code == 200 flow = resp.json()["flow"] assert len(flow["tags"]) == 2 assert "dns" in flow["tags"] assert "networking" in flow["tags"] @pytest.mark.asyncio async def test_export_access_control(client, auth_headers, test_admin, admin_auth_headers, test_tree): """Users should only export trees they can access.""" # Create a second user who can't access the tree user2_data = { "email": "other@example.com", "password": "OtherPass123!", "name": "Other User", } await client.post("/api/v1/auth/register", json=user2_data) login_resp = await client.post("/api/v1/auth/login/json", json={ "email": user2_data["email"], "password": user2_data["password"], }) other_headers = {"Authorization": f"Bearer {login_resp.json()['access_token']}"} resp = await client.get( f"/api/v1/trees/{test_tree['id']}/export", headers=other_headers, ) assert resp.status_code == 403 @pytest.mark.asyncio async def test_export_nonexistent_tree(client, auth_headers): """Export of non-existent tree returns 404.""" import uuid resp = await client.get( f"/api/v1/trees/{uuid.uuid4()}/export", headers=auth_headers, ) assert resp.status_code == 404 # --- Import Tests --- @pytest.mark.asyncio async def test_import_happy_path(client, auth_headers, test_tree): """Import should create a draft tree owned by the importing user.""" # First export export_resp = await client.get( f"/api/v1/trees/{test_tree['id']}/export", headers=auth_headers, ) rfflow_data = export_resp.json() # Import import_resp = await client.post( "/api/v1/trees/import", json=rfflow_data, headers=auth_headers, ) assert import_resp.status_code == 201 result = import_resp.json() assert result["status"] == "draft" assert result["name"] == test_tree["name"] assert result["tree_id"] is not None # Verify the created tree tree_resp = await client.get( f"/api/v1/trees/{result['tree_id']}", headers=auth_headers, ) assert tree_resp.status_code == 200 tree = tree_resp.json() assert tree["status"] == "draft" assert tree["import_metadata"] is not None assert tree["import_metadata"]["source_app"] == "ResolutionFlow" @pytest.mark.asyncio async def test_import_with_name_override(client, auth_headers, test_tree): """Import with name_override should use the override name.""" export_resp = await client.get( f"/api/v1/trees/{test_tree['id']}/export", headers=auth_headers, ) rfflow_data = export_resp.json() import_resp = await client.post( "/api/v1/trees/import?name_override=Custom%20Name", json=rfflow_data, headers=auth_headers, ) assert import_resp.status_code == 201 assert import_resp.json()["name"] == "Custom Name" @pytest.mark.asyncio async def test_import_with_new_tags(client, auth_headers): """Import with new tags should create them automatically.""" rfflow = { "rfflow_version": "1.0", "exported_at": "2026-03-05T14:30:00+00:00", "source_app": "ResolutionFlow", "flow": { "name": "Test Import Tags", "description": "Testing tag creation", "tree_type": "troubleshooting", "version": 1, "author_name": "Test Author", "category": None, "tags": ["brand-new-tag", "another-tag"], "tree_structure": { "id": "root", "type": "decision", "question": "Q?", "options": [{"id": "a", "label": "A", "next_node_id": "s1"}], "children": [{"id": "s1", "type": "solution", "title": "S", "description": "D", "solution": "S"}], }, "intake_form": None, }, } resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers) assert resp.status_code == 201 result = resp.json() assert "brand-new-tag" in result["tags_created"] assert "another-tag" in result["tags_created"] @pytest.mark.asyncio async def test_import_with_category_creation(client, auth_headers): """Import with a new category should create it.""" rfflow = { "rfflow_version": "1.0", "exported_at": "2026-03-05T14:30:00+00:00", "source_app": "ResolutionFlow", "flow": { "name": "Import Category Test", "description": None, "tree_type": "troubleshooting", "version": 1, "author_name": None, "category": {"name": "New Category", "slug": "new-category"}, "tags": [], "tree_structure": { "id": "root", "type": "decision", "question": "Q?", "options": [{"id": "a", "label": "A", "next_node_id": "s1"}], "children": [{"id": "s1", "type": "solution", "title": "S", "description": "D", "solution": "S"}], }, "intake_form": None, }, } resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers) assert resp.status_code == 201 assert resp.json()["category_created"] is True @pytest.mark.asyncio async def test_import_invalid_version(client, auth_headers): """Import with unsupported rfflow version should return 422.""" rfflow = { "rfflow_version": "99.0", "exported_at": "2026-03-05T14:30:00+00:00", "source_app": "ResolutionFlow", "flow": { "name": "Bad Version", "tree_type": "troubleshooting", "version": 1, "tags": [], "tree_structure": {"id": "root", "type": "decision", "question": "Q?", "options": [], "children": []}, }, } resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers) assert resp.status_code == 422 @pytest.mark.asyncio async def test_import_round_trip(client, auth_headers): """Export then import should produce a tree with matching data.""" original = await create_tree_with_tags(client, auth_headers) # Export export_resp = await client.get( f"/api/v1/trees/{original['id']}/export", headers=auth_headers, ) rfflow = export_resp.json() # Import import_resp = await client.post( "/api/v1/trees/import", json=rfflow, headers=auth_headers, ) assert import_resp.status_code == 201 result = import_resp.json() # Verify imported tree matches original structure tree_resp = await client.get( f"/api/v1/trees/{result['tree_id']}", headers=auth_headers, ) imported_tree = tree_resp.json() assert imported_tree["name"] == original["name"] assert imported_tree["tree_structure"]["id"] == original["tree_structure"]["id"] assert imported_tree["tree_type"] == original["tree_type"] assert imported_tree["status"] == "draft" # Always draft on import