From b8627f41803273a00b9771634f704718a4a6a8d7 Mon Sep 17 00:00:00 2001 From: Michael Chihlas Date: Mon, 27 Apr 2026 20:57:15 -0400 Subject: [PATCH] feat(escalations): subscribe EscalationQueue to live SSE arrivals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the frontend live-arrival slice on top of the test-stabilized SSE backend. Senior techs now see a junior's escalation slide into the queue without refresh. - streamEscalations(handlers, signal) in aiSessions.ts: fetch-based ReadableStream parser (native EventSource cannot send auth headers). Handles SSE frames, partial frames across chunks, : keepalive heartbeats. Dispatches ready and handoff_created. - HandoffCreatedEvent + EscalationStreamHandlers types mirror the bus payload published by HandoffManager.dispatch_escalation_notifications. - EscalationQueue.tsx: AbortController-managed subscription with exponential-backoff reconnect (1s → 30s cap, attempt counter resets on ready). On handoff_created, refetch and diff against previous IDs via sessionsRef; new arrivals prepended (newest-first) above established cards (oldest-first preserved). Slide-in tag held for 800ms so the locked 200ms animation completes. Tab-title flash prefixes (N) while document.hidden, restores on focus / unmount. prefers-reduced-motion swaps slide-in for fade-in. ARIA region + aria-live=polite + aria-label on heading. Pick Up bumped to py-2.5 to clear the 44px touch floor. Verified end-to-end against the running dev stack: subscriber received the ready frame on connect; after posting a handoff via the API, the subscriber received the handoff_created frame with the expected payload — wire format matches the parser. Backend regression: focused subset still 32 passed in 18.91s. Frontend tsc -b clean. Co-Authored-By: Claude Opus 4.7 --- frontend/src/api/aiSessions.ts | 69 +++++ .../components/flowpilot/EscalationQueue.tsx | 270 ++++++++++++++---- frontend/src/types/ai-session.ts | 20 ++ 3 files changed, 303 insertions(+), 56 deletions(-) diff --git a/frontend/src/api/aiSessions.ts b/frontend/src/api/aiSessions.ts index 59d82e24..90531a8d 100644 --- a/frontend/src/api/aiSessions.ts +++ b/frontend/src/api/aiSessions.ts @@ -18,6 +18,8 @@ import type { ChatSessionCreateResponse, ChatMessageRequest, ChatMessageResponse, + HandoffCreatedEvent, + EscalationStreamHandlers, } from '@/types/ai-session' export const aiSessionsApi = { @@ -220,6 +222,73 @@ export const aiSessionsApi = { return response.data }, + // Native EventSource cannot send Authorization headers, so we use fetch + + // ReadableStream and parse SSE frames manually (same pattern as + // `streamDocumentation`). The returned promise resolves on clean stream + // close (server hangs up) and rejects on network/HTTP error so the caller + // can decide whether to reconnect with backoff. + async streamEscalations( + handlers: EscalationStreamHandlers, + signal: AbortSignal, + ): Promise { + const token = localStorage.getItem('access_token') + const baseUrl = import.meta.env.VITE_API_URL || '' + + const response = await fetch( + `${baseUrl}/api/v1/ai-sessions/escalations/stream`, + { + headers: { Authorization: `Bearer ${token}` }, + signal, + }, + ) + + if (!response.ok) { + throw new Error(`Escalation stream failed: HTTP ${response.status}`) + } + + const reader = response.body?.getReader() + if (!reader) { + throw new Error('Escalation stream: no response body') + } + + const decoder = new TextDecoder() + let buffer = '' + + while (true) { + const { done, value } = await reader.read() + if (done) return + + buffer += decoder.decode(value, { stream: true }) + + // SSE frames are separated by blank lines. Hold the trailing partial + // frame in the buffer until the next chunk completes it. + const frames = buffer.split('\n\n') + buffer = frames.pop() ?? '' + + for (const frame of frames) { + if (!frame) continue + let eventType = 'message' + let data = '' + for (const line of frame.split('\n')) { + if (line.startsWith(':')) continue // comment / keepalive + if (line.startsWith('event: ')) eventType = line.slice(7).trim() + else if (line.startsWith('data: ')) data += line.slice(6) + } + if (!data) continue + try { + const parsed = JSON.parse(data) as Record + if (eventType === 'handoff_created' && parsed.type === 'handoff_created') { + handlers.onHandoffCreated?.(parsed as unknown as HandoffCreatedEvent) + } else if (eventType === 'ready') { + handlers.onReady?.() + } + } catch { + // skip malformed frame + } + } + } + }, + async search(q: string, limit: number = 5): Promise { const response = await apiClient.get('/ai-sessions/search', { params: { q, limit }, diff --git a/frontend/src/components/flowpilot/EscalationQueue.tsx b/frontend/src/components/flowpilot/EscalationQueue.tsx index 20e865f1..dbce00aa 100644 --- a/frontend/src/components/flowpilot/EscalationQueue.tsx +++ b/frontend/src/components/flowpilot/EscalationQueue.tsx @@ -1,15 +1,31 @@ -import { useState, useEffect } from 'react' +import { useCallback, useEffect, useMemo, useRef, useState } from 'react' import { useNavigate } from 'react-router-dom' import { AlertTriangle, Clock, Hash, Ticket, Loader2, RefreshCw } from 'lucide-react' import { aiSessionsApi } from '@/api' import type { AISessionSummary } from '@/types/ai-session' import { timeAgo } from '@/lib/timeAgo' +import { cn } from '@/lib/utils' interface EscalationQueueProps { onPickup?: (sessionId: string) => void onCountChange?: (count: number) => void } +// Static list sort: oldest-first. Longest waiting = most urgent. +const sortOldestFirst = (a: AISessionSummary, b: AISessionSummary) => + new Date(a.created_at).getTime() - new Date(b.created_at).getTime() + +// Live-arrival bucket sort: newest-first so the most recent escalation is at +// the very top of the list. +const sortNewestFirst = (a: AISessionSummary, b: AISessionSummary) => + new Date(b.created_at).getTime() - new Date(a.created_at).getTime() + +// How long a freshly-arrived card keeps the slide-in animation class. The +// keyframe itself runs 200ms; this just keeps the class on the DOM long +// enough for the animation to finish before React removes it on the next +// state transition. +const NEW_CARD_HIGHLIGHT_MS = 800 + function waitTimeColor(createdAt: string): string { const hours = (Date.now() - new Date(createdAt).getTime()) / 3_600_000 if (hours >= 4) return '#f87171' // danger @@ -22,29 +38,156 @@ export function EscalationQueue({ onPickup, onCountChange }: EscalationQueueProp const [sessions, setSessions] = useState([]) const [isLoading, setIsLoading] = useState(true) const [error, setError] = useState(null) + // Session IDs that arrived via SSE and should still play the slide-in. + const [newIds, setNewIds] = useState>(new Set()) + // Track count of unseen arrivals while the tab is backgrounded. + const [unseenCount, setUnseenCount] = useState(0) - const loadQueue = async () => { + // Ref mirrors the latest sessions so the SSE handler can diff without + // re-binding on every state change. + const sessionsRef = useRef([]) + useEffect(() => { + sessionsRef.current = sessions + }, [sessions]) + + const prefersReducedMotion = useMemo(() => { + if (typeof window === 'undefined' || !window.matchMedia) return false + return window.matchMedia('(prefers-reduced-motion: reduce)').matches + }, []) + + // ── Tab title flash ── + // Capture the original title once at mount. While unseen > 0, prefix it. + const originalTitleRef = useRef('') + useEffect(() => { + originalTitleRef.current = document.title + return () => { + // Restore on unmount so a leftover "(N) ..." prefix doesn't bleed + // into the next page. + document.title = originalTitleRef.current + } + }, []) + + useEffect(() => { + const base = originalTitleRef.current || document.title + document.title = unseenCount > 0 ? `(${unseenCount}) ${base}` : base + }, [unseenCount]) + + useEffect(() => { + const clearUnseen = () => { + if (!document.hidden) setUnseenCount(0) + } + const onFocus = () => setUnseenCount(0) + document.addEventListener('visibilitychange', clearUnseen) + window.addEventListener('focus', onFocus) + return () => { + document.removeEventListener('visibilitychange', clearUnseen) + window.removeEventListener('focus', onFocus) + } + }, []) + + const loadQueue = useCallback(async () => { setIsLoading(true) setError(null) try { const data = await aiSessionsApi.getEscalationQueue() - // Sort oldest-first — longest waiting = most urgent - const sorted = [...data].sort( - (a, b) => new Date(a.created_at).getTime() - new Date(b.created_at).getTime() - ) + const sorted = [...data].sort(sortOldestFirst) setSessions(sorted) + setNewIds(new Set()) onCountChange?.(sorted.length) } catch { setError('Failed to load escalation queue') } finally { setIsLoading(false) } - } + }, [onCountChange]) useEffect(() => { loadQueue() - // eslint-disable-next-line react-hooks/exhaustive-deps -- load once on mount - }, []) + }, [loadQueue]) + + // ── SSE subscription ── + // Refetch the queue on each `handoff_created` event (the event payload is + // intentionally thin — it's a trigger, not the full card data). Diff + // against the previous list to identify newly-arrived sessions; prepend + // them at the top with the slide-in animation, then keep the rest of the + // queue in oldest-first order below. + const handleHandoffCreated = useCallback(async () => { + let fresh: AISessionSummary[] + try { + fresh = await aiSessionsApi.getEscalationQueue() + } catch { + return + } + + const prevIds = new Set(sessionsRef.current.map((s) => s.id)) + const arrived = fresh.filter((s) => !prevIds.has(s.id)).sort(sortNewestFirst) + const established = fresh.filter((s) => prevIds.has(s.id)).sort(sortOldestFirst) + const next = [...arrived, ...established] + setSessions(next) + onCountChange?.(next.length) + + if (arrived.length === 0) return + + const arrivedIds = arrived.map((s) => s.id) + setNewIds((prev) => { + const merged = new Set(prev) + arrivedIds.forEach((id) => merged.add(id)) + return merged + }) + if (document.hidden) { + setUnseenCount((c) => c + arrived.length) + } + window.setTimeout(() => { + setNewIds((prev) => { + const remaining = new Set(prev) + arrivedIds.forEach((id) => remaining.delete(id)) + return remaining + }) + }, NEW_CARD_HIGHLIGHT_MS) + }, [onCountChange]) + + useEffect(() => { + const abort = new AbortController() + let reconnectTimer: number | null = null + let attempt = 0 + let cancelled = false + + const connect = async () => { + if (cancelled) return + try { + await aiSessionsApi.streamEscalations( + { + onReady: () => { + attempt = 0 + }, + onHandoffCreated: () => { + void handleHandoffCreated() + }, + }, + abort.signal, + ) + // Stream ended cleanly (server hung up). Reconnect quickly. + if (!cancelled) { + reconnectTimer = window.setTimeout(connect, 1000) + } + } catch (err) { + if (cancelled || abort.signal.aborted) return + if (err instanceof DOMException && err.name === 'AbortError') return + // Exponential backoff: 1s, 2s, 4s, 8s, 16s, capped at 30s. + const delay = Math.min(30_000, 1000 * 2 ** attempt) + attempt += 1 + reconnectTimer = window.setTimeout(connect, delay) + } + } + + void connect() + + return () => { + cancelled = true + abort.abort() + if (reconnectTimer !== null) window.clearTimeout(reconnectTimer) + } + }, [handleHandoffCreated]) const handlePickup = (sessionId: string) => { if (onPickup) { @@ -95,7 +238,10 @@ export function EscalationQueue({ onPickup, onCountChange }: EscalationQueueProp return (
-

