feat(handoff): email engineer-or-admin teammates on escalation

First half of the Escalation Mode notification dual-path. WebSocket/SSE
push is the second half (next commit) — email handles offline seniors,
push handles online ones for the magic-moment demo.

HandoffManager.dispatch_escalation_notifications:
- Pulls active engineer/admin/owner-role users in the same account_id
  (excludes the escalator + viewers + soft-deleted)
- Sends via existing EmailService.send_notification_email, concurrent
  via asyncio.gather; per-message failures don't block the rest
- Wrapped in try/except: any exception is logged + swallowed. Handoff
  creation is authoritative; notification is advisory. This is the
  graceful-degradation regression both eng + codex reviews flagged as
  critical (handoff must succeed even if SMTP is down).

Endpoint wiring (POST /ai-sessions/{id}/handoff):
- Dispatch fires AFTER db.commit() — never email about a rolled-back
  handoff. Trust-erosion bug if we got that wrong.
- Only fires for intent=escalate. Park is private to the escalator.

Tests (4 new):
- emails-engineer-recipients-in-account: viewer excluded, escalator
  excluded, only the engineer/admin teammates get the message
- skipped-for-park-intent: park doesn't fan out
- graceful-degradation-when-email-raises: RuntimeError from the email
  service does NOT bubble out of dispatch
- endpoint-dispatches-on-escalate: end-to-end wiring through POST

Per-channel delivery records (replacing the dead `notification_sent`
boolean per Codex correction) is a v1.x story — for now application
logs are the audit trail. See
docs/plans/2026-04-27-escalation-mode-wedge-design.md.

20 tests green across handoff_manager + session_handoffs_api +
flowpilot_analytics_escalations. No regressions.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-27 15:58:05 -04:00
parent 7a5b853b3b
commit 07d0db9579
3 changed files with 315 additions and 0 deletions

View File

