feat(escalations): unify /escalate through HandoffManager
Replaces the legacy flowpilot_engine.escalate_session orchestration with
a single canonical path through HandoffManager. Every escalation now
creates a SessionHandoff row, fans out via the SSE bus, persists
AppNotification rows for the bell icon, dispatches to external channels
(Slack/Teams) via notify(), and emails per-user — regardless of whether
the call entered through /escalate (legacy URL) or /handoff (new URL).
The senior-pickup magic-moment screen now works end-to-end from the
EscalateModal bell-icon path the user just tested.
Backend
- HandoffCreateRequest gains optional target_user_id (the equivalent of
the legacy escalated_to_id field). Self-targeting rejected.
- HandoffManager.create_handoff handles intent='escalate' end-to-end:
sets escalation_reason + escalated_to_id, builds the legacy enhanced
AI escalation_package (Sonnet, lazy-imported from flowpilot_engine,
graceful fallback on failure), and merges handoff metadata into it.
Eager-loads session.steps and session.user via selectinload — required
by both the enhanced-package builder and notify() to avoid
MissingGreenlet on async lazy access.
- HandoffManager.finalize_escalation generates SessionDocumentation,
pushes documentation to PSA, and runs notify() — pre-commit so the
AppNotification rows persist atomically with the handoff.
- HandoffManager.dispatch_escalation_notifications keeps only the
fire-and-forget IO (bus publish, per-user emails) — runs post-commit.
Pulls engineer name via a separate User query rather than relying on
session.user lazy access.
- /handoff endpoint passes target_user_id through and calls
finalize_escalation pre-commit.
- /escalate endpoint is now a thin shim: owner-only session lookup,
HandoffManager.create_handoff(intent='escalate'), finalize_escalation,
commit, dispatch_escalation_notifications, return SessionCloseResponse
built from documentation + psa_result. flowpilot_engine.escalate_session
is no longer called by any endpoint.
- pickup_session accepts both 'requesting_escalation' (legacy in-flight
sessions) and 'escalated' (new canonical) so the migration is seamless
for sessions already in the queue.
- Escalation queue list and sidebar count now match either status.
Frontend
- useFlowPilotSession optimistic update flips status to 'escalated'
instead of 'requesting_escalation' so the page state matches the
unified backend response.
Verified end-to-end live: a fresh /escalate call from the junior produces
status='escalated', a SessionHandoff row, a SessionDocumentation, PSA
push attempted (no_psa for this test session), AND a bell-icon
AppNotification for the team admin with link
/pilot/{session_id}?pickup=true. Backend test suite: 1103 passed.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -452,6 +452,13 @@ async def resolve_session(
|
||||
|
||||
|
||||
# ── Escalate ──
|
||||
#
|
||||
# Thin shim over HandoffManager. The legacy `flowpilot_engine.escalate_session`
|
||||
# is no longer the source of truth — every escalation now creates a
|
||||
# SessionHandoff row, fans out via the SSE bus, dispatches AppNotification +
|
||||
# external channels via notify(), and emails per-user. EscalateModal and the
|
||||
# /handoff endpoint both funnel through here / through HandoffManager so the
|
||||
# senior-pickup magic-moment screen works regardless of entry point.
|
||||
|
||||
@router.post("/{session_id}/escalate", response_model=SessionCloseResponse)
|
||||
@limiter.limit("15/minute")
|
||||
@@ -463,21 +470,49 @@ async def escalate_session(
|
||||
db: Annotated[AsyncSession, Depends(get_db)],
|
||||
_: None = Depends(require_engineer_or_admin),
|
||||
):
|
||||
"""Escalate a FlowPilot session to another engineer."""
|
||||
"""Escalate a FlowPilot session — unified through HandoffManager."""
|
||||
from app.services.handoff_manager import HandoffManager
|
||||
|
||||
# Owner-only — matches the original constraint on flowpilot_engine.escalate_session.
|
||||
session_result = await db.execute(
|
||||
select(AISession).where(
|
||||
AISession.id == session_id,
|
||||
AISession.user_id == current_user.id,
|
||||
)
|
||||
)
|
||||
session = session_result.scalar_one_or_none()
|
||||
if not session:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail="Session not found"
|
||||
)
|
||||
|
||||
manager = HandoffManager(db)
|
||||
try:
|
||||
result = await flowpilot_engine.escalate_session(
|
||||
handoff = await manager.create_handoff(
|
||||
session_id=session_id,
|
||||
request=data,
|
||||
intent="escalate",
|
||||
engineer_notes=data.escalation_reason,
|
||||
user_id=current_user.id,
|
||||
db=db,
|
||||
priority="normal",
|
||||
target_user_id=data.escalated_to_id,
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
|
||||
except PermissionError as e:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))
|
||||
|
||||
documentation, psa_result = await manager.finalize_escalation(
|
||||
handoff, session, current_user.id
|
||||
)
|
||||
|
||||
await db.commit()
|
||||
return result
|
||||
|
||||
await manager.dispatch_escalation_notifications(handoff)
|
||||
|
||||
return SessionCloseResponse(
|
||||
session_id=session.id,
|
||||
status=session.status,
|
||||
documentation=documentation,
|
||||
**psa_result,
|
||||
)
|
||||
|
||||
|
||||
# ── Pause ──
|
||||
@@ -644,7 +679,7 @@ async def get_escalation_queue(
|
||||
select(AISession)
|
||||
.where(
|
||||
scope_filter,
|
||||
AISession.status == "requesting_escalation",
|
||||
AISession.status.in_(("requesting_escalation", "escalated")),
|
||||
)
|
||||
.order_by(AISession.created_at.desc())
|
||||
)
|
||||
|
||||
@@ -63,10 +63,16 @@ async def create_handoff(
|
||||
engineer_notes=body.engineer_notes,
|
||||
user_id=current_user.id,
|
||||
priority=body.priority,
|
||||
target_user_id=body.target_user_id,
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
|
||||
# For escalate: generate documentation + push to PSA before commit so
|
||||
# the handoff and the PSA-state changes land atomically.
|
||||
if handoff.intent == "escalate":
|
||||
await manager.finalize_escalation(handoff, session, current_user.id)
|
||||
|
||||
await db.commit()
|
||||
|
||||
# Best-effort notification dispatch AFTER commit so we never email about
|
||||
|
||||
@@ -161,7 +161,7 @@ async def get_sidebar_stats(
|
||||
select(func.count()).where(
|
||||
and_(
|
||||
esc_scope,
|
||||
AISession.status == "requesting_escalation",
|
||||
AISession.status.in_(("requesting_escalation", "escalated")),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@@ -10,6 +10,11 @@ class HandoffCreateRequest(BaseModel):
|
||||
intent: str = Field(..., pattern="^(park|escalate)$")
|
||||
engineer_notes: str | None = None
|
||||
priority: str = Field("normal", pattern="^(normal|elevated)$")
|
||||
# Optional escalation target — if set, only this user is the named
|
||||
# recipient. Notification dispatch fans out to all engineer/admin/owner
|
||||
# users in the account either way; this just records the original
|
||||
# engineer's preferred recipient on the session for audit/UX.
|
||||
target_user_id: UUID | None = None
|
||||
|
||||
|
||||
class HandoffResponse(BaseModel):
|
||||
|
||||
@@ -632,8 +632,10 @@ async def pickup_session(
|
||||
allow_team_access=True, team_id=team_id,
|
||||
)
|
||||
|
||||
if session.status != "requesting_escalation":
|
||||
raise ValueError(f"Session is {session.status}, not requesting_escalation")
|
||||
if session.status not in ("requesting_escalation", "escalated"):
|
||||
raise ValueError(
|
||||
f"Session is {session.status}, not in an escalated state"
|
||||
)
|
||||
|
||||
# Can't pick up your own session
|
||||
if session.user_id == user_id:
|
||||
|
||||
@@ -3,6 +3,15 @@
|
||||
Creates handoff snapshots, AI assessments (for escalations), claim workflow,
|
||||
and queue queries. Dual-writes to ai_sessions.escalation_package for
|
||||
backward compatibility with the existing escalation queue.
|
||||
|
||||
For intent='escalate', `create_handoff` also runs the legacy enrichment
|
||||
that the deprecated `/escalate` endpoint used to do directly: setting
|
||||
`escalated_to_id`, building the AI-enhanced escalation_package (Sonnet),
|
||||
and recording escalation_reason. `finalize_escalation` then generates the
|
||||
SessionDocumentation and pushes to PSA. `dispatch_escalation_notifications`
|
||||
fans out the bell-icon AppNotification + external channels (Slack/Teams)
|
||||
on top of per-user emails. The `/escalate` endpoint is now a thin shim
|
||||
calling these in sequence.
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
@@ -12,6 +21,7 @@ from uuid import UUID
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.email import EmailService
|
||||
@@ -20,6 +30,8 @@ from app.models.ai_session import AISession
|
||||
from app.models.session_branch import SessionBranch
|
||||
from app.models.session_handoff import SessionHandoff
|
||||
from app.models.user import User
|
||||
from app.schemas.ai_session import SessionDocumentation
|
||||
from app.services.notification_service import notify
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -37,19 +49,46 @@ class HandoffManager:
|
||||
engineer_notes: str | None,
|
||||
user_id: UUID,
|
||||
priority: str = "normal",
|
||||
target_user_id: UUID | None = None,
|
||||
) -> SessionHandoff:
|
||||
"""Create a handoff (park or escalate).
|
||||
|
||||
Generates snapshot, updates session status, dual-writes to
|
||||
escalation_package for backward compat.
|
||||
|
||||
For intent='escalate' also: sets `session.escalation_reason` and
|
||||
optionally `session.escalated_to_id`, builds the AI-enhanced
|
||||
escalation package (the rich one the legacy `/escalate` path used
|
||||
to produce), and merges the handoff metadata into it. Self-targeting
|
||||
is rejected with ValueError, matching legacy behavior.
|
||||
"""
|
||||
# Eager-load steps + user — _build_escalation_package_enhanced and
|
||||
# finalize_escalation iterate over session.steps to compose the
|
||||
# legacy enriched package and the SessionDocumentation, and the
|
||||
# notify() dispatcher reads session.user.name. Without selectinload
|
||||
# the async session raises MissingGreenlet on attribute access.
|
||||
result = await self.db.execute(
|
||||
select(AISession).where(AISession.id == session_id)
|
||||
select(AISession)
|
||||
.options(
|
||||
selectinload(AISession.steps),
|
||||
selectinload(AISession.user),
|
||||
)
|
||||
.where(AISession.id == session_id)
|
||||
)
|
||||
session = result.scalar_one_or_none()
|
||||
if not session:
|
||||
raise ValueError(f"Session {session_id} not found")
|
||||
|
||||
if intent == "escalate":
|
||||
if target_user_id and target_user_id == user_id:
|
||||
raise ValueError(
|
||||
"Cannot escalate a session to yourself. Use pause instead."
|
||||
)
|
||||
if session.status not in ("active", "paused"):
|
||||
raise ValueError(
|
||||
f"Cannot escalate session in status: {session.status}"
|
||||
)
|
||||
|
||||
# Generate snapshot
|
||||
snapshot = await self._generate_snapshot(session)
|
||||
|
||||
@@ -80,20 +119,134 @@ class HandoffManager:
|
||||
session.status = "paused"
|
||||
elif intent == "escalate":
|
||||
session.status = "escalated"
|
||||
session.escalation_reason = engineer_notes
|
||||
if target_user_id:
|
||||
session.escalated_to_id = target_user_id
|
||||
|
||||
session.handoff_count = (session.handoff_count or 0) + 1
|
||||
|
||||
# Dual-write for backward compat
|
||||
session.escalation_package = {
|
||||
"snapshot": snapshot,
|
||||
"intent": intent,
|
||||
"engineer_notes": engineer_notes,
|
||||
"handoff_id": str(handoff.id),
|
||||
}
|
||||
# Dual-write to escalation_package. For escalate, build the
|
||||
# AI-enhanced package (preserves the legacy rich shape that
|
||||
# SessionBriefing/PSA writeback consume), then layer in the new
|
||||
# handoff metadata. For park, the lightweight shape is fine —
|
||||
# there's no legacy enhanced package for parking.
|
||||
if intent == "escalate":
|
||||
enhanced_pkg = await self._build_enhanced_escalation_package(
|
||||
session, user_id
|
||||
)
|
||||
enhanced_pkg["intent"] = intent
|
||||
enhanced_pkg["engineer_notes"] = engineer_notes
|
||||
enhanced_pkg["handoff_id"] = str(handoff.id)
|
||||
enhanced_pkg["snapshot"] = snapshot
|
||||
session.escalation_package = enhanced_pkg
|
||||
else:
|
||||
session.escalation_package = {
|
||||
"snapshot": snapshot,
|
||||
"intent": intent,
|
||||
"engineer_notes": engineer_notes,
|
||||
"handoff_id": str(handoff.id),
|
||||
}
|
||||
|
||||
await self.db.flush()
|
||||
return handoff
|
||||
|
||||
async def finalize_escalation(
|
||||
self,
|
||||
handoff: SessionHandoff,
|
||||
session: AISession,
|
||||
user_id: UUID,
|
||||
) -> tuple[SessionDocumentation | None, dict[str, Any]]:
|
||||
"""Post-create enrichment for intent='escalate' handoffs.
|
||||
|
||||
Generates the SessionDocumentation + pushes documentation to PSA if
|
||||
a ticket is linked. Returns (documentation, psa_result) so the
|
||||
legacy `/escalate` shim can map back to SessionCloseResponse. Safe
|
||||
to call only when handoff.intent == 'escalate' — for park, returns
|
||||
a no-op no-PSA dict.
|
||||
"""
|
||||
if handoff.intent != "escalate":
|
||||
return None, {
|
||||
"psa_push_status": "no_psa",
|
||||
"psa_push_error": None,
|
||||
"member_mapping_warning": None,
|
||||
}
|
||||
|
||||
# Lazy import to avoid circular dependency: flowpilot_engine imports
|
||||
# plenty of services at module load time and we don't want
|
||||
# handoff_manager pulled into that graph at import.
|
||||
from app.services.flowpilot_engine import (
|
||||
_generate_documentation,
|
||||
_push_to_psa,
|
||||
)
|
||||
|
||||
documentation = _generate_documentation(session)
|
||||
psa_result = await _push_to_psa(session, user_id, self.db)
|
||||
|
||||
# Bell-icon AppNotification rows + external account-level channels
|
||||
# (Slack/Teams webhooks, shared escalations inboxes). This is the
|
||||
# `notify()` call the legacy /escalate path used to make directly,
|
||||
# and it has to happen BEFORE the endpoint commits so the
|
||||
# AppNotification rows land atomically with the handoff. Per-user
|
||||
# emails come after commit in dispatch_escalation_notifications —
|
||||
# those are pure IO with no persistent state.
|
||||
try:
|
||||
engineer_user = (
|
||||
await self.db.execute(
|
||||
select(User).where(User.id == user_id)
|
||||
)
|
||||
).scalar_one_or_none()
|
||||
engineer_name = (
|
||||
engineer_user.name
|
||||
if engineer_user and engineer_user.name
|
||||
else "Unknown"
|
||||
)
|
||||
target_user_ids = (
|
||||
[session.escalated_to_id] if session.escalated_to_id else None
|
||||
)
|
||||
await notify(
|
||||
"session.escalated",
|
||||
handoff.account_id,
|
||||
{
|
||||
"session_id": str(handoff.session_id),
|
||||
"engineer_name": engineer_name,
|
||||
"escalation_reason": handoff.engineer_notes or "",
|
||||
"problem_summary": session.problem_summary or "N/A",
|
||||
},
|
||||
self.db,
|
||||
target_user_ids=target_user_ids,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"notify() dispatch failed for handoff %s", handoff.id
|
||||
)
|
||||
|
||||
return documentation, psa_result
|
||||
|
||||
async def _build_enhanced_escalation_package(
|
||||
self,
|
||||
session: AISession,
|
||||
user_id: UUID,
|
||||
) -> dict[str, Any]:
|
||||
"""Lazy wrapper around the legacy enhanced-package builder.
|
||||
|
||||
The builder lives in flowpilot_engine; we only need it for the
|
||||
escalate path. Failures are caught here so handoff creation never
|
||||
depends on the optional Sonnet enrichment — return the minimal
|
||||
shape on failure.
|
||||
"""
|
||||
try:
|
||||
from app.services.flowpilot_engine import (
|
||||
_build_escalation_package_enhanced,
|
||||
)
|
||||
return await _build_escalation_package_enhanced(session, user_id)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Enhanced escalation package build failed for session %s; "
|
||||
"falling back to minimal package",
|
||||
session.id,
|
||||
)
|
||||
return {}
|
||||
|
||||
async def dispatch_escalation_notifications(
|
||||
self, handoff: SessionHandoff
|
||||
) -> int:
|
||||
|
||||
@@ -168,7 +168,7 @@ export function useFlowPilotSession(): UseFlowPilotSession {
|
||||
setIsProcessing(true)
|
||||
try {
|
||||
const result = await aiSessionsApi.escalateSession(session.id, data)
|
||||
setSession(prev => prev ? { ...prev, status: 'requesting_escalation' } : null)
|
||||
setSession(prev => prev ? { ...prev, status: 'escalated' } : null)
|
||||
setDocumentation(result.documentation)
|
||||
setPsaPushStatus(result.psa_push_status)
|
||||
setPsaPushError(result.psa_push_error)
|
||||
|
||||
Reference in New Issue
Block a user