feat(escalations): Escalation Mode wedge — live arrival + magic-moment pickup #155

Merged
chihlasm merged 34 commits from feat/escalation-metric-endpoint into main 2026-04-30 21:32:16 +00:00
7 changed files with 221 additions and 20 deletions
Showing only changes of commit 029680ab2d - Show all commits

View File

@@ -452,6 +452,13 @@ async def resolve_session(
# ── Escalate ── # ── Escalate ──
#
# Thin shim over HandoffManager. The legacy `flowpilot_engine.escalate_session`
# is no longer the source of truth — every escalation now creates a
# SessionHandoff row, fans out via the SSE bus, dispatches AppNotification +
# external channels via notify(), and emails per-user. EscalateModal and the
# /handoff endpoint both funnel through here / through HandoffManager so the
# senior-pickup magic-moment screen works regardless of entry point.
@router.post("/{session_id}/escalate", response_model=SessionCloseResponse) @router.post("/{session_id}/escalate", response_model=SessionCloseResponse)
@limiter.limit("15/minute") @limiter.limit("15/minute")
@@ -463,21 +470,49 @@ async def escalate_session(
db: Annotated[AsyncSession, Depends(get_db)], db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin), _: None = Depends(require_engineer_or_admin),
): ):
"""Escalate a FlowPilot session to another engineer.""" """Escalate a FlowPilot session — unified through HandoffManager."""
from app.services.handoff_manager import HandoffManager
# Owner-only — matches the original constraint on flowpilot_engine.escalate_session.
session_result = await db.execute(
select(AISession).where(
AISession.id == session_id,
AISession.user_id == current_user.id,
)
)
session = session_result.scalar_one_or_none()
if not session:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Session not found"
)
manager = HandoffManager(db)
try: try:
result = await flowpilot_engine.escalate_session( handoff = await manager.create_handoff(
session_id=session_id, session_id=session_id,
request=data, intent="escalate",
engineer_notes=data.escalation_reason,
user_id=current_user.id, user_id=current_user.id,
db=db, priority="normal",
target_user_id=data.escalated_to_id,
) )
except ValueError as e: except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
except PermissionError as e:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) documentation, psa_result = await manager.finalize_escalation(
handoff, session, current_user.id
)
await db.commit() await db.commit()
return result
await manager.dispatch_escalation_notifications(handoff)
return SessionCloseResponse(
session_id=session.id,
status=session.status,
documentation=documentation,
**psa_result,
)
# ── Pause ── # ── Pause ──
@@ -644,7 +679,7 @@ async def get_escalation_queue(
select(AISession) select(AISession)
.where( .where(
scope_filter, scope_filter,
AISession.status == "requesting_escalation", AISession.status.in_(("requesting_escalation", "escalated")),
) )
.order_by(AISession.created_at.desc()) .order_by(AISession.created_at.desc())
) )

View File

@@ -63,10 +63,16 @@ async def create_handoff(
engineer_notes=body.engineer_notes, engineer_notes=body.engineer_notes,
user_id=current_user.id, user_id=current_user.id,
priority=body.priority, priority=body.priority,
target_user_id=body.target_user_id,
) )
except ValueError as e: except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
# For escalate: generate documentation + push to PSA before commit so
# the handoff and the PSA-state changes land atomically.
if handoff.intent == "escalate":
await manager.finalize_escalation(handoff, session, current_user.id)
await db.commit() await db.commit()
# Best-effort notification dispatch AFTER commit so we never email about # Best-effort notification dispatch AFTER commit so we never email about

View File

@@ -161,7 +161,7 @@ async def get_sidebar_stats(
select(func.count()).where( select(func.count()).where(
and_( and_(
esc_scope, esc_scope,
AISession.status == "requesting_escalation", AISession.status.in_(("requesting_escalation", "escalated")),
) )
) )
) )

View File

@@ -10,6 +10,11 @@ class HandoffCreateRequest(BaseModel):
intent: str = Field(..., pattern="^(park|escalate)$") intent: str = Field(..., pattern="^(park|escalate)$")
engineer_notes: str | None = None engineer_notes: str | None = None
priority: str = Field("normal", pattern="^(normal|elevated)$") priority: str = Field("normal", pattern="^(normal|elevated)$")
# Optional escalation target — if set, only this user is the named
# recipient. Notification dispatch fans out to all engineer/admin/owner
# users in the account either way; this just records the original
# engineer's preferred recipient on the session for audit/UX.
target_user_id: UUID | None = None
class HandoffResponse(BaseModel): class HandoffResponse(BaseModel):

View File

