Files
resolutionflow/backend/app/api/endpoints/session_handoffs.py
Michael Chihlas e8ba74ed6d
All checks were successful
Mirror to GitHub / mirror (push) Successful in 6m5s
CI / frontend (pull_request) Successful in 11m59s
CI / e2e (pull_request) Successful in 10m7s
CI / backend (pull_request) Successful in 16m22s
feat(escalations): distinguishable notifications, async AI, richer sidebar
Three improvements driven by live wedge testing.

1) Notification title now includes a problem snippet and PSA ticket
   suffix when present:
     "Escalation from Jane · #12345: Outlook is failing to sync email…"
   Replaces the prior "Session escalated by Jane" copy that made every
   escalation from the same junior look identical in the bell panel.
   Snippet is trimmed to 70 chars with ellipsis. handoff_manager now
   passes psa_ticket_id through in the notify() payload so this works
   for both /escalate and /handoff entry points.

2) AI enrichment (assessment + enhanced escalation_package) moved to
   a FastAPI BackgroundTask. The escalating engineer no longer waits
   on 15-25s of Sonnet latency — handoff creation returns as soon as
   snapshot, status flip, dual-write, documentation, PSA push, and
   notify() are committed. enrich_escalation_async opens its own DB
   session, runs both AI calls, updates handoff.ai_assessment +
   session.escalation_package, commits, and publishes a new
   `handoff_assessment_ready` event on the escalation bus. Frontend
   doesn't yet listen for that event — the magic-moment screen still
   shows a placeholder ("AI assessment is still generating. Reopen
   this view in a few seconds…") which is honest about the state.
   Live polling / auto-refresh on the bus event is the natural next
   step.

3) ChatSidebar entries now surface the problem summary as a secondary
   line and tag PSA-linked sessions with a monospace #ticket badge plus
   an "Escalated" pill on in-transit sessions. ChatListItem grew
   problem_summary, psa_ticket_id, and status fields; loadChats
   populates them from listSessions. The user couldn't tell their own
   sessions apart in the sidebar because they all rendered as "New
   Chat" with no distinguishing detail — this fixes that for any
   session, escalated or not.

Test plan
- Backend full suite: 1103 passed in 255.85s with -n auto.
- Frontend tsc -b clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-28 00:34:32 -04:00

230 lines
8.3 KiB
Python

