From 831ef07ceb564d49421cba449b4cef3eeb47cc77 Mon Sep 17 00:00:00 2001 From: chihlasm Date: Mon, 16 Mar 2026 01:16:59 -0400 Subject: [PATCH] feat: add session-to-flow AI generation service (Task 20) Converts completed troubleshooting sessions into reusable procedural flows with fallback branches. Includes PSA ticket context integration and AI-generated step validation. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../app/services/session_to_flow_service.py | 254 ++++++++++++++++++ 1 file changed, 254 insertions(+) create mode 100644 backend/app/services/session_to_flow_service.py diff --git a/backend/app/services/session_to_flow_service.py b/backend/app/services/session_to_flow_service.py new file mode 100644 index 00000000..4911e9dc --- /dev/null +++ b/backend/app/services/session_to_flow_service.py @@ -0,0 +1,254 @@ +"""Session-to-Flow AI generation service. + +Converts a completed troubleshooting session into a reusable procedural +flow with fallback branches, powered by AI. +""" +import json +import logging +import re +import uuid +from typing import Any, Optional +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.ai_provider import get_ai_provider +from app.core.config import settings +from app.core.ai_tree_validator import validate_generated_procedural_steps +from app.models.session import Session +from app.models.tree import Tree + +logger = logging.getLogger(__name__) + +# AI system prompt for session-to-flow conversion +SESSION_TO_FLOW_SYSTEM_PROMPT = """You are an expert MSP engineer and IT process documentation specialist. + +Your task is to convert a completed IT troubleshooting session into a reusable procedural flow with optional fallback branches. + +You will receive: +- The session outcome and engineer notes +- An ordered list of decisions the engineer made (questions/answers, actions, notes, command output) +- The original troubleshooting tree structure (for context on alternative paths) +- Optional PSA ticket context + +Generate a procedural flow that can be replicated for similar issues in the future. Each step should: +1. Be concrete and actionable — include exact commands, paths, or config values +2. Have a clear verification criterion +3. Include 1-3 fallback_steps per step (alternatives to try if the primary action fails) +4. End with a procedure_end step summarizing the resolution + +Return ONLY a valid JSON object with this exact structure: +{ + "name": "Short descriptive title (5-10 words)", + "description": "2-3 sentence description of what this flow resolves", + "tags": ["tag1", "tag2"], + "steps": [ + { + "id": "step-1", + "type": "procedure_step", + "title": "Step title", + "description": "Detailed instructions with exact commands/paths", + "content_type": "text", + "fallback_steps": [ + { + "id": "step-1-fb-1", + "type": "procedure_step", + "title": "Alternative: ...", + "description": "Alternative approach if primary step fails", + "content_type": "text" + } + ] + }, + { + "id": "step-end", + "type": "procedure_end", + "title": "Resolution Complete", + "description": "Summary of what was resolved and any follow-up actions" + } + ] +} + +Rules: +- Use unique string IDs for all steps (e.g. "step-1", "step-2", "step-1-fb-1") +- Include 3-10 procedure_step entries before the procedure_end +- Each step should be 1 concrete action, not a vague suggestion +- Fallback steps use the same schema as procedure_steps but represent alternative approaches +- Tags should be 2-5 relevant keywords (technology, vendor, symptom) +- Do NOT wrap JSON in markdown code fences +- Return only valid JSON, nothing else +""" + + +def _strip_markdown_fences(text: str) -> str: + """Strip markdown code fences if the model wrapped its JSON response.""" + text = text.strip() + match = re.match(r"^```(?:json)?\s*([\s\S]*?)```$", text) + if match: + return match.group(1).strip() + return text + + +def _build_session_context(session: Session, tree: Optional[Tree]) -> str: + """Build a context string from session data for the AI prompt.""" + parts: list[str] = [] + + # Flow info + tree_name = session.tree_snapshot.get("name", "Unknown Flow") if session.tree_snapshot else "Unknown Flow" + parts.append(f"Flow: {tree_name}") + parts.append(f"Outcome: {session.outcome or 'Unknown'}") + + if session.outcome_notes: + parts.append(f"Outcome Notes: {session.outcome_notes}") + + # Session decisions (the troubleshooting path taken) + if session.decisions: + parts.append("\n--- TROUBLESHOOTING PATH ---") + for i, decision in enumerate(session.decisions): + step_parts: list[str] = [f"\nStep {i + 1}:"] + if decision.get("question"): + step_parts.append(f" Question: {decision['question']}") + if decision.get("answer"): + step_parts.append(f" Answer: {decision['answer']}") + if decision.get("action_performed"): + step_parts.append(f" Action: {decision['action_performed']}") + if decision.get("notes"): + step_parts.append(f" Notes: {decision['notes']}") + if decision.get("command_output"): + # Truncate long command output + output = decision["command_output"] + if len(output) > 500: + output = output[:500] + "... [truncated]" + step_parts.append(f" Command Output: {output}") + parts.append("\n".join(step_parts)) + + # Scratchpad + if session.scratchpad and session.scratchpad.strip(): + parts.append(f"\n--- ENGINEER SCRATCHPAD ---\n{session.scratchpad[:1000]}") + + # Original tree structure (for branch context, truncated) + if tree and tree.tree_structure: + tree_json = json.dumps(tree.tree_structure, indent=None) + if len(tree_json) > 3000: + tree_json = tree_json[:3000] + "... [truncated]" + parts.append(f"\n--- ORIGINAL TREE STRUCTURE (for alternative paths) ---\n{tree_json}") + elif session.tree_snapshot: + snapshot_json = json.dumps(session.tree_snapshot, indent=None) + if len(snapshot_json) > 3000: + snapshot_json = snapshot_json[:3000] + "... [truncated]" + parts.append(f"\n--- TREE SNAPSHOT (for alternative paths) ---\n{snapshot_json}") + + return "\n".join(parts) + + +async def generate_flow_from_session( + session_id: str, + user_id: UUID, + account_id: UUID, + db: AsyncSession, +) -> dict[str, Any]: + """Generate a procedural flow from a completed session. + + Returns a dict with keys: name, description, tree_type, tags, tree_structure. + Raises ValueError on validation failures, Exception on AI/DB errors. + """ + # Load the session + session_uuid = UUID(session_id) if isinstance(session_id, str) else session_id + result = await db.execute( + select(Session).where( + Session.id == session_uuid, + Session.user_id == user_id, + ) + ) + session = result.scalar_one_or_none() + if not session: + raise ValueError(f"Session '{session_id}' not found or access denied") + + # Load the original tree for branch context + tree: Optional[Tree] = None + if session.tree_id: + tree_result = await db.execute( + select(Tree).where(Tree.id == session.tree_id) + ) + tree = tree_result.scalar_one_or_none() + + # Build session context + session_context = _build_session_context(session, tree) + + # Optionally fetch PSA ticket context + psa_context = "" + if session.psa_ticket_id and session.psa_connection_id: + try: + from app.services.psa.registry import get_provider_for_account + from app.services.psa.ticket_context import format_ticket_context_for_prompt + + psa_provider = await get_provider_for_account(account_id, db) + connection_id = str(session.psa_connection_id) + ticket_ctx = await psa_provider.get_ticket_context( + ticket_id=int(session.psa_ticket_id), + connection_id=connection_id, + ) + psa_context = "\n\n--- PSA TICKET CONTEXT ---\n" + format_ticket_context_for_prompt(ticket_ctx) + except Exception as psa_err: + logger.warning( + "Failed to fetch PSA ticket context for session-to-flow (session=%s, ticket=%s): %s", + session_id, + session.psa_ticket_id, + psa_err, + ) + + # Build user message + user_message = ( + "Please convert the following completed troubleshooting session into a reusable procedural flow:\n\n" + f"{session_context}" + f"{psa_context}" + ) + + # Call AI + model = settings.get_model_for_action("generate_steps") + provider = get_ai_provider(model=model) + + raw_text, input_tokens, output_tokens = await provider.generate_json( + system_prompt=SESSION_TO_FLOW_SYSTEM_PROMPT, + messages=[{"role": "user", "content": user_message}], + max_tokens=4096, + ) + + logger.info( + "session_to_flow AI response (tokens in=%d out=%d, session=%s)", + input_tokens, + output_tokens, + session_id, + ) + + # Strip markdown fences and parse JSON + raw_text = _strip_markdown_fences(raw_text) + try: + generated = json.loads(raw_text) + except json.JSONDecodeError as e: + raise ValueError(f"AI returned invalid JSON: {e}") from e + + # Validate the generated steps + val_errors = validate_generated_procedural_steps(generated) + if val_errors: + raise ValueError(f"Generated flow failed validation: {'; '.join(val_errors)}") + + # Ensure procedure_end exists; add if missing + steps = generated.get("steps", []) + has_end = any(s.get("type") == "procedure_end" for s in steps) + if not has_end: + steps.append({ + "id": f"step-end-{uuid.uuid4().hex[:8]}", + "type": "procedure_end", + "title": "Procedure Complete", + "description": "All steps completed successfully.", + }) + generated["steps"] = steps + + return { + "name": generated.get("name", "AI-Generated Flow"), + "description": generated.get("description", ""), + "tree_type": "procedural", + "tags": generated.get("tags", []), + "tree_structure": {"steps": steps}, + }