Files
resolutionflow/backend/app/api/endpoints/session_handoffs.py
Michael Chihlas 029680ab2d
All checks were successful
Mirror to GitHub / mirror (push) Successful in 4s
CI / frontend (pull_request) Successful in 5m8s
CI / backend (pull_request) Successful in 10m13s
CI / e2e (pull_request) Successful in 10m47s
feat(escalations): unify /escalate through HandoffManager
Replaces the legacy flowpilot_engine.escalate_session orchestration with
a single canonical path through HandoffManager. Every escalation now
creates a SessionHandoff row, fans out via the SSE bus, persists
AppNotification rows for the bell icon, dispatches to external channels
(Slack/Teams) via notify(), and emails per-user — regardless of whether
the call entered through /escalate (legacy URL) or /handoff (new URL).
The senior-pickup magic-moment screen now works end-to-end from the
EscalateModal bell-icon path the user just tested.

Backend
- HandoffCreateRequest gains optional target_user_id (the equivalent of
  the legacy escalated_to_id field). Self-targeting rejected.
- HandoffManager.create_handoff handles intent='escalate' end-to-end:
  sets escalation_reason + escalated_to_id, builds the legacy enhanced
  AI escalation_package (Sonnet, lazy-imported from flowpilot_engine,
  graceful fallback on failure), and merges handoff metadata into it.
  Eager-loads session.steps and session.user via selectinload — required
  by both the enhanced-package builder and notify() to avoid
  MissingGreenlet on async lazy access.
- HandoffManager.finalize_escalation generates SessionDocumentation,
  pushes documentation to PSA, and runs notify() — pre-commit so the
  AppNotification rows persist atomically with the handoff.
- HandoffManager.dispatch_escalation_notifications keeps only the
  fire-and-forget IO (bus publish, per-user emails) — runs post-commit.
  Pulls engineer name via a separate User query rather than relying on
  session.user lazy access.
- /handoff endpoint passes target_user_id through and calls
  finalize_escalation pre-commit.
- /escalate endpoint is now a thin shim: owner-only session lookup,
  HandoffManager.create_handoff(intent='escalate'), finalize_escalation,
  commit, dispatch_escalation_notifications, return SessionCloseResponse
  built from documentation + psa_result. flowpilot_engine.escalate_session
  is no longer called by any endpoint.
- pickup_session accepts both 'requesting_escalation' (legacy in-flight
  sessions) and 'escalated' (new canonical) so the migration is seamless
  for sessions already in the queue.
- Escalation queue list and sidebar count now match either status.

Frontend
- useFlowPilotSession optimistic update flips status to 'escalated'
  instead of 'requesting_escalation' so the page state matches the
  unified backend response.

Verified end-to-end live: a fresh /escalate call from the junior produces
status='escalated', a SessionHandoff row, a SessionDocumentation, PSA
push attempted (no_psa for this test session), AND a bell-icon
AppNotification for the team admin with link
/pilot/{session_id}?pickup=true. Backend test suite: 1103 passed.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 22:27:26 -04:00

221 lines
7.8 KiB
Python

