feat: flow export/import + procedural Flow Assist (#96)

* feat: add flow export/import backend (migration, endpoints, schemas)

Add .rfflow file export/import support:
- Migration 050: import_metadata JSONB column on trees
- GET /trees/{id}/export?format=json|xml endpoint
- POST /trees/import endpoint (creates draft, resolves categories/tags)
- FlowExportEnvelope, FlowImportRequest/Response schemas
- import_metadata field on TreeResponse

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add flow export/import frontend + backend tests

Frontend:
- ExportFlowModal with JSON/XML format selection + download
- ImportFlowModal with drag-drop file picker + preview step
- rfflowParser for client-side JSON/XML .rfflow parsing
- Export buttons on editor toolbar and library action menus
- Import button on library page next to Create New
- Provenance display for imported flows in editor
- flowTransfer API client + types

Backend:
- Fix regex->pattern deprecation in export endpoint
- 12 integration tests covering export, import, round-trip,
  access control, tag/category creation, version validation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor: remove XML export, JSON-only for .rfflow files

- Remove XML builder, format query param, and XML tests
- Simplify ExportFlowModal (no format picker)
- Simplify rfflowParser (JSON-only)
- Remove format field from schemas and types

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: match Flow Assist chat input to AI Assistant styling + strengthen one-question prompt

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add procedural flow support to AI chat builder (Flow Assist)

- Add procedural-specific system prompts (schema, interview protocol, response format)
- Dispatch prompts by flow_type: procedural/maintenance use flat steps schema, troubleshooting uses decision tree schema
- Parse [STEPS_UPDATE] and [INTAKE_FORM] markers in AI responses
- Add validate_generated_procedural_steps() validator
- Handle intake form extraction in AI chat import endpoint
- Add StaticStepsPreview component for procedural flow preview
- Update store and page to render correct preview by flow type

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add flow type selection to Flow Assist entry points

- CreateFlowDropdown now shows "Build with Flow Assist" under each flow type
- Library page "Flow Assist" button respects current type filter
- Clean up unused AIFlowBuilderModal references

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: update CLAUDE.md with AI chat builder and intake form learnings

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: refine assistant chat prompt for concise answers and focused questions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: switch AI provider to Claude Sonnet 4.6 + add shift+enter hint to chat inputs

- Default AI_PROVIDER changed from gemini to anthropic
- AI_MODEL and AI_MODEL_ANTHROPIC updated to claude-sonnet-4-6
- Added "Shift + Enter for a new line" hint below all chat textareas

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: update CLAUDE.md with AI provider and chat input learnings

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add editor-embedded Flow Assist design document

Design for replacing the standalone /ai/chat page with context-aware
AI side panels embedded in each editor (Troubleshooting + Procedural).
Covers ghost node suggestion system, output-based thresholds,
config-driven model routing, knowledge integration, and per-flow
chat persistence.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add editor-embedded Flow Assist implementation plan

25-task plan across 9 phases covering backend foundation, frontend
infrastructure, tree/procedural editor integration, AI-assisted create,
old code removal, action-type dispatch, suggestion audit trail, and
build verification.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: use actual root node ID in orphan validation check

AI-generated trees use descriptive IDs (e.g., "verify-account-exists")
instead of "root", causing the root node to be falsely flagged as orphaned.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add config-driven AI model tier routing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: extend AI chat session with tree_id and archived_at

Add tree_id FK (CASCADE) for editor-embedded sessions and archived_at
timestamp column to ai_chat_sessions table.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add AI suggestion audit trail table

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add action_type and focal_node_id to AI chat message API

- Add VALID_ACTION_TYPES literal and action_type/focal_node_id fields to
  AIChatMessageRequest schema
- Add tree_id field to AIChatStartRequest schema for editor-embedded sessions
- Update send_message() signature with action_type and focal_node_id params
- Update start_chat_session() signature with tree_id param
- Pass new params through endpoints to service functions
- All new params have defaults so existing behavior is unchanged

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: route AI model selection through action-type config

Update get_ai_provider() to accept an optional model override parameter
(applied only to AnthropicProvider; Gemini always uses its own model).
Thread action_type-based model resolution through send_message() and
generate_final_tree() in the AI chat service.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add TypeScript types for editor-embedded AI

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add shared ContextMenu component

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add useEditorAI hook and editorAI API client

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add EditorAIPanel component with Chat and Suggestions tabs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: integrate AI panel, context menu, and ghost nodes in tree editor

- Add AI Assist panel toggle button to tree editor toolbar
- Wire EditorAIPanel alongside TreeEditorLayout with single-panel rule
- Thread onNodeContextMenu through TreeEditorLayout → FlowCanvas → FlowCanvasNode
- Add right-click context menu with Generate Branch, Explain Node, Delete actions
- Add ghost node detection (_suggestion flag) with dashed border + opacity styling
- Add Accept/Dismiss overlay buttons on ghost nodes for future suggestion handling

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: integrate AI panel, context menu, and ghost steps in procedural editor

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add AI prompt dialog and wire into CreateFlowDropdown

Replace navigation to /ai/chat with an inline AIPromptDialog modal
that collects a single prompt, generates a flow via the editor AI API,
imports it, and navigates to the editor with the AI panel open.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: add glassmorphism to AI prompt dialog + maintenance Flow Assist button

- Use .glass-card-static on AIPromptDialog card for consistent design system
- Add "Build with Flow Assist" button to maintenance section in CreateFlowDropdown

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor: remove standalone Flow Assist page and old AI chat components

Remove the old /ai/chat page, AI wizard modal, and all associated
components/stores/types now replaced by the editor-embedded AI panel.

Deleted:
- AIChatBuilderPage, ai-chat/ components, aiChatStore, aiChat API, ai-chat types
- AIFlowBuilderModal, ai-builder/ components, aiFlowBuilderStore

Cleaned up:
- Router (removed /ai/chat route)
- Sidebar (removed Flow Assist nav item)
- MyTreesPage (removed AI builder modal and button)
- TreeLibraryPage (removed Flow Assist button)
- API and type barrel exports

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add delta response parsing and action-type prompt dispatch

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add AI suggestion audit trail endpoints

Create/list/resolve endpoints for tracking AI-applied changes to flows.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add APScheduler task to auto-archive stale AI chat sessions

Archives AI chat sessions with no activity for 30 days, runs daily at 3 AM.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: update project status for editor-embedded Flow Assist

- Add Editor-Embedded Flow Assist to CURRENT-STATE.md in-progress items
- Update CLAUDE.md: fix stale lessons (#41, #46), add new patterns (#47 editor AI architecture, #48 orphan validation)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: use correct model alias in AI_MODEL_TIERS standard tier

The dated model ID `claude-sonnet-4-6-20250514` was causing 502 errors.
Use the alias `claude-sonnet-4-6` which matches AI_MODEL_ANTHROPIC.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: send live flow context to AI Assist for full editor awareness

The AI panel now sends the current tree structure (troubleshooting) or
steps + intake form (procedural/maintenance) with each message. This
gives the AI full visibility into node details, questions, descriptions,
options, and intake form fields — not just the node ID.

- Backend: add flow_context param to schema, endpoint, and service
- Frontend: add getFlowContext callback to useEditorAI hook
- TreeEditorPage: passes treeStructure as flow context
- ProceduralEditorPage: passes steps + intakeForm as flow context

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: include flow name and description in AI Assist context

Both editors now send name and description alongside the flow structure,
so the AI can reference what the flow is about when responding.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: increase AI timeout to 120s and limit retries to 1

The 45s timeout was too short for generation tasks with full flow
context in the system prompt. The Anthropic SDK's default 2 retries
caused requests to hang for ~136s before failing. Now: 120s timeout
with max 1 retry = faster failure if it does timeout.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: wire AI-generated flow structures into editor stores

The useEditorAI hook was ignoring result.working_tree from AI responses,
so generated steps/trees never appeared in the editor. Now:
- useEditorAI calls onFlowUpdate when working_tree is present in response
- ProceduralEditorPage handles steps + intake form updates via replaceSteps
- TreeEditorPage handles tree structure updates via replaceTreeStructure
- Both stores have new bulk-replace methods for AI integration

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add lessons learned for full-stack integration, Anthropic retries, model tiers

#49 Always verify frontend consumes backend response fields
#50 Anthropic SDK max_retries=1 to avoid 3× timeout
#51 AI model tier routing via settings.get_model_for_action()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: move AI Assist panel to full-height side layout in both editors

The AI panel was nested inside the content area, only spanning the
step list / canvas section. Now it sits at the outermost flex level,
spanning the full page height alongside all content (toolbar,
collapsible sections, steps/canvas). This prevents the panel from
overlapping content and lets the editor area properly shrink.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: AI Assist panel as fixed right drawer (matching Copilot/Scratchpad)

Convert EditorAIPanel from in-flow flex child to fixed right-side drawer
overlay, same pattern as CopilotPanel and ScratchpadSidebar. The panel
is fixed at right:0 spanning full viewport height, and editor pages add
pr-[380px] padding when open so content shifts left without overlap.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: AI Assist panel sits below topbar with slide-in animation

- Panel now uses top:56px to sit below the app shell topbar instead of
  covering it (matches the main-content grid cell area)
- Added slideInRight CSS animation for smooth drawer entrance
- Editor pages use dynamic paddingRight via PANEL_WIDTH constant
- ChatTab upgraded: markdown rendering, CopilotPanel-style message
  bubbles, auto-focus input, Shift+Enter hint
- All borders use --glass-border for consistent glassmorphism

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: AI Assist panel as in-flow flex sibling (not fixed/overlay)

Replace fixed positioning with in-flow flex layout. The outermost div
is now a horizontal flex row: content column (flex-1 min-w-0) + panel
(w-[380px] shrink-0). When the panel opens, the content column
automatically shrinks — no padding hacks or z-index stacking needed.
This guarantees the content shifts left and stays fully visible.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: AI Copilot panel as in-flow flex sibling in session navigation pages

Changed CopilotPanel from fixed overlay to flex layout sibling so it
pushes main content instead of covering it during active sessions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: remove duplicate CLAUDE.md lessons #47-48

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit was merged in pull request #96.
This commit is contained in:
chihlasm
2026-03-07 15:51:37 -05:00
committed by GitHub
parent 0fb1ef33a0
commit 0dc6123c0c
90 changed files with 7637 additions and 2472 deletions

View File

@@ -16,6 +16,7 @@ from app.models.copilot_conversation import CopilotConversation
from app.models.assistant_chat import AssistantChat
from app.models.survey_response import SurveyResponse
from app.models.survey_invite import SurveyInvite
from app.models.ai_suggestion import AISuggestion # noqa: F401
from app.core.config import settings
# this is the Alembic Config object

View File

@@ -0,0 +1,23 @@
"""Add import_metadata JSONB column to trees table.
Revision ID: 050
Revises: 049
Create Date: 2026-03-05
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB
# revision identifiers
revision = '050'
down_revision = '049'
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column('trees', sa.Column('import_metadata', JSONB, nullable=True))
def downgrade() -> None:
op.drop_column('trees', 'import_metadata')

View File

@@ -0,0 +1,39 @@
"""extend ai chat session with tree_id and archived_at
Revision ID: 051
Revises: 050
"""
from alembic import op
import sqlalchemy as sa
revision = "051"
down_revision = "050"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"ai_chat_sessions",
sa.Column("tree_id", sa.UUID(), nullable=True),
)
op.add_column(
"ai_chat_sessions",
sa.Column("archived_at", sa.DateTime(timezone=True), nullable=True),
)
op.create_index("ix_ai_chat_sessions_tree_id", "ai_chat_sessions", ["tree_id"])
op.create_foreign_key(
"fk_ai_chat_sessions_tree_id",
"ai_chat_sessions",
"trees",
["tree_id"],
["id"],
ondelete="CASCADE",
)
def downgrade() -> None:
op.drop_constraint("fk_ai_chat_sessions_tree_id", "ai_chat_sessions", type_="foreignkey")
op.drop_index("ix_ai_chat_sessions_tree_id", table_name="ai_chat_sessions")
op.drop_column("ai_chat_sessions", "archived_at")
op.drop_column("ai_chat_sessions", "tree_id")

View File

@@ -0,0 +1,37 @@
"""add ai suggestion table
Revision ID: 052
Revises: 051
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
revision = "052"
down_revision = "051"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
"ai_suggestions",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column("tree_id", UUID(as_uuid=True), sa.ForeignKey("trees.id", ondelete="CASCADE"), nullable=False),
sa.Column("user_id", UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="CASCADE"), nullable=False),
sa.Column("session_id", UUID(as_uuid=True), sa.ForeignKey("ai_chat_sessions.id", ondelete="SET NULL"), nullable=True),
sa.Column("action_type", sa.String(50), nullable=False),
sa.Column("target_node_id", sa.String(255), nullable=True),
sa.Column("changes_json", JSONB, nullable=False, server_default="{}"),
sa.Column("status", sa.String(20), nullable=False, server_default="pending"),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
sa.Column("resolved_at", sa.DateTime(timezone=True), nullable=True),
)
op.create_index("ix_ai_suggestions_tree_id", "ai_suggestions", ["tree_id"])
op.create_index("ix_ai_suggestions_user_id", "ai_suggestions", ["user_id"])
def downgrade() -> None:
op.drop_index("ix_ai_suggestions_user_id", table_name="ai_suggestions")
op.drop_index("ix_ai_suggestions_tree_id", table_name="ai_suggestions")
op.drop_table("ai_suggestions")

