"""FlowPilot Engine — core LLM orchestration for AI troubleshooting sessions. Manages structured diagnostic conversations: intake analysis, step generation, confidence tracking, and auto-documentation. All LLM responses are structured JSON validated against known output shapes. """ import json import logging import uuid from datetime import datetime, timezone from typing import Any, Optional from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.core.ai_provider import get_ai_provider from app.core.config import settings from app.models.ai_session import AISession from app.models.ai_session_step import AISessionStep from app.schemas.ai_session import ( AISessionCreateRequest, AISessionCreateResponse, AISessionStepResponse, StepOptionSchema, StepResponseRequest, StepResponseResponse, ResolveSessionRequest, EscalateSessionRequest, SessionCloseResponse, SessionDocumentation, DocumentationStep, ) logger = logging.getLogger(__name__) # Maximum steps per session as a safety limit MAX_STEPS_PER_SESSION = 30 STRUCTURED_OUTPUT_SCHEMA = """\ Your response MUST be a valid JSON object with one of these shapes: 1. Diagnostic question: {"type": "question", "content": "Brief description", "reasoning": "Internal why", "context_message": "Shown to engineer", "options": [{"label": "Human text", "value": "machine_value", "followup_hint": "or null"}], "allow_free_text": true, "allow_skip": true, "confidence": 0.65} 2. Suggested action: {"type": "action", "content": "What to do", "reasoning": "Internal why", "context_message": "Here's what to try", "action_type": "instruction | script_generation | verification | info_request", "expected_outcome": "What success looks like", "confidence": 0.78} 3. Resolution suggestion: {"type": "resolution_suggestion", "content": "Summary of what we did", "reasoning": "Internal why", "resolution_summary": "Issue was caused by X, fixed by Y", "confidence": 0.92, "follow_up_recommendations": ["Monitor for 24 hours"]}\ """ FLOWPILOT_SYSTEM_PROMPT = """\ You are FlowPilot, an expert MSP troubleshooting assistant embedded in ResolutionFlow. You guide engineers through structured diagnosis of IT issues. ## YOUR ROLE - Conduct systematic troubleshooting through targeted questions and actions - Start broad, narrow down based on responses - Never guess — ask clarifying questions when uncertain - Suggest specific, actionable steps the engineer can verify - When confidence is high, suggest resolution; when low, keep investigating ## RESPONSE FORMAT You MUST respond with ONLY a valid JSON object. No markdown, no prose, no code fences. Every response must have a "type" field: "question", "action", or "resolution_suggestion". {structured_output_schema} ## RULES - Maximum 5 options per question. Options should be the most likely scenarios. - Always include relevant context in context_message — explain WHY you're asking - confidence is a float 0.0-1.0 reflecting how certain you are about the diagnosis path - When multiple symptoms point to one root cause with >90% confidence, suggest resolution - If you detect the engineer needs a PowerShell script, suggest a script_generation action - Never suggest restarting or rebooting as a first step — diagnose first - Be specific: "Check Event Viewer > System > source NTFS" not "check the logs" {team_context} {matched_flow_context}\ """ INTAKE_CLASSIFICATION_PROMPT = """\ You are a triage classifier for IT support issues in an MSP environment. Analyze the following intake and respond with ONLY a JSON object: { "problem_summary": "One-line summary of the issue (max 120 chars)", "problem_domain": "One of: active_directory, networking, m365, hardware, endpoint, virtualization, security, backup, email, printing, cloud, other", "key_symptoms": ["symptom1", "symptom2"], "urgency": "low | medium | high | critical" }\ """ def _confidence_to_tier(confidence: float) -> str: """Map numeric confidence to tier label.""" if confidence >= 0.8: return "guided" elif confidence >= 0.4: return "exploring" return "discovery" def _parse_structured_output(raw_text: str) -> dict[str, Any]: """Parse and validate structured JSON from LLM response. Handles common LLM quirks: markdown fences, trailing commas, etc. """ text = raw_text.strip() # Strip markdown code fences if present if text.startswith("```"): lines = text.split("\n") # Remove first line (```json or ```) and last line (```) lines = [l for l in lines if not l.strip().startswith("```")] text = "\n".join(lines).strip() try: data = json.loads(text) except json.JSONDecodeError as e: logger.warning("Failed to parse LLM JSON output: %s — raw: %.200s", e, text) raise ValueError(f"Invalid JSON from LLM: {e}") from e if not isinstance(data, dict) or "type" not in data: raise ValueError("LLM response missing required 'type' field") valid_types = {"question", "action", "resolution_suggestion"} if data["type"] not in valid_types: raise ValueError(f"Unknown response type: {data['type']}") return data def _build_step_response(step: AISessionStep, session: AISession) -> AISessionStepResponse: """Convert a model step + session state into an API response.""" options = [] if step.options_presented: options = [ StepOptionSchema( label=opt.get("label", ""), value=opt.get("value", ""), followup_hint=opt.get("followup_hint"), ) for opt in step.options_presented ] content = step.content or {} return AISessionStepResponse( step_id=step.id, step_order=step.step_order, step_type=step.step_type, content=content, context_message=step.context_message, options=options, allow_free_text=content.get("allow_free_text", True), allow_skip=content.get("allow_skip", True), confidence_tier=session.confidence_tier, confidence_score=session.confidence_score, ) async def start_session( request: AISessionCreateRequest, user_id: UUID, account_id: UUID, team_id: Optional[UUID], db: AsyncSession, ) -> AISessionCreateResponse: """Start a new FlowPilot session: classify intake, match flows, get first step.""" # 1. Classify intake via fast LLM call intake_text = _extract_intake_text(request.intake_content) classification = await _classify_intake(intake_text) # 2. Try to match existing flows from app.services.flow_matching_engine import find_matches matches = await find_matches( intake_text=intake_text, problem_domain=classification.get("problem_domain"), account_id=account_id, db=db, ) top_match = matches[0] if matches else None matched_flow_id = top_match["tree_id"] if top_match else None match_score = top_match["score"] if top_match else None matched_flow_name = top_match["tree_name"] if top_match else None # 3. Build system prompt matched_flow_context = "" if top_match and top_match.get("score", 0) > 0.5: matched_flow_context = ( f"## MATCHED FLOW\n" f"A similar flow exists: \"{top_match['tree_name']}\" " f"(match score: {top_match['score']:.0%}). " f"Use it as a guide but adapt to the specific situation." ) system_prompt = FLOWPILOT_SYSTEM_PROMPT.format( structured_output_schema=STRUCTURED_OUTPUT_SCHEMA, team_context="", # Phase 2: team-specific context matched_flow_context=matched_flow_context, ) # 4. Build first user message from intake user_message = _format_intake_message(request.intake_content, classification) messages = [{"role": "user", "content": user_message}] # 5. Call LLM for first diagnostic step provider = get_ai_provider(settings.get_model_for_action("open_chat")) raw_response, input_tokens, output_tokens = await provider.generate_json( system_prompt=system_prompt, messages=messages, max_tokens=2048, ) # Parse with retry on failure try: parsed = _parse_structured_output(raw_response) except ValueError: # Retry once with nudge retry_messages = messages + [ {"role": "assistant", "content": raw_response}, {"role": "user", "content": "Please respond with ONLY valid JSON matching the required schema. No markdown or prose."}, ] raw_response, retry_in, retry_out = await provider.generate_json( system_prompt=system_prompt, messages=retry_messages, max_tokens=2048, ) input_tokens += retry_in output_tokens += retry_out parsed = _parse_structured_output(raw_response) confidence = parsed.get("confidence", 0.0) confidence_tier = _confidence_to_tier(confidence) # Initial confidence from match + classification if top_match and top_match.get("score", 0) > 0.8: confidence_tier = "guided" confidence = max(confidence, 0.8) # 6. Create session session = AISession( id=uuid.uuid4(), user_id=user_id, account_id=account_id, team_id=team_id, intake_type=request.intake_type, intake_content=request.intake_content, problem_summary=classification.get("problem_summary"), problem_domain=classification.get("problem_domain"), status="active", confidence_tier=confidence_tier, confidence_score=confidence, matched_flow_id=matched_flow_id, match_score=match_score, psa_ticket_id=request.psa_ticket_id, psa_connection_id=request.psa_connection_id, total_input_tokens=input_tokens, total_output_tokens=output_tokens, step_count=1, system_prompt_snapshot=system_prompt, conversation_messages=[ {"role": "user", "content": user_message}, {"role": "assistant", "content": raw_response}, ], ) db.add(session) # 7. Create first step step = _create_step_from_parsed( session_id=session.id, step_order=0, parsed=parsed, input_tokens=input_tokens, output_tokens=output_tokens, ) db.add(step) await db.flush() return AISessionCreateResponse( session_id=session.id, status=session.status, confidence_tier=session.confidence_tier, problem_summary=session.problem_summary, problem_domain=session.problem_domain, matched_flow_id=matched_flow_id, matched_flow_name=matched_flow_name, match_score=match_score, first_step=_build_step_response(step, session), ) async def process_response( session_id: UUID, request: StepResponseRequest, user_id: UUID, db: AsyncSession, ) -> StepResponseResponse: """Process an engineer's response and generate the next FlowPilot step.""" session = await _load_session(session_id, user_id, db) if session.status != "active": raise ValueError(f"Session is {session.status}, not active") if session.step_count >= MAX_STEPS_PER_SESSION: raise ValueError("Maximum steps reached for this session") # Update the current (latest) step with engineer's response latest_step = session.steps[-1] if session.steps else None if latest_step and latest_step.responded_at is None: latest_step.selected_option = request.selected_option latest_step.free_text_input = request.free_text_input latest_step.was_free_text = bool(request.free_text_input and not request.selected_option) latest_step.was_skipped = request.was_skipped latest_step.action_result = request.action_result latest_step.responded_at = datetime.now(timezone.utc) # Build the conversation message for the engineer's response response_text = _format_engineer_response(request) session.conversation_messages = session.conversation_messages + [ {"role": "user", "content": response_text} ] # Call LLM with full conversation provider = get_ai_provider(settings.get_model_for_action("open_chat")) raw_response, input_tokens, output_tokens = await provider.generate_json( system_prompt=session.system_prompt_snapshot or "", messages=session.conversation_messages, max_tokens=2048, ) try: parsed = _parse_structured_output(raw_response) except ValueError: retry_messages = session.conversation_messages + [ {"role": "assistant", "content": raw_response}, {"role": "user", "content": "Please respond with ONLY valid JSON matching the required schema."}, ] raw_response, retry_in, retry_out = await provider.generate_json( system_prompt=session.system_prompt_snapshot or "", messages=retry_messages, max_tokens=2048, ) input_tokens += retry_in output_tokens += retry_out parsed = _parse_structured_output(raw_response) # Append assistant response to conversation session.conversation_messages = session.conversation_messages + [ {"role": "assistant", "content": raw_response} ] # Update session confidence confidence = parsed.get("confidence", session.confidence_score) session.confidence_score = confidence session.confidence_tier = _confidence_to_tier(confidence) session.total_input_tokens += input_tokens session.total_output_tokens += output_tokens session.step_count += 1 # Create new step step = _create_step_from_parsed( session_id=session.id, step_order=session.step_count - 1, parsed=parsed, input_tokens=input_tokens, output_tokens=output_tokens, ) db.add(step) await db.flush() # Check if resolution was suggested resolution_suggested = parsed["type"] == "resolution_suggestion" resolution_summary = parsed.get("resolution_summary") if resolution_suggested else None return StepResponseResponse( session_id=session.id, status=session.status, confidence_tier=session.confidence_tier, confidence_score=session.confidence_score, next_step=_build_step_response(step, session), resolution_suggested=resolution_suggested, resolution_summary=resolution_summary, ) async def resolve_session( session_id: UUID, request: ResolveSessionRequest, user_id: UUID, db: AsyncSession, ) -> SessionCloseResponse: """Close a session as resolved and generate documentation.""" session = await _load_session(session_id, user_id, db) if session.status not in ("active", "paused"): raise ValueError(f"Cannot resolve session in status: {session.status}") session.status = "resolved" session.resolved_at = datetime.now(timezone.utc) session.resolution_summary = request.resolution_summary session.resolution_action = request.resolution_action if request.session_rating is not None: session.session_rating = request.session_rating if request.session_feedback is not None: session.session_feedback = request.session_feedback documentation = _generate_documentation(session) await db.flush() return SessionCloseResponse( session_id=session.id, status=session.status, documentation=documentation, ) async def escalate_session( session_id: UUID, request: EscalateSessionRequest, user_id: UUID, db: AsyncSession, ) -> SessionCloseResponse: """Escalate a session to another engineer.""" session = await _load_session(session_id, user_id, db) if session.status not in ("active", "paused"): raise ValueError(f"Cannot escalate session in status: {session.status}") session.status = "escalated" session.resolved_at = datetime.now(timezone.utc) session.escalation_reason = request.escalation_reason session.escalated_to_id = request.escalated_to_id # Build escalation package session.escalation_package = _build_escalation_package(session) documentation = _generate_documentation(session) await db.flush() return SessionCloseResponse( session_id=session.id, status=session.status, documentation=documentation, ) async def rate_session( session_id: UUID, rating: int, feedback: Optional[str], user_id: UUID, db: AsyncSession, ) -> None: """Submit post-session rating.""" session = await _load_session(session_id, user_id, db) session.session_rating = rating session.session_feedback = feedback await db.flush() async def get_session_documentation( session_id: UUID, user_id: UUID, db: AsyncSession, ) -> SessionDocumentation: """Get auto-generated documentation for a session.""" session = await _load_session(session_id, user_id, db) return _generate_documentation(session) # ── Internal helpers ── async def _load_session( session_id: UUID, user_id: UUID, db: AsyncSession, ) -> AISession: """Load session with steps, verifying ownership.""" result = await db.execute( select(AISession) .options(selectinload(AISession.steps)) .where(AISession.id == session_id) ) session = result.scalar_one_or_none() if not session: raise ValueError("Session not found") # Allow access if user is the session owner or the escalation target if session.user_id != user_id and session.escalated_to_id != user_id: raise PermissionError("Not authorized to access this session") return session async def _classify_intake(intake_text: str) -> dict[str, Any]: """Quick LLM call to classify intake content.""" try: provider = get_ai_provider(settings.get_model_for_action("quick_action")) raw, _, _ = await provider.generate_json( system_prompt=INTAKE_CLASSIFICATION_PROMPT, messages=[{"role": "user", "content": intake_text}], max_tokens=512, ) return json.loads(raw.strip()) except Exception as e: logger.warning("Intake classification failed: %s", e) return { "problem_summary": intake_text[:120], "problem_domain": "other", "key_symptoms": [], "urgency": "medium", } def _extract_intake_text(intake_content: dict[str, Any]) -> str: """Extract searchable text from intake content.""" parts = [] if text := intake_content.get("text"): parts.append(text) if log := intake_content.get("log_content"): parts.append(f"Log output:\n{log}") if ticket := intake_content.get("ticket_data"): if isinstance(ticket, dict): parts.append(f"Ticket: {ticket.get('summary', '')}") return "\n\n".join(parts) if parts else str(intake_content) def _format_intake_message( intake_content: dict[str, Any], classification: dict[str, Any], ) -> str: """Format intake + classification into the first user message.""" parts = ["I need help troubleshooting an issue."] if text := intake_content.get("text"): parts.append(f"\n**Problem description:**\n{text}") if log := intake_content.get("log_content"): parts.append(f"\n**Log output:**\n```\n{log}\n```") if summary := classification.get("problem_summary"): parts.append(f"\n**Classified as:** {summary}") if domain := classification.get("problem_domain"): parts.append(f"**Domain:** {domain}") symptoms = classification.get("key_symptoms", []) if symptoms: parts.append(f"**Key symptoms:** {', '.join(symptoms)}") return "\n".join(parts) def _format_engineer_response(request: StepResponseRequest) -> str: """Format engineer's step response into a conversation message.""" if request.was_skipped: return "I can't check this right now / I don't know." parts = [] if request.selected_option: parts.append(f"Selected: {request.selected_option}") if request.free_text_input: parts.append(request.free_text_input) if request.action_result: result = request.action_result success = "succeeded" if result.get("success") else "did not work" parts.append(f"Action {success}.") if details := result.get("details"): parts.append(f"Details: {details}") return "\n".join(parts) if parts else "No response provided." def _create_step_from_parsed( session_id: UUID, step_order: int, parsed: dict[str, Any], input_tokens: int, output_tokens: int, ) -> AISessionStep: """Create an AISessionStep from parsed LLM output.""" step_type = parsed["type"] if step_type == "resolution_suggestion": step_type = "action" # Store as action in DB, UI distinguishes via content # Build content dict (everything the UI needs to render) content = { "text": parsed.get("content", ""), "type": parsed["type"], } if parsed["type"] == "action": content["action_type"] = parsed.get("action_type", "instruction") content["expected_outcome"] = parsed.get("expected_outcome") elif parsed["type"] == "resolution_suggestion": content["resolution_summary"] = parsed.get("resolution_summary") content["follow_up_recommendations"] = parsed.get("follow_up_recommendations", []) content["allow_free_text"] = False content["allow_skip"] = False # Extract options for question type options = None if parsed["type"] == "question" and "options" in parsed: options = parsed["options"] content["allow_free_text"] = parsed.get("allow_free_text", True) content["allow_skip"] = parsed.get("allow_skip", True) return AISessionStep( id=uuid.uuid4(), session_id=session_id, step_order=step_order, step_type=step_type if parsed["type"] != "resolution_suggestion" else "action", content=content, context_message=parsed.get("context_message"), options_presented=options, confidence_at_step=parsed.get("confidence", 0.0), ai_reasoning=parsed.get("reasoning"), input_tokens=input_tokens, output_tokens=output_tokens, ) def _generate_documentation(session: AISession) -> SessionDocumentation: """Generate structured documentation from a session's steps.""" diagnostic_steps = [] for step in session.steps: content = step.content or {} description = content.get("text", "") # Determine engineer response engineer_response = None if step.was_skipped: engineer_response = "Skipped" elif step.selected_option: # Find the label for the selected option if step.options_presented: for opt in step.options_presented: if opt.get("value") == step.selected_option: engineer_response = opt.get("label", step.selected_option) break else: engineer_response = step.selected_option else: engineer_response = step.selected_option elif step.free_text_input: engineer_response = step.free_text_input # Determine outcome outcome = None if step.action_result: result = step.action_result outcome = "Succeeded" if result.get("success") else "Did not resolve" if details := result.get("details"): outcome += f" — {details}" diagnostic_steps.append(DocumentationStep( step_number=step.step_order + 1, step_type=step.step_type, description=description, engineer_response=engineer_response, outcome=outcome, )) # Calculate duration duration_display = None if session.resolved_at and session.created_at: delta = session.resolved_at - session.created_at minutes = int(delta.total_seconds() / 60) if minutes < 60: duration_display = f"{minutes}m" else: hours = minutes // 60 remaining = minutes % 60 duration_display = f"{hours}h {remaining}m" # Build intake summary intake = session.intake_content or {} intake_summary = intake.get("text", "")[:500] if not intake_summary: intake_summary = str(intake)[:500] return SessionDocumentation( problem_summary=session.problem_summary or "No summary available", problem_domain=session.problem_domain, intake_summary=intake_summary, diagnostic_steps=diagnostic_steps, resolution_summary=session.resolution_summary, escalation_reason=session.escalation_reason, total_steps=session.step_count, duration_display=duration_display, generated_at=datetime.now(timezone.utc), ) def _build_escalation_package(session: AISession) -> dict[str, Any]: """Build context package for the receiving engineer.""" steps_tried = [] for step in session.steps: content = step.content or {} entry = { "step_type": step.step_type, "description": content.get("text", ""), } if step.selected_option: entry["response"] = step.selected_option elif step.free_text_input: entry["response"] = step.free_text_input elif step.was_skipped: entry["response"] = "Skipped" if step.action_result: entry["action_result"] = step.action_result steps_tried.append(entry) return { "problem_summary": session.problem_summary, "problem_domain": session.problem_domain, "intake_content": session.intake_content, "confidence_at_escalation": session.confidence_score, "steps_tried": steps_tried, "escalation_reason": session.escalation_reason, }