"""Handoff endpoints — unified park/escalate. POST /ai-sessions/{id}/handoff — Create handoff GET /ai-sessions/{id}/handoffs — Handoff history POST /ai-sessions/{id}/handoffs/{hid}/claim — Claim session GET /ai-sessions/queue — Team queue GET /ai-sessions/escalations/stream — SSE: live escalation arrivals """ import asyncio import json import logging from typing import Annotated, AsyncGenerator from uuid import UUID from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, status from fastapi.responses import StreamingResponse from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin from app.core.escalation_bus import bus as escalation_bus from app.models.user import User from app.models.ai_session import AISession from app.models.session_handoff import SessionHandoff from app.services.handoff_manager import HandoffAlreadyClaimedError, HandoffManager from app.schemas.session_handoff import ( HandoffCreateRequest, HandoffResponse, ) logger = logging.getLogger(__name__) # Queue endpoint needs its own router (no session_id prefix) queue_router = APIRouter(prefix="/ai-sessions", tags=["session-handoffs"]) # Session-scoped endpoints router = APIRouter(prefix="/ai-sessions/{session_id}", tags=["session-handoffs"]) @router.post("/handoff", response_model=HandoffResponse, status_code=status.HTTP_201_CREATED) async def create_handoff( session_id: UUID, body: HandoffCreateRequest, background_tasks: BackgroundTasks, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ) -> HandoffResponse: """Create a handoff (park or escalate).""" result = await db.execute( select(AISession).where( AISession.id == session_id, AISession.user_id == current_user.id, ) ) session = result.scalar_one_or_none() if not session: raise HTTPException(status_code=404, detail="Session not found") manager = HandoffManager(db) try: handoff = await manager.create_handoff( session_id=session_id, intent=body.intent, engineer_notes=body.engineer_notes, user_id=current_user.id, priority=body.priority, target_user_id=body.target_user_id, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) # For escalate: generate documentation + push to PSA before commit so # the handoff and the PSA-state changes land atomically. if handoff.intent == "escalate": await manager.finalize_escalation(handoff, session, current_user.id) await db.commit() # Best-effort notification dispatch AFTER commit so we never email about # a rolled-back handoff. Failures are swallowed inside the manager — # handoff creation is authoritative; notifications are advisory. if handoff.intent == "escalate": from app.services.handoff_manager import enrich_escalation_async await manager.dispatch_escalation_notifications(handoff) # AI enrichment (Sonnet assessment + enhanced escalation_package) # runs in the background after the response is sent so the # escalating engineer doesn't wait on 15-25s of model latency. background_tasks.add_task( enrich_escalation_async, handoff.id, current_user.id ) return HandoffResponse.model_validate(handoff).model_copy( update={"handed_off_by_name": current_user.name} ) @router.get("/handoffs", response_model=list[HandoffResponse]) async def list_handoffs( session_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ) -> list[HandoffResponse]: """Get handoff history for a session.""" result = await db.execute( select(SessionHandoff) .where(SessionHandoff.session_id == session_id) .order_by(SessionHandoff.created_at.desc()) ) handoffs = result.scalars().all() return [HandoffResponse.model_validate(h) for h in handoffs] @router.post("/handoffs/{handoff_id}/claim", response_model=HandoffResponse) async def claim_handoff( session_id: UUID, handoff_id: UUID, current_user: Annotated[User, Depends(require_engineer_or_admin)], db: Annotated[AsyncSession, Depends(get_db)], ) -> HandoffResponse: """Claim a handed-off session. Role-gated to engineer/admin/owner — viewers cannot claim. The race-condition story (two seniors clicking Pick Up simultaneously) depends on auth gating for audit integrity. Codex review flagged this as wedge-relevant; locked in-scope for Escalation Mode v1. """ manager = HandoffManager(db) try: handoff = await manager.claim_session( handoff_id=handoff_id, claiming_user_id=current_user.id, ) except HandoffAlreadyClaimedError as e: # Loser of the race — the API surfaces structured detail so the # client can render "Already claimed by {name} {time_ago}" without # a follow-up fetch. raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail={ "error": "already_claimed", "claimed_by_id": str(e.claimed_by_id), "claimed_by_name": e.claimed_by_name, "claimed_at": e.claimed_at.isoformat(), }, ) except PermissionError as e: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) await db.commit() handed_off_by_name = ( handoff.handed_off_by_user.name if handoff.handed_off_by_user else None ) return HandoffResponse.model_validate(handoff).model_copy( update={"handed_off_by_name": handed_off_by_name} ) @queue_router.get("/queue") async def get_queue( current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ) -> list[dict]: """Get team queue of parked + escalated sessions.""" manager = HandoffManager(db) return await manager.get_queue( team_id=current_user.team_id, account_id=current_user.account_id, ) # ─── Live escalation arrivals (SSE) ────────────────────────────────────────── # # Streams `handoff_created` events to subscribers in the same account_id as # the new handoff. Connected EscalationQueue instances prepend the new card # with the locked 200ms slide-in. Account-scoped: cross-tenant leakage is # prevented at the bus.publish boundary (only handoff.account_id subscribers # are notified) and re-enforced here by binding the subscription to # current_user.account_id. # # Heartbeat: a `: keepalive\n\n` SSE comment every 25s keeps the connection # alive through Railway / nginx default 60s idle timeouts. Reconnect policy # is on the client (browser EventSource auto-reconnects; our fetch-based # reader retries with backoff). _HEARTBEAT_INTERVAL_S = 25 _QUEUE_GET_TIMEOUT_S = 25 # < heartbeat so heartbeat fires reliably @queue_router.get("/escalations/stream") async def stream_escalations( request: Request, current_user: Annotated[ User, Depends(require_engineer_or_admin, scope="function"), ], ): """SSE stream of new escalation arrivals for the current user's account. Role-gated to engineer/admin/owner so viewers can't subscribe (matches the queue + claim role surface). One open connection per browser tab is expected; the bus handles fan-out. """ if not current_user.account_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="No account" ) account_id = current_user.account_id async def event_generator() -> AsyncGenerator[str, None]: queue = await escalation_bus.subscribe(account_id) try: # Initial hello so the client knows the stream is live. yield ( "event: ready\n" f"data: {json.dumps({'account_id': str(account_id)})}\n\n" ) while True: if await request.is_disconnected(): break try: event = await asyncio.wait_for( queue.get(), timeout=_QUEUE_GET_TIMEOUT_S ) except asyncio.TimeoutError: # Heartbeat keeps the connection alive through proxies. yield ": keepalive\n\n" continue event_type = event.get("type", "message") yield ( f"event: {event_type}\n" f"data: {json.dumps(event)}\n\n" ) finally: await escalation_bus.unsubscribe(account_id, queue) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, )