View File

@@ -95,6 +95,7 @@ async def create_session(
user_id=current_user.id,
account_id=current_user.account_id,
db=db,
tree_id=data.tree_id,
)
except Exception as e:
logger.exception("AI chat session start failed: %s", e)
@@ -168,7 +169,10 @@ async def post_message(
try:
ai_content, tree_update, new_phase, metadata = await send_message(
session, data.content, db
session, data.content, db,
action_type=data.action_type or "open_chat",
focal_node_id=data.focal_node_id,
flow_context=data.flow_context,
)
except Exception as e:
logger.exception("AI chat message failed: %s", e)
@@ -390,11 +394,18 @@ async def import_tree(
# Always create a new Tree record (no duplicate check — user may
# want multiple copies or re-import after edits)
metadata = session.tree_metadata or {}
# Extract intake form from metadata if present (procedural flows)
intake_form = None
if isinstance(metadata.get("intake_form"), list):
intake_form = metadata.pop("intake_form")
tree = Tree(
name=data.name or metadata.get("name", "AI-Generated Flow"),
description=data.description or metadata.get("description", ""),
tree_type=session.flow_type,
tree_structure=session.working_tree,
intake_form=intake_form,
author_id=current_user.id,
account_id=current_user.account_id,
category_id=data.category_id,

View File

@@ -0,0 +1,79 @@
"""AI Suggestion audit trail endpoints."""
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime, timezone
from app.api.deps import get_current_active_user, get_db
from app.models.user import User
from app.models.ai_suggestion import AISuggestion
from app.schemas.ai_suggestion import (
AISuggestionCreate,
AISuggestionResponse,
AISuggestionResolve,
)
router = APIRouter(prefix="/ai/suggestions", tags=["ai-suggestions"])
@router.get("/tree/{tree_id}", response_model=list[AISuggestionResponse])
async def list_suggestions(
tree_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_user),
):
"""List all suggestions for a flow, filtered to current user."""
result = await db.execute(
select(AISuggestion)
.where(AISuggestion.tree_id == tree_id, AISuggestion.user_id == current_user.id)
.order_by(AISuggestion.created_at.desc())
)
return result.scalars().all()
@router.post("", response_model=AISuggestionResponse, status_code=201)
async def create_suggestion(
data: AISuggestionCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_user),
):
"""Record a new AI suggestion."""
suggestion = AISuggestion(
tree_id=data.tree_id,
user_id=current_user.id,
session_id=data.session_id,
action_type=data.action_type,
target_node_id=data.target_node_id,
changes_json=data.changes_json,
)
db.add(suggestion)
await db.commit()
await db.refresh(suggestion)
return suggestion
@router.patch("/{suggestion_id}", response_model=AISuggestionResponse)
async def resolve_suggestion(
suggestion_id: UUID,
data: AISuggestionResolve,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_user),
):
"""Accept or dismiss a suggestion."""
result = await db.execute(
select(AISuggestion).where(
AISuggestion.id == suggestion_id,
AISuggestion.user_id == current_user.id,
)
)
suggestion = result.scalar_one_or_none()
if not suggestion:
raise HTTPException(status_code=404, detail="Suggestion not found")
suggestion.status = data.status
suggestion.resolved_at = datetime.now(timezone.utc)
await db.commit()
await db.refresh(suggestion)
return suggestion

View File

