feat: add AI chat builder service with system prompt and conversation loop

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-02-27 03:47:56 -05:00
parent 5c67455190
commit b7e48fae0e

View File

@@ -0,0 +1,456 @@
"""AI Chat Builder service.
Manages the conversational flow builder: system prompt construction,
message exchange with AI provider, and response parsing (extracting
tree updates, phase transitions, and metadata from structured markers).
"""
import json
import logging
import re
import uuid
from datetime import datetime, timezone, timedelta
from typing import Any, Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.ai_provider import get_ai_provider
from app.core.ai_tree_validator import validate_generated_tree
from app.core.config import settings
from app.models.ai_chat_session import AIChatSession
logger = logging.getLogger(__name__)
# ── Cost estimation ──
COST_PER_INPUT_TOKEN = 1.0 / 1_000_000
COST_PER_OUTPUT_TOKEN = 5.0 / 1_000_000
# ── Max messages per session ──
MAX_MESSAGES_FREE = 10
MAX_MESSAGES_PAID = 25
# ── System Prompt ──
ROLE_PERSONA = """You are a senior IT engineer embedded in ResolutionFlow, a troubleshooting platform for MSP (Managed Service Provider) engineers. You have 15+ years of hands-on experience across Windows Server, Active Directory, Entra ID/Azure AD, Microsoft 365, networking (DNS, DHCP, routing, VPN, firewalls), virtualization (Hyper-V, VMware), security, backup/DR, and cloud infrastructure.
Your job is to help engineers build troubleshooting decision trees by interviewing them about a problem space. You are NOT a generic assistant. You are a colleague who has seen these issues hundreds of times and knows the optimal diagnostic order.
CRITICAL BEHAVIORS:
- Act as a senior engineer, not a chatbot. Use your domain knowledge to SUGGEST diagnostic steps, not just record what the user says.
- When the user describes a problem area, demonstrate understanding by naming specific sub-categories, common causes, and relevant tools.
- Challenge assumptions constructively: "Before we go down that path, have you considered checking X first? In my experience, that resolves 60% of these cases."
- Capture SPECIFIC commands with exact syntax. Not "check the service" but "Get-Service ADSync | Select-Object Status, StartType".
- Include expected outcomes for every action: what does success look like?
- Surface edge cases proactively: "What about multi-forest environments?" or "Does this change if they have conditional access policies?"
- Explain WHY the diagnostic order matters: "We check connectivity before auth because a network issue masquerades as an auth failure."
- Ask ONE focused question at a time. Do not overwhelm with multiple questions.
- Use plain, collegial language. Sound like a colleague, not a form."""
SCHEMA_CONTEXT = """
TREESTRUCTURE SCHEMA — This is what you are building:
The tree is a recursive JSON structure. Each node has a "type" field:
1. decision — A diagnostic question with branching options
Required: id (string), type ("decision"), question (string), options (array), children (array)
Optional: help_text (string)
Each option: { id (string), label (string), next_node_id (string — must match a child's id) }
2. action — A step the engineer performs
Required: id (string), type ("action"), title (string), description (string)
Optional: commands (string array — exact CLI/PowerShell syntax), expected_outcome (string), help_text (string), next_node_id (string — ID of the next node to navigate to)
3. solution — A resolution endpoint
Required: id (string), type ("solution"), title (string), description (string)
Optional: resolution_steps (string array)
STRUCTURAL RULES:
- Root node MUST be type "decision"
- Decision nodes contain their children in the "children" array
- Each decision option's next_node_id must reference a child node's id
- Action nodes use next_node_id to chain to the next step (NOT children)
- Solution nodes are terminal — no next_node_id or children
- All IDs must be unique strings (use descriptive slugs like "check-service-status")
"""
INTERVIEW_PROTOCOL = """
INTERVIEW PHASES — Follow this progression:
PHASE 1 - SCOPING (current_phase: scoping):
Ask broad questions to understand the problem domain and scope:
- What type of issue is this flow for?
- Who is the target audience? (Tier 1 help desk, Tier 2, Tier 3?)
- What environment assumptions? (On-prem, hybrid, specific vendors?)
Demonstrate domain expertise immediately. If the user says "Azure AD Sync failures," show understanding: "Are you primarily seeing password hash sync issues, object attribute sync failures, or full directory sync errors?"
DO NOT emit [TREE_UPDATE] during scoping. You are still understanding the problem.
PHASE 2 - DISCOVERY (current_phase: discovery):
Work through the troubleshooting logic branch by branch:
- Establish the first diagnostic question (the root decision node)
- For each branch, ask what the engineer would check next
- Suggest checks the user might not have considered
- Capture specific commands, tools, and procedures
EMIT [TREE_UPDATE] ONLY when you and the user have agreed on a concrete node — a decision with clear options, or an action with a specific command. If you are asking a question, you are NOT updating the tree.
PHASE 3 - ENRICHMENT (current_phase: enrichment):
Circle back to enrich existing nodes:
- Add exact PowerShell/CLI commands with syntax
- Add help text with relevant documentation links
- Add expected outcomes for action nodes
- Suggest edge cases needing additional branches
EMIT [TREE_UPDATE] when enriching existing nodes or adding edge case branches.
PHASE 4 - REVIEW (current_phase: review):
Present a summary:
- Total node count by type
- Text outline of the flow structure
- Flag any areas of uncertainty
- Offer chance to add/remove/modify branches
EMIT [TREE_UPDATE] only if the user requests structural changes.
TRANSITION between phases by emitting [PHASE:phase_name] when the conversation naturally moves to the next stage. You decide when enough information has been gathered for each phase.
"""
RESPONSE_FORMAT = """
RESPONSE FORMAT:
Your response is natural conversational text. When the tree structure changes, include structured markers that will be parsed by the system (the user will NOT see these markers):
1. Tree update (only when structure changes — see phase rules above):
[TREE_UPDATE]
{...valid TreeStructure JSON...}
[/TREE_UPDATE]
2. Phase transition (when moving to next phase):
[PHASE:discovery]
3. Metadata capture (when you learn the flow's name, description, or tags):
[METADATA]
{"name": "...", "description": "...", "tags": ["..."]}
[/METADATA]
IMPORTANT:
- Include [TREE_UPDATE] sparingly. Only when concrete nodes are established or modified.
- The tree update should be the COMPLETE working tree, not a diff.
- Always include conversational text OUTSIDE the markers — never respond with only markers.
"""
def _build_system_prompt(flow_type: str) -> str:
"""Assemble the full system prompt for the chat builder."""
flow_context = (
"The user wants to build a TROUBLESHOOTING flow — a diagnostic decision tree "
"that guides engineers through symptom identification, diagnostic checks, and "
"resolution steps."
if flow_type == "troubleshooting"
else "The user wants to build a PROCEDURAL flow — a step-by-step process guide "
"with phases, checklists, and verification steps."
)
return f"{ROLE_PERSONA}\n\n{flow_context}\n\n{SCHEMA_CONTEXT}\n\n{INTERVIEW_PROTOCOL}\n\n{RESPONSE_FORMAT}"
def _strip_markdown_fences(text: str) -> str:
"""Strip markdown code fences if the model wrapped its JSON response."""
text = text.strip()
match = re.match(r"^```(?:json)?\s*([\s\S]*?)```$", text)
if match:
return match.group(1).strip()
return text
def _parse_ai_response(raw_response: str) -> dict[str, Any]:
"""Parse structured markers from AI response.
Returns dict with:
- content: str (conversational text with markers stripped)
- tree_update: dict | None (parsed TreeStructure JSON)
- phase: str | None (new phase name)
- metadata: dict | None (name, description, tags)
"""
result: dict[str, Any] = {
"content": raw_response,
"tree_update": None,
"phase": None,
"metadata": None,
}
# Extract [TREE_UPDATE]...[/TREE_UPDATE]
tree_match = re.search(
r"\[TREE_UPDATE\]\s*([\s\S]*?)\s*\[/TREE_UPDATE\]", raw_response
)
if tree_match:
try:
raw_json = _strip_markdown_fences(tree_match.group(1))
result["tree_update"] = json.loads(raw_json)
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Failed to parse tree update JSON: %s", e)
result["content"] = raw_response[: tree_match.start()] + raw_response[tree_match.end() :]
# Extract [PHASE:name]
phase_match = re.search(r"\[PHASE:(\w+)\]", result["content"])
if phase_match:
result["phase"] = phase_match.group(1)
result["content"] = result["content"][: phase_match.start()] + result["content"][phase_match.end() :]
# Extract [METADATA]...[/METADATA]
meta_match = re.search(
r"\[METADATA\]\s*([\s\S]*?)\s*\[/METADATA\]", result["content"]
)
if meta_match:
try:
raw_json = _strip_markdown_fences(meta_match.group(1))
result["metadata"] = json.loads(raw_json)
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Failed to parse metadata JSON: %s", e)
result["content"] = result["content"][: meta_match.start()] + result["content"][meta_match.end() :]
# Clean up extra whitespace from marker removal
result["content"] = re.sub(r"\n{3,}", "\n\n", result["content"]).strip()
return result
# ── Main Service Functions ──
async def start_chat_session(
flow_type: str,
user_id: uuid.UUID,
account_id: uuid.UUID,
db: AsyncSession,
) -> tuple[AIChatSession, str]:
"""Create a chat session and return the AI's opening greeting.
Returns (session, greeting_text).
"""
session = AIChatSession(
user_id=user_id,
account_id=account_id,
flow_type=flow_type,
expires_at=datetime.now(timezone.utc) + timedelta(hours=settings.AI_CONVERSATION_TTL_HOURS),
)
db.add(session)
await db.flush()
# Build system prompt and get opening message
system_prompt = _build_system_prompt(flow_type)
primer = f"I want to build a {flow_type} flow. Help me get started."
provider = get_ai_provider()
provider_name = settings.AI_PROVIDER
messages = [{"role": "user", "content": primer}]
response_text, input_tokens, output_tokens = await provider.generate_json(
system_prompt=system_prompt,
messages=messages,
max_tokens=1500,
)
# Parse response (greeting shouldn't have tree updates, but handle gracefully)
parsed = _parse_ai_response(response_text)
# Store conversation history
now_iso = datetime.now(timezone.utc).isoformat()
session.conversation_history = [
{"role": "user", "content": primer, "timestamp": now_iso, "hidden": True},
{"role": "assistant", "content": parsed["content"], "timestamp": now_iso},
]
session.provider_used = provider_name
session.message_count = 1
session.total_input_tokens = input_tokens
session.total_output_tokens = output_tokens
if parsed["metadata"]:
session.tree_metadata = parsed["metadata"]
return session, parsed["content"]
async def send_message(
session: AIChatSession,
user_message: str,
db: AsyncSession,
) -> tuple[str, Optional[dict], Optional[str], Optional[dict]]:
"""Send a user message and get AI response.
Returns (ai_content, working_tree_update, new_phase, metadata_update).
"""
system_prompt = _build_system_prompt(session.flow_type)
# Build messages array from conversation history
now_iso = datetime.now(timezone.utc).isoformat()
history = list(session.conversation_history)
history.append({"role": "user", "content": user_message, "timestamp": now_iso})
# Convert to provider format (just role + content)
provider_messages = [
{"role": msg["role"], "content": msg["content"]}
for msg in history
]
provider = get_ai_provider()
response_text, input_tokens, output_tokens = await provider.generate_json(
system_prompt=system_prompt,
messages=provider_messages,
max_tokens=2000,
)
parsed = _parse_ai_response(response_text)
# Validate tree update if present
tree_update = parsed["tree_update"]
if tree_update:
errors = validate_generated_tree(tree_update)
if errors:
logger.warning("AI tree update failed validation: %s", errors)
tree_update = None # Silently discard invalid updates
# Update session state
history.append({"role": "assistant", "content": parsed["content"], "timestamp": now_iso})
session.conversation_history = history
session.message_count = session.message_count + 1
session.total_input_tokens = session.total_input_tokens + input_tokens
session.total_output_tokens = session.total_output_tokens + output_tokens
if tree_update:
session.working_tree = tree_update
if parsed["phase"]:
valid_phases = {"scoping", "discovery", "enrichment", "review", "generation"}
if parsed["phase"] in valid_phases:
session.current_phase = parsed["phase"]
if parsed["metadata"]:
merged = dict(session.tree_metadata)
merged.update(parsed["metadata"])
session.tree_metadata = merged
session.updated_at = datetime.now(timezone.utc)
return parsed["content"], tree_update, parsed["phase"], parsed["metadata"]
async def generate_final_tree(
session: AIChatSession,
db: AsyncSession,
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Generate the final validated TreeStructure from the conversation.
Returns (tree_structure, metadata).
Raises ValueError if generation fails after retry.
"""
system_prompt = _build_system_prompt(session.flow_type)
# Build generation prompt from full conversation
provider_messages = [
{"role": msg["role"], "content": msg["content"]}
for msg in session.conversation_history
]
generation_instruction = """Based on our entire conversation, generate the COMPLETE and FINAL TreeStructure JSON for this flow.
Requirements:
- Include ALL branches, steps, and solutions we discussed
- Use descriptive node IDs (slugs, not UUIDs)
- Root node must be type "decision"
- Every decision option must have a valid next_node_id pointing to a child
- Every action node should have commands with exact syntax where discussed
- Every action node should have expected_outcome where discussed
- Solution nodes should have resolution_steps
- Respond with ONLY the JSON — no conversational text, no markdown fences
Also provide metadata as a separate JSON object after the tree:
[METADATA]
{"name": "...", "description": "...", "tags": ["..."]}
[/METADATA]"""
provider_messages.append({"role": "user", "content": generation_instruction})
provider = get_ai_provider()
for attempt in range(2): # One try + one retry
response_text, input_tokens, output_tokens = await provider.generate_json(
system_prompt=system_prompt,
messages=provider_messages,
max_tokens=8000,
)
session.total_input_tokens = session.total_input_tokens + input_tokens
session.total_output_tokens = session.total_output_tokens + output_tokens
# Extract metadata first
parsed = _parse_ai_response(response_text)
metadata = parsed["metadata"] or dict(session.tree_metadata)
# Parse tree JSON — could be in tree_update marker or raw
tree = parsed["tree_update"]
if not tree:
try:
raw = _strip_markdown_fences(parsed["content"])
tree = json.loads(raw)
except (json.JSONDecodeError, ValueError):
pass
if not tree:
if attempt == 0:
provider_messages.append({"role": "assistant", "content": response_text})
provider_messages.append({
"role": "user",
"content": "That response was not valid JSON. Please respond with ONLY the TreeStructure JSON object, starting with { and ending with }. No markdown fences, no explanatory text.",
})
continue
raise ValueError("AI failed to produce valid JSON after retry")
errors = validate_generated_tree(tree)
if errors:
if attempt == 0:
provider_messages.append({"role": "assistant", "content": response_text})
correction = (
f"The tree has validation errors: {'; '.join(errors)}. "
"Please fix these issues and respond with the corrected JSON only."
)
provider_messages.append({"role": "user", "content": correction})
continue
raise ValueError(f"Generated tree failed validation: {'; '.join(errors)}")
# Success
session.working_tree = tree
session.tree_metadata = metadata
session.current_phase = "generation"
session.updated_at = datetime.now(timezone.utc)
return tree, metadata
raise ValueError("AI failed to generate a valid tree")
async def get_chat_session(
session_id: uuid.UUID,
user_id: uuid.UUID,
db: AsyncSession,
) -> AIChatSession:
"""Get a chat session, validating ownership and expiry.
Raises HTTPException on not found, forbidden, or expired.
"""
from fastapi import HTTPException, status
result = await db.execute(
select(AIChatSession).where(AIChatSession.id == session_id)
)
session = result.scalar_one_or_none()
if not session:
raise HTTPException(status_code=404, detail="Chat session not found")
if session.user_id != user_id:
raise HTTPException(status_code=403, detail="Access denied")
if session.expires_at < datetime.now(timezone.utc):
session.status = "abandoned"
await db.flush()
raise HTTPException(status_code=410, detail="Chat session has expired")
return session