@@ -632,8 +632,10 @@ async def pickup_session(
allow_team_access=True, team_id=team_id, allow_team_access=True, team_id=team_id,
) )
if session.status != "requesting_escalation": if session.status not in ("requesting_escalation", "escalated"):
raise ValueError(f"Session is {session.status}, not requesting_escalation") raise ValueError(
f"Session is {session.status}, not in an escalated state"
)
# Can't pick up your own session # Can't pick up your own session
if session.user_id == user_id: if session.user_id == user_id:

View File

@@ -3,6 +3,15 @@
Creates handoff snapshots, AI assessments (for escalations), claim workflow, Creates handoff snapshots, AI assessments (for escalations), claim workflow,
and queue queries. Dual-writes to ai_sessions.escalation_package for and queue queries. Dual-writes to ai_sessions.escalation_package for
backward compatibility with the existing escalation queue. backward compatibility with the existing escalation queue.
For intent='escalate', `create_handoff` also runs the legacy enrichment
that the deprecated `/escalate` endpoint used to do directly: setting
`escalated_to_id`, building the AI-enhanced escalation_package (Sonnet),
and recording escalation_reason. `finalize_escalation` then generates the
SessionDocumentation and pushes to PSA. `dispatch_escalation_notifications`
fans out the bell-icon AppNotification + external channels (Slack/Teams)
on top of per-user emails. The `/escalate` endpoint is now a thin shim
calling these in sequence.
""" """
import asyncio import asyncio
import logging import logging
@@ -12,6 +21,7 @@ from uuid import UUID
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.config import settings from app.core.config import settings
from app.core.email import EmailService from app.core.email import EmailService
@@ -20,6 +30,8 @@ from app.models.ai_session import AISession
from app.models.session_branch import SessionBranch from app.models.session_branch import SessionBranch
from app.models.session_handoff import SessionHandoff from app.models.session_handoff import SessionHandoff
from app.models.user import User from app.models.user import User
from app.schemas.ai_session import SessionDocumentation
from app.services.notification_service import notify
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -37,19 +49,46 @@ class HandoffManager:
engineer_notes: str | None, engineer_notes: str | None,
user_id: UUID, user_id: UUID,
priority: str = "normal", priority: str = "normal",
target_user_id: UUID | None = None,
) -> SessionHandoff: ) -> SessionHandoff:
"""Create a handoff (park or escalate). """Create a handoff (park or escalate).
Generates snapshot, updates session status, dual-writes to Generates snapshot, updates session status, dual-writes to
escalation_package for backward compat. escalation_package for backward compat.
For intent='escalate' also: sets `session.escalation_reason` and
optionally `session.escalated_to_id`, builds the AI-enhanced
escalation package (the rich one the legacy `/escalate` path used
to produce), and merges the handoff metadata into it. Self-targeting
is rejected with ValueError, matching legacy behavior.
""" """
# Eager-load steps + user — _build_escalation_package_enhanced and
# finalize_escalation iterate over session.steps to compose the
# legacy enriched package and the SessionDocumentation, and the
# notify() dispatcher reads session.user.name. Without selectinload
# the async session raises MissingGreenlet on attribute access.
result = await self.db.execute( result = await self.db.execute(
select(AISession).where(AISession.id == session_id) select(AISession)
.options(
selectinload(AISession.steps),
selectinload(AISession.user),
)
.where(AISession.id == session_id)
) )
session = result.scalar_one_or_none() session = result.scalar_one_or_none()
if not session: if not session:
raise ValueError(f"Session {session_id} not found") raise ValueError(f"Session {session_id} not found")
if intent == "escalate":
if target_user_id and target_user_id == user_id:
raise ValueError(
"Cannot escalate a session to yourself. Use pause instead."
)
if session.status not in ("active", "paused"):
raise ValueError(
f"Cannot escalate session in status: {session.status}"
)
# Generate snapshot # Generate snapshot
snapshot = await self._generate_snapshot(session) snapshot = await self._generate_snapshot(session)
@@ -80,20 +119,134 @@ class HandoffManager:
session.status = "paused" session.status = "paused"
elif intent == "escalate": elif intent == "escalate":
session.status = "escalated" session.status = "escalated"
session.escalation_reason = engineer_notes
if target_user_id:
session.escalated_to_id = target_user_id
session.handoff_count = (session.handoff_count or 0) + 1 session.handoff_count = (session.handoff_count or 0) + 1
# Dual-write for backward compat # Dual-write to escalation_package. For escalate, build the
session.escalation_package = { # AI-enhanced package (preserves the legacy rich shape that
"snapshot": snapshot, # SessionBriefing/PSA writeback consume), then layer in the new
"intent": intent, # handoff metadata. For park, the lightweight shape is fine —
"engineer_notes": engineer_notes, # there's no legacy enhanced package for parking.
"handoff_id": str(handoff.id), if intent == "escalate":
} enhanced_pkg = await self._build_enhanced_escalation_package(
session, user_id
)
enhanced_pkg["intent"] = intent
enhanced_pkg["engineer_notes"] = engineer_notes
enhanced_pkg["handoff_id"] = str(handoff.id)
enhanced_pkg["snapshot"] = snapshot
session.escalation_package = enhanced_pkg
else:
session.escalation_package = {
"snapshot": snapshot,
"intent": intent,
"engineer_notes": engineer_notes,
"handoff_id": str(handoff.id),
}
await self.db.flush() await self.db.flush()
return handoff return handoff
async def finalize_escalation(
self,
handoff: SessionHandoff,
session: AISession,
user_id: UUID,
) -> tuple[SessionDocumentation | None, dict[str, Any]]:
"""Post-create enrichment for intent='escalate' handoffs.
Generates the SessionDocumentation + pushes documentation to PSA if
a ticket is linked. Returns (documentation, psa_result) so the
legacy `/escalate` shim can map back to SessionCloseResponse. Safe
to call only when handoff.intent == 'escalate' — for park, returns
a no-op no-PSA dict.
"""
if handoff.intent != "escalate":
return None, {
"psa_push_status": "no_psa",
"psa_push_error": None,
"member_mapping_warning": None,
}
# Lazy import to avoid circular dependency: flowpilot_engine imports
# plenty of services at module load time and we don't want
# handoff_manager pulled into that graph at import.
from app.services.flowpilot_engine import (
_generate_documentation,
_push_to_psa,
)
documentation = _generate_documentation(session)
psa_result = await _push_to_psa(session, user_id, self.db)
# Bell-icon AppNotification rows + external account-level channels
# (Slack/Teams webhooks, shared escalations inboxes). This is the
# `notify()` call the legacy /escalate path used to make directly,
# and it has to happen BEFORE the endpoint commits so the
# AppNotification rows land atomically with the handoff. Per-user
# emails come after commit in dispatch_escalation_notifications —
# those are pure IO with no persistent state.
try:
engineer_user = (
await self.db.execute(
select(User).where(User.id == user_id)
)
).scalar_one_or_none()
engineer_name = (
engineer_user.name
if engineer_user and engineer_user.name
else "Unknown"
)
target_user_ids = (
[session.escalated_to_id] if session.escalated_to_id else None
)
await notify(
"session.escalated",
handoff.account_id,
{
"session_id": str(handoff.session_id),
"engineer_name": engineer_name,
"escalation_reason": handoff.engineer_notes or "",
"problem_summary": session.problem_summary or "N/A",
},
self.db,
target_user_ids=target_user_ids,
)
except Exception:
logger.exception(
"notify() dispatch failed for handoff %s", handoff.id
)
return documentation, psa_result
async def _build_enhanced_escalation_package(
self,
session: AISession,
user_id: UUID,
) -> dict[str, Any]:
"""Lazy wrapper around the legacy enhanced-package builder.
The builder lives in flowpilot_engine; we only need it for the
escalate path. Failures are caught here so handoff creation never
depends on the optional Sonnet enrichment — return the minimal
shape on failure.
"""
try:
from app.services.flowpilot_engine import (
_build_escalation_package_enhanced,
)
return await _build_escalation_package_enhanced(session, user_id)
except Exception:
logger.exception(
"Enhanced escalation package build failed for session %s; "
"falling back to minimal package",
session.id,
)
return {}
async def dispatch_escalation_notifications( async def dispatch_escalation_notifications(
self, handoff: SessionHandoff self, handoff: SessionHandoff
) -> int: ) -> int:

View File

@@ -168,7 +168,7 @@ export function useFlowPilotSession(): UseFlowPilotSession {
setIsProcessing(true) setIsProcessing(true)
try { try {
const result = await aiSessionsApi.escalateSession(session.id, data) const result = await aiSessionsApi.escalateSession(session.id, data)
setSession(prev => prev ? { ...prev, status: 'requesting_escalation' } : null) setSession(prev => prev ? { ...prev, status: 'escalated' } : null)
setDocumentation(result.documentation) setDocumentation(result.documentation)
setPsaPushStatus(result.psa_push_status) setPsaPushStatus(result.psa_push_status)
setPsaPushError(result.psa_push_error) setPsaPushError(result.psa_push_error)