@@ -0,0 +1,281 @@
"""Flow export/import endpoints (.rfflow files)."""
import logging
import re
from datetime import datetime, timezone
from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from fastapi.responses import Response
from sqlalchemy import select, or_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.api.deps import get_current_active_user, require_engineer_or_admin
from app.core.audit import log_audit
from app.core.database import get_db
from app.core.permissions import can_access_tree
from app.core.subscriptions import check_tree_limit
from app.core.tree_validation import can_publish_tree
from app.models.category import TreeCategory
from app.models.tag import TreeTag, tree_tag_assignments
from app.models.tree import Tree
from app.models.user import User
from app.schemas.tree_export import (
FlowExportCategory,
FlowExportData,
FlowExportEnvelope,
FlowImportRequest,
FlowImportResponse,
)
from app.services.rag_service import index_tree as rag_index_tree
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/trees", tags=["tree-transfer"])
def _slugify(name: str) -> str:
"""Create a filename-safe slug from a name."""
slug = re.sub(r'[^\w\s-]', '', name.lower().strip())
return re.sub(r'[-\s]+', '-', slug)
# --- Export ---
@router.get("/{tree_id}/export")
async def export_tree(
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
):
"""Export a tree as a downloadable .rfflow JSON file."""
# Load tree with relationships + author name
result = await db.execute(
select(Tree)
.options(
selectinload(Tree.category_rel),
selectinload(Tree.tags),
selectinload(Tree.author),
)
.where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(status_code=404, detail="Tree not found")
if not tree.is_active or not can_access_tree(current_user, tree):
raise HTTPException(status_code=403, detail="You don't have access to this tree")
# Build export category
export_category = None
if tree.category_rel:
export_category = FlowExportCategory(
name=tree.category_rel.name,
slug=tree.category_rel.slug,
)
# Build export data
author_name = None
if tree.author:
author_name = tree.author.name or tree.author.email
flow_data = FlowExportData(
name=tree.name,
description=tree.description,
tree_type=tree.tree_type,
version=tree.version,
author_name=author_name,
category=export_category,
tags=tree.tag_names,
tree_structure=tree.tree_structure,
intake_form=tree.intake_form,
)
envelope = FlowExportEnvelope(
rfflow_version="1.0",
exported_at=datetime.now(timezone.utc),
source_app="ResolutionFlow",
flow=flow_data,
)
slug = _slugify(tree.name)
# Audit log
await log_audit(db, current_user.id, "tree.export", "tree", tree.id)
await db.commit()
content = envelope.model_dump_json(indent=2)
return Response(
content=content,
media_type="application/json",
headers={"Content-Disposition": f'attachment; filename="{slug}.rfflow"'},
)
# --- Import ---
@router.post("/import", response_model=FlowImportResponse, status_code=status.HTTP_201_CREATED)
async def import_tree(
data: FlowImportRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_engineer_or_admin)],
name_override: Optional[str] = Query(None, max_length=255),
):
"""Import a flow from a parsed .rfflow file. Creates as draft."""
# Validate version
if data.rfflow_version != "1.0":
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Unsupported rfflow version: {data.rfflow_version}. Only '1.0' is supported.",
)
flow = data.flow
# Check subscription tree limit
if current_user.account_id:
can_create, limit, count = await check_tree_limit(current_user.account_id, db)
if not can_create:
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail=f"Tree limit reached ({count}/{limit}). Upgrade your plan to create more trees.",
)
# --- Category resolution ---
category_id = None
category_created = False
if flow.category:
# Try to match by slug within user's account
cat_result = await db.execute(
select(TreeCategory).where(
TreeCategory.slug == flow.category.slug,
or_(
TreeCategory.account_id.is_(None),
TreeCategory.account_id == current_user.account_id,
),
)
)
category = cat_result.scalar_one_or_none()
if category:
category_id = category.id
else:
# Create new category
new_cat = TreeCategory(
name=flow.category.name,
slug=flow.category.slug,
account_id=current_user.account_id,
)
db.add(new_cat)
await db.flush()
category_id = new_cat.id
category_created = True
# --- Tag resolution ---
tags_created: list[str] = []
tags_to_add: list[TreeTag] = []
tree_account_id = current_user.account_id
for tag_name in flow.tags:
slug = TreeTag.slugify(tag_name)
tag_result = await db.execute(
select(TreeTag).where(
TreeTag.slug == slug,
or_(
TreeTag.account_id.is_(None),
TreeTag.account_id == tree_account_id,
),
)
)
tag = tag_result.scalar_one_or_none()
if not tag:
tag = TreeTag(
name=tag_name,
slug=slug,
account_id=tree_account_id,
created_by=current_user.id,
)
db.add(tag)
await db.flush()
tags_created.append(tag_name)
tags_to_add.append(tag)
tag.usage_count += 1
# --- Validation warnings (non-blocking since status=draft) ---
warnings: list[str] = []
intake_form_dicts = flow.intake_form
can_pub, validation_errors = can_publish_tree(
flow.tree_structure,
flow.name,
flow.description,
tree_type=flow.tree_type,
intake_form=intake_form_dicts,
)
if not can_pub:
for err in validation_errors:
msg = err.get("message", str(err)) if isinstance(err, dict) else str(err)
warnings.append(msg)
# --- Create tree ---
tree_name = name_override or flow.name
import_metadata = {
"original_author_name": flow.author_name,
"exported_at": data.exported_at.isoformat(),
"imported_at": datetime.now(timezone.utc).isoformat(),
"source_app": data.source_app,
}
new_tree = Tree(
name=tree_name,
description=flow.description,
tree_type=flow.tree_type,
tree_structure=flow.tree_structure,
intake_form=intake_form_dicts,
category_id=category_id,
author_id=current_user.id,
account_id=current_user.account_id,
status="draft",
version=1,
import_metadata=import_metadata,
)
db.add(new_tree)
await db.flush()
# Tag junction table inserts
for tag in tags_to_add:
await db.execute(
tree_tag_assignments.insert().values(
tree_id=new_tree.id,
tag_id=tag.id,
assigned_by=current_user.id,
)
)
# Audit log
await log_audit(db, current_user.id, "tree.import", "tree", new_tree.id, {
"source_app": data.source_app,
"original_author": flow.author_name,
})
await db.commit()
# RAG index (best-effort)
try:
await rag_index_tree(new_tree.id, db)
await db.commit()
except Exception:
logger.warning("RAG indexing failed for imported tree %s", new_tree.id)
return FlowImportResponse(
tree_id=str(new_tree.id),
name=tree_name,
tree_type=flow.tree_type,
status="draft",
category_created=category_created,
tags_created=tags_created,
validation_warnings=warnings,
)

View File

@@ -116,7 +116,8 @@ def build_full_tree_response(tree: Tree, parent_tree: Tree = None) -> TreeRespon
version=tree.version,
usage_count=tree.usage_count,
created_at=tree.created_at,
updated_at=tree.updated_at
updated_at=tree.updated_at,
import_metadata=tree.import_metadata
)

View File

@@ -12,6 +12,8 @@ from app.api.endpoints import copilot
from app.api.endpoints import assistant_chat
from app.api.endpoints import survey
from app.api.endpoints import admin_survey
from app.api.endpoints import tree_transfer
from app.api.endpoints import ai_suggestions
api_router = APIRouter()
@@ -48,3 +50,5 @@ api_router.include_router(copilot.router)
api_router.include_router(assistant_chat.router)
api_router.include_router(survey.router)
api_router.include_router(admin_survey.router)
api_router.include_router(tree_transfer.router)
api_router.include_router(ai_suggestions.router)

View File

