feat(escalations): subscribe EscalationQueue to live SSE arrivals

Adds the frontend live-arrival slice on top of the test-stabilized SSE
backend. Senior techs now see a junior's escalation slide into the
queue without refresh.

- streamEscalations(handlers, signal) in aiSessions.ts: fetch-based
  ReadableStream parser (native EventSource cannot send auth headers).
  Handles SSE frames, partial frames across chunks, : keepalive
  heartbeats. Dispatches ready and handoff_created.
- HandoffCreatedEvent + EscalationStreamHandlers types mirror the bus
  payload published by HandoffManager.dispatch_escalation_notifications.
- EscalationQueue.tsx: AbortController-managed subscription with
  exponential-backoff reconnect (1s → 30s cap, attempt counter resets
  on ready). On handoff_created, refetch and diff against previous IDs
  via sessionsRef; new arrivals prepended (newest-first) above
  established cards (oldest-first preserved). Slide-in tag held for
  800ms so the locked 200ms animation completes. Tab-title flash
  prefixes (N) while document.hidden, restores on focus / unmount.
  prefers-reduced-motion swaps slide-in for fade-in. ARIA region +
  aria-live=polite + aria-label on heading. Pick Up bumped to py-2.5
  to clear the 44px touch floor.

Verified end-to-end against the running dev stack: subscriber received
the ready frame on connect; after posting a handoff via the API, the
subscriber received the handoff_created frame with the expected
payload — wire format matches the parser. Backend regression: focused
subset still 32 passed in 18.91s. Frontend tsc -b clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-27 20:57:15 -04:00
parent 02d5c6c08c
commit b8627f4180
3 changed files with 303 additions and 56 deletions

View File

