Files
resolutionflow/backend/app/services/flowpilot_engine.py
chihlasm 79358be90f fix(escalation): use User.name not display_name — attribute doesn't exist
User model has 'name', not 'display_name'. Fixed in flowpilot_engine
(escalate notify + pickup briefing) and psa_documentation_service
(engineer name in exported docs).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 04:09:54 +00:00

1239 lines
45 KiB
Python

"""FlowPilot Engine — core LLM orchestration for AI troubleshooting sessions.
Manages structured diagnostic conversations: intake analysis, step generation,
confidence tracking, and auto-documentation. All LLM responses are structured
JSON validated against known output shapes.
"""
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Optional
from uuid import UUID
from sqlalchemy import select, func, or_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.ai_provider import get_ai_provider
from app.core.config import settings
from app.services.llm_utils import parse_llm_json
from app.services.notification_service import notify
from app.models.ai_session import AISession
from app.models.ai_session_step import AISessionStep
from app.models.tree import Tree
from app.schemas.ai_session import (
AISessionCreateRequest,
AISessionCreateResponse,
AISessionStepResponse,
StepOptionSchema,
StepResponseRequest,
StepResponseResponse,
ResolveSessionRequest,
EscalateSessionRequest,
SessionCloseResponse,
SessionDocumentation,
DocumentationStep,
)
logger = logging.getLogger(__name__)
# Maximum steps per session as a safety limit
MAX_STEPS_PER_SESSION = 30
STRUCTURED_OUTPUT_SCHEMA = """\
Your response MUST be a valid JSON object with one of these shapes:
1. Diagnostic question:
{"type": "question", "content": "Brief description", "reasoning": "Internal why", "context_message": "Shown to engineer", "options": [{"label": "Human text", "value": "machine_value", "followup_hint": "or null"}], "allow_free_text": true, "allow_skip": true, "confidence": 0.65}
2. Suggested action:
{"type": "action", "content": "What to do", "reasoning": "Internal why", "context_message": "Here's what to try", "action_type": "instruction | script_generation | verification | info_request", "expected_outcome": "What success looks like", "confidence": 0.78}
3. Resolution suggestion:
{"type": "resolution_suggestion", "content": "Summary of what we did", "reasoning": "Internal why", "resolution_summary": "Issue was caused by X, fixed by Y", "confidence": 0.92, "follow_up_recommendations": ["Monitor for 24 hours"]}\
"""
FLOWPILOT_SYSTEM_PROMPT = """\
You are FlowPilot, an expert MSP troubleshooting assistant embedded in ResolutionFlow. You guide engineers through structured diagnosis of IT issues.
## YOUR ROLE
- Conduct systematic troubleshooting through targeted questions and actions
- Start broad, narrow down based on responses
- Never guess — ask clarifying questions when uncertain
- Suggest specific, actionable steps the engineer can verify
- When confidence is high, suggest resolution; when low, keep investigating
## RESPONSE FORMAT
You MUST respond with ONLY a valid JSON object. No markdown, no prose, no code fences.
Every response must have a "type" field: "question", "action", or "resolution_suggestion".
{structured_output_schema}
## RULES
- Maximum 5 options per question. Options should be the most likely scenarios.
- Always include relevant context in context_message — explain WHY you're asking
- confidence is a float 0.0-1.0 reflecting how certain you are about the diagnosis path
- When multiple symptoms point to one root cause with >90% confidence, suggest resolution
- If you detect the engineer needs a PowerShell script, suggest a script_generation action
- Never suggest restarting or rebooting as a first step — diagnose first
- Be specific: "Check Event Viewer > System > source NTFS" not "check the logs"
{team_context}
{matched_flow_context}\
"""
INTAKE_CLASSIFICATION_PROMPT = """\
You are a triage classifier for IT support issues in an MSP environment.
Analyze the following intake and respond with ONLY a JSON object:
{
"problem_summary": "One-line summary of the issue (max 120 chars)",
"problem_domain": "One of: active_directory, networking, m365, hardware, endpoint, virtualization, security, backup, email, printing, cloud, other",
"key_symptoms": ["symptom1", "symptom2"],
"urgency": "low | medium | high | critical"
}\
"""
def _confidence_to_tier(confidence: float) -> str:
"""Map numeric confidence to tier label."""
if confidence >= 0.8:
return "guided"
elif confidence >= 0.4:
return "exploring"
return "discovery"
def _parse_structured_output(raw_text: str) -> dict[str, Any]:
"""Parse and validate structured JSON from LLM response.
Uses shared parse_llm_json for fence stripping and JSON parsing,
then validates FlowPilot-specific output shape.
"""
data = parse_llm_json(raw_text)
if not isinstance(data, dict) or "type" not in data:
raise ValueError("LLM response missing required 'type' field")
valid_types = {"question", "action", "resolution_suggestion"}
if data["type"] not in valid_types:
raise ValueError(f"Unknown response type: {data['type']}")
return data
def _build_step_response(step: AISessionStep, session: AISession) -> AISessionStepResponse:
"""Convert a model step + session state into an API response."""
options = []
if step.options_presented:
options = [
StepOptionSchema(
label=opt.get("label", ""),
value=opt.get("value", ""),
followup_hint=opt.get("followup_hint"),
)
for opt in step.options_presented
]
content = step.content or {}
return AISessionStepResponse(
step_id=step.id,
step_order=step.step_order,
step_type=step.step_type,
content=content,
context_message=step.context_message,
options=options,
allow_free_text=content.get("allow_free_text", True),
allow_skip=content.get("allow_skip", True),
confidence_tier=session.confidence_tier,
confidence_score=session.confidence_score,
)
async def start_session(
request: AISessionCreateRequest,
user_id: UUID,
account_id: UUID,
team_id: Optional[UUID],
db: AsyncSession,
) -> AISessionCreateResponse:
"""Start a new FlowPilot session: classify intake, match flows, get first step."""
# 0. Process PSA ticket intake if applicable
ticket_context_block = None
ticket_data = None
psa_context_status = None
if request.intake_type == "psa_ticket" and request.psa_connection_id and request.psa_ticket_id:
ticket_context_block, ticket_data, psa_context_status = await _process_ticket_intake(
psa_connection_id=request.psa_connection_id,
psa_ticket_id=request.psa_ticket_id,
db=db,
)
# Enrich intake content with ticket context for classification
if ticket_data:
enriched_content = dict(request.intake_content)
enriched_content["ticket_data"] = {
"summary": ticket_data.get("ticket", {}).get("summary", ""),
"company": ticket_data.get("company", {}).get("name", ""),
"priority": ticket_data.get("ticket", {}).get("priority", ""),
}
request = request.model_copy(update={"intake_content": enriched_content})
# 1. Classify intake via fast LLM call
intake_text = _extract_intake_text(request.intake_content)
# Include ticket context in classification text if available
if ticket_context_block:
intake_text = f"{ticket_context_block}\n\n{intake_text}"
classification = await _classify_intake(intake_text)
# 2. Try to match existing flows
from app.services.flow_matching_engine import find_matches
matches = await find_matches(
intake_text=intake_text,
problem_domain=classification.get("problem_domain"),
account_id=account_id,
db=db,
)
top_match = matches[0] if matches else None
matched_flow_id = top_match["tree_id"] if top_match else None
match_score = top_match["score"] if top_match else None
matched_flow_name = top_match["tree_name"] if top_match else None
# 3. Build system prompt
matched_flow_context = ""
if top_match and top_match.get("score", 0) > 0.5:
matched_flow_context = (
f"## MATCHED FLOW\n"
f"A similar flow exists: \"{top_match['tree_name']}\" "
f"(match score: {top_match['score']:.0%}). "
f"Use it as a guide but adapt to the specific situation."
)
# Include ticket context in system prompt if available
ticket_prompt_section = ""
if ticket_context_block:
ticket_prompt_section = f"\n## PSA TICKET CONTEXT\n{ticket_context_block}\n"
# Include available script templates for in-session script generation
script_context = await _build_script_context(team_id, db)
if script_context:
ticket_prompt_section += f"\n{script_context}\n"
system_prompt = FLOWPILOT_SYSTEM_PROMPT.format(
structured_output_schema=STRUCTURED_OUTPUT_SCHEMA,
team_context=ticket_prompt_section,
matched_flow_context=matched_flow_context,
)
# 4. Build first user message from intake
user_message = _format_intake_message(request.intake_content, classification)
messages = [{"role": "user", "content": user_message}]
# 5. Call LLM for first diagnostic step
provider = get_ai_provider(settings.get_model_for_action("open_chat"))
raw_response, input_tokens, output_tokens = await provider.generate_json(
system_prompt=system_prompt,
messages=messages,
max_tokens=2048,
)
# Parse with retry on failure
try:
parsed = _parse_structured_output(raw_response)
except ValueError:
# Retry once with nudge
retry_messages = messages + [
{"role": "assistant", "content": raw_response},
{"role": "user", "content": "Please respond with ONLY valid JSON matching the required schema. No markdown or prose."},
]
raw_response, retry_in, retry_out = await provider.generate_json(
system_prompt=system_prompt,
messages=retry_messages,
max_tokens=2048,
)
input_tokens += retry_in
output_tokens += retry_out
parsed = _parse_structured_output(raw_response)
confidence = parsed.get("confidence", 0.0)
confidence_tier = _confidence_to_tier(confidence)
# Initial confidence from match + classification
if top_match and top_match.get("score", 0) > 0.8:
confidence_tier = "guided"
confidence = max(confidence, 0.8)
# 6. Create session
session = AISession(
id=uuid.uuid4(),
user_id=user_id,
account_id=account_id,
team_id=team_id,
intake_type=request.intake_type,
intake_content=request.intake_content,
problem_summary=classification.get("problem_summary"),
problem_domain=classification.get("problem_domain"),
status="active",
confidence_tier=confidence_tier,
confidence_score=confidence,
matched_flow_id=matched_flow_id,
match_score=match_score,
psa_ticket_id=request.psa_ticket_id,
psa_connection_id=request.psa_connection_id,
ticket_data=ticket_data,
total_input_tokens=input_tokens,
total_output_tokens=output_tokens,
step_count=1,
system_prompt_snapshot=system_prompt,
conversation_messages=[
{"role": "user", "content": user_message},
{"role": "assistant", "content": raw_response},
],
)
db.add(session)
# 7a. Update matched flow usage tracking
if matched_flow_id:
try:
flow_result = await db.get(Tree, matched_flow_id)
if flow_result:
flow_result.usage_count = (flow_result.usage_count or 0) + 1
flow_result.last_matched_at = datetime.now(timezone.utc)
except Exception as e:
logger.warning("Failed to update flow usage stats for flow %s: %s", matched_flow_id, e)
# 7. Create first step
step = _create_step_from_parsed(
session_id=session.id,
step_order=0,
parsed=parsed,
input_tokens=input_tokens,
output_tokens=output_tokens,
)
db.add(step)
await db.flush()
# Generate session embedding for similar-session matching (fire-and-forget)
try:
from app.services.session_embedding_service import generate_session_embedding
await generate_session_embedding(session.id, db)
except Exception:
logger.warning("Failed to generate session embedding on create", exc_info=True)
return AISessionCreateResponse(
session_id=session.id,
status=session.status,
confidence_tier=session.confidence_tier,
problem_summary=session.problem_summary,
problem_domain=session.problem_domain,
matched_flow_id=matched_flow_id,
matched_flow_name=matched_flow_name,
match_score=match_score,
first_step=_build_step_response(step, session),
psa_context_status=psa_context_status,
)
async def process_response(
session_id: UUID,
request: StepResponseRequest,
user_id: UUID,
db: AsyncSession,
) -> StepResponseResponse:
"""Process an engineer's response and generate the next FlowPilot step."""
session = await _load_session(session_id, user_id, db)
if session.status != "active":
raise ValueError(f"Session is {session.status}, not active")
if session.step_count >= MAX_STEPS_PER_SESSION:
raise ValueError("Maximum steps reached for this session")
# Update the current (latest) step with engineer's response
latest_step = session.steps[-1] if session.steps else None
if latest_step and latest_step.responded_at is None:
latest_step.selected_option = request.selected_option
latest_step.free_text_input = request.free_text_input
latest_step.was_free_text = bool(request.free_text_input and not request.selected_option)
latest_step.was_skipped = request.was_skipped
latest_step.action_result = request.action_result
latest_step.responded_at = datetime.now(timezone.utc)
# Build the conversation message for the engineer's response
response_text = _format_engineer_response(request)
session.conversation_messages = session.conversation_messages + [
{"role": "user", "content": response_text}
]
# Call LLM with full conversation
provider = get_ai_provider(settings.get_model_for_action("open_chat"))
raw_response, input_tokens, output_tokens = await provider.generate_json(
system_prompt=session.system_prompt_snapshot or "",
messages=session.conversation_messages,
max_tokens=2048,
)
try:
parsed = _parse_structured_output(raw_response)
except ValueError:
retry_messages = session.conversation_messages + [
{"role": "assistant", "content": raw_response},
{"role": "user", "content": "Please respond with ONLY valid JSON matching the required schema."},
]
raw_response, retry_in, retry_out = await provider.generate_json(
system_prompt=session.system_prompt_snapshot or "",
messages=retry_messages,
max_tokens=2048,
)
input_tokens += retry_in
output_tokens += retry_out
parsed = _parse_structured_output(raw_response)
# Append assistant response to conversation
session.conversation_messages = session.conversation_messages + [
{"role": "assistant", "content": raw_response}
]
# Update session confidence
confidence = parsed.get("confidence", session.confidence_score)
session.confidence_score = confidence
session.confidence_tier = _confidence_to_tier(confidence)
session.total_input_tokens += input_tokens
session.total_output_tokens += output_tokens
session.step_count += 1
# Create new step
step = _create_step_from_parsed(
session_id=session.id,
step_order=session.step_count - 1,
parsed=parsed,
input_tokens=input_tokens,
output_tokens=output_tokens,
)
db.add(step)
await db.flush()
# Check if resolution was suggested
resolution_suggested = parsed["type"] == "resolution_suggestion"
resolution_summary = parsed.get("resolution_summary") if resolution_suggested else None
return StepResponseResponse(
session_id=session.id,
status=session.status,
confidence_tier=session.confidence_tier,
confidence_score=session.confidence_score,
next_step=_build_step_response(step, session),
resolution_suggested=resolution_suggested,
resolution_summary=resolution_summary,
)
async def resolve_session(
session_id: UUID,
request: ResolveSessionRequest,
user_id: UUID,
db: AsyncSession,
) -> SessionCloseResponse:
"""Close a session as resolved and generate documentation."""
session = await _load_session(session_id, user_id, db)
if session.status not in ("active", "paused"):
raise ValueError(f"Cannot resolve session in status: {session.status}")
session.status = "resolved"
session.resolved_at = datetime.now(timezone.utc)
session.resolution_summary = request.resolution_summary
session.resolution_action = request.resolution_action
if request.session_rating is not None:
session.session_rating = request.session_rating
if request.session_feedback is not None:
session.session_feedback = request.session_feedback
documentation = _generate_documentation(session)
# Queue for Knowledge Flywheel analysis
session.analysis_status = "pending"
# Recalculate success_rate for the matched flow
if session.matched_flow_id:
try:
flow = await db.get(Tree, session.matched_flow_id)
if flow:
total_result = await db.execute(
select(func.count(AISession.id))
.where(AISession.matched_flow_id == flow.id)
)
resolved_result = await db.execute(
select(func.count(AISession.id))
.where(
AISession.matched_flow_id == flow.id,
AISession.status == "resolved",
)
)
total = total_result.scalar() or 0
resolved_count = resolved_result.scalar() or 0
flow.success_rate = round(resolved_count / total, 3) if total else None
except Exception as e:
logger.warning("Failed to recalculate success_rate for flow %s: %s", session.matched_flow_id, e)
await db.flush()
# Update session embedding with resolution data for similar-session matching
try:
from app.services.session_embedding_service import generate_session_embedding
await generate_session_embedding(session.id, db)
except Exception:
logger.warning("Failed to update session embedding on resolve", exc_info=True)
# Push documentation to PSA if ticket is linked
psa_result = await _push_to_psa(session, user_id, db)
return SessionCloseResponse(
session_id=session.id,
status=session.status,
documentation=documentation,
**psa_result,
)
async def escalate_session(
session_id: UUID,
request: EscalateSessionRequest,
user_id: UUID,
db: AsyncSession,
) -> SessionCloseResponse:
"""Escalate a session — sets status to requesting_escalation for pickup."""
session = await _load_session(session_id, user_id, db)
if session.status not in ("active", "paused"):
raise ValueError(f"Cannot escalate session in status: {session.status}")
# Block self-escalation
if request.escalated_to_id and request.escalated_to_id == user_id:
raise ValueError("Cannot escalate a session to yourself. Use pause instead.")
session.status = "requesting_escalation"
# Don't set resolved_at — session isn't done yet
session.escalation_reason = request.escalation_reason
session.escalated_to_id = request.escalated_to_id
# Build enhanced escalation package
session.escalation_package = await _build_escalation_package_enhanced(session, user_id)
documentation = _generate_documentation(session)
await db.flush()
# Notify about escalation
await notify("session.escalated", session.account_id, {
"session_id": str(session_id),
"engineer_name": session.user.name if session.user else "Unknown",
"escalation_reason": request.escalation_reason,
"problem_summary": session.problem_summary or "N/A",
"link": f"/pilot/{session_id}",
}, db, target_user_ids=[request.escalated_to_id] if request.escalated_to_id else None)
# Push documentation to PSA if ticket is linked
psa_result = await _push_to_psa(session, user_id, db)
return SessionCloseResponse(
session_id=session.id,
status=session.status,
documentation=documentation,
**psa_result,
)
async def pickup_session(
session_id: UUID,
resume_mode: str,
additional_context: Optional[str],
user_id: UUID,
team_id: Optional[UUID],
db: AsyncSession,
) -> StepResponseResponse:
"""Pick up an escalated session as a new engineer.
Generates a briefing step summarizing prior work, then either continues
the conversation or starts fresh with the new engineer's context.
"""
session = await _load_session(
session_id, user_id, db,
allow_team_access=True, team_id=team_id,
)
if session.status != "requesting_escalation":
raise ValueError(f"Session is {session.status}, not requesting_escalation")
# Can't pick up your own session
if session.user_id == user_id:
raise ValueError("Cannot pick up your own escalated session")
# Record the pickup in the escalation package
pkg = session.escalation_package or {}
pkg["picked_up_by"] = str(user_id)
pkg["picked_up_at"] = datetime.now(timezone.utc).isoformat()
session.escalation_package = pkg
# Reactivate the session
session.status = "active"
# Build a briefing message for the new engineer
original_user_name = "the previous engineer"
if session.user and session.user.name:
original_user_name = session.user.name
briefing_parts = [
f"## Escalation Briefing",
f"**Escalated by:** {original_user_name}",
f"**Reason:** {session.escalation_reason or 'Not specified'}",
"",
f"**Problem:** {session.problem_summary or 'Unknown'}",
]
steps_tried = pkg.get("steps_tried", [])
if steps_tried:
briefing_parts.append("")
briefing_parts.append("**Steps already taken:**")
for i, step in enumerate(steps_tried, 1):
desc = step.get("description", "")
resp = step.get("response", "")
briefing_parts.append(f"{i}. {desc}")
if resp:
briefing_parts.append(f"{resp}")
if hypotheses := pkg.get("remaining_hypotheses"):
briefing_parts.append("")
briefing_parts.append("**Remaining hypotheses:**")
if isinstance(hypotheses, list):
for h in hypotheses:
briefing_parts.append(f"- {h}")
else:
briefing_parts.append(str(hypotheses))
if suggestions := pkg.get("suggested_next_steps"):
briefing_parts.append("")
briefing_parts.append("**Suggested next steps:**")
if isinstance(suggestions, list):
for s in suggestions:
briefing_parts.append(f"- {s}")
else:
briefing_parts.append(str(suggestions))
briefing_text = "\n".join(briefing_parts)
# Create a briefing step (special intake_analysis type)
briefing_step = AISessionStep(
id=uuid.uuid4(),
session_id=session.id,
step_order=session.step_count,
step_type="action",
content={
"text": briefing_text,
"type": "briefing",
"allow_free_text": False,
"allow_skip": False,
},
context_message="Escalation briefing — here's what was tried before you.",
confidence_at_step=session.confidence_score,
ai_reasoning="Escalation handoff briefing for receiving engineer",
input_tokens=0,
output_tokens=0,
)
db.add(briefing_step)
session.step_count += 1
# Now generate the next step based on resume_mode
if resume_mode == "fresh" and additional_context:
# Engineer B provides their own input
user_message = f"[Picking up escalated session] {additional_context}"
else:
# Continue where A left off
user_message = (
"[Picking up escalated session] I've reviewed the briefing above. "
"Please continue the diagnosis based on everything tried so far."
)
# Append to conversation
session.conversation_messages = session.conversation_messages + [
{"role": "user", "content": user_message}
]
# Call LLM for next step
provider = get_ai_provider(settings.get_model_for_action("open_chat"))
raw_response, input_tokens, output_tokens = await provider.generate_json(
system_prompt=session.system_prompt_snapshot or "",
messages=session.conversation_messages,
max_tokens=2048,
)
try:
parsed = _parse_structured_output(raw_response)
except ValueError:
retry_messages = session.conversation_messages + [
{"role": "assistant", "content": raw_response},
{"role": "user", "content": "Please respond with ONLY valid JSON matching the required schema."},
]
raw_response, retry_in, retry_out = await provider.generate_json(
system_prompt=session.system_prompt_snapshot or "",
messages=retry_messages,
max_tokens=2048,
)
input_tokens += retry_in
output_tokens += retry_out
parsed = _parse_structured_output(raw_response)
session.conversation_messages = session.conversation_messages + [
{"role": "assistant", "content": raw_response}
]
confidence = parsed.get("confidence", session.confidence_score)
session.confidence_score = confidence
session.confidence_tier = _confidence_to_tier(confidence)
session.total_input_tokens += input_tokens
session.total_output_tokens += output_tokens
session.step_count += 1
next_step = _create_step_from_parsed(
session_id=session.id,
step_order=session.step_count - 1,
parsed=parsed,
input_tokens=input_tokens,
output_tokens=output_tokens,
)
db.add(next_step)
await db.flush()
return StepResponseResponse(
session_id=session.id,
status=session.status,
confidence_tier=session.confidence_tier,
confidence_score=session.confidence_score,
next_step=_build_step_response(next_step, session),
resolution_suggested=parsed["type"] == "resolution_suggestion",
resolution_summary=parsed.get("resolution_summary") if parsed["type"] == "resolution_suggestion" else None,
)
async def link_ticket(
session_id: UUID,
psa_ticket_id: str,
psa_connection_id: UUID,
user_id: UUID,
db: AsyncSession,
) -> None:
"""Link a PSA ticket to an in-progress session and inject context."""
session = await _load_session(session_id, user_id, db)
if session.status not in ("active", "paused"):
raise ValueError(f"Cannot link ticket to session in status: {session.status}")
# Store the ticket link
session.psa_ticket_id = psa_ticket_id
session.psa_connection_id = psa_connection_id
# Try to fetch ticket context
ticket_context_block, ticket_data, _ = await _process_ticket_intake(
psa_connection_id=psa_connection_id,
psa_ticket_id=psa_ticket_id,
db=db,
)
if ticket_data:
session.ticket_data = ticket_data
# Inject ticket context into the system prompt for subsequent steps
if ticket_context_block and session.system_prompt_snapshot:
ticket_section = f"\n\n## PSA TICKET CONTEXT (linked mid-session)\n{ticket_context_block}\n"
session.system_prompt_snapshot = session.system_prompt_snapshot + ticket_section
await db.flush()
async def pause_session(
session_id: UUID,
user_id: UUID,
db: AsyncSession,
) -> None:
"""Pause an active session for the same engineer to resume later."""
session = await _load_session(session_id, user_id, db)
if session.status != "active":
raise ValueError(f"Cannot pause session in status: {session.status}")
session.status = "paused"
await db.flush()
async def resume_session(
session_id: UUID,
user_id: UUID,
db: AsyncSession,
) -> None:
"""Resume a paused session for the same engineer."""
session = await _load_session(session_id, user_id, db)
if session.status != "paused":
raise ValueError(f"Cannot resume session in status: {session.status}")
session.status = "active"
await db.flush()
async def rate_session(
session_id: UUID,
rating: int,
feedback: Optional[str],
user_id: UUID,
db: AsyncSession,
) -> None:
"""Submit post-session rating."""
session = await _load_session(session_id, user_id, db)
session.session_rating = rating
session.session_feedback = feedback
await db.flush()
async def get_session_documentation(
session_id: UUID,
user_id: UUID,
db: AsyncSession,
) -> SessionDocumentation:
"""Get auto-generated documentation for a session."""
session = await _load_session(session_id, user_id, db)
return _generate_documentation(session)
# ── Internal helpers ──
async def _load_session(
session_id: UUID,
user_id: UUID,
db: AsyncSession,
allow_team_access: bool = False,
team_id: Optional[UUID] = None,
) -> AISession:
"""Load session with steps and user relationships, verifying ownership.
Args:
allow_team_access: If True, same-team users can access sessions in
'requesting_escalation' status (for escalation pickup).
team_id: Required when allow_team_access is True.
"""
result = await db.execute(
select(AISession)
.options(
selectinload(AISession.steps),
selectinload(AISession.user),
selectinload(AISession.escalated_to),
)
.where(AISession.id == session_id)
)
session = result.scalar_one_or_none()
if not session:
raise ValueError("Session not found")
# Owner or escalation target always has access
if session.user_id == user_id or session.escalated_to_id == user_id:
return session
# Engineer who picked up an escalated session has access
pkg = session.escalation_package or {}
if pkg.get("picked_up_by") == str(user_id):
return session
# Team-based access for escalation pickup
if allow_team_access and team_id and session.team_id == team_id:
if session.status == "requesting_escalation":
return session
raise PermissionError("Not authorized to access this session")
async def _classify_intake(intake_text: str) -> dict[str, Any]:
"""Quick LLM call to classify intake content."""
try:
provider = get_ai_provider(settings.get_model_for_action("quick_action"))
raw, _, _ = await provider.generate_json(
system_prompt=INTAKE_CLASSIFICATION_PROMPT,
messages=[{"role": "user", "content": intake_text}],
max_tokens=512,
)
return json.loads(raw.strip())
except Exception as e:
logger.warning("Intake classification failed: %s", e)
return {
"problem_summary": intake_text[:120],
"problem_domain": "other",
"key_symptoms": [],
"urgency": "medium",
}
def _extract_intake_text(intake_content: dict[str, Any]) -> str:
"""Extract searchable text from intake content."""
parts = []
if text := intake_content.get("text"):
parts.append(text)
if log := intake_content.get("log_content"):
parts.append(f"Log output:\n{log}")
if ticket := intake_content.get("ticket_data"):
if isinstance(ticket, dict):
parts.append(f"Ticket: {ticket.get('summary', '')}")
return "\n\n".join(parts) if parts else str(intake_content)
def _format_intake_message(
intake_content: dict[str, Any],
classification: dict[str, Any],
) -> str:
"""Format intake + classification into the first user message."""
parts = ["I need help troubleshooting an issue."]
if text := intake_content.get("text"):
parts.append(f"\n**Problem description:**\n{text}")
if log := intake_content.get("log_content"):
parts.append(f"\n**Log output:**\n```\n{log}\n```")
if summary := classification.get("problem_summary"):
parts.append(f"\n**Classified as:** {summary}")
if domain := classification.get("problem_domain"):
parts.append(f"**Domain:** {domain}")
symptoms = classification.get("key_symptoms", [])
if symptoms:
parts.append(f"**Key symptoms:** {', '.join(symptoms)}")
return "\n".join(parts)
def _format_engineer_response(request: StepResponseRequest) -> str:
"""Format engineer's step response into a conversation message."""
if request.was_skipped:
return "I can't check this right now / I don't know."
parts = []
if request.selected_option:
parts.append(f"Selected: {request.selected_option}")
if request.free_text_input:
parts.append(request.free_text_input)
if request.action_result:
result = request.action_result
success = "succeeded" if result.get("success") else "did not work"
parts.append(f"Action {success}.")
if details := result.get("details"):
parts.append(f"Details: {details}")
return "\n".join(parts) if parts else "No response provided."
def _create_step_from_parsed(
session_id: UUID,
step_order: int,
parsed: dict[str, Any],
input_tokens: int,
output_tokens: int,
) -> AISessionStep:
"""Create an AISessionStep from parsed LLM output."""
step_type = parsed["type"]
if step_type == "resolution_suggestion":
step_type = "action" # Store as action in DB, UI distinguishes via content
# Build content dict (everything the UI needs to render)
content = {
"text": parsed.get("content", ""),
"type": parsed["type"],
}
if parsed["type"] == "action":
content["action_type"] = parsed.get("action_type", "instruction")
content["expected_outcome"] = parsed.get("expected_outcome")
# Script generation fields (populated when FlowPilot suggests a script)
if parsed.get("template_id"):
content["template_id"] = parsed["template_id"]
if parsed.get("pre_filled_params"):
content["pre_filled_params"] = parsed["pre_filled_params"]
if parsed.get("instructions"):
content["instructions"] = parsed["instructions"]
elif parsed["type"] == "resolution_suggestion":
content["resolution_summary"] = parsed.get("resolution_summary")
content["follow_up_recommendations"] = parsed.get("follow_up_recommendations", [])
content["allow_free_text"] = False
content["allow_skip"] = False
# Extract options for question type
options = None
if parsed["type"] == "question" and "options" in parsed:
options = parsed["options"]
content["allow_free_text"] = parsed.get("allow_free_text", True)
content["allow_skip"] = parsed.get("allow_skip", True)
return AISessionStep(
id=uuid.uuid4(),
session_id=session_id,
step_order=step_order,
step_type=step_type if parsed["type"] != "resolution_suggestion" else "action",
content=content,
context_message=parsed.get("context_message"),
options_presented=options,
confidence_at_step=parsed.get("confidence", 0.0),
ai_reasoning=parsed.get("reasoning"),
input_tokens=input_tokens,
output_tokens=output_tokens,
)
def _generate_documentation(session: AISession) -> SessionDocumentation:
"""Generate structured documentation from a session's steps."""
diagnostic_steps = []
for step in session.steps:
content = step.content or {}
description = content.get("text", "")
# Determine engineer response
engineer_response = None
if step.was_skipped:
engineer_response = "Skipped"
elif step.selected_option:
# Find the label for the selected option
if step.options_presented:
for opt in step.options_presented:
if opt.get("value") == step.selected_option:
engineer_response = opt.get("label", step.selected_option)
break
else:
engineer_response = step.selected_option
else:
engineer_response = step.selected_option
elif step.free_text_input:
engineer_response = step.free_text_input
# Determine outcome
outcome = None
if step.action_result:
result = step.action_result
outcome = "Succeeded" if result.get("success") else "Did not resolve"
if details := result.get("details"):
outcome += f"{details}"
diagnostic_steps.append(DocumentationStep(
step_number=step.step_order + 1,
step_type=step.step_type,
description=description,
engineer_response=engineer_response,
outcome=outcome,
))
# Calculate duration
duration_display = None
if session.resolved_at and session.created_at:
delta = session.resolved_at - session.created_at
minutes = int(delta.total_seconds() / 60)
if minutes < 60:
duration_display = f"{minutes}m"
else:
hours = minutes // 60
remaining = minutes % 60
duration_display = f"{hours}h {remaining}m"
# Build intake summary
intake = session.intake_content or {}
intake_summary = intake.get("text", "")[:500]
if not intake_summary:
intake_summary = str(intake)[:500]
return SessionDocumentation(
problem_summary=session.problem_summary or "No summary available",
problem_domain=session.problem_domain,
intake_summary=intake_summary,
diagnostic_steps=diagnostic_steps,
resolution_summary=session.resolution_summary,
escalation_reason=session.escalation_reason,
total_steps=session.step_count,
duration_display=duration_display,
generated_at=datetime.now(timezone.utc),
)
async def _push_to_psa(
session: AISession,
user_id: UUID,
db: AsyncSession,
) -> dict[str, Any]:
"""Push documentation to PSA if session has a linked ticket.
Returns dict with psa_push_status, psa_push_error, member_mapping_warning.
"""
if not session.psa_ticket_id or not session.psa_connection_id:
return {"psa_push_status": "no_psa", "psa_push_error": None, "member_mapping_warning": None}
try:
from app.services.psa_documentation_service import push_documentation
return await push_documentation(session, user_id, db)
except Exception as e:
logger.warning("PSA documentation push failed for session %s: %s", session.id, e)
return {
"psa_push_status": "failed",
"psa_push_error": str(e)[:200],
"member_mapping_warning": None,
}
async def _process_ticket_intake(
psa_connection_id: UUID,
psa_ticket_id: str,
db: AsyncSession,
) -> tuple[Optional[str], Optional[dict[str, Any]], str]:
"""Fetch ticket context from PSA and format for AI prompt.
Returns:
(ticket_context_block, ticket_data_dict, psa_context_status)
- ticket_context_block: formatted text for system prompt, or None on failure
- ticket_data_dict: serialized TicketContext for storage, or None on failure
- psa_context_status: "loaded" or "unavailable"
"""
try:
from app.services.psa.registry import get_provider_for_connection
from app.services.psa.ticket_context import format_ticket_context_for_prompt
provider = await get_provider_for_connection(psa_connection_id, db)
ticket_context = await provider.get_ticket_context(
int(psa_ticket_id), str(psa_connection_id)
)
ticket_prompt_block = format_ticket_context_for_prompt(ticket_context)
ticket_data = ticket_context.model_dump(mode="json")
return ticket_prompt_block, ticket_data, "loaded"
except Exception as e:
logger.warning(
"Failed to fetch ticket context for ticket %s (connection %s): %s",
psa_ticket_id, psa_connection_id, e,
)
return None, None, "unavailable"
async def _build_script_context(
team_id: Optional[UUID],
db: AsyncSession,
) -> Optional[str]:
"""Build script template context for the system prompt.
Includes available script templates so FlowPilot can suggest
script_generation actions with pre-filled parameters.
"""
try:
from app.models.script_template import ScriptTemplate
result = await db.execute(
select(ScriptTemplate)
.where(
ScriptTemplate.is_active.is_(True),
or_(
ScriptTemplate.team_id.is_(None),
ScriptTemplate.team_id == team_id,
),
)
.order_by(ScriptTemplate.usage_count.desc())
.limit(20)
)
templates = result.scalars().all()
if not templates:
return None
lines = ["## AVAILABLE SCRIPTS"]
lines.append("When the engineer needs to run a script, suggest an action with action_type='script_generation'.")
lines.append("Include template_id and pre_filled_params based on the diagnostic context.\n")
for t in templates:
params = t.parameters_schema.get("parameters", [])
param_keys = ", ".join(p.get("key", "") for p in params if p.get("key"))
lines.append(f"- {t.name} (ID: {t.id}): {t.description or 'No description'}")
if param_keys:
lines.append(f" Parameters: {param_keys}")
return "\n".join(lines)
except Exception as e:
logger.warning("Failed to build script context: %s", e)
return None
async def _build_escalation_package_enhanced(
session: AISession,
user_id: UUID,
) -> dict[str, Any]:
"""Build enhanced context package with LLM-generated hypotheses."""
steps_tried = []
for step in session.steps:
content = step.content or {}
entry = {
"step_type": step.step_type,
"description": content.get("text", ""),
}
if step.selected_option:
entry["response"] = step.selected_option
elif step.free_text_input:
entry["response"] = step.free_text_input
elif step.was_skipped:
entry["response"] = "Skipped"
if step.action_result:
entry["action_result"] = step.action_result
steps_tried.append(entry)
package = {
"original_user_id": str(user_id),
"problem_summary": session.problem_summary,
"problem_domain": session.problem_domain,
"intake_content": session.intake_content,
"confidence_at_escalation": session.confidence_score,
"steps_tried": steps_tried,
"escalation_reason": session.escalation_reason,
}
# LLM call for remaining hypotheses and suggested next steps (fast model)
try:
conversation_summary = "\n".join(
f"- {s.get('description', '')}{s.get('response', 'no response')}"
for s in steps_tried
)
prompt = (
"Based on this diagnostic conversation for an IT troubleshooting session:\n\n"
f"Problem: {session.problem_summary}\n"
f"Domain: {session.problem_domain}\n\n"
f"Steps taken:\n{conversation_summary}\n\n"
f"Escalation reason: {session.escalation_reason}\n\n"
"Respond with ONLY a JSON object:\n"
'{"remaining_hypotheses": ["hypothesis1", "hypothesis2"], '
'"suggested_next_steps": ["step1", "step2"], '
'"steps_ruled_out": ["ruled_out1"]}'
)
provider = get_ai_provider(settings.get_model_for_action("quick_action"))
raw, _, _ = await provider.generate_json(
system_prompt="You are an expert IT diagnostic assistant. Analyze the escalation context and provide concise insights.",
messages=[{"role": "user", "content": prompt}],
max_tokens=1024,
)
insights = json.loads(raw.strip().strip("`").lstrip("json\n"))
package["remaining_hypotheses"] = insights.get("remaining_hypotheses", [])
package["suggested_next_steps"] = insights.get("suggested_next_steps", [])
package["steps_ruled_out"] = insights.get("steps_ruled_out", [])
except Exception as e:
logger.warning("Failed to generate escalation insights: %s", e)
# Fall back gracefully — don't block the escalation
return package