Files
resolutionflow/backend/app/api/endpoints/tree_transfer.py
chihlasm 88c1553c5d refactor: remove XML export, JSON-only for .rfflow files
- Remove XML builder, format query param, and XML tests
- Simplify ExportFlowModal (no format picker)
- Simplify rfflowParser (JSON-only)
- Remove format field from schemas and types

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 00:56:07 -05:00

282 lines
8.5 KiB
Python

"""Flow export/import endpoints (.rfflow files)."""
import logging
import re
from datetime import datetime, timezone
from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from fastapi.responses import Response
from sqlalchemy import select, or_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.api.deps import get_current_active_user, require_engineer_or_admin
from app.core.audit import log_audit
from app.core.database import get_db
from app.core.permissions import can_access_tree
from app.core.subscriptions import check_tree_limit
from app.core.tree_validation import can_publish_tree
from app.models.category import TreeCategory
from app.models.tag import TreeTag, tree_tag_assignments
from app.models.tree import Tree
from app.models.user import User
from app.schemas.tree_export import (
FlowExportCategory,
FlowExportData,
FlowExportEnvelope,
FlowImportRequest,
FlowImportResponse,
)
from app.services.rag_service import index_tree as rag_index_tree
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/trees", tags=["tree-transfer"])
def _slugify(name: str) -> str:
"""Create a filename-safe slug from a name."""
slug = re.sub(r'[^\w\s-]', '', name.lower().strip())
return re.sub(r'[-\s]+', '-', slug)
# --- Export ---
@router.get("/{tree_id}/export")
async def export_tree(
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
):
"""Export a tree as a downloadable .rfflow JSON file."""
# Load tree with relationships + author name
result = await db.execute(
select(Tree)
.options(
selectinload(Tree.category_rel),
selectinload(Tree.tags),
selectinload(Tree.author),
)
.where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(status_code=404, detail="Tree not found")
if not tree.is_active or not can_access_tree(current_user, tree):
raise HTTPException(status_code=403, detail="You don't have access to this tree")
# Build export category
export_category = None
if tree.category_rel:
export_category = FlowExportCategory(
name=tree.category_rel.name,
slug=tree.category_rel.slug,
)
# Build export data
author_name = None
if tree.author:
author_name = tree.author.name or tree.author.email
flow_data = FlowExportData(
name=tree.name,
description=tree.description,
tree_type=tree.tree_type,
version=tree.version,
author_name=author_name,
category=export_category,
tags=tree.tag_names,
tree_structure=tree.tree_structure,
intake_form=tree.intake_form,
)
envelope = FlowExportEnvelope(
rfflow_version="1.0",
exported_at=datetime.now(timezone.utc),
source_app="ResolutionFlow",
flow=flow_data,
)
slug = _slugify(tree.name)
# Audit log
await log_audit(db, current_user.id, "tree.export", "tree", tree.id)
await db.commit()
content = envelope.model_dump_json(indent=2)
return Response(
content=content,
media_type="application/json",
headers={"Content-Disposition": f'attachment; filename="{slug}.rfflow"'},
)
# --- Import ---
@router.post("/import", response_model=FlowImportResponse, status_code=status.HTTP_201_CREATED)
async def import_tree(
data: FlowImportRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_engineer_or_admin)],
name_override: Optional[str] = Query(None, max_length=255),
):
"""Import a flow from a parsed .rfflow file. Creates as draft."""
# Validate version
if data.rfflow_version != "1.0":
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Unsupported rfflow version: {data.rfflow_version}. Only '1.0' is supported.",
)
flow = data.flow
# Check subscription tree limit
if current_user.account_id:
can_create, limit, count = await check_tree_limit(current_user.account_id, db)
if not can_create:
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail=f"Tree limit reached ({count}/{limit}). Upgrade your plan to create more trees.",
)
# --- Category resolution ---
category_id = None
category_created = False
if flow.category:
# Try to match by slug within user's account
cat_result = await db.execute(
select(TreeCategory).where(
TreeCategory.slug == flow.category.slug,
or_(
TreeCategory.account_id.is_(None),
TreeCategory.account_id == current_user.account_id,
),
)
)
category = cat_result.scalar_one_or_none()
if category:
category_id = category.id
else:
# Create new category
new_cat = TreeCategory(
name=flow.category.name,
slug=flow.category.slug,
account_id=current_user.account_id,
)
db.add(new_cat)
await db.flush()
category_id = new_cat.id
category_created = True
# --- Tag resolution ---
tags_created: list[str] = []
tags_to_add: list[TreeTag] = []
tree_account_id = current_user.account_id
for tag_name in flow.tags:
slug = TreeTag.slugify(tag_name)
tag_result = await db.execute(
select(TreeTag).where(
TreeTag.slug == slug,
or_(
TreeTag.account_id.is_(None),
TreeTag.account_id == tree_account_id,
),
)
)
tag = tag_result.scalar_one_or_none()
if not tag:
tag = TreeTag(
name=tag_name,
slug=slug,
account_id=tree_account_id,
created_by=current_user.id,
)
db.add(tag)
await db.flush()
tags_created.append(tag_name)
tags_to_add.append(tag)
tag.usage_count += 1
# --- Validation warnings (non-blocking since status=draft) ---
warnings: list[str] = []
intake_form_dicts = flow.intake_form
can_pub, validation_errors = can_publish_tree(
flow.tree_structure,
flow.name,
flow.description,
tree_type=flow.tree_type,
intake_form=intake_form_dicts,
)
if not can_pub:
for err in validation_errors:
msg = err.get("message", str(err)) if isinstance(err, dict) else str(err)
warnings.append(msg)
# --- Create tree ---
tree_name = name_override or flow.name
import_metadata = {
"original_author_name": flow.author_name,
"exported_at": data.exported_at.isoformat(),
"imported_at": datetime.now(timezone.utc).isoformat(),
"source_app": data.source_app,
}
new_tree = Tree(
name=tree_name,
description=flow.description,
tree_type=flow.tree_type,
tree_structure=flow.tree_structure,
intake_form=intake_form_dicts,
category_id=category_id,
author_id=current_user.id,
account_id=current_user.account_id,
status="draft",
version=1,
import_metadata=import_metadata,
)
db.add(new_tree)
await db.flush()
# Tag junction table inserts
for tag in tags_to_add:
await db.execute(
tree_tag_assignments.insert().values(
tree_id=new_tree.id,
tag_id=tag.id,
assigned_by=current_user.id,
)
)
# Audit log
await log_audit(db, current_user.id, "tree.import", "tree", new_tree.id, {
"source_app": data.source_app,
"original_author": flow.author_name,
})
await db.commit()
# RAG index (best-effort)
try:
await rag_index_tree(new_tree.id, db)
await db.commit()
except Exception:
logger.warning("RAG indexing failed for imported tree %s", new_tree.id)
return FlowImportResponse(
tree_id=str(new_tree.id),
name=tree_name,
tree_type=flow.tree_type,
status="draft",
category_created=category_created,
tags_created=tags_created,
validation_warnings=warnings,
)