First half of the Escalation Mode notification dual-path. WebSocket/SSE
push is the second half (next commit) — email handles offline seniors,
push handles online ones for the magic-moment demo.
HandoffManager.dispatch_escalation_notifications:
- Pulls active engineer/admin/owner-role users in the same account_id
(excludes the escalator + viewers + soft-deleted)
- Sends via existing EmailService.send_notification_email, concurrent
via asyncio.gather; per-message failures don't block the rest
- Wrapped in try/except: any exception is logged + swallowed. Handoff
creation is authoritative; notification is advisory. This is the
graceful-degradation regression both eng + codex reviews flagged as
critical (handoff must succeed even if SMTP is down).
Endpoint wiring (POST /ai-sessions/{id}/handoff):
- Dispatch fires AFTER db.commit() — never email about a rolled-back
handoff. Trust-erosion bug if we got that wrong.
- Only fires for intent=escalate. Park is private to the escalator.
Tests (4 new):
- emails-engineer-recipients-in-account: viewer excluded, escalator
excluded, only the engineer/admin teammates get the message
- skipped-for-park-intent: park doesn't fan out
- graceful-degradation-when-email-raises: RuntimeError from the email
service does NOT bubble out of dispatch
- endpoint-dispatches-on-escalate: end-to-end wiring through POST
Per-channel delivery records (replacing the dead `notification_sent`
boolean per Codex correction) is a v1.x story — for now application
logs are the audit trail. See
docs/plans/2026-04-27-escalation-mode-wedge-design.md.
20 tests green across handoff_manager + session_handoffs_api +
flowpilot_analytics_escalations. No regressions.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
391 lines
14 KiB
Python
391 lines
14 KiB
Python
"""Handoff management — unified park/escalate with dual-write backward compat.
|
|
|
|
Creates handoff snapshots, AI assessments (for escalations), claim workflow,
|
|
and queue queries. Dual-writes to ai_sessions.escalation_package for
|
|
backward compatibility with the existing escalation queue.
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import settings
|
|
from app.core.email import EmailService
|
|
from app.models.ai_session import AISession
|
|
from app.models.session_branch import SessionBranch
|
|
from app.models.session_handoff import SessionHandoff
|
|
from app.models.user import User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HandoffManager:
|
|
"""Unified park/escalate handoff management."""
|
|
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def create_handoff(
|
|
self,
|
|
session_id: UUID,
|
|
intent: str,
|
|
engineer_notes: str | None,
|
|
user_id: UUID,
|
|
priority: str = "normal",
|
|
) -> SessionHandoff:
|
|
"""Create a handoff (park or escalate).
|
|
|
|
Generates snapshot, updates session status, dual-writes to
|
|
escalation_package for backward compat.
|
|
"""
|
|
result = await self.db.execute(
|
|
select(AISession).where(AISession.id == session_id)
|
|
)
|
|
session = result.scalar_one_or_none()
|
|
if not session:
|
|
raise ValueError(f"Session {session_id} not found")
|
|
|
|
# Generate snapshot
|
|
snapshot = await self._generate_snapshot(session)
|
|
|
|
# Generate AI assessment for escalations
|
|
ai_assessment = None
|
|
ai_assessment_data = None
|
|
if intent == "escalate":
|
|
ai_assessment, ai_assessment_data = await self._generate_ai_assessment(session)
|
|
|
|
handoff = SessionHandoff(
|
|
session_id=session_id,
|
|
account_id=session.account_id,
|
|
handed_off_by=user_id,
|
|
intent=intent,
|
|
source_branch_id=session.active_branch_id,
|
|
snapshot=snapshot,
|
|
ai_assessment=ai_assessment,
|
|
ai_assessment_data=ai_assessment_data,
|
|
engineer_notes=engineer_notes,
|
|
priority=priority,
|
|
)
|
|
self.db.add(handoff)
|
|
|
|
# Update session status
|
|
if intent == "park":
|
|
session.status = "paused"
|
|
elif intent == "escalate":
|
|
session.status = "escalated"
|
|
|
|
session.handoff_count = (session.handoff_count or 0) + 1
|
|
|
|
# Dual-write for backward compat
|
|
session.escalation_package = {
|
|
"snapshot": snapshot,
|
|
"intent": intent,
|
|
"engineer_notes": engineer_notes,
|
|
"handoff_id": str(handoff.id),
|
|
}
|
|
|
|
await self.db.flush()
|
|
return handoff
|
|
|
|
async def dispatch_escalation_notifications(
|
|
self, handoff: SessionHandoff
|
|
) -> int:
|
|
"""Email engineer-or-admin users in the account about a new escalation.
|
|
|
|
Call this AFTER `db.commit()` has succeeded — sending email for a
|
|
rolled-back handoff is the kind of trust-erosion bug that makes pilot
|
|
customers stop trusting the tool. Returns the number of recipients
|
|
successfully emailed (best-effort, not authoritative).
|
|
|
|
Failures are logged but never raise: the wedge demo's reliability
|
|
story is "handoff creation always succeeds; notification is best-effort,"
|
|
not "handoff creation depends on the email service being up." This is
|
|
the graceful-degradation regression the eng + codex reviews both
|
|
flagged as critical.
|
|
|
|
Per-channel delivery records (Codex correction on the dead
|
|
`notification_sent` boolean) are a v1.x story — for now the
|
|
application logs are the audit trail.
|
|
"""
|
|
if handoff.intent != "escalate":
|
|
return 0
|
|
|
|
try:
|
|
recipients = (
|
|
await self.db.execute(
|
|
select(User).where(
|
|
User.account_id == handoff.account_id,
|
|
User.id != handoff.handed_off_by,
|
|
User.account_role.in_(("owner", "admin", "engineer")),
|
|
User.is_active.is_(True),
|
|
User.deleted_at.is_(None),
|
|
)
|
|
)
|
|
).scalars().all()
|
|
|
|
if not recipients:
|
|
logger.info(
|
|
"No notification recipients for handoff %s in account %s",
|
|
handoff.id,
|
|
handoff.account_id,
|
|
)
|
|
return 0
|
|
|
|
# Pull session for the email subject. Fall back to a generic title
|
|
# if the session is gone (e.g. cascade delete mid-dispatch).
|
|
session_result = await self.db.execute(
|
|
select(AISession).where(AISession.id == handoff.session_id)
|
|
)
|
|
session = session_result.scalar_one_or_none()
|
|
problem = (
|
|
session.problem_summary if session and session.problem_summary
|
|
else "an active session"
|
|
)
|
|
|
|
title = f"New escalation: {problem}"
|
|
notes = (handoff.engineer_notes or "").strip()
|
|
body = (
|
|
"A teammate has escalated a session and is asking for help.\n\n"
|
|
f"Reason: {notes if notes else 'No reason provided.'}\n"
|
|
f"Priority: {handoff.priority}"
|
|
)
|
|
link_url = (
|
|
f"{settings.FRONTEND_URL.rstrip('/')}/escalations"
|
|
if settings.FRONTEND_URL
|
|
else None
|
|
)
|
|
|
|
results = await asyncio.gather(
|
|
*[
|
|
EmailService.send_notification_email(
|
|
to_email=r.email,
|
|
title=title,
|
|
body=body,
|
|
link_url=link_url,
|
|
)
|
|
for r in recipients
|
|
],
|
|
return_exceptions=True,
|
|
)
|
|
sent = sum(1 for r in results if r is True)
|
|
logger.info(
|
|
"Escalation notifications dispatched for handoff %s: %d/%d recipients",
|
|
handoff.id,
|
|
sent,
|
|
len(recipients),
|
|
)
|
|
return sent
|
|
|
|
except Exception:
|
|
logger.exception(
|
|
"Escalation notification dispatch failed for handoff %s",
|
|
handoff.id,
|
|
)
|
|
return 0
|
|
|
|
async def _generate_snapshot(self, session: AISession) -> dict[str, Any]:
|
|
"""Generate a snapshot of the session state at handoff time."""
|
|
snapshot: dict[str, Any] = {
|
|
"problem_summary": session.problem_summary,
|
|
"problem_domain": session.problem_domain,
|
|
"status": session.status,
|
|
"step_count": session.step_count,
|
|
"confidence_tier": session.confidence_tier,
|
|
}
|
|
|
|
# Add branch map if branching is active
|
|
if session.is_branching:
|
|
branches_result = await self.db.execute(
|
|
select(SessionBranch)
|
|
.where(SessionBranch.session_id == session.id)
|
|
.order_by(SessionBranch.branch_order)
|
|
)
|
|
branches = list(branches_result.scalars().all())
|
|
|
|
branch_map = []
|
|
for b in branches:
|
|
branch_map.append({
|
|
"id": str(b.id),
|
|
"label": b.label,
|
|
"status": b.status,
|
|
"status_reason": b.status_reason,
|
|
"parent_branch_id": str(b.parent_branch_id) if b.parent_branch_id else None,
|
|
})
|
|
snapshot["branch_map"] = branch_map
|
|
snapshot["active_branch_id"] = str(session.active_branch_id) if session.active_branch_id else None
|
|
|
|
return snapshot
|
|
|
|
async def claim_session(
|
|
self,
|
|
handoff_id: UUID,
|
|
claiming_user_id: UUID,
|
|
) -> SessionHandoff:
|
|
"""Claim a handed-off session."""
|
|
result = await self.db.execute(
|
|
select(SessionHandoff).where(SessionHandoff.id == handoff_id)
|
|
)
|
|
handoff = result.scalar_one_or_none()
|
|
if not handoff:
|
|
raise ValueError(f"Handoff {handoff_id} not found")
|
|
|
|
handoff.claimed_by = claiming_user_id
|
|
handoff.claimed_at = datetime.now(timezone.utc)
|
|
|
|
# Reactivate session
|
|
session_result = await self.db.execute(
|
|
select(AISession).where(AISession.id == handoff.session_id)
|
|
)
|
|
session = session_result.scalar_one()
|
|
session.status = "active"
|
|
|
|
# Dual-write
|
|
session.escalated_to_id = claiming_user_id
|
|
|
|
await self.db.flush()
|
|
return handoff
|
|
|
|
async def _generate_ai_assessment(
|
|
self, session: AISession
|
|
) -> tuple[str | None, dict[str, Any] | None]:
|
|
"""Generate AI diagnostic assessment for escalation handoffs."""
|
|
try:
|
|
from app.services.assistant_chat_service import _call_ai
|
|
|
|
context = f"Problem: {session.problem_summary or 'Unknown'}\nDomain: {session.problem_domain or 'Unknown'}"
|
|
msgs = session.conversation_messages or []
|
|
# Include last 10 messages for context
|
|
recent = "\n".join(
|
|
f"[{m.get('role', '?')}]: {m.get('content', '')[:200]}"
|
|
for m in msgs[-10:]
|
|
)
|
|
|
|
assessment_text, _, _ = await _call_ai(
|
|
system_base="You are a diagnostic assessment generator for MSP escalations.",
|
|
rag_context="",
|
|
history=[],
|
|
new_message=(
|
|
f"Generate a brief diagnostic assessment for this escalation.\n"
|
|
f"{context}\n\nRecent conversation:\n{recent}\n\n"
|
|
f"Return: 1) Most likely cause, 2) Suggested next steps, 3) Confidence (low/medium/high)"
|
|
),
|
|
max_tokens=500,
|
|
)
|
|
|
|
assessment_data = {
|
|
"likely_cause": "See assessment text",
|
|
"suggested_steps": [],
|
|
"confidence": "medium",
|
|
}
|
|
|
|
return assessment_text, assessment_data
|
|
except Exception:
|
|
logger.exception("Failed to generate AI assessment")
|
|
return None, None
|
|
|
|
async def generate_briefing(
|
|
self, handoff_id: UUID, claiming_user_id: UUID
|
|
) -> str:
|
|
"""Generate a natural-language briefing for the engineer claiming the session."""
|
|
result = await self.db.execute(
|
|
select(SessionHandoff).where(SessionHandoff.id == handoff_id)
|
|
)
|
|
handoff = result.scalar_one_or_none()
|
|
if not handoff:
|
|
raise ValueError(f"Handoff {handoff_id} not found")
|
|
|
|
session_result = await self.db.execute(
|
|
select(AISession).where(AISession.id == handoff.session_id)
|
|
)
|
|
session = session_result.scalar_one()
|
|
|
|
from app.services.assistant_chat_service import _call_ai
|
|
|
|
snapshot_text = str(handoff.snapshot)[:2000]
|
|
briefing, _, _ = await _call_ai(
|
|
system_base="You are a handoff briefing generator for MSP teams.",
|
|
rag_context="",
|
|
history=[],
|
|
new_message=(
|
|
f"Generate a concise briefing for an engineer picking up this session.\n"
|
|
f"Problem: {session.problem_summary}\n"
|
|
f"Intent: {handoff.intent}\n"
|
|
f"Engineer notes: {handoff.engineer_notes or 'None'}\n"
|
|
f"Snapshot: {snapshot_text}\n"
|
|
f"AI Assessment: {handoff.ai_assessment or 'None'}"
|
|
),
|
|
max_tokens=500,
|
|
)
|
|
return briefing
|
|
|
|
async def push_to_psa(self, handoff_id: UUID) -> SessionHandoff:
|
|
"""Push handoff notes to PSA via existing psa_documentation_service."""
|
|
result = await self.db.execute(
|
|
select(SessionHandoff).where(SessionHandoff.id == handoff_id)
|
|
)
|
|
handoff = result.scalar_one_or_none()
|
|
if not handoff:
|
|
raise ValueError(f"Handoff {handoff_id} not found")
|
|
|
|
try:
|
|
from app.services.psa_documentation_service import push_session_notes
|
|
session_result = await self.db.execute(
|
|
select(AISession).where(AISession.id == handoff.session_id)
|
|
)
|
|
session = session_result.scalar_one()
|
|
if session.psa_ticket_id and session.psa_connection_id:
|
|
note_id = await push_session_notes(
|
|
session=session,
|
|
notes_content=handoff.ai_assessment or str(handoff.snapshot),
|
|
db=self.db,
|
|
)
|
|
handoff.psa_note_pushed = True
|
|
handoff.psa_note_id = note_id
|
|
except Exception:
|
|
logger.exception(f"Failed to push handoff {handoff_id} to PSA")
|
|
|
|
await self.db.flush()
|
|
return handoff
|
|
|
|
async def get_queue(
|
|
self,
|
|
team_id: UUID | None = None,
|
|
account_id: UUID | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Get team queue of parked + escalated sessions."""
|
|
query = (
|
|
select(SessionHandoff, AISession)
|
|
.join(AISession, SessionHandoff.session_id == AISession.id)
|
|
.where(SessionHandoff.claimed_by.is_(None))
|
|
.order_by(SessionHandoff.created_at.desc())
|
|
)
|
|
|
|
if team_id:
|
|
query = query.where(AISession.team_id == team_id)
|
|
elif account_id:
|
|
query = query.where(AISession.account_id == account_id)
|
|
|
|
result = await self.db.execute(query)
|
|
rows = result.all()
|
|
|
|
queue_items = []
|
|
for handoff, session in rows:
|
|
queue_items.append({
|
|
"handoff_id": handoff.id,
|
|
"session_id": session.id,
|
|
"intent": handoff.intent,
|
|
"problem_summary": session.problem_summary,
|
|
"problem_domain": session.problem_domain,
|
|
"priority": handoff.priority,
|
|
"engineer_notes": handoff.engineer_notes,
|
|
"created_at": handoff.created_at,
|
|
"claimed_by": handoff.claimed_by,
|
|
"claimed_at": handoff.claimed_at,
|
|
})
|
|
|
|
return queue_items
|