@@ -63,6 +63,13 @@ async def create_handoff(
raise HTTPException(status_code=400, detail=str(e))
await db.commit()
# Best-effort notification dispatch AFTER commit so we never email about
# a rolled-back handoff. Failures are swallowed inside the manager —
# handoff creation is authoritative; notifications are advisory.
if handoff.intent == "escalate":
await manager.dispatch_escalation_notifications(handoff)
return HandoffResponse.model_validate(handoff)

View File

@@ -4,6 +4,7 @@ Creates handoff snapshots, AI assessments (for escalations), claim workflow,
and queue queries. Dual-writes to ai_sessions.escalation_package for
backward compatibility with the existing escalation queue.
"""
import asyncio
import logging
from datetime import datetime, timezone
from typing import Any
@@ -12,9 +13,12 @@ from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.email import EmailService
from app.models.ai_session import AISession
from app.models.session_branch import SessionBranch
from app.models.session_handoff import SessionHandoff
from app.models.user import User
logger = logging.getLogger(__name__)
@@ -87,6 +91,102 @@ class HandoffManager:
await self.db.flush()
return handoff
async def dispatch_escalation_notifications(
self, handoff: SessionHandoff
) -> int:
"""Email engineer-or-admin users in the account about a new escalation.
Call this AFTER `db.commit()` has succeeded — sending email for a
rolled-back handoff is the kind of trust-erosion bug that makes pilot
customers stop trusting the tool. Returns the number of recipients
successfully emailed (best-effort, not authoritative).
Failures are logged but never raise: the wedge demo's reliability
story is "handoff creation always succeeds; notification is best-effort,"
not "handoff creation depends on the email service being up." This is
the graceful-degradation regression the eng + codex reviews both
flagged as critical.
Per-channel delivery records (Codex correction on the dead
`notification_sent` boolean) are a v1.x story — for now the
application logs are the audit trail.
"""
if handoff.intent != "escalate":
return 0
try:
recipients = (
await self.db.execute(
select(User).where(
User.account_id == handoff.account_id,
User.id != handoff.handed_off_by,
User.account_role.in_(("owner", "admin", "engineer")),
User.is_active.is_(True),
User.deleted_at.is_(None),
)
)
).scalars().all()
if not recipients:
logger.info(
"No notification recipients for handoff %s in account %s",
handoff.id,
handoff.account_id,
)
return 0
# Pull session for the email subject. Fall back to a generic title
# if the session is gone (e.g. cascade delete mid-dispatch).
session_result = await self.db.execute(
select(AISession).where(AISession.id == handoff.session_id)
)
session = session_result.scalar_one_or_none()
problem = (
session.problem_summary if session and session.problem_summary
else "an active session"
)
title = f"New escalation: {problem}"
notes = (handoff.engineer_notes or "").strip()
body = (
"A teammate has escalated a session and is asking for help.\n\n"
f"Reason: {notes if notes else 'No reason provided.'}\n"
f"Priority: {handoff.priority}"
)
link_url = (
f"{settings.FRONTEND_URL.rstrip('/')}/escalations"
if settings.FRONTEND_URL
else None
)
results = await asyncio.gather(
*[
EmailService.send_notification_email(
to_email=r.email,
title=title,
body=body,
link_url=link_url,
)
for r in recipients
],
return_exceptions=True,
)
sent = sum(1 for r in results if r is True)
logger.info(
"Escalation notifications dispatched for handoff %s: %d/%d recipients",
handoff.id,
sent,
len(recipients),
)
return sent
except Exception:
logger.exception(
"Escalation notification dispatch failed for handoff %s",
handoff.id,
)
return 0
async def _generate_snapshot(self, session: AISession) -> dict[str, Any]:
"""Generate a snapshot of the session state at handoff time."""
snapshot: dict[str, Any] = {

View File

@@ -1,8 +1,12 @@
"""Integration tests for HandoffManager service."""
from unittest.mock import AsyncMock, patch
import pytest
from httpx import AsyncClient
from app.models.ai_session import AISession
from app.models.user import User
from app.services.handoff_manager import HandoffManager
@pytest.mark.asyncio
@@ -113,3 +117,207 @@ async def test_claim_session(client: AsyncClient, test_user, test_admin, auth_he
await test_db.refresh(session)
assert session.status == "active"
# ─── Notification dispatch ────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_dispatch_emails_engineer_recipients_in_account(
client: AsyncClient, test_user, auth_headers, test_db
):
"""dispatch_escalation_notifications emails every engineer/admin in the
account except the escalator."""
# Add a second user (engineer role) in the same account.
teammate = User(
email="teammate@example.com",
password_hash="x",
name="Teammate",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="engineer",
)
test_db.add(teammate)
await test_db.flush()
# Add a viewer-role user — must NOT receive a notification.
viewer = User(
email="viewer@example.com",
password_hash="x",
name="Viewer",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="viewer",
)
test_db.add(viewer)
await test_db.flush()
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "vpn down"},
problem_summary="VPN won't connect after Win update",
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="escalate",
engineer_notes="Stuck on auth handshake",
user_id=test_user["user_data"]["id"],
)
await test_db.commit()
with patch(
"app.services.handoff_manager.EmailService.send_notification_email",
new=AsyncMock(return_value=True),
) as send:
sent = await manager.dispatch_escalation_notifications(handoff)
assert sent == 1 # only the engineer-role teammate
recipients = {call.kwargs["to_email"] for call in send.call_args_list}
assert recipients == {"teammate@example.com"}
assert viewer.email not in recipients
assert test_user["email"] not in recipients # not self-notified
title = send.call_args_list[0].kwargs["title"]
assert "VPN won't connect after Win update" in title
@pytest.mark.asyncio
async def test_dispatch_skipped_for_park_intent(
client: AsyncClient, test_user, auth_headers, test_db
):
"""park-intent handoffs are private (waiting for client logs etc) — no
team-wide email."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "x"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="park",
engineer_notes="waiting on customer",
user_id=test_user["user_data"]["id"],
)
await test_db.commit()
with patch(
"app.services.handoff_manager.EmailService.send_notification_email",
new=AsyncMock(return_value=True),
) as send:
sent = await manager.dispatch_escalation_notifications(handoff)
assert sent == 0
assert send.call_count == 0
@pytest.mark.asyncio
async def test_dispatch_graceful_degradation_when_email_raises(
client: AsyncClient, test_user, auth_headers, test_db
):
"""If the email service raises (auth misconfig, network, etc.), dispatch
must NOT raise. Handoff creation has already committed; emailing is
best-effort. Codex-flagged regression."""
teammate = User(
email="t@example.com",
password_hash="x",
name="T",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="engineer",
)
test_db.add(teammate)
await test_db.flush()
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "x"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="escalate",
engineer_notes="help",
user_id=test_user["user_data"]["id"],
)
await test_db.commit()
with patch(
"app.services.handoff_manager.EmailService.send_notification_email",
new=AsyncMock(side_effect=RuntimeError("SMTP down")),
):
# Must not raise.
sent = await manager.dispatch_escalation_notifications(handoff)
assert sent == 0
@pytest.mark.asyncio
async def test_create_handoff_endpoint_dispatches_on_escalate(
client: AsyncClient, test_user, auth_headers, test_db
):
"""End-to-end: POST /handoff with intent=escalate triggers
dispatch_escalation_notifications after commit. Verifies the wiring in
the endpoint, not just the manager method."""
teammate = User(
email="t2@example.com",
password_hash="x",
name="T2",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="engineer",
)
test_db.add(teammate)
await test_db.commit()
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "x"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
with patch(
"app.services.handoff_manager.EmailService.send_notification_email",
new=AsyncMock(return_value=True),
) as send:
resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoff",
headers=auth_headers,
json={"intent": "escalate", "engineer_notes": "Need help"},
)
assert resp.status_code == 201
assert send.call_count == 1
assert send.call_args.kwargs["to_email"] == "t2@example.com"