"""EscalationPackageGeneratorService — drafts the handoff package for a session. Parallel to ResolutionNoteGeneratorService but oriented around handoff to another engineer instead of closing the ticket. The output markdown follows FLOWPILOT-MIGRATION.md Section 6.3: ## Problem ## What we've confirmed ## What we've tried ## Current hypothesis ## Suggested next steps Same caching story as resolution-note previews: keyed on `(session_id, ai_sessions.state_version)` via `preview_cache`, invalidated by any fact / suggested-fix / script-generation write. Model: Sonnet (`escalation_package` action tier per Section 6.6). MCP off. """ from __future__ import annotations import logging from typing import Any from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.ai_provider import get_ai_provider from app.core.config import settings from app.models.ai_session import AISession from app.models.script_template import ScriptGeneration, ScriptTemplate from app.models.session_fact import SessionFact from app.models.session_suggested_fix import SessionSuggestedFix from app.services.preview_cache import preview_cache from app.services.script_template_engine import ScriptTemplateEngine logger = logging.getLogger(__name__) _ESCALATION_SYSTEM_PROMPT = """\ You produce structured escalation handoff packages for an MSP troubleshooting \ platform. The package is read by the next engineer picking up the ticket; it \ must give them a running start without making them re-read the chat transcript. Output exactly this markdown structure, no preamble, no closing remarks, no \ extra headings: ## Problem ## What we've confirmed ## What we've tried - applied_failed: List the fix as a tried path. Include the failure reason if \ provided. State that it did not resolve the issue. - applied_partial: Include the fix as a partially tried path. Include partial \ notes if provided. Indicate it was not fully completed or not verified. - applied_success: Note that the fix was applied and verified but escalation \ is still needed for another reason (unusual — reflect this accurately). - dismissed: Do not mention the fix as a tried path; it was only considered. - proposed (no outcome yet): Do not list it here; it goes in Current hypothesis. If nothing has been tried at all (no checks, no scripts, no applied/partial \ fix), write "No diagnostic actions run yet." and continue. ## Current hypothesis - proposed (no outcome yet): State the fix title and confidence. If confidence \ is below 60% or there is no active fix, say "No leading hypothesis yet — \ symptoms are still being narrowed." - applied_failed or dismissed: Say the proposed fix did not hold or was set \ aside. State any remaining uncertainty. - applied_partial: Note the partial application and what remains open. - applied_success: Unusual in an escalate path — state the fix resolved the \ original symptom but a new or related issue requires escalation. ## Suggested next steps 80%).> Strict rules: - Use ONLY the input I provide. Never invent command names, KB articles, or \ configuration specifics that aren't in the input. - Do not include placeholder text like "TBD" or empty bullets. - Do not include the engineer's name, the AI's name, session IDs, or the \ chat transcript verbatim. - Markdown headings exactly as shown (## level), no bolding. - The tone is a peer handing off to a peer, not a status report. """ class EscalationPackageGeneratorService: """Generates and caches the five-section Escalate handoff markdown.""" KIND = "escalation_package" def __init__(self, db: AsyncSession) -> None: self.db = db async def generate_or_get_cached( self, session_id: UUID, *, force: bool = False, ) -> dict[str, Any]: session = await self._load_session(session_id) cached = preview_cache.get(self.KIND, session.id, session.state_version) if not force else None if cached is not None: return {**cached, "from_cache": True} markdown = await self._render(session) target = self._target_ticket_ref(session) payload = { "markdown": markdown, "target_ticket_ref": target, "state_version": session.state_version, } preview_cache.set(self.KIND, session.id, session.state_version, payload) return {**payload, "from_cache": False} # ── Internals (parallel to ResolutionNoteGenerator) ─────────────────── async def _load_session(self, session_id: UUID) -> AISession: result = await self.db.execute( select(AISession).where(AISession.id == session_id) ) session = result.scalar_one_or_none() if session is None: raise ValueError(f"Session {session_id} not found") return session async def _render(self, session: AISession) -> str: facts = await self._load_facts(session.id) active_fix = await self._load_active_fix(session.id) gens = await self._load_redacted_generations(session.id) bundle = self._build_input_bundle(session, facts, active_fix, gens) model = settings.get_model_for_action("escalation_package") provider = get_ai_provider(model=model) system_blocks: list[dict[str, Any]] = [ { "type": "text", "text": _ESCALATION_SYSTEM_PROMPT, "cache_control": {"type": "ephemeral"}, # cacheable: identical across every escalation-package preview call }, ] try: text, _in, _out = await provider.generate_text( system_prompt=system_blocks, messages=[{"role": "user", "content": bundle}], max_tokens=1400, ) except Exception: logger.exception("Escalation package generation failed for session %s", session.id) raise return text.strip() async def _load_facts(self, session_id: UUID) -> list[SessionFact]: result = await self.db.execute( select(SessionFact) .where( SessionFact.session_id == session_id, SessionFact.deleted_at.is_(None), ) .order_by(SessionFact.created_at.asc()) ) return list(result.scalars().all()) async def _load_active_fix(self, session_id: UUID) -> SessionSuggestedFix | None: result = await self.db.execute( select(SessionSuggestedFix) .where( SessionSuggestedFix.session_id == session_id, SessionSuggestedFix.superseded_at.is_(None), ) .order_by(SessionSuggestedFix.created_at.desc()) ) return result.scalars().first() async def _load_redacted_generations( self, session_id: UUID ) -> list[dict[str, Any]]: result = await self.db.execute( select(ScriptGeneration) .where(ScriptGeneration.ai_session_id == session_id) .order_by(ScriptGeneration.created_at.asc()) ) gens = list(result.scalars().all()) if not gens: return [] template_ids = {g.template_id for g in gens} tpl_result = await self.db.execute( select(ScriptTemplate).where(ScriptTemplate.id.in_(template_ids)) ) templates_by_id = {t.id: t for t in tpl_result.scalars().all()} engine = ScriptTemplateEngine() out: list[dict[str, Any]] = [] for g in gens: tpl = templates_by_id.get(g.template_id) sensitive_keys: set[str] = set() schema = (tpl.parameters_schema if tpl else {}) or {} params = schema.get("parameters") if isinstance(schema, dict) else None if isinstance(params, list): for p in params: if isinstance(p, dict) and p.get("field_type") == "password": k = p.get("key") or p.get("variable_name") if isinstance(k, str): sensitive_keys.add(k) redacted_params = engine.redact_sensitive(g.parameters_used or {}, sensitive_keys) out.append({ "template_name": tpl.name if tpl else "(unknown template)", "template_slug": tpl.slug if tpl else None, "parameters_used": redacted_params, "created_at": g.created_at.isoformat(), }) return out @staticmethod def _target_ticket_ref(session: AISession) -> str | None: if not session.psa_ticket_id: return None return f"CW #{session.psa_ticket_id}" @staticmethod def _build_input_bundle( session: AISession, facts: list[SessionFact], active_fix: SessionSuggestedFix | None, generations: list[dict[str, Any]], ) -> str: lines: list[str] = [] lines.append("# Session context") lines.append(f"Title: {session.title or '(untitled)'}") if session.problem_summary: lines.append(f"Problem summary: {session.problem_summary}") if session.problem_domain: lines.append(f"Domain: {session.problem_domain}") intake_text = (session.intake_content or {}).get("text") if isinstance(session.intake_content, dict) else None if intake_text: lines.append(f"Intake message: {intake_text}") if session.psa_ticket_id: lines.append(f"Linked PSA ticket: CW #{session.psa_ticket_id}") lines.append("") lines.append("# Confirmed facts (What we know)") if not facts: lines.append("(none)") else: for f in facts: tag = f.source_type summary = f" — {f.source_summary}" if f.source_summary else "" lines.append(f"- [{tag}] {f.text}{summary}") lines.append("") lines.append("# Diagnostic checks run during the session") check_facts = [f for f in facts if f.source_type == "diagnostic_check"] if not check_facts and not generations: lines.append("(none)") else: for f in check_facts: lines.append(f"- {f.text}") for g in generations: lines.append(f"- Ran script {g['template_name']} (slug={g['template_slug']})") if g["parameters_used"]: lines.append(f" parameters: {g['parameters_used']}") lines.append("") lines.append("# Active suggested fix (current hypothesis)") if active_fix is None: lines.append("(no active suggested fix)") else: lines.append(f"Title: {active_fix.title}") lines.append(f"Confidence: {active_fix.confidence_pct}%") lines.append(f"Description: {active_fix.description}") lines.append(f"Outcome status: {active_fix.status}") if active_fix.applied_at: lines.append(f"Applied at: {active_fix.applied_at.isoformat()}") if active_fix.verified_at: lines.append(f"Verified at: {active_fix.verified_at.isoformat()}") if active_fix.partial_notes: lines.append(f"Partial notes: {active_fix.partial_notes}") if active_fix.failure_reason: lines.append(f"Failure reason: {active_fix.failure_reason}") lines.append("") lines.append( "Produce the five-section escalation handoff now. Use only the input above." ) return "\n".join(lines)