Files
resolutionflow/backend/tests/test_tree_transfer.py
chihlasm 39677a3841 feat: add flow export/import frontend + backend tests
Frontend:
- ExportFlowModal with JSON/XML format selection + download
- ImportFlowModal with drag-drop file picker + preview step
- rfflowParser for client-side JSON/XML .rfflow parsing
- Export buttons on editor toolbar and library action menus
- Import button on library page next to Create New
- Provenance display for imported flows in editor
- flowTransfer API client + types

Backend:
- Fix regex->pattern deprecation in export endpoint
- 12 integration tests covering export, import, round-trip,
  access control, tag/category creation, version validation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 00:18:10 -05:00

365 lines
12 KiB
Python

"""Tests for flow export/import (.rfflow) endpoints."""
import json
import xml.etree.ElementTree as ET
import pytest
from httpx import AsyncClient
# --- Helpers ---
TREE_DATA = {
"name": "DNS Troubleshooting",
"description": "Diagnose DNS resolution issues",
"category": "Networking",
"tree_structure": {
"id": "root",
"type": "decision",
"question": "Is DNS resolving?",
"options": [
{"id": "yes", "label": "Yes", "next_node_id": "sol1"},
{"id": "no", "label": "No", "next_node_id": "sol2"},
],
"children": [
{"id": "sol1", "type": "solution", "title": "DNS OK", "description": "DNS is working", "solution": "No action needed"},
{"id": "sol2", "type": "solution", "title": "DNS Fail", "description": "DNS is not resolving", "solution": "Check DNS server config"},
],
},
"tags": ["dns", "networking"],
}
async def create_tree_with_tags(client: AsyncClient, headers: dict, data: dict | None = None) -> dict:
"""Create a tree and return the response."""
resp = await client.post("/api/v1/trees", json=data or TREE_DATA, headers=headers)
assert resp.status_code == 201
return resp.json()
# --- Export Tests ---
@pytest.mark.asyncio
async def test_export_json_format(client, auth_headers, test_tree):
"""Export should return valid .rfflow JSON with correct structure."""
resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export?format=json",
headers=auth_headers,
)
assert resp.status_code == 200
assert "attachment" in resp.headers.get("content-disposition", "")
assert ".rfflow" in resp.headers.get("content-disposition", "")
data = resp.json()
assert data["rfflow_version"] == "1.0"
assert data["source_app"] == "ResolutionFlow"
assert data["format"] == "json"
assert data["exported_at"] is not None
flow = data["flow"]
assert flow["name"] == test_tree["name"]
assert flow["tree_structure"] is not None
assert flow["tree_type"] == "troubleshooting"
# No IDs leaked
assert "id" not in flow or flow.get("id") is None
assert "author_id" not in flow
assert "account_id" not in flow
@pytest.mark.asyncio
async def test_export_xml_format(client, auth_headers, test_tree):
"""Export as XML should be parseable and contain all required fields."""
resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export?format=xml",
headers=auth_headers,
)
assert resp.status_code == 200
assert ".rfflow" in resp.headers.get("content-disposition", "")
# Parse XML
root = ET.fromstring(resp.text)
assert root.tag == "rfflow"
assert root.get("version") == "1.0"
flow_el = root.find("flow")
assert flow_el is not None
assert flow_el.find("name").text == test_tree["name"]
# tree_structure should be valid JSON
ts_text = flow_el.find("tree_structure").text
ts = json.loads(ts_text)
assert ts["id"] == "root"
@pytest.mark.asyncio
async def test_export_with_category_and_tags(client, auth_headers):
"""Export should include category and tag data."""
tree = await create_tree_with_tags(client, auth_headers)
resp = await client.get(
f"/api/v1/trees/{tree['id']}/export?format=json",
headers=auth_headers,
)
assert resp.status_code == 200
flow = resp.json()["flow"]
assert len(flow["tags"]) == 2
assert "dns" in flow["tags"]
assert "networking" in flow["tags"]
@pytest.mark.asyncio
async def test_export_access_control(client, auth_headers, test_admin, admin_auth_headers, test_tree):
"""Users should only export trees they can access."""
# Create a second user who can't access the tree
user2_data = {
"email": "other@example.com",
"password": "OtherPass123!",
"name": "Other User",
}
await client.post("/api/v1/auth/register", json=user2_data)
login_resp = await client.post("/api/v1/auth/login/json", json={
"email": user2_data["email"],
"password": user2_data["password"],
})
other_headers = {"Authorization": f"Bearer {login_resp.json()['access_token']}"}
# The other user is in a different account, so can't see the tree
resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export?format=json",
headers=other_headers,
)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_export_nonexistent_tree(client, auth_headers):
"""Export of non-existent tree returns 404."""
import uuid
resp = await client.get(
f"/api/v1/trees/{uuid.uuid4()}/export?format=json",
headers=auth_headers,
)
assert resp.status_code == 404
# --- Import Tests ---
@pytest.mark.asyncio
async def test_import_happy_path(client, auth_headers, test_tree):
"""Import should create a draft tree owned by the importing user."""
# First export
export_resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export?format=json",
headers=auth_headers,
)
rfflow_data = export_resp.json()
# Import
import_resp = await client.post(
"/api/v1/trees/import",
json=rfflow_data,
headers=auth_headers,
)
assert import_resp.status_code == 201
result = import_resp.json()
assert result["status"] == "draft"
assert result["name"] == test_tree["name"]
assert result["tree_id"] is not None
# Verify the created tree
tree_resp = await client.get(
f"/api/v1/trees/{result['tree_id']}",
headers=auth_headers,
)
assert tree_resp.status_code == 200
tree = tree_resp.json()
assert tree["status"] == "draft"
assert tree["import_metadata"] is not None
assert tree["import_metadata"]["source_app"] == "ResolutionFlow"
@pytest.mark.asyncio
async def test_import_with_name_override(client, auth_headers, test_tree):
"""Import with name_override should use the override name."""
export_resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export?format=json",
headers=auth_headers,
)
rfflow_data = export_resp.json()
import_resp = await client.post(
"/api/v1/trees/import?name_override=Custom%20Name",
json=rfflow_data,
headers=auth_headers,
)
assert import_resp.status_code == 201
assert import_resp.json()["name"] == "Custom Name"
@pytest.mark.asyncio
async def test_import_with_new_tags(client, auth_headers):
"""Import with new tags should create them automatically."""
rfflow = {
"rfflow_version": "1.0",
"exported_at": "2026-03-05T14:30:00+00:00",
"source_app": "ResolutionFlow",
"format": "json",
"flow": {
"name": "Test Import Tags",
"description": "Testing tag creation",
"tree_type": "troubleshooting",
"version": 1,
"author_name": "Test Author",
"category": None,
"tags": ["brand-new-tag", "another-tag"],
"tree_structure": {
"id": "root",
"type": "decision",
"question": "Q?",
"options": [{"id": "a", "label": "A", "next_node_id": "s1"}],
"children": [{"id": "s1", "type": "solution", "title": "S", "description": "D", "solution": "S"}],
},
"intake_form": None,
},
}
resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
assert resp.status_code == 201
result = resp.json()
assert "brand-new-tag" in result["tags_created"]
assert "another-tag" in result["tags_created"]
@pytest.mark.asyncio
async def test_import_with_category_creation(client, auth_headers):
"""Import with a new category should create it."""
rfflow = {
"rfflow_version": "1.0",
"exported_at": "2026-03-05T14:30:00+00:00",
"source_app": "ResolutionFlow",
"format": "json",
"flow": {
"name": "Import Category Test",
"description": None,
"tree_type": "troubleshooting",
"version": 1,
"author_name": None,
"category": {"name": "New Category", "slug": "new-category"},
"tags": [],
"tree_structure": {
"id": "root",
"type": "decision",
"question": "Q?",
"options": [{"id": "a", "label": "A", "next_node_id": "s1"}],
"children": [{"id": "s1", "type": "solution", "title": "S", "description": "D", "solution": "S"}],
},
"intake_form": None,
},
}
resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
assert resp.status_code == 201
assert resp.json()["category_created"] is True
@pytest.mark.asyncio
async def test_import_invalid_version(client, auth_headers):
"""Import with unsupported rfflow version should return 422."""
rfflow = {
"rfflow_version": "99.0",
"exported_at": "2026-03-05T14:30:00+00:00",
"source_app": "ResolutionFlow",
"format": "json",
"flow": {
"name": "Bad Version",
"tree_type": "troubleshooting",
"version": 1,
"tags": [],
"tree_structure": {"id": "root", "type": "decision", "question": "Q?", "options": [], "children": []},
},
}
resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_import_round_trip(client, auth_headers):
"""Export then import should produce a tree with matching data."""
original = await create_tree_with_tags(client, auth_headers)
# Export
export_resp = await client.get(
f"/api/v1/trees/{original['id']}/export?format=json",
headers=auth_headers,
)
rfflow = export_resp.json()
# Import
import_resp = await client.post(
"/api/v1/trees/import",
json=rfflow,
headers=auth_headers,
)
assert import_resp.status_code == 201
result = import_resp.json()
# Verify imported tree matches original structure
tree_resp = await client.get(
f"/api/v1/trees/{result['tree_id']}",
headers=auth_headers,
)
imported_tree = tree_resp.json()
assert imported_tree["name"] == original["name"]
assert imported_tree["tree_structure"]["id"] == original["tree_structure"]["id"]
assert imported_tree["tree_type"] == original["tree_type"]
assert imported_tree["status"] == "draft" # Always draft on import
@pytest.mark.asyncio
async def test_import_xml_round_trip(client, auth_headers):
"""Export as XML then re-import the parsed structure should work."""
original = await create_tree_with_tags(client, auth_headers)
# Export as XML
export_resp = await client.get(
f"/api/v1/trees/{original['id']}/export?format=xml",
headers=auth_headers,
)
assert export_resp.status_code == 200
# Parse XML to build import request (simulating frontend parser)
root = ET.fromstring(export_resp.text)
flow_el = root.find("flow")
tags = [t.text for t in flow_el.find("tags").findall("tag")]
ts = json.loads(flow_el.find("tree_structure").text)
cat_el = flow_el.find("category")
category = None
if cat_el is not None:
cat_name = cat_el.find("name")
cat_slug = cat_el.find("slug")
if cat_name is not None and cat_slug is not None:
category = {"name": cat_name.text, "slug": cat_slug.text}
rfflow = {
"rfflow_version": root.get("version"),
"exported_at": root.find("exported_at").text,
"source_app": root.find("source_app").text,
"format": "xml",
"flow": {
"name": flow_el.find("name").text,
"description": flow_el.find("description").text or None,
"tree_type": flow_el.find("tree_type").text,
"version": int(flow_el.find("version").text),
"author_name": flow_el.find("author_name").text or None,
"category": category,
"tags": tags,
"tree_structure": ts,
"intake_form": None,
},
}
import_resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
assert import_resp.status_code == 201
result = import_resp.json()
assert result["name"] == original["name"]