Files
resolutionflow/backend/app/core/ai_chat_service.py
chihlasm 0dc6123c0c feat: flow export/import + procedural Flow Assist (#96)
* feat: add flow export/import backend (migration, endpoints, schemas)

Add .rfflow file export/import support:
- Migration 050: import_metadata JSONB column on trees
- GET /trees/{id}/export?format=json|xml endpoint
- POST /trees/import endpoint (creates draft, resolves categories/tags)
- FlowExportEnvelope, FlowImportRequest/Response schemas
- import_metadata field on TreeResponse

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add flow export/import frontend + backend tests

Frontend:
- ExportFlowModal with JSON/XML format selection + download
- ImportFlowModal with drag-drop file picker + preview step
- rfflowParser for client-side JSON/XML .rfflow parsing
- Export buttons on editor toolbar and library action menus
- Import button on library page next to Create New
- Provenance display for imported flows in editor
- flowTransfer API client + types

Backend:
- Fix regex->pattern deprecation in export endpoint
- 12 integration tests covering export, import, round-trip,
  access control, tag/category creation, version validation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor: remove XML export, JSON-only for .rfflow files

- Remove XML builder, format query param, and XML tests
- Simplify ExportFlowModal (no format picker)
- Simplify rfflowParser (JSON-only)
- Remove format field from schemas and types

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: match Flow Assist chat input to AI Assistant styling + strengthen one-question prompt

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add procedural flow support to AI chat builder (Flow Assist)

- Add procedural-specific system prompts (schema, interview protocol, response format)
- Dispatch prompts by flow_type: procedural/maintenance use flat steps schema, troubleshooting uses decision tree schema
- Parse [STEPS_UPDATE] and [INTAKE_FORM] markers in AI responses
- Add validate_generated_procedural_steps() validator
- Handle intake form extraction in AI chat import endpoint
- Add StaticStepsPreview component for procedural flow preview
- Update store and page to render correct preview by flow type

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add flow type selection to Flow Assist entry points

- CreateFlowDropdown now shows "Build with Flow Assist" under each flow type
- Library page "Flow Assist" button respects current type filter
- Clean up unused AIFlowBuilderModal references

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: update CLAUDE.md with AI chat builder and intake form learnings

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: refine assistant chat prompt for concise answers and focused questions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: switch AI provider to Claude Sonnet 4.6 + add shift+enter hint to chat inputs

- Default AI_PROVIDER changed from gemini to anthropic
- AI_MODEL and AI_MODEL_ANTHROPIC updated to claude-sonnet-4-6
- Added "Shift + Enter for a new line" hint below all chat textareas

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: update CLAUDE.md with AI provider and chat input learnings

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add editor-embedded Flow Assist design document

Design for replacing the standalone /ai/chat page with context-aware
AI side panels embedded in each editor (Troubleshooting + Procedural).
Covers ghost node suggestion system, output-based thresholds,
config-driven model routing, knowledge integration, and per-flow
chat persistence.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add editor-embedded Flow Assist implementation plan

25-task plan across 9 phases covering backend foundation, frontend
infrastructure, tree/procedural editor integration, AI-assisted create,
old code removal, action-type dispatch, suggestion audit trail, and
build verification.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: use actual root node ID in orphan validation check

AI-generated trees use descriptive IDs (e.g., "verify-account-exists")
instead of "root", causing the root node to be falsely flagged as orphaned.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add config-driven AI model tier routing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: extend AI chat session with tree_id and archived_at

Add tree_id FK (CASCADE) for editor-embedded sessions and archived_at
timestamp column to ai_chat_sessions table.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add AI suggestion audit trail table

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add action_type and focal_node_id to AI chat message API

- Add VALID_ACTION_TYPES literal and action_type/focal_node_id fields to
  AIChatMessageRequest schema
- Add tree_id field to AIChatStartRequest schema for editor-embedded sessions
- Update send_message() signature with action_type and focal_node_id params
- Update start_chat_session() signature with tree_id param
- Pass new params through endpoints to service functions
- All new params have defaults so existing behavior is unchanged

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: route AI model selection through action-type config

Update get_ai_provider() to accept an optional model override parameter
(applied only to AnthropicProvider; Gemini always uses its own model).
Thread action_type-based model resolution through send_message() and
generate_final_tree() in the AI chat service.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add TypeScript types for editor-embedded AI

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add shared ContextMenu component

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add useEditorAI hook and editorAI API client

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add EditorAIPanel component with Chat and Suggestions tabs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: integrate AI panel, context menu, and ghost nodes in tree editor

- Add AI Assist panel toggle button to tree editor toolbar
- Wire EditorAIPanel alongside TreeEditorLayout with single-panel rule
- Thread onNodeContextMenu through TreeEditorLayout → FlowCanvas → FlowCanvasNode
- Add right-click context menu with Generate Branch, Explain Node, Delete actions
- Add ghost node detection (_suggestion flag) with dashed border + opacity styling
- Add Accept/Dismiss overlay buttons on ghost nodes for future suggestion handling

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: integrate AI panel, context menu, and ghost steps in procedural editor

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add AI prompt dialog and wire into CreateFlowDropdown

Replace navigation to /ai/chat with an inline AIPromptDialog modal
that collects a single prompt, generates a flow via the editor AI API,
imports it, and navigates to the editor with the AI panel open.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: add glassmorphism to AI prompt dialog + maintenance Flow Assist button

- Use .glass-card-static on AIPromptDialog card for consistent design system
- Add "Build with Flow Assist" button to maintenance section in CreateFlowDropdown

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor: remove standalone Flow Assist page and old AI chat components

Remove the old /ai/chat page, AI wizard modal, and all associated
components/stores/types now replaced by the editor-embedded AI panel.

Deleted:
- AIChatBuilderPage, ai-chat/ components, aiChatStore, aiChat API, ai-chat types
- AIFlowBuilderModal, ai-builder/ components, aiFlowBuilderStore

Cleaned up:
- Router (removed /ai/chat route)
- Sidebar (removed Flow Assist nav item)
- MyTreesPage (removed AI builder modal and button)
- TreeLibraryPage (removed Flow Assist button)
- API and type barrel exports

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add delta response parsing and action-type prompt dispatch

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add AI suggestion audit trail endpoints

Create/list/resolve endpoints for tracking AI-applied changes to flows.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add APScheduler task to auto-archive stale AI chat sessions

Archives AI chat sessions with no activity for 30 days, runs daily at 3 AM.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: update project status for editor-embedded Flow Assist

- Add Editor-Embedded Flow Assist to CURRENT-STATE.md in-progress items
- Update CLAUDE.md: fix stale lessons (#41, #46), add new patterns (#47 editor AI architecture, #48 orphan validation)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: use correct model alias in AI_MODEL_TIERS standard tier

The dated model ID `claude-sonnet-4-6-20250514` was causing 502 errors.
Use the alias `claude-sonnet-4-6` which matches AI_MODEL_ANTHROPIC.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: send live flow context to AI Assist for full editor awareness

The AI panel now sends the current tree structure (troubleshooting) or
steps + intake form (procedural/maintenance) with each message. This
gives the AI full visibility into node details, questions, descriptions,
options, and intake form fields — not just the node ID.

- Backend: add flow_context param to schema, endpoint, and service
- Frontend: add getFlowContext callback to useEditorAI hook
- TreeEditorPage: passes treeStructure as flow context
- ProceduralEditorPage: passes steps + intakeForm as flow context

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: include flow name and description in AI Assist context

Both editors now send name and description alongside the flow structure,
so the AI can reference what the flow is about when responding.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: increase AI timeout to 120s and limit retries to 1

The 45s timeout was too short for generation tasks with full flow
context in the system prompt. The Anthropic SDK's default 2 retries
caused requests to hang for ~136s before failing. Now: 120s timeout
with max 1 retry = faster failure if it does timeout.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: wire AI-generated flow structures into editor stores

The useEditorAI hook was ignoring result.working_tree from AI responses,
so generated steps/trees never appeared in the editor. Now:
- useEditorAI calls onFlowUpdate when working_tree is present in response
- ProceduralEditorPage handles steps + intake form updates via replaceSteps
- TreeEditorPage handles tree structure updates via replaceTreeStructure
- Both stores have new bulk-replace methods for AI integration

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add lessons learned for full-stack integration, Anthropic retries, model tiers

#49 Always verify frontend consumes backend response fields
#50 Anthropic SDK max_retries=1 to avoid 3× timeout
#51 AI model tier routing via settings.get_model_for_action()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: move AI Assist panel to full-height side layout in both editors

The AI panel was nested inside the content area, only spanning the
step list / canvas section. Now it sits at the outermost flex level,
spanning the full page height alongside all content (toolbar,
collapsible sections, steps/canvas). This prevents the panel from
overlapping content and lets the editor area properly shrink.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: AI Assist panel as fixed right drawer (matching Copilot/Scratchpad)

Convert EditorAIPanel from in-flow flex child to fixed right-side drawer
overlay, same pattern as CopilotPanel and ScratchpadSidebar. The panel
is fixed at right:0 spanning full viewport height, and editor pages add
pr-[380px] padding when open so content shifts left without overlap.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: AI Assist panel sits below topbar with slide-in animation

- Panel now uses top:56px to sit below the app shell topbar instead of
  covering it (matches the main-content grid cell area)
- Added slideInRight CSS animation for smooth drawer entrance
- Editor pages use dynamic paddingRight via PANEL_WIDTH constant
- ChatTab upgraded: markdown rendering, CopilotPanel-style message
  bubbles, auto-focus input, Shift+Enter hint
- All borders use --glass-border for consistent glassmorphism

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: AI Assist panel as in-flow flex sibling (not fixed/overlay)

Replace fixed positioning with in-flow flex layout. The outermost div
is now a horizontal flex row: content column (flex-1 min-w-0) + panel
(w-[380px] shrink-0). When the panel opens, the content column
automatically shrinks — no padding hacks or z-index stacking needed.
This guarantees the content shifts left and stays fully visible.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: AI Copilot panel as in-flow flex sibling in session navigation pages

Changed CopilotPanel from fixed overlay to flex layout sibling so it
pushes main content instead of covering it during active sessions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: remove duplicate CLAUDE.md lessons #47-48

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 15:51:37 -05:00

790 lines
34 KiB
Python

"""AI Chat Builder service.
Manages the conversational flow builder: system prompt construction,
message exchange with AI provider, and response parsing (extracting
tree updates, phase transitions, and metadata from structured markers).
"""
import json
import logging
import re
import uuid
from datetime import datetime, timezone, timedelta
from typing import Any, Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.ai_provider import get_ai_provider
from app.core.ai_tree_validator import validate_generated_tree, validate_generated_procedural_steps
from app.core.config import settings
from app.models.ai_chat_session import AIChatSession
logger = logging.getLogger(__name__)
# ── Cost estimation ──
COST_PER_INPUT_TOKEN = 1.0 / 1_000_000
COST_PER_OUTPUT_TOKEN = 5.0 / 1_000_000
# ── Max messages per session ──
MAX_MESSAGES_FREE = 10
MAX_MESSAGES_PAID = 25
# ── System Prompt ──
ROLE_PERSONA = """You are a senior IT engineer embedded in ResolutionFlow, a troubleshooting platform for MSP (Managed Service Provider) engineers. You have 15+ years of hands-on experience across Windows Server, Active Directory, Entra ID/Azure AD, Microsoft 365, networking (DNS, DHCP, routing, VPN, firewalls), virtualization (Hyper-V, VMware), security, backup/DR, and cloud infrastructure.
Your job is to help engineers build troubleshooting decision trees by interviewing them about a problem space. You are NOT a generic assistant. You are a colleague who has seen these issues hundreds of times and knows the optimal diagnostic order.
CRITICAL BEHAVIORS:
- Act as a senior engineer, not a chatbot. Use your domain knowledge to SUGGEST diagnostic steps, not just record what the user says.
- When the user describes a problem area, demonstrate understanding by naming specific sub-categories, common causes, and relevant tools.
- Challenge assumptions constructively: "Before we go down that path, have you considered checking X first? In my experience, that resolves 60% of these cases."
- Capture SPECIFIC commands with exact syntax. Not "check the service" but "Get-Service ADSync | Select-Object Status, StartType".
- Include expected outcomes for every action: what does success look like?
- Surface edge cases proactively: "What about multi-forest environments?" or "Does this change if they have conditional access policies?"
- Explain WHY the diagnostic order matters: "We check connectivity before auth because a network issue masquerades as an auth failure."
- Ask ONE focused question at a time. NEVER ask multiple questions in a single response — no numbered lists of questions, no "also, what about X?", no follow-up questions tacked on. One question, then wait for the answer.
- Use plain, collegial language. Sound like a colleague, not a form."""
SCHEMA_CONTEXT = """
TREESTRUCTURE SCHEMA — This is what you are building:
The tree is a recursive JSON structure. Each node has a "type" field:
1. decision — A diagnostic question with branching options
Required: id (string), type ("decision"), question (string), options (array), children (array)
Optional: help_text (string)
Each option: { id (string), label (string), next_node_id (string — must match a child's id) }
2. action — A step the engineer performs
Required: id (string), type ("action"), title (string), description (string)
Optional: commands (string array — exact CLI/PowerShell syntax), expected_outcome (string), help_text (string), next_node_id (string — ID of the next node to navigate to)
3. solution — A resolution endpoint
Required: id (string), type ("solution"), title (string), description (string)
Optional: resolution_steps (string array)
STRUCTURAL RULES:
- Root node MUST be type "decision"
- Decision nodes contain their children in the "children" array
- Each decision option's next_node_id typically references a child node's id, BUT can also reference ANY other node in the tree for loop-back / re-verification patterns
- Action nodes use next_node_id to chain to the next step — this can point to any node in the tree, including ancestors, for loop-backs (e.g., "remediate → re-verify from earlier checkpoint")
- Solution nodes are terminal — no next_node_id or children
- All IDs must be unique strings (use descriptive slugs like "check-service-status")
CROSS-REFERENCE / LOOP-BACK PATTERN:
When a troubleshooting path needs to loop back (e.g., after remediation, re-verify from an earlier checkpoint), set next_node_id to the target node's ID. Example: an action node "restart-ssh-service" can set next_node_id to "verify-ssh-connection" (an ancestor decision node) to create a re-verification loop.
"""
INTERVIEW_PROTOCOL = """
INTERVIEW PHASES — Follow this progression:
PHASE 1 - SCOPING (current_phase: scoping):
Ask broad questions to understand the problem domain and scope:
- What type of issue is this flow for?
- Who is the target audience? (Tier 1 help desk, Tier 2, Tier 3?)
- What environment assumptions? (On-prem, hybrid, specific vendors?)
Demonstrate domain expertise immediately. If the user says "Azure AD Sync failures," show understanding: "Are you primarily seeing password hash sync issues, object attribute sync failures, or full directory sync errors?"
DO NOT emit [TREE_UPDATE] during scoping. You are still understanding the problem.
PHASE 2 - DISCOVERY (current_phase: discovery):
Work through the troubleshooting logic branch by branch:
- Establish the first diagnostic question (the root decision node)
- For each branch, ask what the engineer would check next
- Suggest checks the user might not have considered
- Capture specific commands, tools, and procedures
EMIT [TREE_UPDATE] ONLY when you and the user have agreed on a concrete node — a decision with clear options, or an action with a specific command. If you are asking a question, you are NOT updating the tree.
PHASE 3 - ENRICHMENT (current_phase: enrichment):
Circle back to enrich existing nodes:
- Add exact PowerShell/CLI commands with syntax
- Add help text with relevant documentation links
- Add expected outcomes for action nodes
- Suggest edge cases needing additional branches
EMIT [TREE_UPDATE] when enriching existing nodes or adding edge case branches.
PHASE 4 - REVIEW (current_phase: review):
Present a summary:
- Total node count by type
- Text outline of the flow structure
- Flag any areas of uncertainty
- Offer chance to add/remove/modify branches
EMIT [TREE_UPDATE] only if the user requests structural changes.
TRANSITION between phases by emitting [PHASE:phase_name] when the conversation naturally moves to the next stage. You decide when enough information has been gathered for each phase.
"""
RESPONSE_FORMAT = """
RESPONSE FORMAT:
Your response is natural conversational text. When the tree structure changes, include structured markers that will be parsed by the system (the user will NOT see these markers):
1. Tree update (only when structure changes — see phase rules above):
[TREE_UPDATE]
{...valid TreeStructure JSON...}
[/TREE_UPDATE]
2. Phase transition (when moving to next phase):
[PHASE:discovery]
3. Metadata capture (when you learn the flow's name, description, or tags):
[METADATA]
{"name": "...", "description": "...", "tags": ["..."]}
[/METADATA]
IMPORTANT:
- Include [TREE_UPDATE] sparingly. Only when concrete nodes are established or modified.
- The tree update should be the COMPLETE working tree, not a diff.
- Always include conversational text OUTSIDE the markers — never respond with only markers.
"""
PROCEDURAL_SCHEMA_CONTEXT = """
PROCEDURAL STEP SCHEMA — This is what you are building:
The flow is an ordered array of steps in a JSON object: {"steps": [...]}
Each step has a "type" field:
1. procedure_step — A concrete step the engineer performs
Required: id (string), type ("procedure_step"), title (string), description (string)
Optional:
- content_type ("action"|"informational"|"verification"|"warning") — default "action"
- estimated_minutes (number)
- commands (array of objects: {code: string, label?: string, language?: string}) — exact CLI/PowerShell syntax
- expected_outcome (string) — what success looks like
- verification_prompt (string) — question to confirm completion
- verification_type ("checkbox"|"text_input") — how the engineer confirms
- warning_text (string) — caution or prerequisite info
- notes_enabled (boolean) — allow engineer to capture notes on this step
- reference_url (string) — link to documentation
2. section_header — Groups steps into logical phases
Required: id (string), type ("section_header"), title (string)
Section headers apply to all subsequent steps until the next section_header.
3. procedure_end — Terminal marker (always the last step)
Required: id (string), type ("procedure_end"), title (string)
STRUCTURAL RULES:
- Steps are executed in array order (flat list, no branching)
- All IDs must be unique descriptive slugs (e.g., "check-dns-resolution", not UUIDs)
- The last step MUST be type "procedure_end"
- Use section_headers to organize steps into logical phases
- Commands are arrays of objects: [{"code": "Get-Service ADSync", "label": "Check sync service", "language": "powershell"}]
- Descriptions support [VAR:variable_name] interpolation for intake form variables (e.g., "Connect to [VAR:server_name] via RDP")
VARIABLE INTERPOLATION:
When the procedure needs per-execution input (server name, IP address, client name, etc.), use [VAR:variable_name] syntax in descriptions and commands. These map to intake form fields that the engineer fills in before starting.
"""
PROCEDURAL_INTERVIEW_PROTOCOL = """
INTERVIEW PHASES — Follow this progression:
PHASE 1 - SCOPING (current_phase: scoping):
Understand the process being documented:
- What process or procedure is this flow for?
- Who will execute it? (Tier 1 help desk, Tier 2, senior engineers?)
- What environment context? (Specific vendor, on-prem vs cloud, tools available?)
- Will this need per-execution input? (server name, client info, IP addresses → intake form fields)
Demonstrate domain expertise: if the user says "Exchange Online mailbox migration," show understanding: "Are we covering full tenant-to-tenant migration, on-prem to Exchange Online cutover, or individual mailbox moves with hybrid?"
DO NOT emit [STEPS_UPDATE] during scoping. You are still understanding the process.
PHASE 2 - DISCOVERY (current_phase: discovery):
Build the procedure step by step IN ORDER:
- Start with prerequisites and initial verification
- Walk through each step sequentially — ask what happens first, then next, then next
- Suggest section headers to organize logical phases (e.g., "Pre-Flight Checks", "Migration", "Verification")
- Capture specific commands, tools, and expected outcomes for each step
- Identify where [VAR:variable_name] placeholders are needed
EMIT [STEPS_UPDATE] when you and the user have agreed on concrete steps. Build progressively — emit partial step lists as you go.
PHASE 3 - ENRICHMENT (current_phase: enrichment):
Circle back to enrich existing steps:
- Add exact PowerShell/CLI commands with full syntax
- Add verification prompts for critical steps
- Add warning_text for steps with risk (data loss, downtime, etc.)
- Add estimated_minutes for time-critical procedures
- Add expected_outcome for action steps
- Suggest reference_url links to documentation
- Identify missing edge cases or safety checks
EMIT [STEPS_UPDATE] when enriching steps with additional detail.
PHASE 4 - REVIEW (current_phase: review):
Present a summary:
- Total step count by content_type
- Outline of sections and steps
- List of intake form variables ([VAR:...]) used
- Flag any steps missing commands or verification
- Offer chance to reorder, add, or remove steps
EMIT [STEPS_UPDATE] only if the user requests changes.
TRANSITION between phases by emitting [PHASE:phase_name] when the conversation naturally moves to the next stage. You decide when enough information has been gathered for each phase.
"""
PROCEDURAL_RESPONSE_FORMAT = """
RESPONSE FORMAT:
Your response is natural conversational text. When the step structure changes, include structured markers that will be parsed by the system (the user will NOT see these markers):
1. Steps update (only when structure changes — see phase rules above):
[STEPS_UPDATE]
{"steps": [...valid steps array...]}
[/STEPS_UPDATE]
2. Phase transition (when moving to next phase):
[PHASE:discovery]
3. Metadata capture (when you learn the flow's name, description, or tags):
[METADATA]
{"name": "...", "description": "...", "tags": ["..."]}
[/METADATA]
4. Intake form suggestion (when intake form fields are identified):
[INTAKE_FORM]
[{"variable_name": "server_name", "label": "Server Name", "field_type": "text", "required": true, "placeholder": "e.g., DC01", "group_name": "Server Details", "display_order": 1}]
[/INTAKE_FORM]
IMPORTANT:
- Include [STEPS_UPDATE] sparingly. Only when concrete steps are established or modified.
- The steps update should be the COMPLETE working step list, not a diff.
- Always include conversational text OUTSIDE the markers — never respond with only markers.
- The procedure_end step is always included as the last step.
"""
def _build_system_prompt(flow_type: str) -> str:
"""Assemble the full system prompt for the chat builder."""
if flow_type in ("procedural", "maintenance"):
flow_context = (
"The user wants to build a PROCEDURAL flow — a step-by-step process guide "
"with ordered phases, verification checkpoints, and optional intake form variables. "
"This is NOT a branching decision tree — it is a flat, sequential procedure."
)
return (
f"{ROLE_PERSONA}\n\n{flow_context}\n\n"
f"{PROCEDURAL_SCHEMA_CONTEXT}\n\n{PROCEDURAL_INTERVIEW_PROTOCOL}\n\n{PROCEDURAL_RESPONSE_FORMAT}"
)
else:
flow_context = (
"The user wants to build a TROUBLESHOOTING flow — a diagnostic decision tree "
"that guides engineers through symptom identification, diagnostic checks, and "
"resolution steps."
)
return f"{ROLE_PERSONA}\n\n{flow_context}\n\n{SCHEMA_CONTEXT}\n\n{INTERVIEW_PROTOCOL}\n\n{RESPONSE_FORMAT}"
def _strip_markdown_fences(text: str) -> str:
"""Strip markdown code fences if the model wrapped its JSON response."""
text = text.strip()
match = re.match(r"^```(?:json)?\s*([\s\S]*?)```$", text)
if match:
return match.group(1).strip()
return text
def _parse_delta(response: str) -> dict | None:
"""Extract [DELTA]...[/DELTA] JSON from AI response."""
match = re.search(r'\[DELTA\](.*?)\[/DELTA\]', response, re.DOTALL)
if not match:
return None
raw = _strip_markdown_fences(match.group(1).strip())
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
def _find_node_by_id(tree: dict, node_id: str) -> dict | None:
"""Find a node by ID in a tree structure (recursive)."""
if tree.get("id") == node_id:
return tree
for child in tree.get("children", []):
found = _find_node_by_id(child, node_id)
if found:
return found
for step in tree.get("steps", []):
if step.get("id") == node_id:
return step
return None
def _build_action_prompt(
action_type: str,
focal_node_id: str | None,
tree_structure: dict,
flow_type: str,
) -> str:
"""Build action-specific system prompt supplement."""
tree_json = json.dumps(tree_structure, indent=2)
focal_context = ""
if focal_node_id:
focal_node = _find_node_by_id(tree_structure, focal_node_id)
if focal_node:
focal_context = f"\n\nFOCAL NODE (the node being acted on):\n{json.dumps(focal_node, indent=2)}"
prompts = {
"generate_branch": (
f"Generate a complete branch of child nodes for the focal node. "
f"Return the new nodes wrapped in [DELTA]...[/DELTA] markers as JSON with "
f"action='add', target_node_id='{focal_node_id}', and nodes array."
f"{focal_context}"
),
"modify_node": (
f"Modify the focal node based on the user's instruction. "
f"Return the updated node in [DELTA]...[/DELTA] markers with action='modify'."
f"{focal_context}"
),
"add_steps": (
f"Generate new procedural steps to insert after the focal step. "
f"Return them in [DELTA]...[/DELTA] markers with action='add'."
f"{focal_context}"
),
"quick_action": (
f"Respond to the user's quick action request about the focal node. "
f"If the action modifies the node, return changes in [DELTA]...[/DELTA] markers. "
f"If it's informational (e.g. explain), just respond in text."
f"{focal_context}"
),
"open_chat": (
"Have a helpful conversation about the flow. If the user asks for changes, "
"return them in [DELTA]...[/DELTA] markers. Otherwise respond in text."
),
"generate_full": (
"Generate a complete flow structure based on the user's description."
),
"variable_inference": (
"Analyze the procedural steps for implicit variables. Look for references to "
"specific servers, clients, credentials, or other values that should be captured "
"in an intake form. Return suggestions as JSON."
),
}
action_prompt = prompts.get(action_type, prompts["open_chat"])
return (
f"CURRENT FLOW STRUCTURE ({flow_type}):\n{tree_json}\n\n"
f"ACTION: {action_type}\n{action_prompt}"
)
def _parse_ai_response(raw_response: str) -> dict[str, Any]:
"""Parse structured markers from AI response.
Returns dict with:
- content: str (conversational text with markers stripped)
- tree_update: dict | None (parsed TreeStructure JSON)
- phase: str | None (new phase name)
- metadata: dict | None (name, description, tags)
"""
result: dict[str, Any] = {
"content": raw_response,
"tree_update": None,
"phase": None,
"metadata": None,
"intake_form": None,
}
# Extract [TREE_UPDATE]...[/TREE_UPDATE]
tree_match = re.search(
r"\[TREE_UPDATE\]\s*([\s\S]*?)\s*\[/TREE_UPDATE\]", raw_response
)
if tree_match:
try:
raw_json = _strip_markdown_fences(tree_match.group(1))
result["tree_update"] = json.loads(raw_json)
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Failed to parse tree update JSON: %s", e)
result["content"] = raw_response[: tree_match.start()] + raw_response[tree_match.end() :]
else:
# Handle truncated response — opening tag exists but no closing tag
# (happens when max_tokens cuts off the JSON block)
truncated_match = re.search(r"\[TREE_UPDATE\][\s\S]*$", raw_response)
if truncated_match:
logger.warning("Truncated [TREE_UPDATE] block detected (no closing tag) — stripping from display")
result["content"] = raw_response[: truncated_match.start()]
# Extract [STEPS_UPDATE]...[/STEPS_UPDATE] (procedural flows)
steps_match = re.search(
r"\[STEPS_UPDATE\]\s*([\s\S]*?)\s*\[/STEPS_UPDATE\]", result["content"]
)
if steps_match:
try:
raw_json = _strip_markdown_fences(steps_match.group(1))
result["tree_update"] = json.loads(raw_json)
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Failed to parse steps update JSON: %s", e)
result["content"] = result["content"][: steps_match.start()] + result["content"][steps_match.end() :]
else:
truncated_steps = re.search(r"\[STEPS_UPDATE\][\s\S]*$", result["content"])
if truncated_steps:
logger.warning("Truncated [STEPS_UPDATE] block detected (no closing tag) — stripping from display")
result["content"] = result["content"][: truncated_steps.start()]
# Extract [INTAKE_FORM]...[/INTAKE_FORM] (procedural flows)
intake_match = re.search(
r"\[INTAKE_FORM\]\s*([\s\S]*?)\s*\[/INTAKE_FORM\]", result["content"]
)
if intake_match:
try:
raw_json = _strip_markdown_fences(intake_match.group(1))
result["intake_form"] = json.loads(raw_json)
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Failed to parse intake form JSON: %s", e)
result["content"] = result["content"][: intake_match.start()] + result["content"][intake_match.end() :]
else:
truncated_intake = re.search(r"\[INTAKE_FORM\][\s\S]*$", result["content"])
if truncated_intake:
logger.warning("Truncated [INTAKE_FORM] block detected — stripping from display")
result["content"] = result["content"][: truncated_intake.start()]
# Extract [PHASE:name]
phase_match = re.search(r"\[PHASE:(\w+)\]", result["content"])
if phase_match:
result["phase"] = phase_match.group(1)
result["content"] = result["content"][: phase_match.start()] + result["content"][phase_match.end() :]
# Extract [METADATA]...[/METADATA]
meta_match = re.search(
r"\[METADATA\]\s*([\s\S]*?)\s*\[/METADATA\]", result["content"]
)
if meta_match:
try:
raw_json = _strip_markdown_fences(meta_match.group(1))
result["metadata"] = json.loads(raw_json)
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Failed to parse metadata JSON: %s", e)
result["content"] = result["content"][: meta_match.start()] + result["content"][meta_match.end() :]
else:
truncated_meta = re.search(r"\[METADATA\][\s\S]*$", result["content"])
if truncated_meta:
logger.warning("Truncated [METADATA] block detected — stripping from display")
result["content"] = result["content"][: truncated_meta.start()]
# Clean up extra whitespace from marker removal
result["content"] = re.sub(r"\n{3,}", "\n\n", result["content"]).strip()
return result
# ── Main Service Functions ──
async def start_chat_session(
flow_type: str,
user_id: uuid.UUID,
account_id: uuid.UUID,
db: AsyncSession,
tree_id: str | None = None,
) -> tuple[AIChatSession, str]:
"""Create a chat session and return the AI's opening greeting.
Returns (session, greeting_text).
"""
session = AIChatSession(
user_id=user_id,
account_id=account_id,
flow_type=flow_type,
tree_id=uuid.UUID(tree_id) if tree_id else None,
expires_at=datetime.now(timezone.utc) + timedelta(hours=settings.AI_CONVERSATION_TTL_HOURS),
)
db.add(session)
await db.flush()
# Build system prompt and get opening message
system_prompt = _build_system_prompt(flow_type)
primer = f"I want to build a {flow_type} flow. Help me get started."
provider = get_ai_provider()
provider_name = settings.AI_PROVIDER
messages = [{"role": "user", "content": primer}]
response_text, input_tokens, output_tokens = await provider.generate_text(
system_prompt=system_prompt,
messages=messages,
max_tokens=1500,
)
# Parse response (greeting shouldn't have tree updates, but handle gracefully)
parsed = _parse_ai_response(response_text)
# Store conversation history
now_iso = datetime.now(timezone.utc).isoformat()
session.conversation_history = [
{"role": "user", "content": primer, "timestamp": now_iso, "hidden": True},
{"role": "assistant", "content": parsed["content"], "timestamp": now_iso},
]
session.provider_used = provider_name
session.message_count = 1
session.total_input_tokens = input_tokens
session.total_output_tokens = output_tokens
if parsed["metadata"]:
session.tree_metadata = parsed["metadata"]
return session, parsed["content"]
async def send_message(
session: AIChatSession,
user_message: str,
db: AsyncSession,
action_type: str = "open_chat",
focal_node_id: str | None = None,
flow_context: dict | None = None,
) -> tuple[str, Optional[dict], Optional[str], Optional[dict]]:
"""Send a user message and get AI response.
Args:
flow_context: Live flow structure from the editor. Contains the current
tree_structure (troubleshooting) or steps + intake_form (procedural).
This gives the AI full awareness of the flow being edited.
Returns (ai_content, working_tree_update, new_phase, metadata_update).
"""
system_prompt = _build_system_prompt(session.flow_type)
# Inject live flow context so the AI can see current editor state
if flow_context:
context_json = json.dumps(flow_context, indent=2)
system_prompt += (
f"\n\nCURRENT FLOW STATE (live from editor):\n{context_json}"
)
if focal_node_id:
focal_node = _find_node_by_id(flow_context, focal_node_id)
if focal_node:
system_prompt += (
f"\n\nFOCAL NODE/STEP (the item being acted on):\n"
f"{json.dumps(focal_node, indent=2)}"
)
# Build messages array from conversation history
now_iso = datetime.now(timezone.utc).isoformat()
history = list(session.conversation_history)
history.append({"role": "user", "content": user_message, "timestamp": now_iso})
# Convert to provider format (just role + content)
provider_messages = [
{"role": msg["role"], "content": msg["content"]}
for msg in history
]
# Resolve model for this action type
action_model = settings.get_model_for_action(action_type)
provider = get_ai_provider(model=action_model)
response_text, input_tokens, output_tokens = await provider.generate_text(
system_prompt=system_prompt,
messages=provider_messages,
max_tokens=8000,
)
parsed = _parse_ai_response(response_text)
# Validate tree update if present (lightweight check for progressive builds —
# only require valid root structure, not min node counts)
tree_update = parsed["tree_update"]
if tree_update:
if session.flow_type in ("procedural", "maintenance"):
# Procedural: must be a dict with a "steps" list
if not isinstance(tree_update, dict) or not isinstance(tree_update.get("steps"), list):
logger.warning("AI steps update rejected: must be a dict with a 'steps' list")
tree_update = None
else:
# Troubleshooting: root must be a decision node
if not isinstance(tree_update, dict) or tree_update.get("type") != "decision":
logger.warning("AI tree update rejected: root must be a decision node")
tree_update = None
elif not tree_update.get("id"):
logger.warning("AI tree update rejected: root node missing id")
tree_update = None
# Update session state
history.append({"role": "assistant", "content": parsed["content"], "timestamp": now_iso})
session.conversation_history = history
session.message_count = session.message_count + 1
session.total_input_tokens = session.total_input_tokens + input_tokens
session.total_output_tokens = session.total_output_tokens + output_tokens
if tree_update:
session.working_tree = tree_update
if parsed["phase"]:
valid_phases = {"scoping", "discovery", "enrichment", "review", "generation"}
if parsed["phase"] in valid_phases:
session.current_phase = parsed["phase"]
if parsed["metadata"]:
merged = dict(session.tree_metadata)
merged.update(parsed["metadata"])
session.tree_metadata = merged
if parsed.get("intake_form"):
merged = dict(session.tree_metadata)
merged["intake_form"] = parsed["intake_form"]
session.tree_metadata = merged
session.updated_at = datetime.now(timezone.utc)
return parsed["content"], tree_update, parsed["phase"], parsed["metadata"]
async def generate_final_tree(
session: AIChatSession,
db: AsyncSession,
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Generate the final validated TreeStructure from the conversation.
Returns (tree_structure, metadata).
Raises ValueError if generation fails after retry.
"""
system_prompt = _build_system_prompt(session.flow_type)
# Build generation prompt from full conversation
provider_messages = [
{"role": msg["role"], "content": msg["content"]}
for msg in session.conversation_history
]
if session.flow_type in ("procedural", "maintenance"):
generation_instruction = """Based on our entire conversation, generate the COMPLETE and FINAL procedural steps JSON for this flow.
Requirements:
- Output format: {"steps": [...]} — a JSON object with a "steps" array
- Include ALL steps, section headers, and details we discussed
- Use descriptive step IDs (slugs, not UUIDs)
- Steps are in execution order (flat list, no branching)
- Use section_header steps to organize into logical phases
- Every procedure_step should have commands with exact syntax where discussed
- Every procedure_step should have expected_outcome and verification_prompt where discussed
- Include content_type, estimated_minutes, warning_text, and reference_url where discussed
- Use [VAR:variable_name] syntax in descriptions/commands for intake form variables
- The LAST step MUST be type "procedure_end"
- Respond with ONLY the JSON — no conversational text, no markdown fences
Also provide metadata as a separate JSON object after the steps:
[METADATA]
{"name": "...", "description": "...", "tags": ["..."]}
[/METADATA]
If we discussed intake form fields, also include:
[INTAKE_FORM]
[{"variable_name": "server_name", "label": "Server Name", "field_type": "text", "required": true, "placeholder": "e.g., DC01", "group_name": "Server Details", "display_order": 1}]
[/INTAKE_FORM]"""
else:
generation_instruction = """Based on our entire conversation, generate the COMPLETE and FINAL TreeStructure JSON for this flow.
Requirements:
- Include ALL branches, steps, and solutions we discussed
- Use descriptive node IDs (slugs, not UUIDs)
- Root node must be type "decision"
- Every decision option must have a valid next_node_id pointing to a child
- Every action node should have commands with exact syntax where discussed
- Every action node should have expected_outcome where discussed
- Solution nodes should have resolution_steps
- Respond with ONLY the JSON — no conversational text, no markdown fences
Also provide metadata as a separate JSON object after the tree:
[METADATA]
{"name": "...", "description": "...", "tags": ["..."]}
[/METADATA]"""
provider_messages.append({"role": "user", "content": generation_instruction})
provider = get_ai_provider(model=settings.get_model_for_action("generate_full"))
for attempt in range(2): # One try + one retry
response_text, input_tokens, output_tokens = await provider.generate_text(
system_prompt=system_prompt,
messages=provider_messages,
max_tokens=8000,
)
session.total_input_tokens = session.total_input_tokens + input_tokens
session.total_output_tokens = session.total_output_tokens + output_tokens
# Extract metadata first
parsed = _parse_ai_response(response_text)
metadata = parsed["metadata"] or dict(session.tree_metadata)
# Parse tree JSON — could be in tree_update marker or raw
tree = parsed["tree_update"]
if not tree:
try:
raw = _strip_markdown_fences(parsed["content"])
tree = json.loads(raw)
except (json.JSONDecodeError, ValueError):
pass
if not tree:
if attempt == 0:
provider_messages.append({"role": "assistant", "content": response_text})
provider_messages.append({
"role": "user",
"content": "That response was not valid JSON. Please respond with ONLY the TreeStructure JSON object, starting with { and ending with }. No markdown fences, no explanatory text.",
})
continue
raise ValueError("AI failed to produce valid JSON after retry")
if session.flow_type in ("procedural", "maintenance"):
val_errors = validate_generated_procedural_steps(tree)
else:
val_errors = validate_generated_tree(tree)
if val_errors:
if attempt == 0:
provider_messages.append({"role": "assistant", "content": response_text})
correction = (
f"The generated structure has validation errors: {'; '.join(val_errors)}. "
"Please fix these issues and respond with the corrected JSON only."
)
provider_messages.append({"role": "user", "content": correction})
continue
raise ValueError(f"Generated structure failed validation: {'; '.join(val_errors)}")
# Success
session.working_tree = tree
session.tree_metadata = metadata
if parsed.get("intake_form"):
merged = dict(session.tree_metadata)
merged["intake_form"] = parsed["intake_form"]
session.tree_metadata = merged
metadata = session.tree_metadata
session.current_phase = "generation"
session.updated_at = datetime.now(timezone.utc)
return tree, metadata
raise ValueError("AI failed to generate a valid tree")
async def get_chat_session(
session_id: uuid.UUID,
user_id: uuid.UUID,
db: AsyncSession,
) -> AIChatSession:
"""Get a chat session, validating ownership and expiry.
Raises HTTPException on not found, forbidden, or expired.
"""
from fastapi import HTTPException, status
result = await db.execute(
select(AIChatSession).where(AIChatSession.id == session_id)
)
session = result.scalar_one_or_none()
if not session:
raise HTTPException(status_code=404, detail="Chat session not found")
if session.user_id != user_id:
raise HTTPException(status_code=403, detail="Access denied")
if session.expires_at < datetime.now(timezone.utc):
session.status = "abandoned"
await db.flush()
raise HTTPException(status_code=410, detail="Chat session has expired")
return session