@@ -15,7 +15,7 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.ai_provider import get_ai_provider
from app.core.ai_tree_validator import validate_generated_tree
from app.core.ai_tree_validator import validate_generated_tree, validate_generated_procedural_steps
from app.core.config import settings
from app.models.ai_chat_session import AIChatSession
@@ -44,7 +44,7 @@ CRITICAL BEHAVIORS:
- Include expected outcomes for every action: what does success look like?
- Surface edge cases proactively: "What about multi-forest environments?" or "Does this change if they have conditional access policies?"
- Explain WHY the diagnostic order matters: "We check connectivity before auth because a network issue masquerades as an auth failure."
- Ask ONE focused question at a time. Do not overwhelm with multiple questions.
- Ask ONE focused question at a time. NEVER ask multiple questions in a single response — no numbered lists of questions, no "also, what about X?", no follow-up questions tacked on. One question, then wait for the answer.
- Use plain, collegial language. Sound like a colleague, not a form."""
SCHEMA_CONTEXT = """
@@ -140,18 +140,139 @@ IMPORTANT:
"""
PROCEDURAL_SCHEMA_CONTEXT = """
PROCEDURAL STEP SCHEMA — This is what you are building:
The flow is an ordered array of steps in a JSON object: {"steps": [...]}
Each step has a "type" field:
1. procedure_step — A concrete step the engineer performs
Required: id (string), type ("procedure_step"), title (string), description (string)
Optional:
- content_type ("action"|"informational"|"verification"|"warning") — default "action"
- estimated_minutes (number)
- commands (array of objects: {code: string, label?: string, language?: string}) — exact CLI/PowerShell syntax
- expected_outcome (string) — what success looks like
- verification_prompt (string) — question to confirm completion
- verification_type ("checkbox"|"text_input") — how the engineer confirms
- warning_text (string) — caution or prerequisite info
- notes_enabled (boolean) — allow engineer to capture notes on this step
- reference_url (string) — link to documentation
2. section_header — Groups steps into logical phases
Required: id (string), type ("section_header"), title (string)
Section headers apply to all subsequent steps until the next section_header.
3. procedure_end — Terminal marker (always the last step)
Required: id (string), type ("procedure_end"), title (string)
STRUCTURAL RULES:
- Steps are executed in array order (flat list, no branching)
- All IDs must be unique descriptive slugs (e.g., "check-dns-resolution", not UUIDs)
- The last step MUST be type "procedure_end"
- Use section_headers to organize steps into logical phases
- Commands are arrays of objects: [{"code": "Get-Service ADSync", "label": "Check sync service", "language": "powershell"}]
- Descriptions support [VAR:variable_name] interpolation for intake form variables (e.g., "Connect to [VAR:server_name] via RDP")
VARIABLE INTERPOLATION:
When the procedure needs per-execution input (server name, IP address, client name, etc.), use [VAR:variable_name] syntax in descriptions and commands. These map to intake form fields that the engineer fills in before starting.
"""
PROCEDURAL_INTERVIEW_PROTOCOL = """
INTERVIEW PHASES — Follow this progression:
PHASE 1 - SCOPING (current_phase: scoping):
Understand the process being documented:
- What process or procedure is this flow for?
- Who will execute it? (Tier 1 help desk, Tier 2, senior engineers?)
- What environment context? (Specific vendor, on-prem vs cloud, tools available?)
- Will this need per-execution input? (server name, client info, IP addresses → intake form fields)
Demonstrate domain expertise: if the user says "Exchange Online mailbox migration," show understanding: "Are we covering full tenant-to-tenant migration, on-prem to Exchange Online cutover, or individual mailbox moves with hybrid?"
DO NOT emit [STEPS_UPDATE] during scoping. You are still understanding the process.
PHASE 2 - DISCOVERY (current_phase: discovery):
Build the procedure step by step IN ORDER:
- Start with prerequisites and initial verification
- Walk through each step sequentially — ask what happens first, then next, then next
- Suggest section headers to organize logical phases (e.g., "Pre-Flight Checks", "Migration", "Verification")
- Capture specific commands, tools, and expected outcomes for each step
- Identify where [VAR:variable_name] placeholders are needed
EMIT [STEPS_UPDATE] when you and the user have agreed on concrete steps. Build progressively — emit partial step lists as you go.
PHASE 3 - ENRICHMENT (current_phase: enrichment):
Circle back to enrich existing steps:
- Add exact PowerShell/CLI commands with full syntax
- Add verification prompts for critical steps
- Add warning_text for steps with risk (data loss, downtime, etc.)
- Add estimated_minutes for time-critical procedures
- Add expected_outcome for action steps
- Suggest reference_url links to documentation
- Identify missing edge cases or safety checks
EMIT [STEPS_UPDATE] when enriching steps with additional detail.
PHASE 4 - REVIEW (current_phase: review):
Present a summary:
- Total step count by content_type
- Outline of sections and steps
- List of intake form variables ([VAR:...]) used
- Flag any steps missing commands or verification
- Offer chance to reorder, add, or remove steps
EMIT [STEPS_UPDATE] only if the user requests changes.
TRANSITION between phases by emitting [PHASE:phase_name] when the conversation naturally moves to the next stage. You decide when enough information has been gathered for each phase.
"""
PROCEDURAL_RESPONSE_FORMAT = """
RESPONSE FORMAT:
Your response is natural conversational text. When the step structure changes, include structured markers that will be parsed by the system (the user will NOT see these markers):
1. Steps update (only when structure changes — see phase rules above):
[STEPS_UPDATE]
{"steps": [...valid steps array...]}
[/STEPS_UPDATE]
2. Phase transition (when moving to next phase):
[PHASE:discovery]
3. Metadata capture (when you learn the flow's name, description, or tags):
[METADATA]
{"name": "...", "description": "...", "tags": ["..."]}
[/METADATA]
4. Intake form suggestion (when intake form fields are identified):
[INTAKE_FORM]
[{"variable_name": "server_name", "label": "Server Name", "field_type": "text", "required": true, "placeholder": "e.g., DC01", "group_name": "Server Details", "display_order": 1}]
[/INTAKE_FORM]
IMPORTANT:
- Include [STEPS_UPDATE] sparingly. Only when concrete steps are established or modified.
- The steps update should be the COMPLETE working step list, not a diff.
- Always include conversational text OUTSIDE the markers — never respond with only markers.
- The procedure_end step is always included as the last step.
"""
def _build_system_prompt(flow_type: str) -> str:
"""Assemble the full system prompt for the chat builder."""
flow_context = (
"The user wants to build a TROUBLESHOOTING flow — a diagnostic decision tree "
"that guides engineers through symptom identification, diagnostic checks, and "
"resolution steps."
if flow_type == "troubleshooting"
else "The user wants to build a PROCEDURAL flow — a step-by-step process guide "
"with phases, checklists, and verification steps."
)
return f"{ROLE_PERSONA}\n\n{flow_context}\n\n{SCHEMA_CONTEXT}\n\n{INTERVIEW_PROTOCOL}\n\n{RESPONSE_FORMAT}"
if flow_type in ("procedural", "maintenance"):
flow_context = (
"The user wants to build a PROCEDURAL flow — a step-by-step process guide "
"with ordered phases, verification checkpoints, and optional intake form variables. "
"This is NOT a branching decision tree — it is a flat, sequential procedure."
)
return (
f"{ROLE_PERSONA}\n\n{flow_context}\n\n"
f"{PROCEDURAL_SCHEMA_CONTEXT}\n\n{PROCEDURAL_INTERVIEW_PROTOCOL}\n\n{PROCEDURAL_RESPONSE_FORMAT}"
)
else:
flow_context = (
"The user wants to build a TROUBLESHOOTING flow — a diagnostic decision tree "
"that guides engineers through symptom identification, diagnostic checks, and "
"resolution steps."
)
return f"{ROLE_PERSONA}\n\n{flow_context}\n\n{SCHEMA_CONTEXT}\n\n{INTERVIEW_PROTOCOL}\n\n{RESPONSE_FORMAT}"
def _strip_markdown_fences(text: str) -> str:
@@ -163,6 +284,92 @@ def _strip_markdown_fences(text: str) -> str:
return text
def _parse_delta(response: str) -> dict | None:
"""Extract [DELTA]...[/DELTA] JSON from AI response."""
match = re.search(r'\[DELTA\](.*?)\[/DELTA\]', response, re.DOTALL)
if not match:
return None
raw = _strip_markdown_fences(match.group(1).strip())
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
def _find_node_by_id(tree: dict, node_id: str) -> dict | None:
"""Find a node by ID in a tree structure (recursive)."""
if tree.get("id") == node_id:
return tree
for child in tree.get("children", []):
found = _find_node_by_id(child, node_id)
if found:
return found
for step in tree.get("steps", []):
if step.get("id") == node_id:
return step
return None
def _build_action_prompt(
action_type: str,
focal_node_id: str | None,
tree_structure: dict,
flow_type: str,
) -> str:
"""Build action-specific system prompt supplement."""
tree_json = json.dumps(tree_structure, indent=2)
focal_context = ""
if focal_node_id:
focal_node = _find_node_by_id(tree_structure, focal_node_id)
if focal_node:
focal_context = f"\n\nFOCAL NODE (the node being acted on):\n{json.dumps(focal_node, indent=2)}"
prompts = {
"generate_branch": (
f"Generate a complete branch of child nodes for the focal node. "
f"Return the new nodes wrapped in [DELTA]...[/DELTA] markers as JSON with "
f"action='add', target_node_id='{focal_node_id}', and nodes array."
f"{focal_context}"
),
"modify_node": (
f"Modify the focal node based on the user's instruction. "
f"Return the updated node in [DELTA]...[/DELTA] markers with action='modify'."
f"{focal_context}"
),
"add_steps": (
f"Generate new procedural steps to insert after the focal step. "
f"Return them in [DELTA]...[/DELTA] markers with action='add'."
f"{focal_context}"
),
"quick_action": (
f"Respond to the user's quick action request about the focal node. "
f"If the action modifies the node, return changes in [DELTA]...[/DELTA] markers. "
f"If it's informational (e.g. explain), just respond in text."
f"{focal_context}"
),
"open_chat": (
"Have a helpful conversation about the flow. If the user asks for changes, "
"return them in [DELTA]...[/DELTA] markers. Otherwise respond in text."
),
"generate_full": (
"Generate a complete flow structure based on the user's description."
),
"variable_inference": (
"Analyze the procedural steps for implicit variables. Look for references to "
"specific servers, clients, credentials, or other values that should be captured "
"in an intake form. Return suggestions as JSON."
),
}
action_prompt = prompts.get(action_type, prompts["open_chat"])
return (
f"CURRENT FLOW STRUCTURE ({flow_type}):\n{tree_json}\n\n"
f"ACTION: {action_type}\n{action_prompt}"
)
def _parse_ai_response(raw_response: str) -> dict[str, Any]:
"""Parse structured markers from AI response.
@@ -177,6 +384,7 @@ def _parse_ai_response(raw_response: str) -> dict[str, Any]:
"tree_update": None,
"phase": None,
"metadata": None,
"intake_form": None,
}
# Extract [TREE_UPDATE]...[/TREE_UPDATE]
@@ -198,6 +406,40 @@ def _parse_ai_response(raw_response: str) -> dict[str, Any]:
logger.warning("Truncated [TREE_UPDATE] block detected (no closing tag) — stripping from display")
result["content"] = raw_response[: truncated_match.start()]
# Extract [STEPS_UPDATE]...[/STEPS_UPDATE] (procedural flows)
steps_match = re.search(
r"\[STEPS_UPDATE\]\s*([\s\S]*?)\s*\[/STEPS_UPDATE\]", result["content"]
)
if steps_match:
try:
raw_json = _strip_markdown_fences(steps_match.group(1))
result["tree_update"] = json.loads(raw_json)
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Failed to parse steps update JSON: %s", e)
result["content"] = result["content"][: steps_match.start()] + result["content"][steps_match.end() :]
else:
truncated_steps = re.search(r"\[STEPS_UPDATE\][\s\S]*$", result["content"])
if truncated_steps:
logger.warning("Truncated [STEPS_UPDATE] block detected (no closing tag) — stripping from display")
result["content"] = result["content"][: truncated_steps.start()]
# Extract [INTAKE_FORM]...[/INTAKE_FORM] (procedural flows)
intake_match = re.search(
r"\[INTAKE_FORM\]\s*([\s\S]*?)\s*\[/INTAKE_FORM\]", result["content"]
)
if intake_match:
try:
raw_json = _strip_markdown_fences(intake_match.group(1))
result["intake_form"] = json.loads(raw_json)
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Failed to parse intake form JSON: %s", e)
result["content"] = result["content"][: intake_match.start()] + result["content"][intake_match.end() :]
else:
truncated_intake = re.search(r"\[INTAKE_FORM\][\s\S]*$", result["content"])
if truncated_intake:
logger.warning("Truncated [INTAKE_FORM] block detected — stripping from display")
result["content"] = result["content"][: truncated_intake.start()]
# Extract [PHASE:name]
phase_match = re.search(r"\[PHASE:(\w+)\]", result["content"])
if phase_match:
@@ -235,6 +477,7 @@ async def start_chat_session(
user_id: uuid.UUID,
account_id: uuid.UUID,
db: AsyncSession,
tree_id: str | None = None,
) -> tuple[AIChatSession, str]:
"""Create a chat session and return the AI's opening greeting.
@@ -244,6 +487,7 @@ async def start_chat_session(
user_id=user_id,
account_id=account_id,
flow_type=flow_type,
tree_id=uuid.UUID(tree_id) if tree_id else None,
expires_at=datetime.now(timezone.utc) + timedelta(hours=settings.AI_CONVERSATION_TTL_HOURS),
)
db.add(session)
@@ -287,13 +531,35 @@ async def send_message(
session: AIChatSession,
user_message: str,
db: AsyncSession,
action_type: str = "open_chat",
focal_node_id: str | None = None,
flow_context: dict | None = None,
) -> tuple[str, Optional[dict], Optional[str], Optional[dict]]:
"""Send a user message and get AI response.
Args:
flow_context: Live flow structure from the editor. Contains the current
tree_structure (troubleshooting) or steps + intake_form (procedural).
This gives the AI full awareness of the flow being edited.
Returns (ai_content, working_tree_update, new_phase, metadata_update).
"""
system_prompt = _build_system_prompt(session.flow_type)
# Inject live flow context so the AI can see current editor state
if flow_context:
context_json = json.dumps(flow_context, indent=2)
system_prompt += (
f"\n\nCURRENT FLOW STATE (live from editor):\n{context_json}"
)
if focal_node_id:
focal_node = _find_node_by_id(flow_context, focal_node_id)
if focal_node:
system_prompt += (
f"\n\nFOCAL NODE/STEP (the item being acted on):\n"
f"{json.dumps(focal_node, indent=2)}"
)
# Build messages array from conversation history
now_iso = datetime.now(timezone.utc).isoformat()
history = list(session.conversation_history)
@@ -305,7 +571,9 @@ async def send_message(
for msg in history
]
provider = get_ai_provider()
# Resolve model for this action type
action_model = settings.get_model_for_action(action_type)
provider = get_ai_provider(model=action_model)
response_text, input_tokens, output_tokens = await provider.generate_text(
system_prompt=system_prompt,
messages=provider_messages,
@@ -318,12 +586,19 @@ async def send_message(
# only require valid root structure, not min node counts)
tree_update = parsed["tree_update"]
if tree_update:
if not isinstance(tree_update, dict) or tree_update.get("type") != "decision":
logger.warning("AI tree update rejected: root must be a decision node")
tree_update = None
elif not tree_update.get("id"):
logger.warning("AI tree update rejected: root node missing id")
tree_update = None
if session.flow_type in ("procedural", "maintenance"):
# Procedural: must be a dict with a "steps" list
if not isinstance(tree_update, dict) or not isinstance(tree_update.get("steps"), list):
logger.warning("AI steps update rejected: must be a dict with a 'steps' list")
tree_update = None
else:
# Troubleshooting: root must be a decision node
if not isinstance(tree_update, dict) or tree_update.get("type") != "decision":
logger.warning("AI tree update rejected: root must be a decision node")
tree_update = None
elif not tree_update.get("id"):
logger.warning("AI tree update rejected: root node missing id")
tree_update = None
# Update session state
history.append({"role": "assistant", "content": parsed["content"], "timestamp": now_iso})
@@ -345,6 +620,11 @@ async def send_message(
merged.update(parsed["metadata"])
session.tree_metadata = merged
if parsed.get("intake_form"):
merged = dict(session.tree_metadata)
merged["intake_form"] = parsed["intake_form"]
session.tree_metadata = merged
session.updated_at = datetime.now(timezone.utc)
return parsed["content"], tree_update, parsed["phase"], parsed["metadata"]
@@ -367,7 +647,33 @@ async def generate_final_tree(
for msg in session.conversation_history
]
generation_instruction = """Based on our entire conversation, generate the COMPLETE and FINAL TreeStructure JSON for this flow.
if session.flow_type in ("procedural", "maintenance"):
generation_instruction = """Based on our entire conversation, generate the COMPLETE and FINAL procedural steps JSON for this flow.
Requirements:
- Output format: {"steps": [...]} — a JSON object with a "steps" array
- Include ALL steps, section headers, and details we discussed
- Use descriptive step IDs (slugs, not UUIDs)
- Steps are in execution order (flat list, no branching)
- Use section_header steps to organize into logical phases
- Every procedure_step should have commands with exact syntax where discussed
- Every procedure_step should have expected_outcome and verification_prompt where discussed
- Include content_type, estimated_minutes, warning_text, and reference_url where discussed
- Use [VAR:variable_name] syntax in descriptions/commands for intake form variables
- The LAST step MUST be type "procedure_end"
- Respond with ONLY the JSON — no conversational text, no markdown fences
Also provide metadata as a separate JSON object after the steps:
[METADATA]
{"name": "...", "description": "...", "tags": ["..."]}
[/METADATA]
If we discussed intake form fields, also include:
[INTAKE_FORM]
[{"variable_name": "server_name", "label": "Server Name", "field_type": "text", "required": true, "placeholder": "e.g., DC01", "group_name": "Server Details", "display_order": 1}]
[/INTAKE_FORM]"""
else:
generation_instruction = """Based on our entire conversation, generate the COMPLETE and FINAL TreeStructure JSON for this flow.
Requirements:
- Include ALL branches, steps, and solutions we discussed
@@ -386,7 +692,7 @@ Also provide metadata as a separate JSON object after the tree:
provider_messages.append({"role": "user", "content": generation_instruction})
provider = get_ai_provider()
provider = get_ai_provider(model=settings.get_model_for_action("generate_full"))
for attempt in range(2): # One try + one retry
response_text, input_tokens, output_tokens = await provider.generate_text(
@@ -421,21 +727,30 @@ Also provide metadata as a separate JSON object after the tree:
continue
raise ValueError("AI failed to produce valid JSON after retry")
errors = validate_generated_tree(tree)
if errors:
if session.flow_type in ("procedural", "maintenance"):
val_errors = validate_generated_procedural_steps(tree)
else:
val_errors = validate_generated_tree(tree)
if val_errors:
if attempt == 0:
provider_messages.append({"role": "assistant", "content": response_text})
correction = (
f"The tree has validation errors: {'; '.join(errors)}. "
f"The generated structure has validation errors: {'; '.join(val_errors)}. "
"Please fix these issues and respond with the corrected JSON only."
)
provider_messages.append({"role": "user", "content": correction})
continue
raise ValueError(f"Generated tree failed validation: {'; '.join(errors)}")
raise ValueError(f"Generated structure failed validation: {'; '.join(val_errors)}")
# Success
session.working_tree = tree
session.tree_metadata = metadata
if parsed.get("intake_form"):
merged = dict(session.tree_metadata)
merged["intake_form"] = parsed["intake_form"]
session.tree_metadata = merged
metadata = session.tree_metadata
session.current_phase = "generation"
session.updated_at = datetime.now(timezone.utc)

View File

@@ -184,6 +184,7 @@ class AnthropicProvider(AIProvider):
client = anthropic.AsyncAnthropic(
api_key=self._api_key,
timeout=self._timeout,
max_retries=1,
)
response = await client.messages.create(
@@ -209,9 +210,13 @@ class AnthropicProvider(AIProvider):
return await self.generate_json(system_prompt, messages, max_tokens)
def get_ai_provider() -> AIProvider:
def get_ai_provider(model: str | None = None) -> AIProvider:
"""Factory that returns the configured AI provider.
Args:
model: Optional model override (Anthropic model ID). Only applied to
AnthropicProvider; Gemini always uses settings.AI_MODEL_GEMINI.
Selection logic:
1. If AI_PROVIDER == "gemini" and GOOGLE_AI_API_KEY is set -> GeminiProvider
2. If AI_PROVIDER == "anthropic" and ANTHROPIC_API_KEY is set -> AnthropicProvider
@@ -230,7 +235,7 @@ def get_ai_provider() -> AIProvider:
if settings.ANTHROPIC_API_KEY:
return AnthropicProvider(
api_key=settings.ANTHROPIC_API_KEY,
model=settings.AI_MODEL_ANTHROPIC,
model=model or settings.AI_MODEL_ANTHROPIC,
timeout=settings.AI_REQUEST_TIMEOUT_SECONDS,
)
@@ -238,7 +243,7 @@ def get_ai_provider() -> AIProvider:
if settings.ANTHROPIC_API_KEY:
return AnthropicProvider(
api_key=settings.ANTHROPIC_API_KEY,
model=settings.AI_MODEL_ANTHROPIC,
model=model or settings.AI_MODEL_ANTHROPIC,
timeout=settings.AI_REQUEST_TIMEOUT_SECONDS,
)
# Fallback to Gemini

