Files
resolutionflow/backend/app/services/flowpilot_engine.py
chihlasm 5494816b06 feat(ai-session): add FlowPilot AI-powered troubleshooting sessions
Implements Phase 1 of the FlowPilot-First pivot — the core AI session
experience where engineers describe a problem and FlowPilot guides them
through structured diagnosis with selectable options, free-text escape
hatches, and auto-generated documentation on resolution.

Backend: AISession + AISessionStep models, FlowPilot Engine (LLM
orchestration with structured JSON output), Flow Matching Engine v1
(semantic + keyword + recency scoring), 8 API endpoints with auth,
rate limiting, and AI quota enforcement.

Frontend: Intake screen, conversational session view with sidebar,
step cards with options/actions/resolution suggestions, resolve/escalate
modals, documentation view with rating, session history integration,
and /pilot route with sidebar navigation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 14:27:36 +00:00

738 lines
26 KiB
Python

"""FlowPilot Engine — core LLM orchestration for AI troubleshooting sessions.
Manages structured diagnostic conversations: intake analysis, step generation,
confidence tracking, and auto-documentation. All LLM responses are structured
JSON validated against known output shapes.
"""
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Optional
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.ai_provider import get_ai_provider
from app.core.config import settings
from app.models.ai_session import AISession
from app.models.ai_session_step import AISessionStep
from app.schemas.ai_session import (
AISessionCreateRequest,
AISessionCreateResponse,
AISessionStepResponse,
StepOptionSchema,
StepResponseRequest,
StepResponseResponse,
ResolveSessionRequest,
EscalateSessionRequest,
SessionCloseResponse,
SessionDocumentation,
DocumentationStep,
)
logger = logging.getLogger(__name__)
# Maximum steps per session as a safety limit
MAX_STEPS_PER_SESSION = 30
STRUCTURED_OUTPUT_SCHEMA = """\
Your response MUST be a valid JSON object with one of these shapes:
1. Diagnostic question:
{"type": "question", "content": "Brief description", "reasoning": "Internal why", "context_message": "Shown to engineer", "options": [{"label": "Human text", "value": "machine_value", "followup_hint": "or null"}], "allow_free_text": true, "allow_skip": true, "confidence": 0.65}
2. Suggested action:
{"type": "action", "content": "What to do", "reasoning": "Internal why", "context_message": "Here's what to try", "action_type": "instruction | script_generation | verification | info_request", "expected_outcome": "What success looks like", "confidence": 0.78}
3. Resolution suggestion:
{"type": "resolution_suggestion", "content": "Summary of what we did", "reasoning": "Internal why", "resolution_summary": "Issue was caused by X, fixed by Y", "confidence": 0.92, "follow_up_recommendations": ["Monitor for 24 hours"]}\
"""
FLOWPILOT_SYSTEM_PROMPT = """\
You are FlowPilot, an expert MSP troubleshooting assistant embedded in ResolutionFlow. You guide engineers through structured diagnosis of IT issues.
## YOUR ROLE
- Conduct systematic troubleshooting through targeted questions and actions
- Start broad, narrow down based on responses
- Never guess — ask clarifying questions when uncertain
- Suggest specific, actionable steps the engineer can verify
- When confidence is high, suggest resolution; when low, keep investigating
## RESPONSE FORMAT
You MUST respond with ONLY a valid JSON object. No markdown, no prose, no code fences.
Every response must have a "type" field: "question", "action", or "resolution_suggestion".
{structured_output_schema}
## RULES
- Maximum 5 options per question. Options should be the most likely scenarios.
- Always include relevant context in context_message — explain WHY you're asking
- confidence is a float 0.0-1.0 reflecting how certain you are about the diagnosis path
- When multiple symptoms point to one root cause with >90% confidence, suggest resolution
- If you detect the engineer needs a PowerShell script, suggest a script_generation action
- Never suggest restarting or rebooting as a first step — diagnose first
- Be specific: "Check Event Viewer > System > source NTFS" not "check the logs"
{team_context}
{matched_flow_context}\
"""
INTAKE_CLASSIFICATION_PROMPT = """\
You are a triage classifier for IT support issues in an MSP environment.
Analyze the following intake and respond with ONLY a JSON object:
{
"problem_summary": "One-line summary of the issue (max 120 chars)",
"problem_domain": "One of: active_directory, networking, m365, hardware, endpoint, virtualization, security, backup, email, printing, cloud, other",
"key_symptoms": ["symptom1", "symptom2"],
"urgency": "low | medium | high | critical"
}\
"""
def _confidence_to_tier(confidence: float) -> str:
"""Map numeric confidence to tier label."""
if confidence >= 0.8:
return "guided"
elif confidence >= 0.4:
return "exploring"
return "discovery"
def _parse_structured_output(raw_text: str) -> dict[str, Any]:
"""Parse and validate structured JSON from LLM response.
Handles common LLM quirks: markdown fences, trailing commas, etc.
"""
text = raw_text.strip()
# Strip markdown code fences if present
if text.startswith("```"):
lines = text.split("\n")
# Remove first line (```json or ```) and last line (```)
lines = [l for l in lines if not l.strip().startswith("```")]
text = "\n".join(lines).strip()
try:
data = json.loads(text)
except json.JSONDecodeError as e:
logger.warning("Failed to parse LLM JSON output: %s — raw: %.200s", e, text)
raise ValueError(f"Invalid JSON from LLM: {e}") from e
if not isinstance(data, dict) or "type" not in data:
raise ValueError("LLM response missing required 'type' field")
valid_types = {"question", "action", "resolution_suggestion"}
if data["type"] not in valid_types:
raise ValueError(f"Unknown response type: {data['type']}")
return data
def _build_step_response(step: AISessionStep, session: AISession) -> AISessionStepResponse:
"""Convert a model step + session state into an API response."""
options = []
if step.options_presented:
options = [
StepOptionSchema(
label=opt.get("label", ""),
value=opt.get("value", ""),
followup_hint=opt.get("followup_hint"),
)
for opt in step.options_presented
]
content = step.content or {}
return AISessionStepResponse(
step_id=step.id,
step_order=step.step_order,
step_type=step.step_type,
content=content,
context_message=step.context_message,
options=options,
allow_free_text=content.get("allow_free_text", True),
allow_skip=content.get("allow_skip", True),
confidence_tier=session.confidence_tier,
confidence_score=session.confidence_score,
)
async def start_session(
request: AISessionCreateRequest,
user_id: UUID,
account_id: UUID,
team_id: Optional[UUID],
db: AsyncSession,
) -> AISessionCreateResponse:
"""Start a new FlowPilot session: classify intake, match flows, get first step."""
# 1. Classify intake via fast LLM call
intake_text = _extract_intake_text(request.intake_content)
classification = await _classify_intake(intake_text)
# 2. Try to match existing flows
from app.services.flow_matching_engine import find_matches
matches = await find_matches(
intake_text=intake_text,
problem_domain=classification.get("problem_domain"),
account_id=account_id,
db=db,
)
top_match = matches[0] if matches else None
matched_flow_id = top_match["tree_id"] if top_match else None
match_score = top_match["score"] if top_match else None
matched_flow_name = top_match["tree_name"] if top_match else None
# 3. Build system prompt
matched_flow_context = ""
if top_match and top_match.get("score", 0) > 0.5:
matched_flow_context = (
f"## MATCHED FLOW\n"
f"A similar flow exists: \"{top_match['tree_name']}\" "
f"(match score: {top_match['score']:.0%}). "
f"Use it as a guide but adapt to the specific situation."
)
system_prompt = FLOWPILOT_SYSTEM_PROMPT.format(
structured_output_schema=STRUCTURED_OUTPUT_SCHEMA,
team_context="", # Phase 2: team-specific context
matched_flow_context=matched_flow_context,
)
# 4. Build first user message from intake
user_message = _format_intake_message(request.intake_content, classification)
messages = [{"role": "user", "content": user_message}]
# 5. Call LLM for first diagnostic step
provider = get_ai_provider(settings.get_model_for_action("open_chat"))
raw_response, input_tokens, output_tokens = await provider.generate_json(
system_prompt=system_prompt,
messages=messages,
max_tokens=2048,
)
# Parse with retry on failure
try:
parsed = _parse_structured_output(raw_response)
except ValueError:
# Retry once with nudge
retry_messages = messages + [
{"role": "assistant", "content": raw_response},
{"role": "user", "content": "Please respond with ONLY valid JSON matching the required schema. No markdown or prose."},
]
raw_response, retry_in, retry_out = await provider.generate_json(
system_prompt=system_prompt,
messages=retry_messages,
max_tokens=2048,
)
input_tokens += retry_in
output_tokens += retry_out
parsed = _parse_structured_output(raw_response)
confidence = parsed.get("confidence", 0.0)
confidence_tier = _confidence_to_tier(confidence)
# Initial confidence from match + classification
if top_match and top_match.get("score", 0) > 0.8:
confidence_tier = "guided"
confidence = max(confidence, 0.8)
# 6. Create session
session = AISession(
id=uuid.uuid4(),
user_id=user_id,
account_id=account_id,
team_id=team_id,
intake_type=request.intake_type,
intake_content=request.intake_content,
problem_summary=classification.get("problem_summary"),
problem_domain=classification.get("problem_domain"),
status="active",
confidence_tier=confidence_tier,
confidence_score=confidence,
matched_flow_id=matched_flow_id,
match_score=match_score,
psa_ticket_id=request.psa_ticket_id,
psa_connection_id=request.psa_connection_id,
total_input_tokens=input_tokens,
total_output_tokens=output_tokens,
step_count=1,
system_prompt_snapshot=system_prompt,
conversation_messages=[
{"role": "user", "content": user_message},
{"role": "assistant", "content": raw_response},
],
)
db.add(session)
# 7. Create first step
step = _create_step_from_parsed(
session_id=session.id,
step_order=0,
parsed=parsed,
input_tokens=input_tokens,
output_tokens=output_tokens,
)
db.add(step)
await db.flush()
return AISessionCreateResponse(
session_id=session.id,
status=session.status,
confidence_tier=session.confidence_tier,
problem_summary=session.problem_summary,
problem_domain=session.problem_domain,
matched_flow_id=matched_flow_id,
matched_flow_name=matched_flow_name,
match_score=match_score,
first_step=_build_step_response(step, session),
)
async def process_response(
session_id: UUID,
request: StepResponseRequest,
user_id: UUID,
db: AsyncSession,
) -> StepResponseResponse:
"""Process an engineer's response and generate the next FlowPilot step."""
session = await _load_session(session_id, user_id, db)
if session.status != "active":
raise ValueError(f"Session is {session.status}, not active")
if session.step_count >= MAX_STEPS_PER_SESSION:
raise ValueError("Maximum steps reached for this session")
# Update the current (latest) step with engineer's response
latest_step = session.steps[-1] if session.steps else None
if latest_step and latest_step.responded_at is None:
latest_step.selected_option = request.selected_option
latest_step.free_text_input = request.free_text_input
latest_step.was_free_text = bool(request.free_text_input and not request.selected_option)
latest_step.was_skipped = request.was_skipped
latest_step.action_result = request.action_result
latest_step.responded_at = datetime.now(timezone.utc)
# Build the conversation message for the engineer's response
response_text = _format_engineer_response(request)
session.conversation_messages = session.conversation_messages + [
{"role": "user", "content": response_text}
]
# Call LLM with full conversation
provider = get_ai_provider(settings.get_model_for_action("open_chat"))
raw_response, input_tokens, output_tokens = await provider.generate_json(
system_prompt=session.system_prompt_snapshot or "",
messages=session.conversation_messages,
max_tokens=2048,
)
try:
parsed = _parse_structured_output(raw_response)
except ValueError:
retry_messages = session.conversation_messages + [
{"role": "assistant", "content": raw_response},
{"role": "user", "content": "Please respond with ONLY valid JSON matching the required schema."},
]
raw_response, retry_in, retry_out = await provider.generate_json(
system_prompt=session.system_prompt_snapshot or "",
messages=retry_messages,
max_tokens=2048,
)
input_tokens += retry_in
output_tokens += retry_out
parsed = _parse_structured_output(raw_response)
# Append assistant response to conversation
session.conversation_messages = session.conversation_messages + [
{"role": "assistant", "content": raw_response}
]
# Update session confidence
confidence = parsed.get("confidence", session.confidence_score)
session.confidence_score = confidence
session.confidence_tier = _confidence_to_tier(confidence)
session.total_input_tokens += input_tokens
session.total_output_tokens += output_tokens
session.step_count += 1
# Create new step
step = _create_step_from_parsed(
session_id=session.id,
step_order=session.step_count - 1,
parsed=parsed,
input_tokens=input_tokens,
output_tokens=output_tokens,
)
db.add(step)
await db.flush()
# Check if resolution was suggested
resolution_suggested = parsed["type"] == "resolution_suggestion"
resolution_summary = parsed.get("resolution_summary") if resolution_suggested else None
return StepResponseResponse(
session_id=session.id,
status=session.status,
confidence_tier=session.confidence_tier,
confidence_score=session.confidence_score,
next_step=_build_step_response(step, session),
resolution_suggested=resolution_suggested,
resolution_summary=resolution_summary,
)
async def resolve_session(
session_id: UUID,
request: ResolveSessionRequest,
user_id: UUID,
db: AsyncSession,
) -> SessionCloseResponse:
"""Close a session as resolved and generate documentation."""
session = await _load_session(session_id, user_id, db)
if session.status not in ("active", "paused"):
raise ValueError(f"Cannot resolve session in status: {session.status}")
session.status = "resolved"
session.resolved_at = datetime.now(timezone.utc)
session.resolution_summary = request.resolution_summary
session.resolution_action = request.resolution_action
if request.session_rating is not None:
session.session_rating = request.session_rating
if request.session_feedback is not None:
session.session_feedback = request.session_feedback
documentation = _generate_documentation(session)
await db.flush()
return SessionCloseResponse(
session_id=session.id,
status=session.status,
documentation=documentation,
)
async def escalate_session(
session_id: UUID,
request: EscalateSessionRequest,
user_id: UUID,
db: AsyncSession,
) -> SessionCloseResponse:
"""Escalate a session to another engineer."""
session = await _load_session(session_id, user_id, db)
if session.status not in ("active", "paused"):
raise ValueError(f"Cannot escalate session in status: {session.status}")
session.status = "escalated"
session.resolved_at = datetime.now(timezone.utc)
session.escalation_reason = request.escalation_reason
session.escalated_to_id = request.escalated_to_id
# Build escalation package
session.escalation_package = _build_escalation_package(session)
documentation = _generate_documentation(session)
await db.flush()
return SessionCloseResponse(
session_id=session.id,
status=session.status,
documentation=documentation,
)
async def rate_session(
session_id: UUID,
rating: int,
feedback: Optional[str],
user_id: UUID,
db: AsyncSession,
) -> None:
"""Submit post-session rating."""
session = await _load_session(session_id, user_id, db)
session.session_rating = rating
session.session_feedback = feedback
await db.flush()
async def get_session_documentation(
session_id: UUID,
user_id: UUID,
db: AsyncSession,
) -> SessionDocumentation:
"""Get auto-generated documentation for a session."""
session = await _load_session(session_id, user_id, db)
return _generate_documentation(session)
# ── Internal helpers ──
async def _load_session(
session_id: UUID,
user_id: UUID,
db: AsyncSession,
) -> AISession:
"""Load session with steps, verifying ownership."""
result = await db.execute(
select(AISession)
.options(selectinload(AISession.steps))
.where(AISession.id == session_id)
)
session = result.scalar_one_or_none()
if not session:
raise ValueError("Session not found")
# Allow access if user is the session owner or the escalation target
if session.user_id != user_id and session.escalated_to_id != user_id:
raise PermissionError("Not authorized to access this session")
return session
async def _classify_intake(intake_text: str) -> dict[str, Any]:
"""Quick LLM call to classify intake content."""
try:
provider = get_ai_provider(settings.get_model_for_action("quick_action"))
raw, _, _ = await provider.generate_json(
system_prompt=INTAKE_CLASSIFICATION_PROMPT,
messages=[{"role": "user", "content": intake_text}],
max_tokens=512,
)
return json.loads(raw.strip())
except Exception as e:
logger.warning("Intake classification failed: %s", e)
return {
"problem_summary": intake_text[:120],
"problem_domain": "other",
"key_symptoms": [],
"urgency": "medium",
}
def _extract_intake_text(intake_content: dict[str, Any]) -> str:
"""Extract searchable text from intake content."""
parts = []
if text := intake_content.get("text"):
parts.append(text)
if log := intake_content.get("log_content"):
parts.append(f"Log output:\n{log}")
if ticket := intake_content.get("ticket_data"):
if isinstance(ticket, dict):
parts.append(f"Ticket: {ticket.get('summary', '')}")
return "\n\n".join(parts) if parts else str(intake_content)
def _format_intake_message(
intake_content: dict[str, Any],
classification: dict[str, Any],
) -> str:
"""Format intake + classification into the first user message."""
parts = ["I need help troubleshooting an issue."]
if text := intake_content.get("text"):
parts.append(f"\n**Problem description:**\n{text}")
if log := intake_content.get("log_content"):
parts.append(f"\n**Log output:**\n```\n{log}\n```")
if summary := classification.get("problem_summary"):
parts.append(f"\n**Classified as:** {summary}")
if domain := classification.get("problem_domain"):
parts.append(f"**Domain:** {domain}")
symptoms = classification.get("key_symptoms", [])
if symptoms:
parts.append(f"**Key symptoms:** {', '.join(symptoms)}")
return "\n".join(parts)
def _format_engineer_response(request: StepResponseRequest) -> str:
"""Format engineer's step response into a conversation message."""
if request.was_skipped:
return "I can't check this right now / I don't know."
parts = []
if request.selected_option:
parts.append(f"Selected: {request.selected_option}")
if request.free_text_input:
parts.append(request.free_text_input)
if request.action_result:
result = request.action_result
success = "succeeded" if result.get("success") else "did not work"
parts.append(f"Action {success}.")
if details := result.get("details"):
parts.append(f"Details: {details}")
return "\n".join(parts) if parts else "No response provided."
def _create_step_from_parsed(
session_id: UUID,
step_order: int,
parsed: dict[str, Any],
input_tokens: int,
output_tokens: int,
) -> AISessionStep:
"""Create an AISessionStep from parsed LLM output."""
step_type = parsed["type"]
if step_type == "resolution_suggestion":
step_type = "action" # Store as action in DB, UI distinguishes via content
# Build content dict (everything the UI needs to render)
content = {
"text": parsed.get("content", ""),
"type": parsed["type"],
}
if parsed["type"] == "action":
content["action_type"] = parsed.get("action_type", "instruction")
content["expected_outcome"] = parsed.get("expected_outcome")
elif parsed["type"] == "resolution_suggestion":
content["resolution_summary"] = parsed.get("resolution_summary")
content["follow_up_recommendations"] = parsed.get("follow_up_recommendations", [])
content["allow_free_text"] = False
content["allow_skip"] = False
# Extract options for question type
options = None
if parsed["type"] == "question" and "options" in parsed:
options = parsed["options"]
content["allow_free_text"] = parsed.get("allow_free_text", True)
content["allow_skip"] = parsed.get("allow_skip", True)
return AISessionStep(
id=uuid.uuid4(),
session_id=session_id,
step_order=step_order,
step_type=step_type if parsed["type"] != "resolution_suggestion" else "action",
content=content,
context_message=parsed.get("context_message"),
options_presented=options,
confidence_at_step=parsed.get("confidence", 0.0),
ai_reasoning=parsed.get("reasoning"),
input_tokens=input_tokens,
output_tokens=output_tokens,
)
def _generate_documentation(session: AISession) -> SessionDocumentation:
"""Generate structured documentation from a session's steps."""
diagnostic_steps = []
for step in session.steps:
content = step.content or {}
description = content.get("text", "")
# Determine engineer response
engineer_response = None
if step.was_skipped:
engineer_response = "Skipped"
elif step.selected_option:
# Find the label for the selected option
if step.options_presented:
for opt in step.options_presented:
if opt.get("value") == step.selected_option:
engineer_response = opt.get("label", step.selected_option)
break
else:
engineer_response = step.selected_option
else:
engineer_response = step.selected_option
elif step.free_text_input:
engineer_response = step.free_text_input
# Determine outcome
outcome = None
if step.action_result:
result = step.action_result
outcome = "Succeeded" if result.get("success") else "Did not resolve"
if details := result.get("details"):
outcome += f"{details}"
diagnostic_steps.append(DocumentationStep(
step_number=step.step_order + 1,
step_type=step.step_type,
description=description,
engineer_response=engineer_response,
outcome=outcome,
))
# Calculate duration
duration_display = None
if session.resolved_at and session.created_at:
delta = session.resolved_at - session.created_at
minutes = int(delta.total_seconds() / 60)
if minutes < 60:
duration_display = f"{minutes}m"
else:
hours = minutes // 60
remaining = minutes % 60
duration_display = f"{hours}h {remaining}m"
# Build intake summary
intake = session.intake_content or {}
intake_summary = intake.get("text", "")[:500]
if not intake_summary:
intake_summary = str(intake)[:500]
return SessionDocumentation(
problem_summary=session.problem_summary or "No summary available",
problem_domain=session.problem_domain,
intake_summary=intake_summary,
diagnostic_steps=diagnostic_steps,
resolution_summary=session.resolution_summary,
escalation_reason=session.escalation_reason,
total_steps=session.step_count,
duration_display=duration_display,
generated_at=datetime.now(timezone.utc),
)
def _build_escalation_package(session: AISession) -> dict[str, Any]:
"""Build context package for the receiving engineer."""
steps_tried = []
for step in session.steps:
content = step.content or {}
entry = {
"step_type": step.step_type,
"description": content.get("text", ""),
}
if step.selected_option:
entry["response"] = step.selected_option
elif step.free_text_input:
entry["response"] = step.free_text_input
elif step.was_skipped:
entry["response"] = "Skipped"
if step.action_result:
entry["action_result"] = step.action_result
steps_tried.append(entry)
return {
"problem_summary": session.problem_summary,
"problem_domain": session.problem_domain,
"intake_content": session.intake_content,
"confidence_at_escalation": session.confidence_score,
"steps_tried": steps_tried,
"escalation_reason": session.escalation_reason,
}