"""Handoff endpoints — unified park/escalate.
POST /ai-sessions/{id}/handoff — Create handoff
GET /ai-sessions/{id}/handoffs — Handoff history
POST /ai-sessions/{id}/handoffs/{hid}/claim — Claim session
GET /ai-sessions/queue — Team queue
GET /ai-sessions/escalations/stream — SSE: live escalation arrivals
"""
import asyncio
import json
import logging
from typing import Annotated, AsyncGenerator
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.responses import StreamingResponse
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin
from app.core.escalation_bus import bus as escalation_bus
from app.models.user import User
from app.models.ai_session import AISession
from app.models.session_handoff import SessionHandoff
from app.services.handoff_manager import HandoffManager
from app.schemas.session_handoff import (
HandoffCreateRequest,
HandoffResponse,
)
logger = logging.getLogger(__name__)
# Queue endpoint needs its own router (no session_id prefix)
queue_router = APIRouter(prefix="/ai-sessions", tags=["session-handoffs"])
# Session-scoped endpoints
router = APIRouter(prefix="/ai-sessions/{session_id}", tags=["session-handoffs"])
@router.post("/handoff", response_model=HandoffResponse, status_code=status.HTTP_201_CREATED)
async def create_handoff(
session_id: UUID,
body: HandoffCreateRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> HandoffResponse:
"""Create a handoff (park or escalate)."""
result = await db.execute(
select(AISession).where(
AISession.id == session_id,
AISession.user_id == current_user.id,
)
)
session = result.scalar_one_or_none()
if not session:
raise HTTPException(status_code=404, detail="Session not found")
manager = HandoffManager(db)
try:
handoff = await manager.create_handoff(
session_id=session_id,
intent=body.intent,
engineer_notes=body.engineer_notes,
user_id=current_user.id,
priority=body.priority,
target_user_id=body.target_user_id,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# For escalate: generate documentation + push to PSA before commit so
# the handoff and the PSA-state changes land atomically.
if handoff.intent == "escalate":
await manager.finalize_escalation(handoff, session, current_user.id)
await db.commit()
# Best-effort notification dispatch AFTER commit so we never email about
# a rolled-back handoff. Failures are swallowed inside the manager —
# handoff creation is authoritative; notifications are advisory.
if handoff.intent == "escalate":
await manager.dispatch_escalation_notifications(handoff)
return HandoffResponse.model_validate(handoff)
@router.get("/handoffs", response_model=list[HandoffResponse])
async def list_handoffs(
session_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> list[HandoffResponse]:
"""Get handoff history for a session."""
result = await db.execute(
select(SessionHandoff)
.where(SessionHandoff.session_id == session_id)
.order_by(SessionHandoff.created_at.desc())
)
handoffs = result.scalars().all()
return [HandoffResponse.model_validate(h) for h in handoffs]
@router.post("/handoffs/{handoff_id}/claim", response_model=HandoffResponse)
async def claim_handoff(
session_id: UUID,
handoff_id: UUID,
current_user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> HandoffResponse:
"""Claim a handed-off session.
Role-gated to engineer/admin/owner — viewers cannot claim. The race-condition
story (two seniors clicking Pick Up simultaneously) depends on auth gating
for audit integrity. Codex review flagged this as wedge-relevant; locked
in-scope for Escalation Mode v1.
"""
manager = HandoffManager(db)
try:
handoff = await manager.claim_session(
handoff_id=handoff_id,
claiming_user_id=current_user.id,
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
await db.commit()
return HandoffResponse.model_validate(handoff)
@queue_router.get("/queue")
async def get_queue(
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> list[dict]:
"""Get team queue of parked + escalated sessions."""
manager = HandoffManager(db)
return await manager.get_queue(
team_id=current_user.team_id,
account_id=current_user.account_id,
)
# ─── Live escalation arrivals (SSE) ──────────────────────────────────────────
#
# Streams `handoff_created` events to subscribers in the same account_id as
# the new handoff. Connected EscalationQueue instances prepend the new card
# with the locked 200ms slide-in. Account-scoped: cross-tenant leakage is
# prevented at the bus.publish boundary (only handoff.account_id subscribers
# are notified) and re-enforced here by binding the subscription to
# current_user.account_id.
#
# Heartbeat: a `: keepalive\n\n` SSE comment every 25s keeps the connection
# alive through Railway / nginx default 60s idle timeouts. Reconnect policy
# is on the client (browser EventSource auto-reconnects; our fetch-based
# reader retries with backoff).
_HEARTBEAT_INTERVAL_S = 25
_QUEUE_GET_TIMEOUT_S = 25 # < heartbeat so heartbeat fires reliably
@queue_router.get("/escalations/stream")
async def stream_escalations(
request: Request,
current_user: Annotated[
User,
Depends(require_engineer_or_admin, scope="function"),
],
):
"""SSE stream of new escalation arrivals for the current user's account.
Role-gated to engineer/admin/owner so viewers can't subscribe (matches
the queue + claim role surface). One open connection per browser tab is
expected; the bus handles fan-out.
"""
if not current_user.account_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="No account"
)
account_id = current_user.account_id
async def event_generator() -> AsyncGenerator[str, None]:
queue = await escalation_bus.subscribe(account_id)
try:
# Initial hello so the client knows the stream is live.
yield (
"event: ready\n"
f"data: {json.dumps({'account_id': str(account_id)})}\n\n"
)
while True:
if await request.is_disconnected():
break
try:
event = await asyncio.wait_for(
queue.get(), timeout=_QUEUE_GET_TIMEOUT_S
)
except asyncio.TimeoutError:
# Heartbeat keeps the connection alive through proxies.
yield ": keepalive\n\n"
continue
event_type = event.get("type", "message")
yield (
f"event: {event_type}\n"
f"data: {json.dumps(event)}\n\n"
)
finally:
await escalation_bus.unsubscribe(account_id, queue)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)