View File

@@ -230,3 +230,96 @@ def count_tree_stats(tree: dict[str, Any]) -> dict[str, int]:
_count(tree, 1)
return stats
# --- Procedural flow validation ---
VALID_PROCEDURAL_STEP_TYPES = {"procedure_step", "procedure_end", "section_header"}
VALID_CONTENT_TYPES = {"action", "informational", "verification", "warning"}
def validate_generated_procedural_steps(tree: dict[str, Any]) -> list[str]:
"""Validate an AI-generated procedural step array.
Expects a dict with a 'steps' key containing a list of step objects.
Returns a list of error strings. Empty list means valid.
"""
errors: list[str] = []
if not isinstance(tree, dict):
return ["Procedural flow must be a JSON object"]
steps = tree.get("steps")
if not isinstance(steps, list) or len(steps) == 0:
return ["Procedural flow must have a non-empty 'steps' array"]
if len(steps) > 100:
errors.append(
f"Procedural flow has {len(steps)} steps. Maximum 100 allowed."
)
all_ids: set[str] = set()
procedure_step_count = 0
procedure_end_count = 0
for i, step in enumerate(steps):
if not isinstance(step, dict):
errors.append(f"Step at index {i} is not an object")
continue
# Check required fields
step_id = step.get("id")
step_type = step.get("type")
step_title = step.get("title")
if not step_id or not isinstance(step_id, str):
errors.append(f"Step at index {i} missing or invalid 'id' (must be a string)")
elif step_id in all_ids:
errors.append(f"Duplicate step ID: '{step_id}'")
else:
all_ids.add(step_id)
if not step_type or step_type not in VALID_PROCEDURAL_STEP_TYPES:
errors.append(
f"Step '{step_id or f'index {i}'}' has invalid type '{step_type}'. "
f"Must be one of: {', '.join(sorted(VALID_PROCEDURAL_STEP_TYPES))}"
)
else:
if step_type == "procedure_step":
procedure_step_count += 1
elif step_type == "procedure_end":
procedure_end_count += 1
if not step_title or not isinstance(step_title, str):
errors.append(f"Step '{step_id or f'index {i}'}' missing or invalid 'title' (must be a string)")
# Validate content_type if present
content_type = step.get("content_type")
if content_type is not None and content_type not in VALID_CONTENT_TYPES:
errors.append(
f"Step '{step_id or f'index {i}'}' has invalid content_type '{content_type}'. "
f"Must be one of: {', '.join(sorted(VALID_CONTENT_TYPES))}"
)
# Must have exactly one procedure_end as the last step
if procedure_end_count == 0:
errors.append("Procedural flow must have exactly one 'procedure_end' step")
elif procedure_end_count > 1:
errors.append(
f"Procedural flow has {procedure_end_count} 'procedure_end' steps. "
"Must have exactly one."
)
else:
# Exactly one — check it's the last step
last_step = steps[-1]
if isinstance(last_step, dict) and last_step.get("type") != "procedure_end":
errors.append("The 'procedure_end' step must be the last step in the array")
# Need at least 2 procedure_step items
if procedure_step_count < 2:
errors.append(
f"Procedural flow has only {procedure_step_count} 'procedure_step' items. "
"Need at least 2 for a useful procedure."
)
return errors

