WIP: SSE pub/sub for live escalation arrivals (paused for Codex review)
First half of the WebSocket/SSE push slice. Paused mid-flight to hand
the branch to Codex for outside-voice review before stacking more
commits on top. See .ai/HANDOFF.md for the full pause context + what
to look at.
What's here:
- backend/app/core/escalation_bus.py — module-level singleton in-memory
pub/sub keyed by account_id. asyncio.Queue per subscriber with
64-event maxsize and drop-on-full semantics. Designed to be swappable
for Redis pub/sub when Railway scales past single-replica.
- backend/app/api/endpoints/session_handoffs.py — GET
/api/v1/ai-sessions/escalations/stream SSE endpoint. Auth via
require_engineer_or_admin. 25s heartbeat. Account-scoped subscribe
bound to current_user.account_id.
- backend/app/services/handoff_manager.py — dispatch_escalation_notifications
now publishes a `handoff_created` event to the bus BEFORE the email
fan-out, in a try/except so a bus failure can't block email delivery.
- backend/tests/test_escalation_bus.py — 7 unit tests, all green
standalone (0.14s). Cross-tenant isolation, drop-on-full, no-subscribers.
- backend/tests/test_handoff_manager.py — +1 dispatcher integration test
(publishes to bus, payload shape).
- backend/tests/test_session_handoffs_api.py — +2 endpoint tests (viewer
blocked, ready event handshake).
[gstack-context]
Decisions:
- SSE over WebSocket (one-way, browser EventSource semantics, fewer
moving parts behind Railway proxy)
- In-memory bus over Redis for v1 pilot (3 MSPs, single replica)
- Drop-on-full subscriber queue rather than back-pressure publishers
- Bus publish ahead of email send, both wrapped in try/except so
neither can break handoff creation
- Frontend will be a fetch-based ReadableStream reader matching the
existing streamDocumentation pattern, not native EventSource
(custom-header auth)
Remaining (post-Codex):
- Frontend SSE subscription in EscalationQueue.tsx (slide-in,
reconnect, tab-title flash, prefers-reduced-motion)
- Magic-moment handoff-context screen
- Re-run the full backend test suite to verify the SSE +
dispatcher integration tests (bus units already green standalone)
Tried:
- Running the full test suite repeatedly without xdist; the per-test
DROP SCHEMA + recreate fixture made wall-clock prohibitive when
multiple stale runs collided on the same Postgres test schema.
Resolution: -n auto next time.
[/gstack-context]
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -1,19 +1,24 @@
|
||||
"""Handoff endpoints — unified park/escalate.
|
||||
|
||||
POST /ai-sessions/{id}/handoff — Create handoff
|
||||
POST /ai-sessions/{id}/handoff — Create handoff
|
||||
GET /ai-sessions/{id}/handoffs — Handoff history
|
||||
POST /ai-sessions/{id}/handoffs/{hid}/claim — Claim session
|
||||
GET /ai-sessions/queue — Team queue
|
||||
GET /ai-sessions/queue — Team queue
|
||||
GET /ai-sessions/escalations/stream — SSE: live escalation arrivals
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Annotated
|
||||
from typing import Annotated, AsyncGenerator
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi import APIRouter, Depends, HTTPException, Request, status
|
||||
from fastapi.responses import StreamingResponse
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin
|
||||
from app.core.escalation_bus import bus as escalation_bus
|
||||
from app.models.user import User
|
||||
from app.models.ai_session import AISession
|
||||
from app.models.session_handoff import SessionHandoff
|
||||
@@ -127,3 +132,80 @@ async def get_queue(
|
||||
team_id=current_user.team_id,
|
||||
account_id=current_user.account_id,
|
||||
)
|
||||
|
||||
|
||||
# ─── Live escalation arrivals (SSE) ──────────────────────────────────────────
|
||||
#
|
||||
# Streams `handoff_created` events to subscribers in the same account_id as
|
||||
# the new handoff. Connected EscalationQueue instances prepend the new card
|
||||
# with the locked 200ms slide-in. Account-scoped: cross-tenant leakage is
|
||||
# prevented at the bus.publish boundary (only handoff.account_id subscribers
|
||||
# are notified) and re-enforced here by binding the subscription to
|
||||
# current_user.account_id.
|
||||
#
|
||||
# Heartbeat: a `: keepalive\n\n` SSE comment every 25s keeps the connection
|
||||
# alive through Railway / nginx default 60s idle timeouts. Reconnect policy
|
||||
# is on the client (browser EventSource auto-reconnects; our fetch-based
|
||||
# reader retries with backoff).
|
||||
|
||||
|
||||
_HEARTBEAT_INTERVAL_S = 25
|
||||
_QUEUE_GET_TIMEOUT_S = 25 # < heartbeat so heartbeat fires reliably
|
||||
|
||||
|
||||
@queue_router.get("/escalations/stream")
|
||||
async def stream_escalations(
|
||||
request: Request,
|
||||
current_user: Annotated[User, Depends(require_engineer_or_admin)],
|
||||
):
|
||||
"""SSE stream of new escalation arrivals for the current user's account.
|
||||
|
||||
Role-gated to engineer/admin/owner so viewers can't subscribe (matches
|
||||
the queue + claim role surface). One open connection per browser tab is
|
||||
expected; the bus handles fan-out.
|
||||
"""
|
||||
if not current_user.account_id:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail="No account"
|
||||
)
|
||||
|
||||
account_id = current_user.account_id
|
||||
|
||||
async def event_generator() -> AsyncGenerator[str, None]:
|
||||
queue = await escalation_bus.subscribe(account_id)
|
||||
try:
|
||||
# Initial hello so the client knows the stream is live.
|
||||
yield (
|
||||
"event: ready\n"
|
||||
f"data: {json.dumps({'account_id': str(account_id)})}\n\n"
|
||||
)
|
||||
|
||||
while True:
|
||||
if await request.is_disconnected():
|
||||
break
|
||||
try:
|
||||
event = await asyncio.wait_for(
|
||||
queue.get(), timeout=_QUEUE_GET_TIMEOUT_S
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
# Heartbeat keeps the connection alive through proxies.
|
||||
yield ": keepalive\n\n"
|
||||
continue
|
||||
|
||||
event_type = event.get("type", "message")
|
||||
yield (
|
||||
f"event: {event_type}\n"
|
||||
f"data: {json.dumps(event)}\n\n"
|
||||
)
|
||||
finally:
|
||||
await escalation_bus.unsubscribe(account_id, queue)
|
||||
|
||||
return StreamingResponse(
|
||||
event_generator(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user