diff --git a/backend/app/core/ai_chat_service.py b/backend/app/core/ai_chat_service.py new file mode 100644 index 00000000..4202ac31 --- /dev/null +++ b/backend/app/core/ai_chat_service.py @@ -0,0 +1,456 @@ +"""AI Chat Builder service. + +Manages the conversational flow builder: system prompt construction, +message exchange with AI provider, and response parsing (extracting +tree updates, phase transitions, and metadata from structured markers). +""" +import json +import logging +import re +import uuid +from datetime import datetime, timezone, timedelta +from typing import Any, Optional + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.ai_provider import get_ai_provider +from app.core.ai_tree_validator import validate_generated_tree +from app.core.config import settings +from app.models.ai_chat_session import AIChatSession + +logger = logging.getLogger(__name__) + +# ── Cost estimation ── +COST_PER_INPUT_TOKEN = 1.0 / 1_000_000 +COST_PER_OUTPUT_TOKEN = 5.0 / 1_000_000 + +# ── Max messages per session ── +MAX_MESSAGES_FREE = 10 +MAX_MESSAGES_PAID = 25 + + +# ── System Prompt ── + +ROLE_PERSONA = """You are a senior IT engineer embedded in ResolutionFlow, a troubleshooting platform for MSP (Managed Service Provider) engineers. You have 15+ years of hands-on experience across Windows Server, Active Directory, Entra ID/Azure AD, Microsoft 365, networking (DNS, DHCP, routing, VPN, firewalls), virtualization (Hyper-V, VMware), security, backup/DR, and cloud infrastructure. + +Your job is to help engineers build troubleshooting decision trees by interviewing them about a problem space. You are NOT a generic assistant. You are a colleague who has seen these issues hundreds of times and knows the optimal diagnostic order. + +CRITICAL BEHAVIORS: +- Act as a senior engineer, not a chatbot. Use your domain knowledge to SUGGEST diagnostic steps, not just record what the user says. +- When the user describes a problem area, demonstrate understanding by naming specific sub-categories, common causes, and relevant tools. +- Challenge assumptions constructively: "Before we go down that path, have you considered checking X first? In my experience, that resolves 60% of these cases." +- Capture SPECIFIC commands with exact syntax. Not "check the service" but "Get-Service ADSync | Select-Object Status, StartType". +- Include expected outcomes for every action: what does success look like? +- Surface edge cases proactively: "What about multi-forest environments?" or "Does this change if they have conditional access policies?" +- Explain WHY the diagnostic order matters: "We check connectivity before auth because a network issue masquerades as an auth failure." +- Ask ONE focused question at a time. Do not overwhelm with multiple questions. +- Use plain, collegial language. Sound like a colleague, not a form.""" + +SCHEMA_CONTEXT = """ +TREESTRUCTURE SCHEMA — This is what you are building: + +The tree is a recursive JSON structure. Each node has a "type" field: + +1. decision — A diagnostic question with branching options + Required: id (string), type ("decision"), question (string), options (array), children (array) + Optional: help_text (string) + Each option: { id (string), label (string), next_node_id (string — must match a child's id) } + +2. action — A step the engineer performs + Required: id (string), type ("action"), title (string), description (string) + Optional: commands (string array — exact CLI/PowerShell syntax), expected_outcome (string), help_text (string), next_node_id (string — ID of the next node to navigate to) + +3. solution — A resolution endpoint + Required: id (string), type ("solution"), title (string), description (string) + Optional: resolution_steps (string array) + +STRUCTURAL RULES: +- Root node MUST be type "decision" +- Decision nodes contain their children in the "children" array +- Each decision option's next_node_id must reference a child node's id +- Action nodes use next_node_id to chain to the next step (NOT children) +- Solution nodes are terminal — no next_node_id or children +- All IDs must be unique strings (use descriptive slugs like "check-service-status") +""" + +INTERVIEW_PROTOCOL = """ +INTERVIEW PHASES — Follow this progression: + +PHASE 1 - SCOPING (current_phase: scoping): +Ask broad questions to understand the problem domain and scope: +- What type of issue is this flow for? +- Who is the target audience? (Tier 1 help desk, Tier 2, Tier 3?) +- What environment assumptions? (On-prem, hybrid, specific vendors?) +Demonstrate domain expertise immediately. If the user says "Azure AD Sync failures," show understanding: "Are you primarily seeing password hash sync issues, object attribute sync failures, or full directory sync errors?" +DO NOT emit [TREE_UPDATE] during scoping. You are still understanding the problem. + +PHASE 2 - DISCOVERY (current_phase: discovery): +Work through the troubleshooting logic branch by branch: +- Establish the first diagnostic question (the root decision node) +- For each branch, ask what the engineer would check next +- Suggest checks the user might not have considered +- Capture specific commands, tools, and procedures +EMIT [TREE_UPDATE] ONLY when you and the user have agreed on a concrete node — a decision with clear options, or an action with a specific command. If you are asking a question, you are NOT updating the tree. + +PHASE 3 - ENRICHMENT (current_phase: enrichment): +Circle back to enrich existing nodes: +- Add exact PowerShell/CLI commands with syntax +- Add help text with relevant documentation links +- Add expected outcomes for action nodes +- Suggest edge cases needing additional branches +EMIT [TREE_UPDATE] when enriching existing nodes or adding edge case branches. + +PHASE 4 - REVIEW (current_phase: review): +Present a summary: +- Total node count by type +- Text outline of the flow structure +- Flag any areas of uncertainty +- Offer chance to add/remove/modify branches +EMIT [TREE_UPDATE] only if the user requests structural changes. + +TRANSITION between phases by emitting [PHASE:phase_name] when the conversation naturally moves to the next stage. You decide when enough information has been gathered for each phase. +""" + +RESPONSE_FORMAT = """ +RESPONSE FORMAT: + +Your response is natural conversational text. When the tree structure changes, include structured markers that will be parsed by the system (the user will NOT see these markers): + +1. Tree update (only when structure changes — see phase rules above): +[TREE_UPDATE] +{...valid TreeStructure JSON...} +[/TREE_UPDATE] + +2. Phase transition (when moving to next phase): +[PHASE:discovery] + +3. Metadata capture (when you learn the flow's name, description, or tags): +[METADATA] +{"name": "...", "description": "...", "tags": ["..."]} +[/METADATA] + +IMPORTANT: +- Include [TREE_UPDATE] sparingly. Only when concrete nodes are established or modified. +- The tree update should be the COMPLETE working tree, not a diff. +- Always include conversational text OUTSIDE the markers — never respond with only markers. +""" + + +def _build_system_prompt(flow_type: str) -> str: + """Assemble the full system prompt for the chat builder.""" + flow_context = ( + "The user wants to build a TROUBLESHOOTING flow — a diagnostic decision tree " + "that guides engineers through symptom identification, diagnostic checks, and " + "resolution steps." + if flow_type == "troubleshooting" + else "The user wants to build a PROCEDURAL flow — a step-by-step process guide " + "with phases, checklists, and verification steps." + ) + + return f"{ROLE_PERSONA}\n\n{flow_context}\n\n{SCHEMA_CONTEXT}\n\n{INTERVIEW_PROTOCOL}\n\n{RESPONSE_FORMAT}" + + +def _strip_markdown_fences(text: str) -> str: + """Strip markdown code fences if the model wrapped its JSON response.""" + text = text.strip() + match = re.match(r"^```(?:json)?\s*([\s\S]*?)```$", text) + if match: + return match.group(1).strip() + return text + + +def _parse_ai_response(raw_response: str) -> dict[str, Any]: + """Parse structured markers from AI response. + + Returns dict with: + - content: str (conversational text with markers stripped) + - tree_update: dict | None (parsed TreeStructure JSON) + - phase: str | None (new phase name) + - metadata: dict | None (name, description, tags) + """ + result: dict[str, Any] = { + "content": raw_response, + "tree_update": None, + "phase": None, + "metadata": None, + } + + # Extract [TREE_UPDATE]...[/TREE_UPDATE] + tree_match = re.search( + r"\[TREE_UPDATE\]\s*([\s\S]*?)\s*\[/TREE_UPDATE\]", raw_response + ) + if tree_match: + try: + raw_json = _strip_markdown_fences(tree_match.group(1)) + result["tree_update"] = json.loads(raw_json) + except (json.JSONDecodeError, ValueError) as e: + logger.warning("Failed to parse tree update JSON: %s", e) + result["content"] = raw_response[: tree_match.start()] + raw_response[tree_match.end() :] + + # Extract [PHASE:name] + phase_match = re.search(r"\[PHASE:(\w+)\]", result["content"]) + if phase_match: + result["phase"] = phase_match.group(1) + result["content"] = result["content"][: phase_match.start()] + result["content"][phase_match.end() :] + + # Extract [METADATA]...[/METADATA] + meta_match = re.search( + r"\[METADATA\]\s*([\s\S]*?)\s*\[/METADATA\]", result["content"] + ) + if meta_match: + try: + raw_json = _strip_markdown_fences(meta_match.group(1)) + result["metadata"] = json.loads(raw_json) + except (json.JSONDecodeError, ValueError) as e: + logger.warning("Failed to parse metadata JSON: %s", e) + result["content"] = result["content"][: meta_match.start()] + result["content"][meta_match.end() :] + + # Clean up extra whitespace from marker removal + result["content"] = re.sub(r"\n{3,}", "\n\n", result["content"]).strip() + + return result + + +# ── Main Service Functions ── + + +async def start_chat_session( + flow_type: str, + user_id: uuid.UUID, + account_id: uuid.UUID, + db: AsyncSession, +) -> tuple[AIChatSession, str]: + """Create a chat session and return the AI's opening greeting. + + Returns (session, greeting_text). + """ + session = AIChatSession( + user_id=user_id, + account_id=account_id, + flow_type=flow_type, + expires_at=datetime.now(timezone.utc) + timedelta(hours=settings.AI_CONVERSATION_TTL_HOURS), + ) + db.add(session) + await db.flush() + + # Build system prompt and get opening message + system_prompt = _build_system_prompt(flow_type) + primer = f"I want to build a {flow_type} flow. Help me get started." + + provider = get_ai_provider() + provider_name = settings.AI_PROVIDER + + messages = [{"role": "user", "content": primer}] + response_text, input_tokens, output_tokens = await provider.generate_json( + system_prompt=system_prompt, + messages=messages, + max_tokens=1500, + ) + + # Parse response (greeting shouldn't have tree updates, but handle gracefully) + parsed = _parse_ai_response(response_text) + + # Store conversation history + now_iso = datetime.now(timezone.utc).isoformat() + session.conversation_history = [ + {"role": "user", "content": primer, "timestamp": now_iso, "hidden": True}, + {"role": "assistant", "content": parsed["content"], "timestamp": now_iso}, + ] + session.provider_used = provider_name + session.message_count = 1 + session.total_input_tokens = input_tokens + session.total_output_tokens = output_tokens + + if parsed["metadata"]: + session.tree_metadata = parsed["metadata"] + + return session, parsed["content"] + + +async def send_message( + session: AIChatSession, + user_message: str, + db: AsyncSession, +) -> tuple[str, Optional[dict], Optional[str], Optional[dict]]: + """Send a user message and get AI response. + + Returns (ai_content, working_tree_update, new_phase, metadata_update). + """ + system_prompt = _build_system_prompt(session.flow_type) + + # Build messages array from conversation history + now_iso = datetime.now(timezone.utc).isoformat() + history = list(session.conversation_history) + history.append({"role": "user", "content": user_message, "timestamp": now_iso}) + + # Convert to provider format (just role + content) + provider_messages = [ + {"role": msg["role"], "content": msg["content"]} + for msg in history + ] + + provider = get_ai_provider() + response_text, input_tokens, output_tokens = await provider.generate_json( + system_prompt=system_prompt, + messages=provider_messages, + max_tokens=2000, + ) + + parsed = _parse_ai_response(response_text) + + # Validate tree update if present + tree_update = parsed["tree_update"] + if tree_update: + errors = validate_generated_tree(tree_update) + if errors: + logger.warning("AI tree update failed validation: %s", errors) + tree_update = None # Silently discard invalid updates + + # Update session state + history.append({"role": "assistant", "content": parsed["content"], "timestamp": now_iso}) + session.conversation_history = history + session.message_count = session.message_count + 1 + session.total_input_tokens = session.total_input_tokens + input_tokens + session.total_output_tokens = session.total_output_tokens + output_tokens + + if tree_update: + session.working_tree = tree_update + + if parsed["phase"]: + valid_phases = {"scoping", "discovery", "enrichment", "review", "generation"} + if parsed["phase"] in valid_phases: + session.current_phase = parsed["phase"] + + if parsed["metadata"]: + merged = dict(session.tree_metadata) + merged.update(parsed["metadata"]) + session.tree_metadata = merged + + session.updated_at = datetime.now(timezone.utc) + + return parsed["content"], tree_update, parsed["phase"], parsed["metadata"] + + +async def generate_final_tree( + session: AIChatSession, + db: AsyncSession, +) -> tuple[dict[str, Any], dict[str, Any]]: + """Generate the final validated TreeStructure from the conversation. + + Returns (tree_structure, metadata). + Raises ValueError if generation fails after retry. + """ + system_prompt = _build_system_prompt(session.flow_type) + + # Build generation prompt from full conversation + provider_messages = [ + {"role": msg["role"], "content": msg["content"]} + for msg in session.conversation_history + ] + + generation_instruction = """Based on our entire conversation, generate the COMPLETE and FINAL TreeStructure JSON for this flow. + +Requirements: +- Include ALL branches, steps, and solutions we discussed +- Use descriptive node IDs (slugs, not UUIDs) +- Root node must be type "decision" +- Every decision option must have a valid next_node_id pointing to a child +- Every action node should have commands with exact syntax where discussed +- Every action node should have expected_outcome where discussed +- Solution nodes should have resolution_steps +- Respond with ONLY the JSON — no conversational text, no markdown fences + +Also provide metadata as a separate JSON object after the tree: +[METADATA] +{"name": "...", "description": "...", "tags": ["..."]} +[/METADATA]""" + + provider_messages.append({"role": "user", "content": generation_instruction}) + + provider = get_ai_provider() + + for attempt in range(2): # One try + one retry + response_text, input_tokens, output_tokens = await provider.generate_json( + system_prompt=system_prompt, + messages=provider_messages, + max_tokens=8000, + ) + + session.total_input_tokens = session.total_input_tokens + input_tokens + session.total_output_tokens = session.total_output_tokens + output_tokens + + # Extract metadata first + parsed = _parse_ai_response(response_text) + metadata = parsed["metadata"] or dict(session.tree_metadata) + + # Parse tree JSON — could be in tree_update marker or raw + tree = parsed["tree_update"] + if not tree: + try: + raw = _strip_markdown_fences(parsed["content"]) + tree = json.loads(raw) + except (json.JSONDecodeError, ValueError): + pass + + if not tree: + if attempt == 0: + provider_messages.append({"role": "assistant", "content": response_text}) + provider_messages.append({ + "role": "user", + "content": "That response was not valid JSON. Please respond with ONLY the TreeStructure JSON object, starting with { and ending with }. No markdown fences, no explanatory text.", + }) + continue + raise ValueError("AI failed to produce valid JSON after retry") + + errors = validate_generated_tree(tree) + if errors: + if attempt == 0: + provider_messages.append({"role": "assistant", "content": response_text}) + correction = ( + f"The tree has validation errors: {'; '.join(errors)}. " + "Please fix these issues and respond with the corrected JSON only." + ) + provider_messages.append({"role": "user", "content": correction}) + continue + raise ValueError(f"Generated tree failed validation: {'; '.join(errors)}") + + # Success + session.working_tree = tree + session.tree_metadata = metadata + session.current_phase = "generation" + session.updated_at = datetime.now(timezone.utc) + + return tree, metadata + + raise ValueError("AI failed to generate a valid tree") + + +async def get_chat_session( + session_id: uuid.UUID, + user_id: uuid.UUID, + db: AsyncSession, +) -> AIChatSession: + """Get a chat session, validating ownership and expiry. + + Raises HTTPException on not found, forbidden, or expired. + """ + from fastapi import HTTPException, status + + result = await db.execute( + select(AIChatSession).where(AIChatSession.id == session_id) + ) + session = result.scalar_one_or_none() + + if not session: + raise HTTPException(status_code=404, detail="Chat session not found") + + if session.user_id != user_id: + raise HTTPException(status_code=403, detail="Access denied") + + if session.expires_at < datetime.now(timezone.utc): + session.status = "abandoned" + await db.flush() + raise HTTPException(status_code=410, detail="Chat session has expired") + + return session