View File

@@ -74,15 +74,36 @@ class Settings(BaseSettings):
# AI Flow Builder
ANTHROPIC_API_KEY: Optional[str] = None
AI_MODEL: str = "claude-haiku-4-5-20251001"
AI_MODEL: str = "claude-sonnet-4-6"
AI_CONVERSATION_TTL_HOURS: int = 24
AI_MAX_CALLS_PER_FLOW: int = 10
AI_REQUEST_TIMEOUT_SECONDS: int = 45
AI_REQUEST_TIMEOUT_SECONDS: int = 120
# AI Provider selection
AI_PROVIDER: str = "gemini" # "gemini" or "anthropic"
AI_PROVIDER: str = "anthropic" # "gemini" or "anthropic"
GOOGLE_AI_API_KEY: Optional[str] = None
AI_MODEL_GEMINI: str = "gemini-2.5-flash"
AI_MODEL_ANTHROPIC: str = "claude-haiku-4-5-20251001"
AI_MODEL_ANTHROPIC: str = "claude-sonnet-4-6"
# Model tier routing — maps action types to model tiers
AI_MODEL_TIERS: dict[str, str] = {
"fast": "claude-haiku-4-5-20251001",
"standard": "claude-sonnet-4-6",
}
ACTION_MODEL_MAP: dict[str, str] = {
"generate_full": "standard",
"generate_branch": "standard",
"modify_node": "fast",
"add_steps": "standard",
"quick_action": "fast",
"open_chat": "standard",
"variable_inference": "fast",
}
def get_model_for_action(self, action_type: str) -> str:
"""Resolve an action type to a concrete model name via tier routing."""
tier = self.ACTION_MODEL_MAP.get(action_type, "standard")
return self.AI_MODEL_TIERS.get(tier, self.AI_MODEL_TIERS["standard"])
# MCP (Model Context Protocol) integrations
ENABLE_MCP_MICROSOFT_LEARN: bool = True

View File

@@ -22,6 +22,27 @@ setup_logging()
logger = logging.getLogger(__name__)
async def archive_stale_ai_sessions():
"""Archive AI chat sessions with no activity for 30 days."""
from app.models.ai_chat_session import AIChatSession
from sqlalchemy import update
from datetime import datetime, timezone, timedelta
cutoff = datetime.now(timezone.utc) - timedelta(days=30)
async with async_session_maker() as db:
result = await db.execute(
update(AIChatSession)
.where(
AIChatSession.updated_at < cutoff,
AIChatSession.archived_at.is_(None),
AIChatSession.status != "abandoned",
)
.values(archived_at=datetime.now(timezone.utc))
)
await db.commit()
logger.info(f"[archive] Archived {result.rowcount} stale AI chat sessions")
def _configure_seed_module(mod: object, api_url: str, email: str, password: str) -> None:
"""Set globals on a seed script module."""
mod.API_BASE_URL = api_url # type: ignore[attr-defined]
@@ -132,6 +153,15 @@ async def lifespan(app: FastAPI):
replace_existing=True,
)
# Auto-archive stale AI chat sessions (daily at 3 AM)
scheduler.add_job(
archive_stale_ai_sessions,
"cron",
hour=3,
id="archive_stale_ai_sessions",
replace_existing=True,
)
# Auto-seed trees in background on PR environments
seed_task = None
if settings.SEED_ON_DEPLOY:

View File

@@ -86,3 +86,14 @@ class AIChatSession(Base):
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
# Editor-embedded session: links to a specific tree/flow
tree_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("trees.id", ondelete="CASCADE"),
nullable=True,
index=True,
)
archived_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True),
nullable=True,
)

View File

@@ -0,0 +1,55 @@
"""AI Suggestion model for tracking AI-applied changes to flows."""
import uuid
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import DateTime, ForeignKey, String
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import Base
class AISuggestion(Base):
__tablename__ = "ai_suggestions"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
tree_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("trees.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
session_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("ai_chat_sessions.id", ondelete="SET NULL"),
nullable=True,
)
action_type: Mapped[str] = mapped_column(
String(50), nullable=False
)
target_node_id: Mapped[Optional[str]] = mapped_column(
String(255), nullable=True
)
changes_json: Mapped[dict] = mapped_column(
JSONB, nullable=False, default=dict
)
status: Mapped[str] = mapped_column(
String(20), nullable=False, default="pending"
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
nullable=False,
)
resolved_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True), nullable=True
)

View File

@@ -154,6 +154,13 @@ class Tree(Base):
comment="Fork depth: 0 = original, 1 = direct fork, 2 = fork of fork, etc."
)
# Import provenance
import_metadata: Mapped[Optional[dict[str, Any]]] = mapped_column(
JSONB,
nullable=True,
comment="Provenance metadata from .rfflow file import"
)
# Relationships
author: Mapped[Optional["User"]] = relationship("User", foreign_keys=[author_id], back_populates="trees")
team: Mapped[Optional["Team"]] = relationship("Team", back_populates="trees")

View File