"""Handoff endpoints — unified park/escalate.
POST /ai-sessions/{id}/handoff — Create handoff
GET /ai-sessions/{id}/handoffs — Handoff history
POST /ai-sessions/{id}/handoffs/{hid}/claim — Claim session
GET /ai-sessions/queue — Team queue
GET /ai-sessions/escalations/stream — SSE: live escalation arrivals
"""
import asyncio
import json
import logging
from typing import Annotated, AsyncGenerator
from uuid import UUID
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, status
from fastapi.responses import StreamingResponse
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin
from app.core.escalation_bus import bus as escalation_bus
from app.models.user import User
from app.models.ai_session import AISession
from app.models.session_handoff import SessionHandoff
from app.services.handoff_manager import HandoffManager
from app.schemas.session_handoff import (
HandoffCreateRequest,
HandoffResponse,
)
logger = logging.getLogger(__name__)
# Queue endpoint needs its own router (no session_id prefix)
queue_router = APIRouter(prefix="/ai-sessions", tags=["session-handoffs"])
# Session-scoped endpoints
router = APIRouter(prefix="/ai-sessions/{session_id}", tags=["session-handoffs"])
@router.post("/handoff", response_model=HandoffResponse, status_code=status.HTTP_201_CREATED)
async def create_handoff(
session_id: UUID,
body: HandoffCreateRequest,
background_tasks: BackgroundTasks,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> HandoffResponse:
"""Create a handoff (park or escalate)."""
result = await db.execute(
select(AISession).where(
AISession.id == session_id,
AISession.user_id == current_user.id,
)
)
session = result.scalar_one_or_none()
if not session:
raise HTTPException(status_code=404, detail="Session not found")
manager = HandoffManager(db)
try:
handoff = await manager.create_handoff(
session_id=session_id,
intent=body.intent,
engineer_notes=body.engineer_notes,
user_id=current_user.id,
priority=body.priority,
target_user_id=body.target_user_id,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# For escalate: generate documentation + push to PSA before commit so
# the handoff and the PSA-state changes land atomically.
if handoff.intent == "escalate":
await manager.finalize_escalation(handoff, session, current_user.id)
await db.commit()
# Best-effort notification dispatch AFTER commit so we never email about
# a rolled-back handoff. Failures are swallowed inside the manager —
# handoff creation is authoritative; notifications are advisory.
if handoff.intent == "escalate":
from app.services.handoff_manager import enrich_escalation_async
await manager.dispatch_escalation_notifications(handoff)
# AI enrichment (Sonnet assessment + enhanced escalation_package)
# runs in the background after the response is sent so the
# escalating engineer doesn't wait on 15-25s of model latency.
background_tasks.add_task(
enrich_escalation_async, handoff.id, current_user.id
)
return HandoffResponse.model_validate(handoff)
@router.get("/handoffs", response_model=list[HandoffResponse])
async def list_handoffs(
session_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> list[HandoffResponse]:
"""Get handoff history for a session."""
result = await db.execute(
select(SessionHandoff)
.where(SessionHandoff.session_id == session_id)
.order_by(SessionHandoff.created_at.desc())
)
handoffs = result.scalars().all()
return [HandoffResponse.model_validate(h) for h in handoffs]
@router.post("/handoffs/{handoff_id}/claim", response_model=HandoffResponse)
async def claim_handoff(
session_id: UUID,
handoff_id: UUID,
current_user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> HandoffResponse:
"""Claim a handed-off session.
Role-gated to engineer/admin/owner — viewers cannot claim. The race-condition
story (two seniors clicking Pick Up simultaneously) depends on auth gating
for audit integrity. Codex review flagged this as wedge-relevant; locked
in-scope for Escalation Mode v1.
"""
manager = HandoffManager(db)
try:
handoff = await manager.claim_session(
handoff_id=handoff_id,
claiming_user_id=current_user.id,
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
await db.commit()
return HandoffResponse.model_validate(handoff)
@queue_router.get("/queue")
async def get_queue(
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> list[dict]:
"""Get team queue of parked + escalated sessions."""
manager = HandoffManager(db)
return await manager.get_queue(
team_id=current_user.team_id,
account_id=current_user.account_id,
)
# ─── Live escalation arrivals (SSE) ──────────────────────────────────────────
#
# Streams `handoff_created` events to subscribers in the same account_id as
# the new handoff. Connected EscalationQueue instances prepend the new card
# with the locked 200ms slide-in. Account-scoped: cross-tenant leakage is
# prevented at the bus.publish boundary (only handoff.account_id subscribers
# are notified) and re-enforced here by binding the subscription to
# current_user.account_id.
#
# Heartbeat: a `: keepalive\n\n` SSE comment every 25s keeps the connection
# alive through Railway / nginx default 60s idle timeouts. Reconnect policy
# is on the client (browser EventSource auto-reconnects; our fetch-based
# reader retries with backoff).
_HEARTBEAT_INTERVAL_S = 25
_QUEUE_GET_TIMEOUT_S = 25 # < heartbeat so heartbeat fires reliably
@queue_router.get("/escalations/stream")
async def stream_escalations(
request: Request,
current_user: Annotated[
User,
Depends(require_engineer_or_admin, scope="function"),
],
):
"""SSE stream of new escalation arrivals for the current user's account.
Role-gated to engineer/admin/owner so viewers can't subscribe (matches
the queue + claim role surface). One open connection per browser tab is
expected; the bus handles fan-out.
"""
if not current_user.account_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="No account"
)
account_id = current_user.account_id
async def event_generator() -> AsyncGenerator[str, None]:
queue = await escalation_bus.subscribe(account_id)
try:
# Initial hello so the client knows the stream is live.
yield (
"event: ready\n"
f"data: {json.dumps({'account_id': str(account_id)})}\n\n"
)
while True:
if await request.is_disconnected():
break
try:
event = await asyncio.wait_for(
queue.get(), timeout=_QUEUE_GET_TIMEOUT_S
)
except asyncio.TimeoutError:
# Heartbeat keeps the connection alive through proxies.
yield ": keepalive\n\n"
continue
event_type = event.get("type", "message")
yield (
f"event: {event_type}\n"
f"data: {json.dumps(event)}\n\n"
)
finally:
await escalation_bus.unsubscribe(account_id, queue)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)