diff --git a/backend/app/api/endpoints/ai_sessions.py b/backend/app/api/endpoints/ai_sessions.py index 4b484e43..31a1ec5e 100644 --- a/backend/app/api/endpoints/ai_sessions.py +++ b/backend/app/api/endpoints/ai_sessions.py @@ -452,6 +452,13 @@ async def resolve_session( # ── Escalate ── +# +# Thin shim over HandoffManager. The legacy `flowpilot_engine.escalate_session` +# is no longer the source of truth — every escalation now creates a +# SessionHandoff row, fans out via the SSE bus, dispatches AppNotification + +# external channels via notify(), and emails per-user. EscalateModal and the +# /handoff endpoint both funnel through here / through HandoffManager so the +# senior-pickup magic-moment screen works regardless of entry point. @router.post("/{session_id}/escalate", response_model=SessionCloseResponse) @limiter.limit("15/minute") @@ -463,21 +470,49 @@ async def escalate_session( db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): - """Escalate a FlowPilot session to another engineer.""" + """Escalate a FlowPilot session — unified through HandoffManager.""" + from app.services.handoff_manager import HandoffManager + + # Owner-only — matches the original constraint on flowpilot_engine.escalate_session. + session_result = await db.execute( + select(AISession).where( + AISession.id == session_id, + AISession.user_id == current_user.id, + ) + ) + session = session_result.scalar_one_or_none() + if not session: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" + ) + + manager = HandoffManager(db) try: - result = await flowpilot_engine.escalate_session( + handoff = await manager.create_handoff( session_id=session_id, - request=data, + intent="escalate", + engineer_notes=data.escalation_reason, user_id=current_user.id, - db=db, + priority="normal", + target_user_id=data.escalated_to_id, ) except ValueError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) - except PermissionError as e: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) + + documentation, psa_result = await manager.finalize_escalation( + handoff, session, current_user.id + ) await db.commit() - return result + + await manager.dispatch_escalation_notifications(handoff) + + return SessionCloseResponse( + session_id=session.id, + status=session.status, + documentation=documentation, + **psa_result, + ) # ── Pause ── @@ -644,7 +679,7 @@ async def get_escalation_queue( select(AISession) .where( scope_filter, - AISession.status == "requesting_escalation", + AISession.status.in_(("requesting_escalation", "escalated")), ) .order_by(AISession.created_at.desc()) ) diff --git a/backend/app/api/endpoints/session_handoffs.py b/backend/app/api/endpoints/session_handoffs.py index ce74e008..5c70c1e2 100644 --- a/backend/app/api/endpoints/session_handoffs.py +++ b/backend/app/api/endpoints/session_handoffs.py @@ -63,10 +63,16 @@ async def create_handoff( engineer_notes=body.engineer_notes, user_id=current_user.id, priority=body.priority, + target_user_id=body.target_user_id, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) + # For escalate: generate documentation + push to PSA before commit so + # the handoff and the PSA-state changes land atomically. + if handoff.intent == "escalate": + await manager.finalize_escalation(handoff, session, current_user.id) + await db.commit() # Best-effort notification dispatch AFTER commit so we never email about diff --git a/backend/app/api/endpoints/sidebar.py b/backend/app/api/endpoints/sidebar.py index 40baadc7..ff8a3b4c 100644 --- a/backend/app/api/endpoints/sidebar.py +++ b/backend/app/api/endpoints/sidebar.py @@ -161,7 +161,7 @@ async def get_sidebar_stats( select(func.count()).where( and_( esc_scope, - AISession.status == "requesting_escalation", + AISession.status.in_(("requesting_escalation", "escalated")), ) ) ) diff --git a/backend/app/schemas/session_handoff.py b/backend/app/schemas/session_handoff.py index da1a52cb..a67419c7 100644 --- a/backend/app/schemas/session_handoff.py +++ b/backend/app/schemas/session_handoff.py @@ -10,6 +10,11 @@ class HandoffCreateRequest(BaseModel): intent: str = Field(..., pattern="^(park|escalate)$") engineer_notes: str | None = None priority: str = Field("normal", pattern="^(normal|elevated)$") + # Optional escalation target — if set, only this user is the named + # recipient. Notification dispatch fans out to all engineer/admin/owner + # users in the account either way; this just records the original + # engineer's preferred recipient on the session for audit/UX. + target_user_id: UUID | None = None class HandoffResponse(BaseModel): diff --git a/backend/app/services/flowpilot_engine.py b/backend/app/services/flowpilot_engine.py index 1b280c29..f3021b53 100644 --- a/backend/app/services/flowpilot_engine.py +++ b/backend/app/services/flowpilot_engine.py @@ -632,8 +632,10 @@ async def pickup_session( allow_team_access=True, team_id=team_id, ) - if session.status != "requesting_escalation": - raise ValueError(f"Session is {session.status}, not requesting_escalation") + if session.status not in ("requesting_escalation", "escalated"): + raise ValueError( + f"Session is {session.status}, not in an escalated state" + ) # Can't pick up your own session if session.user_id == user_id: diff --git a/backend/app/services/handoff_manager.py b/backend/app/services/handoff_manager.py index 270882db..3684ce20 100644 --- a/backend/app/services/handoff_manager.py +++ b/backend/app/services/handoff_manager.py @@ -3,6 +3,15 @@ Creates handoff snapshots, AI assessments (for escalations), claim workflow, and queue queries. Dual-writes to ai_sessions.escalation_package for backward compatibility with the existing escalation queue. + +For intent='escalate', `create_handoff` also runs the legacy enrichment +that the deprecated `/escalate` endpoint used to do directly: setting +`escalated_to_id`, building the AI-enhanced escalation_package (Sonnet), +and recording escalation_reason. `finalize_escalation` then generates the +SessionDocumentation and pushes to PSA. `dispatch_escalation_notifications` +fans out the bell-icon AppNotification + external channels (Slack/Teams) +on top of per-user emails. The `/escalate` endpoint is now a thin shim +calling these in sequence. """ import asyncio import logging @@ -12,6 +21,7 @@ from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload from app.core.config import settings from app.core.email import EmailService @@ -20,6 +30,8 @@ from app.models.ai_session import AISession from app.models.session_branch import SessionBranch from app.models.session_handoff import SessionHandoff from app.models.user import User +from app.schemas.ai_session import SessionDocumentation +from app.services.notification_service import notify logger = logging.getLogger(__name__) @@ -37,19 +49,46 @@ class HandoffManager: engineer_notes: str | None, user_id: UUID, priority: str = "normal", + target_user_id: UUID | None = None, ) -> SessionHandoff: """Create a handoff (park or escalate). Generates snapshot, updates session status, dual-writes to escalation_package for backward compat. + + For intent='escalate' also: sets `session.escalation_reason` and + optionally `session.escalated_to_id`, builds the AI-enhanced + escalation package (the rich one the legacy `/escalate` path used + to produce), and merges the handoff metadata into it. Self-targeting + is rejected with ValueError, matching legacy behavior. """ + # Eager-load steps + user — _build_escalation_package_enhanced and + # finalize_escalation iterate over session.steps to compose the + # legacy enriched package and the SessionDocumentation, and the + # notify() dispatcher reads session.user.name. Without selectinload + # the async session raises MissingGreenlet on attribute access. result = await self.db.execute( - select(AISession).where(AISession.id == session_id) + select(AISession) + .options( + selectinload(AISession.steps), + selectinload(AISession.user), + ) + .where(AISession.id == session_id) ) session = result.scalar_one_or_none() if not session: raise ValueError(f"Session {session_id} not found") + if intent == "escalate": + if target_user_id and target_user_id == user_id: + raise ValueError( + "Cannot escalate a session to yourself. Use pause instead." + ) + if session.status not in ("active", "paused"): + raise ValueError( + f"Cannot escalate session in status: {session.status}" + ) + # Generate snapshot snapshot = await self._generate_snapshot(session) @@ -80,20 +119,134 @@ class HandoffManager: session.status = "paused" elif intent == "escalate": session.status = "escalated" + session.escalation_reason = engineer_notes + if target_user_id: + session.escalated_to_id = target_user_id session.handoff_count = (session.handoff_count or 0) + 1 - # Dual-write for backward compat - session.escalation_package = { - "snapshot": snapshot, - "intent": intent, - "engineer_notes": engineer_notes, - "handoff_id": str(handoff.id), - } + # Dual-write to escalation_package. For escalate, build the + # AI-enhanced package (preserves the legacy rich shape that + # SessionBriefing/PSA writeback consume), then layer in the new + # handoff metadata. For park, the lightweight shape is fine — + # there's no legacy enhanced package for parking. + if intent == "escalate": + enhanced_pkg = await self._build_enhanced_escalation_package( + session, user_id + ) + enhanced_pkg["intent"] = intent + enhanced_pkg["engineer_notes"] = engineer_notes + enhanced_pkg["handoff_id"] = str(handoff.id) + enhanced_pkg["snapshot"] = snapshot + session.escalation_package = enhanced_pkg + else: + session.escalation_package = { + "snapshot": snapshot, + "intent": intent, + "engineer_notes": engineer_notes, + "handoff_id": str(handoff.id), + } await self.db.flush() return handoff + async def finalize_escalation( + self, + handoff: SessionHandoff, + session: AISession, + user_id: UUID, + ) -> tuple[SessionDocumentation | None, dict[str, Any]]: + """Post-create enrichment for intent='escalate' handoffs. + + Generates the SessionDocumentation + pushes documentation to PSA if + a ticket is linked. Returns (documentation, psa_result) so the + legacy `/escalate` shim can map back to SessionCloseResponse. Safe + to call only when handoff.intent == 'escalate' — for park, returns + a no-op no-PSA dict. + """ + if handoff.intent != "escalate": + return None, { + "psa_push_status": "no_psa", + "psa_push_error": None, + "member_mapping_warning": None, + } + + # Lazy import to avoid circular dependency: flowpilot_engine imports + # plenty of services at module load time and we don't want + # handoff_manager pulled into that graph at import. + from app.services.flowpilot_engine import ( + _generate_documentation, + _push_to_psa, + ) + + documentation = _generate_documentation(session) + psa_result = await _push_to_psa(session, user_id, self.db) + + # Bell-icon AppNotification rows + external account-level channels + # (Slack/Teams webhooks, shared escalations inboxes). This is the + # `notify()` call the legacy /escalate path used to make directly, + # and it has to happen BEFORE the endpoint commits so the + # AppNotification rows land atomically with the handoff. Per-user + # emails come after commit in dispatch_escalation_notifications — + # those are pure IO with no persistent state. + try: + engineer_user = ( + await self.db.execute( + select(User).where(User.id == user_id) + ) + ).scalar_one_or_none() + engineer_name = ( + engineer_user.name + if engineer_user and engineer_user.name + else "Unknown" + ) + target_user_ids = ( + [session.escalated_to_id] if session.escalated_to_id else None + ) + await notify( + "session.escalated", + handoff.account_id, + { + "session_id": str(handoff.session_id), + "engineer_name": engineer_name, + "escalation_reason": handoff.engineer_notes or "", + "problem_summary": session.problem_summary or "N/A", + }, + self.db, + target_user_ids=target_user_ids, + ) + except Exception: + logger.exception( + "notify() dispatch failed for handoff %s", handoff.id + ) + + return documentation, psa_result + + async def _build_enhanced_escalation_package( + self, + session: AISession, + user_id: UUID, + ) -> dict[str, Any]: + """Lazy wrapper around the legacy enhanced-package builder. + + The builder lives in flowpilot_engine; we only need it for the + escalate path. Failures are caught here so handoff creation never + depends on the optional Sonnet enrichment — return the minimal + shape on failure. + """ + try: + from app.services.flowpilot_engine import ( + _build_escalation_package_enhanced, + ) + return await _build_escalation_package_enhanced(session, user_id) + except Exception: + logger.exception( + "Enhanced escalation package build failed for session %s; " + "falling back to minimal package", + session.id, + ) + return {} + async def dispatch_escalation_notifications( self, handoff: SessionHandoff ) -> int: diff --git a/frontend/src/hooks/useFlowPilotSession.ts b/frontend/src/hooks/useFlowPilotSession.ts index 3ef4b68b..df867651 100644 --- a/frontend/src/hooks/useFlowPilotSession.ts +++ b/frontend/src/hooks/useFlowPilotSession.ts @@ -168,7 +168,7 @@ export function useFlowPilotSession(): UseFlowPilotSession { setIsProcessing(true) try { const result = await aiSessionsApi.escalateSession(session.id, data) - setSession(prev => prev ? { ...prev, status: 'requesting_escalation' } : null) + setSession(prev => prev ? { ...prev, status: 'escalated' } : null) setDocumentation(result.documentation) setPsaPushStatus(result.psa_push_status) setPsaPushError(result.psa_push_error)