@@ -14,12 +14,39 @@ class AIChatStartRequest(BaseModel):
flow_type: Literal["troubleshooting", "procedural"] = Field(
..., description="Type of flow to build"
)
tree_id: Optional[str] = Field(
default=None,
description="ID of existing tree for editor-embedded sessions",
)
VALID_ACTION_TYPES = Literal[
"generate_full",
"generate_branch",
"modify_node",
"add_steps",
"quick_action",
"open_chat",
"variable_inference",
]
class AIChatMessageRequest(BaseModel):
"""Send a user message in a chat session."""
content: str = Field(..., min_length=1, max_length=5000)
action_type: Optional[VALID_ACTION_TYPES] = Field(
default="open_chat",
description="Type of AI action to perform",
)
focal_node_id: Optional[str] = Field(
default=None,
description="ID of the node/step being acted on",
)
flow_context: Optional[dict[str, Any]] = Field(
default=None,
description="Live flow structure from the editor (tree structure, steps, intake form)",
)
class AIChatImportRequest(BaseModel):

View File

@@ -0,0 +1,33 @@
"""Schemas for AI suggestion audit trail."""
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel, Field
class AISuggestionCreate(BaseModel):
tree_id: UUID
session_id: Optional[UUID] = None
action_type: str
target_node_id: Optional[str] = None
changes_json: dict = Field(default_factory=dict)
class AISuggestionResponse(BaseModel):
id: UUID
tree_id: UUID
user_id: UUID
session_id: Optional[UUID]
action_type: str
target_node_id: Optional[str]
changes_json: dict
status: str
created_at: datetime
resolved_at: Optional[datetime]
model_config = {"from_attributes": True}
class AISuggestionResolve(BaseModel):
status: str = Field(..., pattern="^(accepted|dismissed)$")

View File

@@ -161,6 +161,7 @@ class TreeResponse(TreeBase):
created_at: datetime
updated_at: datetime
usage_count: int
import_metadata: Optional[dict[str, Any]] = None
class Config:
from_attributes = True

View File

@@ -0,0 +1,52 @@
"""Schemas for .rfflow file export and import."""
from datetime import datetime
from typing import Optional, Any
from pydantic import BaseModel, Field
from app.schemas.tree import TreeType
class FlowExportCategory(BaseModel):
"""Category info embedded in export file."""
name: str
slug: str
class FlowExportData(BaseModel):
"""The flow payload inside an .rfflow file."""
name: str
description: Optional[str] = None
tree_type: TreeType
version: int = 1
author_name: Optional[str] = None
category: Optional[FlowExportCategory] = None
tags: list[str] = []
tree_structure: dict[str, Any]
intake_form: Optional[list[dict[str, Any]]] = None
class FlowExportEnvelope(BaseModel):
"""Top-level .rfflow file structure."""
rfflow_version: str = "1.0"
exported_at: datetime
source_app: str = "ResolutionFlow"
flow: FlowExportData
class FlowImportRequest(BaseModel):
"""What the frontend sends after parsing a .rfflow file."""
rfflow_version: str = Field(..., description="Must be '1.0'")
exported_at: datetime
source_app: str = "ResolutionFlow"
flow: FlowExportData
class FlowImportResponse(BaseModel):
"""Response after importing a flow."""
tree_id: str
name: str
tree_type: str
status: str = "draft"
category_created: bool = False
tags_created: list[str] = []
validation_warnings: list[str] = []

View File

@@ -40,8 +40,8 @@ deep expertise across the MSP technology stack:
- Security: MFA, Conditional Access, EDR, backup/DR
## How to Answer
- **Be direct and actionable.** Engineers are mid-ticket — give them the answer, \
not a lecture. Lead with the fix, then explain why.
- **Be direct and actionable.** Engineers are mid-ticket — lead with the fix or next \
diagnostic step, then explain why in one sentence if helpful. Skip background unless asked.
- **Include specifics.** Exact commands, registry paths, config values, port numbers. \
Vague advice wastes time.
- **Warn before you wreck.** If a step could cause downtime, data loss, or a lockout, \
@@ -51,6 +51,17 @@ bold for key terms. Engineers scan, they don't read essays.
- **Say when you're unsure.** If you don't know the exact answer, say so. Suggest \
where to verify (vendor docs, a specific KB article) rather than guessing.
## How to Ask Questions
- **Default to a single focused question.** Ask what you need to know right now to make progress.
- **Use contextual bullets sparingly.** If the question could be ambiguous (e.g., "what error?" \
when there are multiple common patterns), add 2-3 sub-bullets to help the engineer recognize \
what you're asking for — but keep it short.
- **Multiple questions only when blocking.** If you genuinely cannot proceed without knowing \
two things (e.g., both the error message AND which users are affected), preface it clearly: \
"Before continuing troubleshooting, I need to know: 1) [question], 2) [question]." Use this rarely.
- **Avoid interrogation mode.** Don't fire off 5 questions in a row. Get one answer, make \
progress, then ask the next question if needed.
## Using the Team's Flow Library
Your team has built troubleshooting flows in ResolutionFlow. When relevant flows \
appear in the context below, reference them by name so the engineer can launch them \

View File

@@ -0,0 +1,116 @@
"""Tests for AI delta response parsing and action-type prompt dispatch."""
from app.core.ai_chat_service import _parse_delta, _build_action_prompt, _find_node_by_id
def test_parse_delta_from_response():
"""Service extracts [DELTA] markers from AI responses."""
response = '''Here's a new branch for that node.
[DELTA]
{"action": "add", "target_node_id": "check-dns", "nodes": [{"id": "verify-dns-server", "type": "decision", "question": "Is the DNS server responding?"}], "explanation": "Added DNS verification branch"}
[/DELTA]
Let me know if you'd like to adjust this.'''
parsed = _parse_delta(response)
assert parsed is not None
assert parsed["action"] == "add"
assert parsed["target_node_id"] == "check-dns"
assert len(parsed["nodes"]) == 1
def test_parse_delta_none_when_absent():
"""Returns None when no delta marker present."""
response = "Sure, I can explain that node. It checks connectivity."
parsed = _parse_delta(response)
assert parsed is None
def test_parse_delta_with_markdown_fences():
"""Handles delta JSON wrapped in markdown code fences."""
response = '''[DELTA]
```json
{"action": "modify", "target_node_id": "node-1", "nodes": [{"id": "node-1", "type": "action", "title": "Updated"}], "explanation": "Modified title"}
```
[/DELTA]'''
parsed = _parse_delta(response)
assert parsed is not None
assert parsed["action"] == "modify"
def test_parse_delta_invalid_json():
"""Returns None for invalid JSON inside delta markers."""
response = "[DELTA]not valid json[/DELTA]"
parsed = _parse_delta(response)
assert parsed is None
def test_build_action_prompt_generate_branch():
"""Generate branch action includes focal node context."""
tree = {
"id": "root",
"type": "decision",
"question": "Is the server up?",
"children": [],
"options": [],
}
prompt = _build_action_prompt(
action_type="generate_branch",
focal_node_id="root",
tree_structure=tree,
flow_type="troubleshooting",
)
assert "root" in prompt
assert "generate" in prompt.lower() or "branch" in prompt.lower()
def test_build_action_prompt_open_chat():
"""Open chat action is general conversation."""
prompt = _build_action_prompt(
action_type="open_chat",
focal_node_id=None,
tree_structure={"id": "root", "type": "decision"},
flow_type="troubleshooting",
)
assert isinstance(prompt, str)
assert len(prompt) > 0
def test_find_node_by_id_root():
"""Finds root node."""
tree = {"id": "root", "type": "decision", "children": []}
assert _find_node_by_id(tree, "root") is not None
def test_find_node_by_id_nested():
"""Finds nested child node."""
tree = {
"id": "root",
"type": "decision",
"children": [
{"id": "child-1", "type": "action", "children": []},
{"id": "child-2", "type": "solution", "children": []},
],
}
found = _find_node_by_id(tree, "child-2")
assert found is not None
assert found["id"] == "child-2"
def test_find_node_by_id_not_found():
"""Returns None for non-existent node."""
tree = {"id": "root", "type": "decision", "children": []}
assert _find_node_by_id(tree, "nonexistent") is None
def test_find_node_by_id_in_steps():
"""Finds node in procedural steps array."""
tree = {
"steps": [
{"id": "step-1", "type": "procedure_step"},
{"id": "step-2", "type": "procedure_step"},
]
}
found = _find_node_by_id(tree, "step-2")
assert found is not None
assert found["id"] == "step-2"

View File

@@ -0,0 +1,41 @@
"""Tests for AI suggestion endpoints."""
import pytest
@pytest.mark.asyncio
async def test_create_and_list_suggestions(client, auth_headers, test_tree):
"""Can create and list suggestions for a tree."""
tree_id = test_tree["id"]
# Create suggestion
resp = await client.post(
"/api/v1/ai/suggestions",
json={
"tree_id": tree_id,
"action_type": "generate_branch",
"target_node_id": "some-node",
"changes_json": {"before": {}, "after": {"id": "new-node"}},
},
headers=auth_headers,
)
assert resp.status_code == 201
suggestion_id = resp.json()["id"]
assert resp.json()["status"] == "pending"
# List suggestions
resp = await client.get(
f"/api/v1/ai/suggestions/tree/{tree_id}",
headers=auth_headers,
)
assert resp.status_code == 200
assert len(resp.json()) >= 1
# Resolve suggestion
resp = await client.patch(
f"/api/v1/ai/suggestions/{suggestion_id}",
json={"status": "accepted"},
headers=auth_headers,
)
assert resp.status_code == 200
assert resp.json()["status"] == "accepted"
assert resp.json()["resolved_at"] is not None

