"""Flow export/import endpoints (.rfflow files).""" import logging import re from datetime import datetime, timezone from typing import Annotated, Optional from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, status from fastapi.responses import Response from sqlalchemy import select, or_ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.api.deps import get_current_active_user, require_engineer_or_admin from app.core.audit import log_audit from app.core.database import get_db from app.core.permissions import can_access_tree from app.core.subscriptions import check_tree_limit from app.core.tree_validation import can_publish_tree from app.models.category import TreeCategory from app.models.tag import TreeTag, tree_tag_assignments from app.models.tree import Tree from app.models.user import User from app.schemas.tree_export import ( FlowExportCategory, FlowExportData, FlowExportEnvelope, FlowImportRequest, FlowImportResponse, ) from app.services.rag_service import index_tree as rag_index_tree logger = logging.getLogger(__name__) router = APIRouter(prefix="/trees", tags=["tree-transfer"]) def _slugify(name: str) -> str: """Create a filename-safe slug from a name.""" slug = re.sub(r'[^\w\s-]', '', name.lower().strip()) return re.sub(r'[-\s]+', '-', slug) # --- Export --- @router.get("/{tree_id}/export") async def export_tree( tree_id: UUID, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)], ): """Export a tree as a downloadable .rfflow JSON file.""" # Load tree with relationships + author name result = await db.execute( select(Tree) .options( selectinload(Tree.category_rel), selectinload(Tree.tags), selectinload(Tree.author), ) .where(Tree.id == tree_id) ) tree = result.scalar_one_or_none() if not tree: raise HTTPException(status_code=404, detail="Tree not found") if not tree.is_active or not can_access_tree(current_user, tree): raise HTTPException(status_code=403, detail="You don't have access to this tree") # Build export category export_category = None if tree.category_rel: export_category = FlowExportCategory( name=tree.category_rel.name, slug=tree.category_rel.slug, ) # Build export data author_name = None if tree.author: author_name = tree.author.name or tree.author.email flow_data = FlowExportData( name=tree.name, description=tree.description, tree_type=tree.tree_type, version=tree.version, author_name=author_name, category=export_category, tags=tree.tag_names, tree_structure=tree.tree_structure, intake_form=tree.intake_form, ) envelope = FlowExportEnvelope( rfflow_version="1.0", exported_at=datetime.now(timezone.utc), source_app="ResolutionFlow", flow=flow_data, ) slug = _slugify(tree.name) # Audit log await log_audit(db, current_user.id, "tree.export", "tree", tree.id) await db.commit() content = envelope.model_dump_json(indent=2) return Response( content=content, media_type="application/json", headers={"Content-Disposition": f'attachment; filename="{slug}.rfflow"'}, ) # --- Import --- @router.post("/import", response_model=FlowImportResponse, status_code=status.HTTP_201_CREATED) async def import_tree( data: FlowImportRequest, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_engineer_or_admin)], name_override: Optional[str] = Query(None, max_length=255), ): """Import a flow from a parsed .rfflow file. Creates as draft.""" # Validate version if data.rfflow_version != "1.0": raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Unsupported rfflow version: {data.rfflow_version}. Only '1.0' is supported.", ) flow = data.flow # Check subscription tree limit if current_user.account_id: can_create, limit, count = await check_tree_limit(current_user.account_id, db) if not can_create: raise HTTPException( status_code=status.HTTP_402_PAYMENT_REQUIRED, detail=f"Tree limit reached ({count}/{limit}). Upgrade your plan to create more trees.", ) # --- Category resolution --- category_id = None category_created = False if flow.category: # Try to match by slug within user's account cat_result = await db.execute( select(TreeCategory).where( TreeCategory.slug == flow.category.slug, or_( TreeCategory.account_id.is_(None), TreeCategory.account_id == current_user.account_id, ), ) ) category = cat_result.scalar_one_or_none() if category: category_id = category.id else: # Create new category new_cat = TreeCategory( name=flow.category.name, slug=flow.category.slug, account_id=current_user.account_id, ) db.add(new_cat) await db.flush() category_id = new_cat.id category_created = True # --- Tag resolution --- tags_created: list[str] = [] tags_to_add: list[TreeTag] = [] tree_account_id = current_user.account_id for tag_name in flow.tags: slug = TreeTag.slugify(tag_name) tag_result = await db.execute( select(TreeTag).where( TreeTag.slug == slug, or_( TreeTag.account_id.is_(None), TreeTag.account_id == tree_account_id, ), ) ) tag = tag_result.scalar_one_or_none() if not tag: tag = TreeTag( name=tag_name, slug=slug, account_id=tree_account_id, created_by=current_user.id, ) db.add(tag) await db.flush() tags_created.append(tag_name) tags_to_add.append(tag) tag.usage_count += 1 # --- Validation warnings (non-blocking since status=draft) --- warnings: list[str] = [] intake_form_dicts = flow.intake_form can_pub, validation_errors = can_publish_tree( flow.tree_structure, flow.name, flow.description, tree_type=flow.tree_type, intake_form=intake_form_dicts, ) if not can_pub: for err in validation_errors: msg = err.get("message", str(err)) if isinstance(err, dict) else str(err) warnings.append(msg) # --- Create tree --- tree_name = name_override or flow.name import_metadata = { "original_author_name": flow.author_name, "exported_at": data.exported_at.isoformat(), "imported_at": datetime.now(timezone.utc).isoformat(), "source_app": data.source_app, } new_tree = Tree( name=tree_name, description=flow.description, tree_type=flow.tree_type, tree_structure=flow.tree_structure, intake_form=intake_form_dicts, category_id=category_id, author_id=current_user.id, account_id=current_user.account_id, status="draft", version=1, import_metadata=import_metadata, ) db.add(new_tree) await db.flush() # Tag junction table inserts for tag in tags_to_add: await db.execute( tree_tag_assignments.insert().values( tree_id=new_tree.id, tag_id=tag.id, assigned_by=current_user.id, ) ) # Audit log await log_audit(db, current_user.id, "tree.import", "tree", new_tree.id, { "source_app": data.source_app, "original_author": flow.author_name, }) await db.commit() # RAG index (best-effort) try: await rag_index_tree(new_tree.id, db) await db.commit() except Exception: logger.warning("RAG indexing failed for imported tree %s", new_tree.id) return FlowImportResponse( tree_id=str(new_tree.id), name=tree_name, tree_type=flow.tree_type, status="draft", category_created=category_created, tags_created=tags_created, validation_warnings=warnings, )