@@ -18,6 +18,8 @@ import type {
ChatSessionCreateResponse,
ChatMessageRequest,
ChatMessageResponse,
HandoffCreatedEvent,
EscalationStreamHandlers,
} from '@/types/ai-session'
export const aiSessionsApi = {
@@ -220,6 +222,73 @@ export const aiSessionsApi = {
return response.data
},
// Native EventSource cannot send Authorization headers, so we use fetch +
// ReadableStream and parse SSE frames manually (same pattern as
// `streamDocumentation`). The returned promise resolves on clean stream
// close (server hangs up) and rejects on network/HTTP error so the caller
// can decide whether to reconnect with backoff.
async streamEscalations(
handlers: EscalationStreamHandlers,
signal: AbortSignal,
): Promise<void> {
const token = localStorage.getItem('access_token')
const baseUrl = import.meta.env.VITE_API_URL || ''
const response = await fetch(
`${baseUrl}/api/v1/ai-sessions/escalations/stream`,
{
headers: { Authorization: `Bearer ${token}` },
signal,
},
)
if (!response.ok) {
throw new Error(`Escalation stream failed: HTTP ${response.status}`)
}
const reader = response.body?.getReader()
if (!reader) {
throw new Error('Escalation stream: no response body')
}
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) return
buffer += decoder.decode(value, { stream: true })
// SSE frames are separated by blank lines. Hold the trailing partial
// frame in the buffer until the next chunk completes it.
const frames = buffer.split('\n\n')
buffer = frames.pop() ?? ''
for (const frame of frames) {
if (!frame) continue
let eventType = 'message'
let data = ''
for (const line of frame.split('\n')) {
if (line.startsWith(':')) continue // comment / keepalive
if (line.startsWith('event: ')) eventType = line.slice(7).trim()
else if (line.startsWith('data: ')) data += line.slice(6)
}
if (!data) continue
try {
const parsed = JSON.parse(data) as Record<string, unknown>
if (eventType === 'handoff_created' && parsed.type === 'handoff_created') {
handlers.onHandoffCreated?.(parsed as unknown as HandoffCreatedEvent)
} else if (eventType === 'ready') {
handlers.onReady?.()
}
} catch {
// skip malformed frame
}
}
}
},
async search(q: string, limit: number = 5): Promise<AISessionSearchResult[]> {
const response = await apiClient.get<AISessionSearchResult[]>('/ai-sessions/search', {
params: { q, limit },

View File

@@ -1,15 +1,31 @@
import { useState, useEffect } from 'react'
import { useCallback, useEffect, useMemo, useRef, useState } from 'react'
import { useNavigate } from 'react-router-dom'
import { AlertTriangle, Clock, Hash, Ticket, Loader2, RefreshCw } from 'lucide-react'
import { aiSessionsApi } from '@/api'
import type { AISessionSummary } from '@/types/ai-session'
import { timeAgo } from '@/lib/timeAgo'
import { cn } from '@/lib/utils'
interface EscalationQueueProps {
onPickup?: (sessionId: string) => void
onCountChange?: (count: number) => void
}
// Static list sort: oldest-first. Longest waiting = most urgent.
const sortOldestFirst = (a: AISessionSummary, b: AISessionSummary) =>
new Date(a.created_at).getTime() - new Date(b.created_at).getTime()
// Live-arrival bucket sort: newest-first so the most recent escalation is at
// the very top of the list.
const sortNewestFirst = (a: AISessionSummary, b: AISessionSummary) =>
new Date(b.created_at).getTime() - new Date(a.created_at).getTime()
// How long a freshly-arrived card keeps the slide-in animation class. The
// keyframe itself runs 200ms; this just keeps the class on the DOM long
// enough for the animation to finish before React removes it on the next
// state transition.
const NEW_CARD_HIGHLIGHT_MS = 800
function waitTimeColor(createdAt: string): string {
const hours = (Date.now() - new Date(createdAt).getTime()) / 3_600_000
if (hours >= 4) return '#f87171' // danger
@@ -22,29 +38,156 @@ export function EscalationQueue({ onPickup, onCountChange }: EscalationQueueProp
const [sessions, setSessions] = useState<AISessionSummary[]>([])
const [isLoading, setIsLoading] = useState(true)
const [error, setError] = useState<string | null>(null)
// Session IDs that arrived via SSE and should still play the slide-in.
const [newIds, setNewIds] = useState<Set<string>>(new Set())
// Track count of unseen arrivals while the tab is backgrounded.
const [unseenCount, setUnseenCount] = useState(0)
const loadQueue = async () => {
// Ref mirrors the latest sessions so the SSE handler can diff without
// re-binding on every state change.
const sessionsRef = useRef<AISessionSummary[]>([])
useEffect(() => {
sessionsRef.current = sessions
}, [sessions])
const prefersReducedMotion = useMemo(() => {
if (typeof window === 'undefined' || !window.matchMedia) return false
return window.matchMedia('(prefers-reduced-motion: reduce)').matches
}, [])
// ── Tab title flash ──
// Capture the original title once at mount. While unseen > 0, prefix it.
const originalTitleRef = useRef<string>('')
useEffect(() => {
originalTitleRef.current = document.title
return () => {
// Restore on unmount so a leftover "(N) ..." prefix doesn't bleed
// into the next page.
document.title = originalTitleRef.current
}
}, [])
useEffect(() => {
const base = originalTitleRef.current || document.title
document.title = unseenCount > 0 ? `(${unseenCount}) ${base}` : base
}, [unseenCount])
useEffect(() => {
const clearUnseen = () => {
if (!document.hidden) setUnseenCount(0)
}
const onFocus = () => setUnseenCount(0)
document.addEventListener('visibilitychange', clearUnseen)
window.addEventListener('focus', onFocus)
return () => {
document.removeEventListener('visibilitychange', clearUnseen)
window.removeEventListener('focus', onFocus)
}
}, [])
const loadQueue = useCallback(async () => {
setIsLoading(true)
setError(null)
try {
const data = await aiSessionsApi.getEscalationQueue()
// Sort oldest-first — longest waiting = most urgent
const sorted = [...data].sort(
(a, b) => new Date(a.created_at).getTime() - new Date(b.created_at).getTime()
)
const sorted = [...data].sort(sortOldestFirst)
setSessions(sorted)
setNewIds(new Set())
onCountChange?.(sorted.length)
} catch {
setError('Failed to load escalation queue')
} finally {
setIsLoading(false)
}
}
}, [onCountChange])
useEffect(() => {
loadQueue()
// eslint-disable-next-line react-hooks/exhaustive-deps -- load once on mount
}, [])
}, [loadQueue])
// ── SSE subscription ──
// Refetch the queue on each `handoff_created` event (the event payload is
// intentionally thin — it's a trigger, not the full card data). Diff
// against the previous list to identify newly-arrived sessions; prepend
// them at the top with the slide-in animation, then keep the rest of the
// queue in oldest-first order below.
const handleHandoffCreated = useCallback(async () => {
let fresh: AISessionSummary[]
try {
fresh = await aiSessionsApi.getEscalationQueue()
} catch {
return
}
const prevIds = new Set(sessionsRef.current.map((s) => s.id))
const arrived = fresh.filter((s) => !prevIds.has(s.id)).sort(sortNewestFirst)
const established = fresh.filter((s) => prevIds.has(s.id)).sort(sortOldestFirst)
const next = [...arrived, ...established]
setSessions(next)
onCountChange?.(next.length)
if (arrived.length === 0) return
const arrivedIds = arrived.map((s) => s.id)
setNewIds((prev) => {
const merged = new Set(prev)
arrivedIds.forEach((id) => merged.add(id))
return merged
})
if (document.hidden) {
setUnseenCount((c) => c + arrived.length)
}
window.setTimeout(() => {
setNewIds((prev) => {
const remaining = new Set(prev)
arrivedIds.forEach((id) => remaining.delete(id))
return remaining
})
}, NEW_CARD_HIGHLIGHT_MS)
}, [onCountChange])
useEffect(() => {
const abort = new AbortController()
let reconnectTimer: number | null = null
let attempt = 0
let cancelled = false
const connect = async () => {
if (cancelled) return
try {
await aiSessionsApi.streamEscalations(
{
onReady: () => {
attempt = 0
},
onHandoffCreated: () => {
void handleHandoffCreated()
},
},
abort.signal,
)
// Stream ended cleanly (server hung up). Reconnect quickly.
if (!cancelled) {
reconnectTimer = window.setTimeout(connect, 1000)
}
} catch (err) {
if (cancelled || abort.signal.aborted) return
if (err instanceof DOMException && err.name === 'AbortError') return
// Exponential backoff: 1s, 2s, 4s, 8s, 16s, capped at 30s.
const delay = Math.min(30_000, 1000 * 2 ** attempt)
attempt += 1
reconnectTimer = window.setTimeout(connect, delay)
}
}
void connect()
return () => {
cancelled = true
abort.abort()
if (reconnectTimer !== null) window.clearTimeout(reconnectTimer)
}
}, [handleHandoffCreated])
const handlePickup = (sessionId: string) => {
if (onPickup) {
@@ -95,7 +238,10 @@ export function EscalationQueue({ onPickup, onCountChange }: EscalationQueueProp
return (
<div className="space-y-3">
<div className="flex items-center justify-between px-1">
<h3 className="font-sans text-[0.625rem] uppercase tracking-wider text-muted-foreground">
<h3
className="font-sans text-[0.625rem] uppercase tracking-wider text-muted-foreground"
aria-label={`${sessions.length} escalations awaiting pickup`}
>
Awaiting pickup ({sessions.length})
</h3>
<button
@@ -107,54 +253,66 @@ export function EscalationQueue({ onPickup, onCountChange }: EscalationQueueProp
</button>
</div>
{sessions.map((session) => (
<div key={session.id} className="card-flat p-3 sm:p-4 space-y-3">
<div>
<p className="text-sm font-semibold text-foreground">
{session.problem_summary || 'Untitled session'}
</p>
{session.escalation_reason && (
<p className="mt-1 text-xs text-warning line-clamp-2">
Reason: {session.escalation_reason}
</p>
)}
</div>
<div className="flex flex-wrap items-center gap-x-3 gap-y-1 text-xs text-muted-foreground">
{session.problem_domain && (
<span className="font-sans rounded-md bg-accent-dim px-1.5 py-0.5 text-[0.5625rem] uppercase tracking-wider text-accent-text">
{session.problem_domain}
</span>
)}
<span className="flex items-center gap-1">
<Hash size={10} />
{session.step_count} steps
</span>
<span
className="flex items-center gap-1 font-medium"
style={{ color: waitTimeColor(session.created_at) }}
<div role="region" aria-live="polite" className="space-y-3">
{sessions.map((session) => {
const isNew = newIds.has(session.id)
return (
<div
key={session.id}
className={cn(
'card-flat p-3 sm:p-4 space-y-3',
isNew && !prefersReducedMotion && 'animate-slide-in-bottom',
isNew && prefersReducedMotion && 'animate-fade-in',
)}
>
<Clock size={10} />
{timeAgo(session.created_at)}
</span>
{session.psa_ticket_id && (
<span className="flex items-center gap-1 text-accent-text">
<Ticket size={10} />
#{session.psa_ticket_id}
</span>
)}
</div>
<div>
<p className="text-sm font-semibold text-foreground">
{session.problem_summary || 'Untitled session'}
</p>
{session.escalation_reason && (
<p className="mt-1 text-xs text-warning line-clamp-2">
Reason: {session.escalation_reason}
</p>
)}
</div>
<div className="flex justify-end">
<button
onClick={() => handlePickup(session.id)}
className="rounded-lg bg-primary text-white px-4 py-2 text-sm font-semibold hover:brightness-110 active:scale-[0.98] transition-all"
>
Pick Up
</button>
</div>
</div>
))}
<div className="flex flex-wrap items-center gap-x-3 gap-y-1 text-xs text-muted-foreground">
{session.problem_domain && (
<span className="font-sans rounded-md bg-accent-dim px-1.5 py-0.5 text-[0.5625rem] uppercase tracking-wider text-accent-text">
{session.problem_domain}
</span>
)}
<span className="flex items-center gap-1">
<Hash size={10} />
{session.step_count} steps
</span>
<span
className="flex items-center gap-1 font-medium"
style={{ color: waitTimeColor(session.created_at) }}
>
<Clock size={10} />
{timeAgo(session.created_at)}
</span>
{session.psa_ticket_id && (
<span className="flex items-center gap-1 text-accent-text">
<Ticket size={10} />
#{session.psa_ticket_id}
</span>
)}
</div>
<div className="flex justify-end">
<button
onClick={() => handlePickup(session.id)}
className="rounded-lg bg-primary text-white px-4 py-2.5 text-sm font-semibold hover:brightness-110 active:scale-[0.98] transition-all"
>
Pick Up
</button>
</div>
</div>
)
})}
</div>
</div>
)
}

View File

@@ -258,3 +258,23 @@ export interface SimilarSession {
created_at: string | null
similarity: number
}
// ── Escalation SSE bus ──
//
// Mirrors the `event_generator` payload in
// backend/app/api/endpoints/session_handoffs.py — keep this in sync with the
// dict published by `HandoffManager.dispatch_escalation_notifications`.
export interface HandoffCreatedEvent {
type: 'handoff_created'
handoff_id: string
session_id: string
priority: string
engineer_notes: string
created_at: string | null
}
export interface EscalationStreamHandlers {
onReady?: () => void
onHandoffCreated?: (event: HandoffCreatedEvent) => void
}