feat: add session-to-flow AI generation service (Task 20)

Converts completed troubleshooting sessions into reusable procedural flows
with fallback branches. Includes PSA ticket context integration and
AI-generated step validation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-03-16 01:16:59 -04:00
parent ffc2fcda6d
commit 831ef07ceb

View File

@@ -0,0 +1,254 @@
"""Session-to-Flow AI generation service.
Converts a completed troubleshooting session into a reusable procedural
flow with fallback branches, powered by AI.
"""
import json
import logging
import re
import uuid
from typing import Any, Optional
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.ai_provider import get_ai_provider
from app.core.config import settings
from app.core.ai_tree_validator import validate_generated_procedural_steps
from app.models.session import Session
from app.models.tree import Tree
logger = logging.getLogger(__name__)
# AI system prompt for session-to-flow conversion
SESSION_TO_FLOW_SYSTEM_PROMPT = """You are an expert MSP engineer and IT process documentation specialist.
Your task is to convert a completed IT troubleshooting session into a reusable procedural flow with optional fallback branches.
You will receive:
- The session outcome and engineer notes
- An ordered list of decisions the engineer made (questions/answers, actions, notes, command output)
- The original troubleshooting tree structure (for context on alternative paths)
- Optional PSA ticket context
Generate a procedural flow that can be replicated for similar issues in the future. Each step should:
1. Be concrete and actionable — include exact commands, paths, or config values
2. Have a clear verification criterion
3. Include 1-3 fallback_steps per step (alternatives to try if the primary action fails)
4. End with a procedure_end step summarizing the resolution
Return ONLY a valid JSON object with this exact structure:
{
"name": "Short descriptive title (5-10 words)",
"description": "2-3 sentence description of what this flow resolves",
"tags": ["tag1", "tag2"],
"steps": [
{
"id": "step-1",
"type": "procedure_step",
"title": "Step title",
"description": "Detailed instructions with exact commands/paths",
"content_type": "text",
"fallback_steps": [
{
"id": "step-1-fb-1",
"type": "procedure_step",
"title": "Alternative: ...",
"description": "Alternative approach if primary step fails",
"content_type": "text"
}
]
},
{
"id": "step-end",
"type": "procedure_end",
"title": "Resolution Complete",
"description": "Summary of what was resolved and any follow-up actions"
}
]
}
Rules:
- Use unique string IDs for all steps (e.g. "step-1", "step-2", "step-1-fb-1")
- Include 3-10 procedure_step entries before the procedure_end
- Each step should be 1 concrete action, not a vague suggestion
- Fallback steps use the same schema as procedure_steps but represent alternative approaches
- Tags should be 2-5 relevant keywords (technology, vendor, symptom)
- Do NOT wrap JSON in markdown code fences
- Return only valid JSON, nothing else
"""
def _strip_markdown_fences(text: str) -> str:
"""Strip markdown code fences if the model wrapped its JSON response."""
text = text.strip()
match = re.match(r"^```(?:json)?\s*([\s\S]*?)```$", text)
if match:
return match.group(1).strip()
return text
def _build_session_context(session: Session, tree: Optional[Tree]) -> str:
"""Build a context string from session data for the AI prompt."""
parts: list[str] = []
# Flow info
tree_name = session.tree_snapshot.get("name", "Unknown Flow") if session.tree_snapshot else "Unknown Flow"
parts.append(f"Flow: {tree_name}")
parts.append(f"Outcome: {session.outcome or 'Unknown'}")
if session.outcome_notes:
parts.append(f"Outcome Notes: {session.outcome_notes}")
# Session decisions (the troubleshooting path taken)
if session.decisions:
parts.append("\n--- TROUBLESHOOTING PATH ---")
for i, decision in enumerate(session.decisions):
step_parts: list[str] = [f"\nStep {i + 1}:"]
if decision.get("question"):
step_parts.append(f" Question: {decision['question']}")
if decision.get("answer"):
step_parts.append(f" Answer: {decision['answer']}")
if decision.get("action_performed"):
step_parts.append(f" Action: {decision['action_performed']}")
if decision.get("notes"):
step_parts.append(f" Notes: {decision['notes']}")
if decision.get("command_output"):
# Truncate long command output
output = decision["command_output"]
if len(output) > 500:
output = output[:500] + "... [truncated]"
step_parts.append(f" Command Output: {output}")
parts.append("\n".join(step_parts))
# Scratchpad
if session.scratchpad and session.scratchpad.strip():
parts.append(f"\n--- ENGINEER SCRATCHPAD ---\n{session.scratchpad[:1000]}")
# Original tree structure (for branch context, truncated)
if tree and tree.tree_structure:
tree_json = json.dumps(tree.tree_structure, indent=None)
if len(tree_json) > 3000:
tree_json = tree_json[:3000] + "... [truncated]"
parts.append(f"\n--- ORIGINAL TREE STRUCTURE (for alternative paths) ---\n{tree_json}")
elif session.tree_snapshot:
snapshot_json = json.dumps(session.tree_snapshot, indent=None)
if len(snapshot_json) > 3000:
snapshot_json = snapshot_json[:3000] + "... [truncated]"
parts.append(f"\n--- TREE SNAPSHOT (for alternative paths) ---\n{snapshot_json}")
return "\n".join(parts)
async def generate_flow_from_session(
session_id: str,
user_id: UUID,
account_id: UUID,
db: AsyncSession,
) -> dict[str, Any]:
"""Generate a procedural flow from a completed session.
Returns a dict with keys: name, description, tree_type, tags, tree_structure.
Raises ValueError on validation failures, Exception on AI/DB errors.
"""
# Load the session
session_uuid = UUID(session_id) if isinstance(session_id, str) else session_id
result = await db.execute(
select(Session).where(
Session.id == session_uuid,
Session.user_id == user_id,
)
)
session = result.scalar_one_or_none()
if not session:
raise ValueError(f"Session '{session_id}' not found or access denied")
# Load the original tree for branch context
tree: Optional[Tree] = None
if session.tree_id:
tree_result = await db.execute(
select(Tree).where(Tree.id == session.tree_id)
)
tree = tree_result.scalar_one_or_none()
# Build session context
session_context = _build_session_context(session, tree)
# Optionally fetch PSA ticket context
psa_context = ""
if session.psa_ticket_id and session.psa_connection_id:
try:
from app.services.psa.registry import get_provider_for_account
from app.services.psa.ticket_context import format_ticket_context_for_prompt
psa_provider = await get_provider_for_account(account_id, db)
connection_id = str(session.psa_connection_id)
ticket_ctx = await psa_provider.get_ticket_context(
ticket_id=int(session.psa_ticket_id),
connection_id=connection_id,
)
psa_context = "\n\n--- PSA TICKET CONTEXT ---\n" + format_ticket_context_for_prompt(ticket_ctx)
except Exception as psa_err:
logger.warning(
"Failed to fetch PSA ticket context for session-to-flow (session=%s, ticket=%s): %s",
session_id,
session.psa_ticket_id,
psa_err,
)
# Build user message
user_message = (
"Please convert the following completed troubleshooting session into a reusable procedural flow:\n\n"
f"{session_context}"
f"{psa_context}"
)
# Call AI
model = settings.get_model_for_action("generate_steps")
provider = get_ai_provider(model=model)
raw_text, input_tokens, output_tokens = await provider.generate_json(
system_prompt=SESSION_TO_FLOW_SYSTEM_PROMPT,
messages=[{"role": "user", "content": user_message}],
max_tokens=4096,
)
logger.info(
"session_to_flow AI response (tokens in=%d out=%d, session=%s)",
input_tokens,
output_tokens,
session_id,
)
# Strip markdown fences and parse JSON
raw_text = _strip_markdown_fences(raw_text)
try:
generated = json.loads(raw_text)
except json.JSONDecodeError as e:
raise ValueError(f"AI returned invalid JSON: {e}") from e
# Validate the generated steps
val_errors = validate_generated_procedural_steps(generated)
if val_errors:
raise ValueError(f"Generated flow failed validation: {'; '.join(val_errors)}")
# Ensure procedure_end exists; add if missing
steps = generated.get("steps", [])
has_end = any(s.get("type") == "procedure_end" for s in steps)
if not has_end:
steps.append({
"id": f"step-end-{uuid.uuid4().hex[:8]}",
"type": "procedure_end",
"title": "Procedure Complete",
"description": "All steps completed successfully.",
})
generated["steps"] = steps
return {
"name": generated.get("name", "AI-Generated Flow"),
"description": generated.get("description", ""),
"tree_type": "procedural",
"tags": generated.get("tags", []),
"tree_structure": {"steps": steps},
}