refactor: remove XML export, JSON-only for .rfflow files

- Remove XML builder, format query param, and XML tests
- Simplify ExportFlowModal (no format picker)
- Simplify rfflowParser (JSON-only)
- Remove format field from schemas and types

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-03-06 00:56:07 -05:00
parent 39677a3841
commit 88c1553c5d
8 changed files with 22 additions and 293 deletions

View File

@@ -1,8 +1,6 @@
"""Flow export/import endpoints (.rfflow files)."""
import json
import logging
import re
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from typing import Annotated, Optional
from uuid import UUID
@@ -43,45 +41,6 @@ def _slugify(name: str) -> str:
return re.sub(r'[-\s]+', '-', slug)
def _build_xml(envelope: FlowExportEnvelope) -> str:
"""Build XML representation of an .rfflow export."""
root = ET.Element("rfflow")
root.set("version", envelope.rfflow_version)
ET.SubElement(root, "exported_at").text = envelope.exported_at.isoformat()
ET.SubElement(root, "source_app").text = envelope.source_app
ET.SubElement(root, "format").text = "xml"
flow_el = ET.SubElement(root, "flow")
flow = envelope.flow
ET.SubElement(flow_el, "name").text = flow.name
ET.SubElement(flow_el, "description").text = flow.description or ""
ET.SubElement(flow_el, "tree_type").text = flow.tree_type
ET.SubElement(flow_el, "version").text = str(flow.version)
ET.SubElement(flow_el, "author_name").text = flow.author_name or ""
if flow.category:
cat_el = ET.SubElement(flow_el, "category")
ET.SubElement(cat_el, "name").text = flow.category.name
ET.SubElement(cat_el, "slug").text = flow.category.slug
tags_el = ET.SubElement(flow_el, "tags")
for tag in flow.tags:
ET.SubElement(tags_el, "tag").text = tag
# Store tree_structure as JSON string in CDATA-safe text
ts_el = ET.SubElement(flow_el, "tree_structure")
ts_el.text = json.dumps(flow.tree_structure)
if flow.intake_form:
if_el = ET.SubElement(flow_el, "intake_form")
if_el.text = json.dumps(flow.intake_form)
ET.indent(root)
return ET.tostring(root, encoding="unicode", xml_declaration=True)
# --- Export ---
@router.get("/{tree_id}/export")
@@ -89,9 +48,8 @@ async def export_tree(
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
format: str = Query("json", pattern="^(json|xml)$"),
):
"""Export a tree as a downloadable .rfflow file (JSON or XML)."""
"""Export a tree as a downloadable .rfflow JSON file."""
# Load tree with relationships + author name
result = await db.execute(
select(Tree)
@@ -139,25 +97,15 @@ async def export_tree(
rfflow_version="1.0",
exported_at=datetime.now(timezone.utc),
source_app="ResolutionFlow",
format=format,
flow=flow_data,
)
slug = _slugify(tree.name)
# Audit log
await log_audit(db, current_user.id, "tree.export", "tree", tree.id, {"format": format})
await log_audit(db, current_user.id, "tree.export", "tree", tree.id)
await db.commit()
if format == "xml":
content = _build_xml(envelope)
return Response(
content=content,
media_type="application/xml",
headers={"Content-Disposition": f'attachment; filename="{slug}.rfflow"'},
)
# JSON
content = envelope.model_dump_json(indent=2)
return Response(
content=content,

View File

@@ -1,6 +1,6 @@
"""Schemas for .rfflow file export and import."""
from datetime import datetime
from typing import Optional, Any, Literal
from typing import Optional, Any
from pydantic import BaseModel, Field
from app.schemas.tree import TreeType
@@ -30,7 +30,6 @@ class FlowExportEnvelope(BaseModel):
rfflow_version: str = "1.0"
exported_at: datetime
source_app: str = "ResolutionFlow"
format: Literal["json", "xml"] = "json"
flow: FlowExportData
@@ -39,7 +38,6 @@ class FlowImportRequest(BaseModel):
rfflow_version: str = Field(..., description="Must be '1.0'")
exported_at: datetime
source_app: str = "ResolutionFlow"
format: Literal["json", "xml"] = "json"
flow: FlowExportData

View File

@@ -1,6 +1,4 @@
"""Tests for flow export/import (.rfflow) endpoints."""
import json
import xml.etree.ElementTree as ET
import pytest
from httpx import AsyncClient
@@ -41,7 +39,7 @@ async def create_tree_with_tags(client: AsyncClient, headers: dict, data: dict |
async def test_export_json_format(client, auth_headers, test_tree):
"""Export should return valid .rfflow JSON with correct structure."""
resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export?format=json",
f"/api/v1/trees/{test_tree['id']}/export",
headers=auth_headers,
)
assert resp.status_code == 200
@@ -51,7 +49,6 @@ async def test_export_json_format(client, auth_headers, test_tree):
data = resp.json()
assert data["rfflow_version"] == "1.0"
assert data["source_app"] == "ResolutionFlow"
assert data["format"] == "json"
assert data["exported_at"] is not None
flow = data["flow"]
@@ -65,37 +62,12 @@ async def test_export_json_format(client, auth_headers, test_tree):
assert "account_id" not in flow
@pytest.mark.asyncio
async def test_export_xml_format(client, auth_headers, test_tree):
"""Export as XML should be parseable and contain all required fields."""
resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export?format=xml",
headers=auth_headers,
)
assert resp.status_code == 200
assert ".rfflow" in resp.headers.get("content-disposition", "")
# Parse XML
root = ET.fromstring(resp.text)
assert root.tag == "rfflow"
assert root.get("version") == "1.0"
flow_el = root.find("flow")
assert flow_el is not None
assert flow_el.find("name").text == test_tree["name"]
# tree_structure should be valid JSON
ts_text = flow_el.find("tree_structure").text
ts = json.loads(ts_text)
assert ts["id"] == "root"
@pytest.mark.asyncio
async def test_export_with_category_and_tags(client, auth_headers):
"""Export should include category and tag data."""
tree = await create_tree_with_tags(client, auth_headers)
resp = await client.get(
f"/api/v1/trees/{tree['id']}/export?format=json",
f"/api/v1/trees/{tree['id']}/export",
headers=auth_headers,
)
assert resp.status_code == 200
@@ -121,9 +93,8 @@ async def test_export_access_control(client, auth_headers, test_admin, admin_aut
})
other_headers = {"Authorization": f"Bearer {login_resp.json()['access_token']}"}
# The other user is in a different account, so can't see the tree
resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export?format=json",
f"/api/v1/trees/{test_tree['id']}/export",
headers=other_headers,
)
assert resp.status_code == 403
@@ -134,7 +105,7 @@ async def test_export_nonexistent_tree(client, auth_headers):
"""Export of non-existent tree returns 404."""
import uuid
resp = await client.get(
f"/api/v1/trees/{uuid.uuid4()}/export?format=json",
f"/api/v1/trees/{uuid.uuid4()}/export",
headers=auth_headers,
)
assert resp.status_code == 404
@@ -147,7 +118,7 @@ async def test_import_happy_path(client, auth_headers, test_tree):
"""Import should create a draft tree owned by the importing user."""
# First export
export_resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export?format=json",
f"/api/v1/trees/{test_tree['id']}/export",
headers=auth_headers,
)
rfflow_data = export_resp.json()
@@ -180,7 +151,7 @@ async def test_import_happy_path(client, auth_headers, test_tree):
async def test_import_with_name_override(client, auth_headers, test_tree):
"""Import with name_override should use the override name."""
export_resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export?format=json",
f"/api/v1/trees/{test_tree['id']}/export",
headers=auth_headers,
)
rfflow_data = export_resp.json()
@@ -201,7 +172,6 @@ async def test_import_with_new_tags(client, auth_headers):
"rfflow_version": "1.0",
"exported_at": "2026-03-05T14:30:00+00:00",
"source_app": "ResolutionFlow",
"format": "json",
"flow": {
"name": "Test Import Tags",
"description": "Testing tag creation",
@@ -235,7 +205,6 @@ async def test_import_with_category_creation(client, auth_headers):
"rfflow_version": "1.0",
"exported_at": "2026-03-05T14:30:00+00:00",
"source_app": "ResolutionFlow",
"format": "json",
"flow": {
"name": "Import Category Test",
"description": None,
@@ -267,7 +236,6 @@ async def test_import_invalid_version(client, auth_headers):
"rfflow_version": "99.0",
"exported_at": "2026-03-05T14:30:00+00:00",
"source_app": "ResolutionFlow",
"format": "json",
"flow": {
"name": "Bad Version",
"tree_type": "troubleshooting",
@@ -288,7 +256,7 @@ async def test_import_round_trip(client, auth_headers):
# Export
export_resp = await client.get(
f"/api/v1/trees/{original['id']}/export?format=json",
f"/api/v1/trees/{original['id']}/export",
headers=auth_headers,
)
rfflow = export_resp.json()
@@ -312,53 +280,3 @@ async def test_import_round_trip(client, auth_headers):
assert imported_tree["tree_structure"]["id"] == original["tree_structure"]["id"]
assert imported_tree["tree_type"] == original["tree_type"]
assert imported_tree["status"] == "draft" # Always draft on import
@pytest.mark.asyncio
async def test_import_xml_round_trip(client, auth_headers):
"""Export as XML then re-import the parsed structure should work."""
original = await create_tree_with_tags(client, auth_headers)
# Export as XML
export_resp = await client.get(
f"/api/v1/trees/{original['id']}/export?format=xml",
headers=auth_headers,
)
assert export_resp.status_code == 200
# Parse XML to build import request (simulating frontend parser)
root = ET.fromstring(export_resp.text)
flow_el = root.find("flow")
tags = [t.text for t in flow_el.find("tags").findall("tag")]
ts = json.loads(flow_el.find("tree_structure").text)
cat_el = flow_el.find("category")
category = None
if cat_el is not None:
cat_name = cat_el.find("name")
cat_slug = cat_el.find("slug")
if cat_name is not None and cat_slug is not None:
category = {"name": cat_name.text, "slug": cat_slug.text}
rfflow = {
"rfflow_version": root.get("version"),
"exported_at": root.find("exported_at").text,
"source_app": root.find("source_app").text,
"format": "xml",
"flow": {
"name": flow_el.find("name").text,
"description": flow_el.find("description").text or None,
"tree_type": flow_el.find("tree_type").text,
"version": int(flow_el.find("version").text),
"author_name": flow_el.find("author_name").text or None,
"category": category,
"tags": tags,
"tree_structure": ts,
"intake_form": None,
},
}
import_resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
assert import_resp.status_code == 201
result = import_resp.json()
assert result["name"] == original["name"]