From 07d0db9579f81e9292c5e91e140543bda3beb292 Mon Sep 17 00:00:00 2001 From: Michael Chihlas Date: Mon, 27 Apr 2026 15:58:05 -0400 Subject: [PATCH] feat(handoff): email engineer-or-admin teammates on escalation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First half of the Escalation Mode notification dual-path. WebSocket/SSE push is the second half (next commit) — email handles offline seniors, push handles online ones for the magic-moment demo. HandoffManager.dispatch_escalation_notifications: - Pulls active engineer/admin/owner-role users in the same account_id (excludes the escalator + viewers + soft-deleted) - Sends via existing EmailService.send_notification_email, concurrent via asyncio.gather; per-message failures don't block the rest - Wrapped in try/except: any exception is logged + swallowed. Handoff creation is authoritative; notification is advisory. This is the graceful-degradation regression both eng + codex reviews flagged as critical (handoff must succeed even if SMTP is down). Endpoint wiring (POST /ai-sessions/{id}/handoff): - Dispatch fires AFTER db.commit() — never email about a rolled-back handoff. Trust-erosion bug if we got that wrong. - Only fires for intent=escalate. Park is private to the escalator. Tests (4 new): - emails-engineer-recipients-in-account: viewer excluded, escalator excluded, only the engineer/admin teammates get the message - skipped-for-park-intent: park doesn't fan out - graceful-degradation-when-email-raises: RuntimeError from the email service does NOT bubble out of dispatch - endpoint-dispatches-on-escalate: end-to-end wiring through POST Per-channel delivery records (replacing the dead `notification_sent` boolean per Codex correction) is a v1.x story — for now application logs are the audit trail. See docs/plans/2026-04-27-escalation-mode-wedge-design.md. 20 tests green across handoff_manager + session_handoffs_api + flowpilot_analytics_escalations. No regressions. Co-Authored-By: Claude Opus 4.7 --- backend/app/api/endpoints/session_handoffs.py | 7 + backend/app/services/handoff_manager.py | 100 +++++++++ backend/tests/test_handoff_manager.py | 208 ++++++++++++++++++ 3 files changed, 315 insertions(+) diff --git a/backend/app/api/endpoints/session_handoffs.py b/backend/app/api/endpoints/session_handoffs.py index 2e3ec65f..5e444bd2 100644 --- a/backend/app/api/endpoints/session_handoffs.py +++ b/backend/app/api/endpoints/session_handoffs.py @@ -63,6 +63,13 @@ async def create_handoff( raise HTTPException(status_code=400, detail=str(e)) await db.commit() + + # Best-effort notification dispatch AFTER commit so we never email about + # a rolled-back handoff. Failures are swallowed inside the manager — + # handoff creation is authoritative; notifications are advisory. + if handoff.intent == "escalate": + await manager.dispatch_escalation_notifications(handoff) + return HandoffResponse.model_validate(handoff) diff --git a/backend/app/services/handoff_manager.py b/backend/app/services/handoff_manager.py index c79461ba..fedc8a74 100644 --- a/backend/app/services/handoff_manager.py +++ b/backend/app/services/handoff_manager.py @@ -4,6 +4,7 @@ Creates handoff snapshots, AI assessments (for escalations), claim workflow, and queue queries. Dual-writes to ai_sessions.escalation_package for backward compatibility with the existing escalation queue. """ +import asyncio import logging from datetime import datetime, timezone from typing import Any @@ -12,9 +13,12 @@ from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from app.core.config import settings +from app.core.email import EmailService from app.models.ai_session import AISession from app.models.session_branch import SessionBranch from app.models.session_handoff import SessionHandoff +from app.models.user import User logger = logging.getLogger(__name__) @@ -87,6 +91,102 @@ class HandoffManager: await self.db.flush() return handoff + async def dispatch_escalation_notifications( + self, handoff: SessionHandoff + ) -> int: + """Email engineer-or-admin users in the account about a new escalation. + + Call this AFTER `db.commit()` has succeeded — sending email for a + rolled-back handoff is the kind of trust-erosion bug that makes pilot + customers stop trusting the tool. Returns the number of recipients + successfully emailed (best-effort, not authoritative). + + Failures are logged but never raise: the wedge demo's reliability + story is "handoff creation always succeeds; notification is best-effort," + not "handoff creation depends on the email service being up." This is + the graceful-degradation regression the eng + codex reviews both + flagged as critical. + + Per-channel delivery records (Codex correction on the dead + `notification_sent` boolean) are a v1.x story — for now the + application logs are the audit trail. + """ + if handoff.intent != "escalate": + return 0 + + try: + recipients = ( + await self.db.execute( + select(User).where( + User.account_id == handoff.account_id, + User.id != handoff.handed_off_by, + User.account_role.in_(("owner", "admin", "engineer")), + User.is_active.is_(True), + User.deleted_at.is_(None), + ) + ) + ).scalars().all() + + if not recipients: + logger.info( + "No notification recipients for handoff %s in account %s", + handoff.id, + handoff.account_id, + ) + return 0 + + # Pull session for the email subject. Fall back to a generic title + # if the session is gone (e.g. cascade delete mid-dispatch). + session_result = await self.db.execute( + select(AISession).where(AISession.id == handoff.session_id) + ) + session = session_result.scalar_one_or_none() + problem = ( + session.problem_summary if session and session.problem_summary + else "an active session" + ) + + title = f"New escalation: {problem}" + notes = (handoff.engineer_notes or "").strip() + body = ( + "A teammate has escalated a session and is asking for help.\n\n" + f"Reason: {notes if notes else 'No reason provided.'}\n" + f"Priority: {handoff.priority}" + ) + link_url = ( + f"{settings.FRONTEND_URL.rstrip('/')}/escalations" + if settings.FRONTEND_URL + else None + ) + + results = await asyncio.gather( + *[ + EmailService.send_notification_email( + to_email=r.email, + title=title, + body=body, + link_url=link_url, + ) + for r in recipients + ], + return_exceptions=True, + ) + sent = sum(1 for r in results if r is True) + logger.info( + "Escalation notifications dispatched for handoff %s: %d/%d recipients", + handoff.id, + sent, + len(recipients), + ) + return sent + + except Exception: + logger.exception( + "Escalation notification dispatch failed for handoff %s", + handoff.id, + ) + return 0 + async def _generate_snapshot(self, session: AISession) -> dict[str, Any]: """Generate a snapshot of the session state at handoff time.""" snapshot: dict[str, Any] = { diff --git a/backend/tests/test_handoff_manager.py b/backend/tests/test_handoff_manager.py index 6e1e530e..fc4644be 100644 --- a/backend/tests/test_handoff_manager.py +++ b/backend/tests/test_handoff_manager.py @@ -1,8 +1,12 @@ """Integration tests for HandoffManager service.""" +from unittest.mock import AsyncMock, patch + import pytest from httpx import AsyncClient from app.models.ai_session import AISession +from app.models.user import User +from app.services.handoff_manager import HandoffManager @pytest.mark.asyncio @@ -113,3 +117,207 @@ async def test_claim_session(client: AsyncClient, test_user, test_admin, auth_he await test_db.refresh(session) assert session.status == "active" + + +# ─── Notification dispatch ──────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_dispatch_emails_engineer_recipients_in_account( + client: AsyncClient, test_user, auth_headers, test_db +): + """dispatch_escalation_notifications emails every engineer/admin in the + account except the escalator.""" + # Add a second user (engineer role) in the same account. + teammate = User( + email="teammate@example.com", + password_hash="x", + name="Teammate", + role="engineer", + account_id=test_user["user_data"]["account_id"], + account_role="engineer", + ) + test_db.add(teammate) + await test_db.flush() + + # Add a viewer-role user — must NOT receive a notification. + viewer = User( + email="viewer@example.com", + password_hash="x", + name="Viewer", + role="engineer", + account_id=test_user["user_data"]["account_id"], + account_role="viewer", + ) + test_db.add(viewer) + await test_db.flush() + + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "vpn down"}, + problem_summary="VPN won't connect after Win update", + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.commit() + + manager = HandoffManager(test_db) + handoff = await manager.create_handoff( + session_id=session.id, + intent="escalate", + engineer_notes="Stuck on auth handshake", + user_id=test_user["user_data"]["id"], + ) + await test_db.commit() + + with patch( + "app.services.handoff_manager.EmailService.send_notification_email", + new=AsyncMock(return_value=True), + ) as send: + sent = await manager.dispatch_escalation_notifications(handoff) + + assert sent == 1 # only the engineer-role teammate + recipients = {call.kwargs["to_email"] for call in send.call_args_list} + assert recipients == {"teammate@example.com"} + assert viewer.email not in recipients + assert test_user["email"] not in recipients # not self-notified + + title = send.call_args_list[0].kwargs["title"] + assert "VPN won't connect after Win update" in title + + +@pytest.mark.asyncio +async def test_dispatch_skipped_for_park_intent( + client: AsyncClient, test_user, auth_headers, test_db +): + """park-intent handoffs are private (waiting for client logs etc) — no + team-wide email.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "x"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.commit() + + manager = HandoffManager(test_db) + handoff = await manager.create_handoff( + session_id=session.id, + intent="park", + engineer_notes="waiting on customer", + user_id=test_user["user_data"]["id"], + ) + await test_db.commit() + + with patch( + "app.services.handoff_manager.EmailService.send_notification_email", + new=AsyncMock(return_value=True), + ) as send: + sent = await manager.dispatch_escalation_notifications(handoff) + + assert sent == 0 + assert send.call_count == 0 + + +@pytest.mark.asyncio +async def test_dispatch_graceful_degradation_when_email_raises( + client: AsyncClient, test_user, auth_headers, test_db +): + """If the email service raises (auth misconfig, network, etc.), dispatch + must NOT raise. Handoff creation has already committed; emailing is + best-effort. Codex-flagged regression.""" + teammate = User( + email="t@example.com", + password_hash="x", + name="T", + role="engineer", + account_id=test_user["user_data"]["account_id"], + account_role="engineer", + ) + test_db.add(teammate) + await test_db.flush() + + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "x"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.commit() + + manager = HandoffManager(test_db) + handoff = await manager.create_handoff( + session_id=session.id, + intent="escalate", + engineer_notes="help", + user_id=test_user["user_data"]["id"], + ) + await test_db.commit() + + with patch( + "app.services.handoff_manager.EmailService.send_notification_email", + new=AsyncMock(side_effect=RuntimeError("SMTP down")), + ): + # Must not raise. + sent = await manager.dispatch_escalation_notifications(handoff) + assert sent == 0 + + +@pytest.mark.asyncio +async def test_create_handoff_endpoint_dispatches_on_escalate( + client: AsyncClient, test_user, auth_headers, test_db +): + """End-to-end: POST /handoff with intent=escalate triggers + dispatch_escalation_notifications after commit. Verifies the wiring in + the endpoint, not just the manager method.""" + teammate = User( + email="t2@example.com", + password_hash="x", + name="T2", + role="engineer", + account_id=test_user["user_data"]["account_id"], + account_role="engineer", + ) + test_db.add(teammate) + await test_db.commit() + + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "x"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.commit() + + with patch( + "app.services.handoff_manager.EmailService.send_notification_email", + new=AsyncMock(return_value=True), + ) as send: + resp = await client.post( + f"/api/v1/ai-sessions/{session.id}/handoff", + headers=auth_headers, + json={"intent": "escalate", "engineer_notes": "Need help"}, + ) + assert resp.status_code == 201 + assert send.call_count == 1 + assert send.call_args.kwargs["to_email"] == "t2@example.com"