+

Awaiting pickup ({sessions.length})

- {sessions.map((session) => ( -
-
-

- {session.problem_summary || 'Untitled session'} -

- {session.escalation_reason && ( -

- Reason: {session.escalation_reason} -

- )} -
- -
- {session.problem_domain && ( - - {session.problem_domain} - - )} - - - {session.step_count} steps - - + {sessions.map((session) => { + const isNew = newIds.has(session.id) + return ( +
- - {timeAgo(session.created_at)} - - {session.psa_ticket_id && ( - - - #{session.psa_ticket_id} - - )} -
+
+

+ {session.problem_summary || 'Untitled session'} +

+ {session.escalation_reason && ( +

+ Reason: {session.escalation_reason} +

+ )} +
-
- -
-
- ))} +
+ {session.problem_domain && ( + + {session.problem_domain} + + )} + + + {session.step_count} steps + + + + {timeAgo(session.created_at)} + + {session.psa_ticket_id && ( + + + #{session.psa_ticket_id} + + )} +
+ +
+ +
+
+ ) + })} +
) } diff --git a/frontend/src/types/ai-session.ts b/frontend/src/types/ai-session.ts index c8f90886..281ef543 100644 --- a/frontend/src/types/ai-session.ts +++ b/frontend/src/types/ai-session.ts @@ -258,3 +258,23 @@ export interface SimilarSession { created_at: string | null similarity: number } + +// ── Escalation SSE bus ── +// +// Mirrors the `event_generator` payload in +// backend/app/api/endpoints/session_handoffs.py — keep this in sync with the +// dict published by `HandoffManager.dispatch_escalation_notifications`. + +export interface HandoffCreatedEvent { + type: 'handoff_created' + handoff_id: string + session_id: string + priority: string + engineer_notes: string + created_at: string | null +} + +export interface EscalationStreamHandlers { + onReady?: () => void + onHandoffCreated?: (event: HandoffCreatedEvent) => void +}