View File

@@ -0,0 +1,24 @@
"""Tests for AI model tier configuration."""
from app.core.config import settings
def test_ai_model_tiers_exist():
assert "fast" in settings.AI_MODEL_TIERS
assert "standard" in settings.AI_MODEL_TIERS
def test_action_model_map_covers_all_actions():
valid_tiers = set(settings.AI_MODEL_TIERS.keys())
for action, tier in settings.ACTION_MODEL_MAP.items():
assert tier in valid_tiers, f"Action '{action}' maps to unknown tier '{tier}'"
def test_get_model_for_action():
model = settings.get_model_for_action("generate_full")
assert isinstance(model, str)
assert len(model) > 0
def test_get_model_for_action_unknown_falls_back():
model = settings.get_model_for_action("nonexistent_action")
assert model == settings.AI_MODEL_TIERS["standard"]

View File

@@ -0,0 +1,282 @@
"""Tests for flow export/import (.rfflow) endpoints."""
import pytest
from httpx import AsyncClient
# --- Helpers ---
TREE_DATA = {
"name": "DNS Troubleshooting",
"description": "Diagnose DNS resolution issues",
"category": "Networking",
"tree_structure": {
"id": "root",
"type": "decision",
"question": "Is DNS resolving?",
"options": [
{"id": "yes", "label": "Yes", "next_node_id": "sol1"},
{"id": "no", "label": "No", "next_node_id": "sol2"},
],
"children": [
{"id": "sol1", "type": "solution", "title": "DNS OK", "description": "DNS is working", "solution": "No action needed"},
{"id": "sol2", "type": "solution", "title": "DNS Fail", "description": "DNS is not resolving", "solution": "Check DNS server config"},
],
},
"tags": ["dns", "networking"],
}
async def create_tree_with_tags(client: AsyncClient, headers: dict, data: dict | None = None) -> dict:
"""Create a tree and return the response."""
resp = await client.post("/api/v1/trees", json=data or TREE_DATA, headers=headers)
assert resp.status_code == 201
return resp.json()
# --- Export Tests ---
@pytest.mark.asyncio
async def test_export_json_format(client, auth_headers, test_tree):
"""Export should return valid .rfflow JSON with correct structure."""
resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export",
headers=auth_headers,
)
assert resp.status_code == 200
assert "attachment" in resp.headers.get("content-disposition", "")
assert ".rfflow" in resp.headers.get("content-disposition", "")
data = resp.json()
assert data["rfflow_version"] == "1.0"
assert data["source_app"] == "ResolutionFlow"
assert data["exported_at"] is not None
flow = data["flow"]
assert flow["name"] == test_tree["name"]
assert flow["tree_structure"] is not None
assert flow["tree_type"] == "troubleshooting"
# No IDs leaked
assert "id" not in flow or flow.get("id") is None
assert "author_id" not in flow
assert "account_id" not in flow
@pytest.mark.asyncio
async def test_export_with_category_and_tags(client, auth_headers):
"""Export should include category and tag data."""
tree = await create_tree_with_tags(client, auth_headers)
resp = await client.get(
f"/api/v1/trees/{tree['id']}/export",
headers=auth_headers,
)
assert resp.status_code == 200
flow = resp.json()["flow"]
assert len(flow["tags"]) == 2
assert "dns" in flow["tags"]
assert "networking" in flow["tags"]
@pytest.mark.asyncio
async def test_export_access_control(client, auth_headers, test_admin, admin_auth_headers, test_tree):
"""Users should only export trees they can access."""
# Create a second user who can't access the tree
user2_data = {
"email": "other@example.com",
"password": "OtherPass123!",
"name": "Other User",
}
await client.post("/api/v1/auth/register", json=user2_data)
login_resp = await client.post("/api/v1/auth/login/json", json={
"email": user2_data["email"],
"password": user2_data["password"],
})
other_headers = {"Authorization": f"Bearer {login_resp.json()['access_token']}"}
resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export",
headers=other_headers,
)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_export_nonexistent_tree(client, auth_headers):
"""Export of non-existent tree returns 404."""
import uuid
resp = await client.get(
f"/api/v1/trees/{uuid.uuid4()}/export",
headers=auth_headers,
)
assert resp.status_code == 404
# --- Import Tests ---
@pytest.mark.asyncio
async def test_import_happy_path(client, auth_headers, test_tree):
"""Import should create a draft tree owned by the importing user."""
# First export
export_resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export",
headers=auth_headers,
)
rfflow_data = export_resp.json()
# Import
import_resp = await client.post(
"/api/v1/trees/import",
json=rfflow_data,
headers=auth_headers,
)
assert import_resp.status_code == 201
result = import_resp.json()
assert result["status"] == "draft"
assert result["name"] == test_tree["name"]
assert result["tree_id"] is not None
# Verify the created tree
tree_resp = await client.get(
f"/api/v1/trees/{result['tree_id']}",
headers=auth_headers,
)
assert tree_resp.status_code == 200
tree = tree_resp.json()
assert tree["status"] == "draft"
assert tree["import_metadata"] is not None
assert tree["import_metadata"]["source_app"] == "ResolutionFlow"
@pytest.mark.asyncio
async def test_import_with_name_override(client, auth_headers, test_tree):
"""Import with name_override should use the override name."""
export_resp = await client.get(
f"/api/v1/trees/{test_tree['id']}/export",
headers=auth_headers,
)
rfflow_data = export_resp.json()
import_resp = await client.post(
"/api/v1/trees/import?name_override=Custom%20Name",
json=rfflow_data,
headers=auth_headers,
)
assert import_resp.status_code == 201
assert import_resp.json()["name"] == "Custom Name"
@pytest.mark.asyncio
async def test_import_with_new_tags(client, auth_headers):
"""Import with new tags should create them automatically."""
rfflow = {
"rfflow_version": "1.0",
"exported_at": "2026-03-05T14:30:00+00:00",
"source_app": "ResolutionFlow",
"flow": {
"name": "Test Import Tags",
"description": "Testing tag creation",
"tree_type": "troubleshooting",
"version": 1,
"author_name": "Test Author",
"category": None,
"tags": ["brand-new-tag", "another-tag"],
"tree_structure": {
"id": "root",
"type": "decision",
"question": "Q?",
"options": [{"id": "a", "label": "A", "next_node_id": "s1"}],
"children": [{"id": "s1", "type": "solution", "title": "S", "description": "D", "solution": "S"}],
},
"intake_form": None,
},
}
resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
assert resp.status_code == 201
result = resp.json()
assert "brand-new-tag" in result["tags_created"]
assert "another-tag" in result["tags_created"]
@pytest.mark.asyncio
async def test_import_with_category_creation(client, auth_headers):
"""Import with a new category should create it."""
rfflow = {
"rfflow_version": "1.0",
"exported_at": "2026-03-05T14:30:00+00:00",
"source_app": "ResolutionFlow",
"flow": {
"name": "Import Category Test",
"description": None,
"tree_type": "troubleshooting",
"version": 1,
"author_name": None,
"category": {"name": "New Category", "slug": "new-category"},
"tags": [],
"tree_structure": {
"id": "root",
"type": "decision",
"question": "Q?",
"options": [{"id": "a", "label": "A", "next_node_id": "s1"}],
"children": [{"id": "s1", "type": "solution", "title": "S", "description": "D", "solution": "S"}],
},
"intake_form": None,
},
}
resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
assert resp.status_code == 201
assert resp.json()["category_created"] is True
@pytest.mark.asyncio
async def test_import_invalid_version(client, auth_headers):
"""Import with unsupported rfflow version should return 422."""
rfflow = {
"rfflow_version": "99.0",
"exported_at": "2026-03-05T14:30:00+00:00",
"source_app": "ResolutionFlow",
"flow": {
"name": "Bad Version",
"tree_type": "troubleshooting",
"version": 1,
"tags": [],
"tree_structure": {"id": "root", "type": "decision", "question": "Q?", "options": [], "children": []},
},
}
resp = await client.post("/api/v1/trees/import", json=rfflow, headers=auth_headers)
assert resp.status_code == 422
@pytest.mark.asyncio
async def test_import_round_trip(client, auth_headers):
"""Export then import should produce a tree with matching data."""
original = await create_tree_with_tags(client, auth_headers)
# Export
export_resp = await client.get(
f"/api/v1/trees/{original['id']}/export",
headers=auth_headers,
)
rfflow = export_resp.json()
# Import
import_resp = await client.post(
"/api/v1/trees/import",
json=rfflow,
headers=auth_headers,
)
assert import_resp.status_code == 201
result = import_resp.json()
# Verify imported tree matches original structure
tree_resp = await client.get(
f"/api/v1/trees/{result['tree_id']}",
headers=auth_headers,
)
imported_tree = tree_resp.json()
assert imported_tree["name"] == original["name"]
assert imported_tree["tree_structure"]["id"] == original["tree_structure"]["id"]
assert imported_tree["tree_type"] == original["tree_type"]
assert imported_tree["status"] == "draft" # Always draft on import