feat(escalations): unify /escalate through HandoffManager
Replaces the legacy flowpilot_engine.escalate_session orchestration with
a single canonical path through HandoffManager. Every escalation now
creates a SessionHandoff row, fans out via the SSE bus, persists
AppNotification rows for the bell icon, dispatches to external channels
(Slack/Teams) via notify(), and emails per-user — regardless of whether
the call entered through /escalate (legacy URL) or /handoff (new URL).
The senior-pickup magic-moment screen now works end-to-end from the
EscalateModal bell-icon path the user just tested.
Backend
- HandoffCreateRequest gains optional target_user_id (the equivalent of
the legacy escalated_to_id field). Self-targeting rejected.
- HandoffManager.create_handoff handles intent='escalate' end-to-end:
sets escalation_reason + escalated_to_id, builds the legacy enhanced
AI escalation_package (Sonnet, lazy-imported from flowpilot_engine,
graceful fallback on failure), and merges handoff metadata into it.
Eager-loads session.steps and session.user via selectinload — required
by both the enhanced-package builder and notify() to avoid
MissingGreenlet on async lazy access.
- HandoffManager.finalize_escalation generates SessionDocumentation,
pushes documentation to PSA, and runs notify() — pre-commit so the
AppNotification rows persist atomically with the handoff.
- HandoffManager.dispatch_escalation_notifications keeps only the
fire-and-forget IO (bus publish, per-user emails) — runs post-commit.
Pulls engineer name via a separate User query rather than relying on
session.user lazy access.
- /handoff endpoint passes target_user_id through and calls
finalize_escalation pre-commit.
- /escalate endpoint is now a thin shim: owner-only session lookup,
HandoffManager.create_handoff(intent='escalate'), finalize_escalation,
commit, dispatch_escalation_notifications, return SessionCloseResponse
built from documentation + psa_result. flowpilot_engine.escalate_session
is no longer called by any endpoint.
- pickup_session accepts both 'requesting_escalation' (legacy in-flight
sessions) and 'escalated' (new canonical) so the migration is seamless
for sessions already in the queue.
- Escalation queue list and sidebar count now match either status.
Frontend
- useFlowPilotSession optimistic update flips status to 'escalated'
instead of 'requesting_escalation' so the page state matches the
unified backend response.
Verified end-to-end live: a fresh /escalate call from the junior produces
status='escalated', a SessionHandoff row, a SessionDocumentation, PSA
push attempted (no_psa for this test session), AND a bell-icon
AppNotification for the team admin with link
/pilot/{session_id}?pickup=true. Backend test suite: 1103 passed.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -452,6 +452,13 @@ async def resolve_session(
|
|||||||
|
|
||||||
|
|
||||||
# ── Escalate ──
|
# ── Escalate ──
|
||||||
|
#
|
||||||
|
# Thin shim over HandoffManager. The legacy `flowpilot_engine.escalate_session`
|
||||||
|
# is no longer the source of truth — every escalation now creates a
|
||||||
|
# SessionHandoff row, fans out via the SSE bus, dispatches AppNotification +
|
||||||
|
# external channels via notify(), and emails per-user. EscalateModal and the
|
||||||
|
# /handoff endpoint both funnel through here / through HandoffManager so the
|
||||||
|
# senior-pickup magic-moment screen works regardless of entry point.
|
||||||
|
|
||||||
@router.post("/{session_id}/escalate", response_model=SessionCloseResponse)
|
@router.post("/{session_id}/escalate", response_model=SessionCloseResponse)
|
||||||
@limiter.limit("15/minute")
|
@limiter.limit("15/minute")
|
||||||
@@ -463,21 +470,49 @@ async def escalate_session(
|
|||||||
db: Annotated[AsyncSession, Depends(get_db)],
|
db: Annotated[AsyncSession, Depends(get_db)],
|
||||||
_: None = Depends(require_engineer_or_admin),
|
_: None = Depends(require_engineer_or_admin),
|
||||||
):
|
):
|
||||||
"""Escalate a FlowPilot session to another engineer."""
|
"""Escalate a FlowPilot session — unified through HandoffManager."""
|
||||||
|
from app.services.handoff_manager import HandoffManager
|
||||||
|
|
||||||
|
# Owner-only — matches the original constraint on flowpilot_engine.escalate_session.
|
||||||
|
session_result = await db.execute(
|
||||||
|
select(AISession).where(
|
||||||
|
AISession.id == session_id,
|
||||||
|
AISession.user_id == current_user.id,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
session = session_result.scalar_one_or_none()
|
||||||
|
if not session:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_404_NOT_FOUND, detail="Session not found"
|
||||||
|
)
|
||||||
|
|
||||||
|
manager = HandoffManager(db)
|
||||||
try:
|
try:
|
||||||
result = await flowpilot_engine.escalate_session(
|
handoff = await manager.create_handoff(
|
||||||
session_id=session_id,
|
session_id=session_id,
|
||||||
request=data,
|
intent="escalate",
|
||||||
|
engineer_notes=data.escalation_reason,
|
||||||
user_id=current_user.id,
|
user_id=current_user.id,
|
||||||
db=db,
|
priority="normal",
|
||||||
|
target_user_id=data.escalated_to_id,
|
||||||
)
|
)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
|
||||||
except PermissionError as e:
|
|
||||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e))
|
documentation, psa_result = await manager.finalize_escalation(
|
||||||
|
handoff, session, current_user.id
|
||||||
|
)
|
||||||
|
|
||||||
await db.commit()
|
await db.commit()
|
||||||
return result
|
|
||||||
|
await manager.dispatch_escalation_notifications(handoff)
|
||||||
|
|
||||||
|
return SessionCloseResponse(
|
||||||
|
session_id=session.id,
|
||||||
|
status=session.status,
|
||||||
|
documentation=documentation,
|
||||||
|
**psa_result,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ── Pause ──
|
# ── Pause ──
|
||||||
@@ -644,7 +679,7 @@ async def get_escalation_queue(
|
|||||||
select(AISession)
|
select(AISession)
|
||||||
.where(
|
.where(
|
||||||
scope_filter,
|
scope_filter,
|
||||||
AISession.status == "requesting_escalation",
|
AISession.status.in_(("requesting_escalation", "escalated")),
|
||||||
)
|
)
|
||||||
.order_by(AISession.created_at.desc())
|
.order_by(AISession.created_at.desc())
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -63,10 +63,16 @@ async def create_handoff(
|
|||||||
engineer_notes=body.engineer_notes,
|
engineer_notes=body.engineer_notes,
|
||||||
user_id=current_user.id,
|
user_id=current_user.id,
|
||||||
priority=body.priority,
|
priority=body.priority,
|
||||||
|
target_user_id=body.target_user_id,
|
||||||
)
|
)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise HTTPException(status_code=400, detail=str(e))
|
raise HTTPException(status_code=400, detail=str(e))
|
||||||
|
|
||||||
|
# For escalate: generate documentation + push to PSA before commit so
|
||||||
|
# the handoff and the PSA-state changes land atomically.
|
||||||
|
if handoff.intent == "escalate":
|
||||||
|
await manager.finalize_escalation(handoff, session, current_user.id)
|
||||||
|
|
||||||
await db.commit()
|
await db.commit()
|
||||||
|
|
||||||
# Best-effort notification dispatch AFTER commit so we never email about
|
# Best-effort notification dispatch AFTER commit so we never email about
|
||||||
|
|||||||
@@ -161,7 +161,7 @@ async def get_sidebar_stats(
|
|||||||
select(func.count()).where(
|
select(func.count()).where(
|
||||||
and_(
|
and_(
|
||||||
esc_scope,
|
esc_scope,
|
||||||
AISession.status == "requesting_escalation",
|
AISession.status.in_(("requesting_escalation", "escalated")),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -10,6 +10,11 @@ class HandoffCreateRequest(BaseModel):
|
|||||||
intent: str = Field(..., pattern="^(park|escalate)$")
|
intent: str = Field(..., pattern="^(park|escalate)$")
|
||||||
engineer_notes: str | None = None
|
engineer_notes: str | None = None
|
||||||
priority: str = Field("normal", pattern="^(normal|elevated)$")
|
priority: str = Field("normal", pattern="^(normal|elevated)$")
|
||||||
|
# Optional escalation target — if set, only this user is the named
|
||||||
|
# recipient. Notification dispatch fans out to all engineer/admin/owner
|
||||||
|
# users in the account either way; this just records the original
|
||||||
|
# engineer's preferred recipient on the session for audit/UX.
|
||||||
|
target_user_id: UUID | None = None
|
||||||
|
|
||||||
|
|
||||||
class HandoffResponse(BaseModel):
|
class HandoffResponse(BaseModel):
|
||||||
|
|||||||
@@ -632,8 +632,10 @@ async def pickup_session(
|
|||||||
allow_team_access=True, team_id=team_id,
|
allow_team_access=True, team_id=team_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
if session.status != "requesting_escalation":
|
if session.status not in ("requesting_escalation", "escalated"):
|
||||||
raise ValueError(f"Session is {session.status}, not requesting_escalation")
|
raise ValueError(
|
||||||
|
f"Session is {session.status}, not in an escalated state"
|
||||||
|
)
|
||||||
|
|
||||||
# Can't pick up your own session
|
# Can't pick up your own session
|
||||||
if session.user_id == user_id:
|
if session.user_id == user_id:
|
||||||
|
|||||||
@@ -3,6 +3,15 @@
|
|||||||
Creates handoff snapshots, AI assessments (for escalations), claim workflow,
|
Creates handoff snapshots, AI assessments (for escalations), claim workflow,
|
||||||
and queue queries. Dual-writes to ai_sessions.escalation_package for
|
and queue queries. Dual-writes to ai_sessions.escalation_package for
|
||||||
backward compatibility with the existing escalation queue.
|
backward compatibility with the existing escalation queue.
|
||||||
|
|
||||||
|
For intent='escalate', `create_handoff` also runs the legacy enrichment
|
||||||
|
that the deprecated `/escalate` endpoint used to do directly: setting
|
||||||
|
`escalated_to_id`, building the AI-enhanced escalation_package (Sonnet),
|
||||||
|
and recording escalation_reason. `finalize_escalation` then generates the
|
||||||
|
SessionDocumentation and pushes to PSA. `dispatch_escalation_notifications`
|
||||||
|
fans out the bell-icon AppNotification + external channels (Slack/Teams)
|
||||||
|
on top of per-user emails. The `/escalate` endpoint is now a thin shim
|
||||||
|
calling these in sequence.
|
||||||
"""
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
@@ -12,6 +21,7 @@ from uuid import UUID
|
|||||||
|
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
from sqlalchemy.orm import selectinload
|
||||||
|
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
from app.core.email import EmailService
|
from app.core.email import EmailService
|
||||||
@@ -20,6 +30,8 @@ from app.models.ai_session import AISession
|
|||||||
from app.models.session_branch import SessionBranch
|
from app.models.session_branch import SessionBranch
|
||||||
from app.models.session_handoff import SessionHandoff
|
from app.models.session_handoff import SessionHandoff
|
||||||
from app.models.user import User
|
from app.models.user import User
|
||||||
|
from app.schemas.ai_session import SessionDocumentation
|
||||||
|
from app.services.notification_service import notify
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -37,19 +49,46 @@ class HandoffManager:
|
|||||||
engineer_notes: str | None,
|
engineer_notes: str | None,
|
||||||
user_id: UUID,
|
user_id: UUID,
|
||||||
priority: str = "normal",
|
priority: str = "normal",
|
||||||
|
target_user_id: UUID | None = None,
|
||||||
) -> SessionHandoff:
|
) -> SessionHandoff:
|
||||||
"""Create a handoff (park or escalate).
|
"""Create a handoff (park or escalate).
|
||||||
|
|
||||||
Generates snapshot, updates session status, dual-writes to
|
Generates snapshot, updates session status, dual-writes to
|
||||||
escalation_package for backward compat.
|
escalation_package for backward compat.
|
||||||
|
|
||||||
|
For intent='escalate' also: sets `session.escalation_reason` and
|
||||||
|
optionally `session.escalated_to_id`, builds the AI-enhanced
|
||||||
|
escalation package (the rich one the legacy `/escalate` path used
|
||||||
|
to produce), and merges the handoff metadata into it. Self-targeting
|
||||||
|
is rejected with ValueError, matching legacy behavior.
|
||||||
"""
|
"""
|
||||||
|
# Eager-load steps + user — _build_escalation_package_enhanced and
|
||||||
|
# finalize_escalation iterate over session.steps to compose the
|
||||||
|
# legacy enriched package and the SessionDocumentation, and the
|
||||||
|
# notify() dispatcher reads session.user.name. Without selectinload
|
||||||
|
# the async session raises MissingGreenlet on attribute access.
|
||||||
result = await self.db.execute(
|
result = await self.db.execute(
|
||||||
select(AISession).where(AISession.id == session_id)
|
select(AISession)
|
||||||
|
.options(
|
||||||
|
selectinload(AISession.steps),
|
||||||
|
selectinload(AISession.user),
|
||||||
|
)
|
||||||
|
.where(AISession.id == session_id)
|
||||||
)
|
)
|
||||||
session = result.scalar_one_or_none()
|
session = result.scalar_one_or_none()
|
||||||
if not session:
|
if not session:
|
||||||
raise ValueError(f"Session {session_id} not found")
|
raise ValueError(f"Session {session_id} not found")
|
||||||
|
|
||||||
|
if intent == "escalate":
|
||||||
|
if target_user_id and target_user_id == user_id:
|
||||||
|
raise ValueError(
|
||||||
|
"Cannot escalate a session to yourself. Use pause instead."
|
||||||
|
)
|
||||||
|
if session.status not in ("active", "paused"):
|
||||||
|
raise ValueError(
|
||||||
|
f"Cannot escalate session in status: {session.status}"
|
||||||
|
)
|
||||||
|
|
||||||
# Generate snapshot
|
# Generate snapshot
|
||||||
snapshot = await self._generate_snapshot(session)
|
snapshot = await self._generate_snapshot(session)
|
||||||
|
|
||||||
@@ -80,20 +119,134 @@ class HandoffManager:
|
|||||||
session.status = "paused"
|
session.status = "paused"
|
||||||
elif intent == "escalate":
|
elif intent == "escalate":
|
||||||
session.status = "escalated"
|
session.status = "escalated"
|
||||||
|
session.escalation_reason = engineer_notes
|
||||||
|
if target_user_id:
|
||||||
|
session.escalated_to_id = target_user_id
|
||||||
|
|
||||||
session.handoff_count = (session.handoff_count or 0) + 1
|
session.handoff_count = (session.handoff_count or 0) + 1
|
||||||
|
|
||||||
# Dual-write for backward compat
|
# Dual-write to escalation_package. For escalate, build the
|
||||||
session.escalation_package = {
|
# AI-enhanced package (preserves the legacy rich shape that
|
||||||
"snapshot": snapshot,
|
# SessionBriefing/PSA writeback consume), then layer in the new
|
||||||
"intent": intent,
|
# handoff metadata. For park, the lightweight shape is fine —
|
||||||
"engineer_notes": engineer_notes,
|
# there's no legacy enhanced package for parking.
|
||||||
"handoff_id": str(handoff.id),
|
if intent == "escalate":
|
||||||
}
|
enhanced_pkg = await self._build_enhanced_escalation_package(
|
||||||
|
session, user_id
|
||||||
|
)
|
||||||
|
enhanced_pkg["intent"] = intent
|
||||||
|
enhanced_pkg["engineer_notes"] = engineer_notes
|
||||||
|
enhanced_pkg["handoff_id"] = str(handoff.id)
|
||||||
|
enhanced_pkg["snapshot"] = snapshot
|
||||||
|
session.escalation_package = enhanced_pkg
|
||||||
|
else:
|
||||||
|
session.escalation_package = {
|
||||||
|
"snapshot": snapshot,
|
||||||
|
"intent": intent,
|
||||||
|
"engineer_notes": engineer_notes,
|
||||||
|
"handoff_id": str(handoff.id),
|
||||||
|
}
|
||||||
|
|
||||||
await self.db.flush()
|
await self.db.flush()
|
||||||
return handoff
|
return handoff
|
||||||
|
|
||||||
|
async def finalize_escalation(
|
||||||
|
self,
|
||||||
|
handoff: SessionHandoff,
|
||||||
|
session: AISession,
|
||||||
|
user_id: UUID,
|
||||||
|
) -> tuple[SessionDocumentation | None, dict[str, Any]]:
|
||||||
|
"""Post-create enrichment for intent='escalate' handoffs.
|
||||||
|
|
||||||
|
Generates the SessionDocumentation + pushes documentation to PSA if
|
||||||
|
a ticket is linked. Returns (documentation, psa_result) so the
|
||||||
|
legacy `/escalate` shim can map back to SessionCloseResponse. Safe
|
||||||
|
to call only when handoff.intent == 'escalate' — for park, returns
|
||||||
|
a no-op no-PSA dict.
|
||||||
|
"""
|
||||||
|
if handoff.intent != "escalate":
|
||||||
|
return None, {
|
||||||
|
"psa_push_status": "no_psa",
|
||||||
|
"psa_push_error": None,
|
||||||
|
"member_mapping_warning": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Lazy import to avoid circular dependency: flowpilot_engine imports
|
||||||
|
# plenty of services at module load time and we don't want
|
||||||
|
# handoff_manager pulled into that graph at import.
|
||||||
|
from app.services.flowpilot_engine import (
|
||||||
|
_generate_documentation,
|
||||||
|
_push_to_psa,
|
||||||
|
)
|
||||||
|
|
||||||
|
documentation = _generate_documentation(session)
|
||||||
|
psa_result = await _push_to_psa(session, user_id, self.db)
|
||||||
|
|
||||||
|
# Bell-icon AppNotification rows + external account-level channels
|
||||||
|
# (Slack/Teams webhooks, shared escalations inboxes). This is the
|
||||||
|
# `notify()` call the legacy /escalate path used to make directly,
|
||||||
|
# and it has to happen BEFORE the endpoint commits so the
|
||||||
|
# AppNotification rows land atomically with the handoff. Per-user
|
||||||
|
# emails come after commit in dispatch_escalation_notifications —
|
||||||
|
# those are pure IO with no persistent state.
|
||||||
|
try:
|
||||||
|
engineer_user = (
|
||||||
|
await self.db.execute(
|
||||||
|
select(User).where(User.id == user_id)
|
||||||
|
)
|
||||||
|
).scalar_one_or_none()
|
||||||
|
engineer_name = (
|
||||||
|
engineer_user.name
|
||||||
|
if engineer_user and engineer_user.name
|
||||||
|
else "Unknown"
|
||||||
|
)
|
||||||
|
target_user_ids = (
|
||||||
|
[session.escalated_to_id] if session.escalated_to_id else None
|
||||||
|
)
|
||||||
|
await notify(
|
||||||
|
"session.escalated",
|
||||||
|
handoff.account_id,
|
||||||
|
{
|
||||||
|
"session_id": str(handoff.session_id),
|
||||||
|
"engineer_name": engineer_name,
|
||||||
|
"escalation_reason": handoff.engineer_notes or "",
|
||||||
|
"problem_summary": session.problem_summary or "N/A",
|
||||||
|
},
|
||||||
|
self.db,
|
||||||
|
target_user_ids=target_user_ids,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(
|
||||||
|
"notify() dispatch failed for handoff %s", handoff.id
|
||||||
|
)
|
||||||
|
|
||||||
|
return documentation, psa_result
|
||||||
|
|
||||||
|
async def _build_enhanced_escalation_package(
|
||||||
|
self,
|
||||||
|
session: AISession,
|
||||||
|
user_id: UUID,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Lazy wrapper around the legacy enhanced-package builder.
|
||||||
|
|
||||||
|
The builder lives in flowpilot_engine; we only need it for the
|
||||||
|
escalate path. Failures are caught here so handoff creation never
|
||||||
|
depends on the optional Sonnet enrichment — return the minimal
|
||||||
|
shape on failure.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from app.services.flowpilot_engine import (
|
||||||
|
_build_escalation_package_enhanced,
|
||||||
|
)
|
||||||
|
return await _build_escalation_package_enhanced(session, user_id)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(
|
||||||
|
"Enhanced escalation package build failed for session %s; "
|
||||||
|
"falling back to minimal package",
|
||||||
|
session.id,
|
||||||
|
)
|
||||||
|
return {}
|
||||||
|
|
||||||
async def dispatch_escalation_notifications(
|
async def dispatch_escalation_notifications(
|
||||||
self, handoff: SessionHandoff
|
self, handoff: SessionHandoff
|
||||||
) -> int:
|
) -> int:
|
||||||
|
|||||||
@@ -168,7 +168,7 @@ export function useFlowPilotSession(): UseFlowPilotSession {
|
|||||||
setIsProcessing(true)
|
setIsProcessing(true)
|
||||||
try {
|
try {
|
||||||
const result = await aiSessionsApi.escalateSession(session.id, data)
|
const result = await aiSessionsApi.escalateSession(session.id, data)
|
||||||
setSession(prev => prev ? { ...prev, status: 'requesting_escalation' } : null)
|
setSession(prev => prev ? { ...prev, status: 'escalated' } : null)
|
||||||
setDocumentation(result.documentation)
|
setDocumentation(result.documentation)
|
||||||
setPsaPushStatus(result.psa_push_status)
|
setPsaPushStatus(result.psa_push_status)
|
||||||
setPsaPushError(result.psa_push_error)
|
setPsaPushError(result.psa_push_error)
|
||||||
|
|||||||
Reference in New Issue
Block a user