Files
resolutionflow/backend/app/api/endpoints/tree_transfer.py
chihlasm ee9895de5d feat: add flow export/import backend (migration, endpoints, schemas)
Add .rfflow file export/import support:
- Migration 050: import_metadata JSONB column on trees
- GET /trees/{id}/export?format=json|xml endpoint
- POST /trees/import endpoint (creates draft, resolves categories/tags)
- FlowExportEnvelope, FlowImportRequest/Response schemas
- import_metadata field on TreeResponse

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 00:03:59 -05:00

334 lines
10 KiB
Python

"""Flow export/import endpoints (.rfflow files)."""
import json
import logging
import re
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from fastapi.responses import Response
from sqlalchemy import select, or_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.api.deps import get_current_active_user, require_engineer_or_admin
from app.core.audit import log_audit
from app.core.database import get_db
from app.core.permissions import can_access_tree
from app.core.subscriptions import check_tree_limit
from app.core.tree_validation import can_publish_tree
from app.models.category import TreeCategory
from app.models.tag import TreeTag, tree_tag_assignments
from app.models.tree import Tree
from app.models.user import User
from app.schemas.tree_export import (
FlowExportCategory,
FlowExportData,
FlowExportEnvelope,
FlowImportRequest,
FlowImportResponse,
)
from app.services.rag_service import index_tree as rag_index_tree
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/trees", tags=["tree-transfer"])
def _slugify(name: str) -> str:
"""Create a filename-safe slug from a name."""
slug = re.sub(r'[^\w\s-]', '', name.lower().strip())
return re.sub(r'[-\s]+', '-', slug)
def _build_xml(envelope: FlowExportEnvelope) -> str:
"""Build XML representation of an .rfflow export."""
root = ET.Element("rfflow")
root.set("version", envelope.rfflow_version)
ET.SubElement(root, "exported_at").text = envelope.exported_at.isoformat()
ET.SubElement(root, "source_app").text = envelope.source_app
ET.SubElement(root, "format").text = "xml"
flow_el = ET.SubElement(root, "flow")
flow = envelope.flow
ET.SubElement(flow_el, "name").text = flow.name
ET.SubElement(flow_el, "description").text = flow.description or ""
ET.SubElement(flow_el, "tree_type").text = flow.tree_type
ET.SubElement(flow_el, "version").text = str(flow.version)
ET.SubElement(flow_el, "author_name").text = flow.author_name or ""
if flow.category:
cat_el = ET.SubElement(flow_el, "category")
ET.SubElement(cat_el, "name").text = flow.category.name
ET.SubElement(cat_el, "slug").text = flow.category.slug
tags_el = ET.SubElement(flow_el, "tags")
for tag in flow.tags:
ET.SubElement(tags_el, "tag").text = tag
# Store tree_structure as JSON string in CDATA-safe text
ts_el = ET.SubElement(flow_el, "tree_structure")
ts_el.text = json.dumps(flow.tree_structure)
if flow.intake_form:
if_el = ET.SubElement(flow_el, "intake_form")
if_el.text = json.dumps(flow.intake_form)
ET.indent(root)
return ET.tostring(root, encoding="unicode", xml_declaration=True)
# --- Export ---
@router.get("/{tree_id}/export")
async def export_tree(
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
format: str = Query("json", regex="^(json|xml)$"),
):
"""Export a tree as a downloadable .rfflow file (JSON or XML)."""
# Load tree with relationships + author name
result = await db.execute(
select(Tree)
.options(
selectinload(Tree.category_rel),
selectinload(Tree.tags),
selectinload(Tree.author),
)
.where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(status_code=404, detail="Tree not found")
if not tree.is_active or not can_access_tree(current_user, tree):
raise HTTPException(status_code=403, detail="You don't have access to this tree")
# Build export category
export_category = None
if tree.category_rel:
export_category = FlowExportCategory(
name=tree.category_rel.name,
slug=tree.category_rel.slug,
)
# Build export data
author_name = None
if tree.author:
author_name = tree.author.name or tree.author.email
flow_data = FlowExportData(
name=tree.name,
description=tree.description,
tree_type=tree.tree_type,
version=tree.version,
author_name=author_name,
category=export_category,
tags=tree.tag_names,
tree_structure=tree.tree_structure,
intake_form=tree.intake_form,
)
envelope = FlowExportEnvelope(
rfflow_version="1.0",
exported_at=datetime.now(timezone.utc),
source_app="ResolutionFlow",
format=format,
flow=flow_data,
)
slug = _slugify(tree.name)
# Audit log
await log_audit(db, current_user.id, "tree.export", "tree", tree.id, {"format": format})
await db.commit()
if format == "xml":
content = _build_xml(envelope)
return Response(
content=content,
media_type="application/xml",
headers={"Content-Disposition": f'attachment; filename="{slug}.rfflow"'},
)
# JSON
content = envelope.model_dump_json(indent=2)
return Response(
content=content,
media_type="application/json",
headers={"Content-Disposition": f'attachment; filename="{slug}.rfflow"'},
)
# --- Import ---
@router.post("/import", response_model=FlowImportResponse, status_code=status.HTTP_201_CREATED)
async def import_tree(
data: FlowImportRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_engineer_or_admin)],
name_override: Optional[str] = Query(None, max_length=255),
):
"""Import a flow from a parsed .rfflow file. Creates as draft."""
# Validate version
if data.rfflow_version != "1.0":
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Unsupported rfflow version: {data.rfflow_version}. Only '1.0' is supported.",
)
flow = data.flow
# Check subscription tree limit
if current_user.account_id:
can_create, limit, count = await check_tree_limit(current_user.account_id, db)
if not can_create:
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail=f"Tree limit reached ({count}/{limit}). Upgrade your plan to create more trees.",
)
# --- Category resolution ---
category_id = None
category_created = False
if flow.category:
# Try to match by slug within user's account
cat_result = await db.execute(
select(TreeCategory).where(
TreeCategory.slug == flow.category.slug,
or_(
TreeCategory.account_id.is_(None),
TreeCategory.account_id == current_user.account_id,
),
)
)
category = cat_result.scalar_one_or_none()
if category:
category_id = category.id
else:
# Create new category
new_cat = TreeCategory(
name=flow.category.name,
slug=flow.category.slug,
account_id=current_user.account_id,
)
db.add(new_cat)
await db.flush()
category_id = new_cat.id
category_created = True
# --- Tag resolution ---
tags_created: list[str] = []
tags_to_add: list[TreeTag] = []
tree_account_id = current_user.account_id
for tag_name in flow.tags:
slug = TreeTag.slugify(tag_name)
tag_result = await db.execute(
select(TreeTag).where(
TreeTag.slug == slug,
or_(
TreeTag.account_id.is_(None),
TreeTag.account_id == tree_account_id,
),
)
)
tag = tag_result.scalar_one_or_none()
if not tag:
tag = TreeTag(
name=tag_name,
slug=slug,
account_id=tree_account_id,
created_by=current_user.id,
)
db.add(tag)
await db.flush()
tags_created.append(tag_name)
tags_to_add.append(tag)
tag.usage_count += 1
# --- Validation warnings (non-blocking since status=draft) ---
warnings: list[str] = []
intake_form_dicts = flow.intake_form
can_pub, validation_errors = can_publish_tree(
flow.tree_structure,
flow.name,
flow.description,
tree_type=flow.tree_type,
intake_form=intake_form_dicts,
)
if not can_pub:
for err in validation_errors:
msg = err.get("message", str(err)) if isinstance(err, dict) else str(err)
warnings.append(msg)
# --- Create tree ---
tree_name = name_override or flow.name
import_metadata = {
"original_author_name": flow.author_name,
"exported_at": data.exported_at.isoformat(),
"imported_at": datetime.now(timezone.utc).isoformat(),
"source_app": data.source_app,
}
new_tree = Tree(
name=tree_name,
description=flow.description,
tree_type=flow.tree_type,
tree_structure=flow.tree_structure,
intake_form=intake_form_dicts,
category_id=category_id,
author_id=current_user.id,
account_id=current_user.account_id,
status="draft",
version=1,
import_metadata=import_metadata,
)
db.add(new_tree)
await db.flush()
# Tag junction table inserts
for tag in tags_to_add:
await db.execute(
tree_tag_assignments.insert().values(
tree_id=new_tree.id,
tag_id=tag.id,
assigned_by=current_user.id,
)
)
# Audit log
await log_audit(db, current_user.id, "tree.import", "tree", new_tree.id, {
"source_app": data.source_app,
"original_author": flow.author_name,
})
await db.commit()
# RAG index (best-effort)
try:
await rag_index_tree(new_tree.id, db)
await db.commit()
except Exception:
logger.warning("RAG indexing failed for imported tree %s", new_tree.id)
return FlowImportResponse(
tree_id=str(new_tree.id),
name=tree_name,
tree_type=flow.tree_type,
status="draft",
category_created=category_created,
tags_created=tags_created,
validation_warnings=warnings,
)