diff --git a/backend/app/api/endpoints/session_handoffs.py b/backend/app/api/endpoints/session_handoffs.py index 2e3ec65f..5e444bd2 100644 --- a/backend/app/api/endpoints/session_handoffs.py +++ b/backend/app/api/endpoints/session_handoffs.py @@ -63,6 +63,13 @@ async def create_handoff( raise HTTPException(status_code=400, detail=str(e)) await db.commit() + + # Best-effort notification dispatch AFTER commit so we never email about + # a rolled-back handoff. Failures are swallowed inside the manager — + # handoff creation is authoritative; notifications are advisory. + if handoff.intent == "escalate": + await manager.dispatch_escalation_notifications(handoff) + return HandoffResponse.model_validate(handoff) diff --git a/backend/app/services/handoff_manager.py b/backend/app/services/handoff_manager.py index c79461ba..fedc8a74 100644 --- a/backend/app/services/handoff_manager.py +++ b/backend/app/services/handoff_manager.py @@ -4,6 +4,7 @@ Creates handoff snapshots, AI assessments (for escalations), claim workflow, and queue queries. Dual-writes to ai_sessions.escalation_package for backward compatibility with the existing escalation queue. """ +import asyncio import logging from datetime import datetime, timezone from typing import Any @@ -12,9 +13,12 @@ from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from app.core.config import settings +from app.core.email import EmailService from app.models.ai_session import AISession from app.models.session_branch import SessionBranch from app.models.session_handoff import SessionHandoff +from app.models.user import User logger = logging.getLogger(__name__) @@ -87,6 +91,102 @@ class HandoffManager: await self.db.flush() return handoff + async def dispatch_escalation_notifications( + self, handoff: SessionHandoff + ) -> int: + """Email engineer-or-admin users in the account about a new escalation. + + Call this AFTER `db.commit()` has succeeded — sending email for a + rolled-back handoff is the kind of trust-erosion bug that makes pilot + customers stop trusting the tool. Returns the number of recipients + successfully emailed (best-effort, not authoritative). + + Failures are logged but never raise: the wedge demo's reliability + story is "handoff creation always succeeds; notification is best-effort," + not "handoff creation depends on the email service being up." This is + the graceful-degradation regression the eng + codex reviews both + flagged as critical. + + Per-channel delivery records (Codex correction on the dead + `notification_sent` boolean) are a v1.x story — for now the + application logs are the audit trail. + """ + if handoff.intent != "escalate": + return 0 + + try: + recipients = ( + await self.db.execute( + select(User).where( + User.account_id == handoff.account_id, + User.id != handoff.handed_off_by, + User.account_role.in_(("owner", "admin", "engineer")), + User.is_active.is_(True), + User.deleted_at.is_(None), + ) + ) + ).scalars().all() + + if not recipients: + logger.info( + "No notification recipients for handoff %s in account %s", + handoff.id, + handoff.account_id, + ) + return 0 + + # Pull session for the email subject. Fall back to a generic title + # if the session is gone (e.g. cascade delete mid-dispatch). + session_result = await self.db.execute( + select(AISession).where(AISession.id == handoff.session_id) + ) + session = session_result.scalar_one_or_none() + problem = ( + session.problem_summary if session and session.problem_summary + else "an active session" + ) + + title = f"New escalation: {problem}" + notes = (handoff.engineer_notes or "").strip() + body = ( + "A teammate has escalated a session and is asking for help.\n\n" + f"Reason: {notes if notes else 'No reason provided.'}\n" + f"Priority: {handoff.priority}" + ) + link_url = ( + f"{settings.FRONTEND_URL.rstrip('/')}/escalations" + if settings.FRONTEND_URL + else None + ) + + results = await asyncio.gather( + *[ + EmailService.send_notification_email( + to_email=r.email, + title=title, + body=body, + link_url=link_url, + ) + for r in recipients + ], + return_exceptions=True, + ) + sent = sum(1 for r in results if r is True) + logger.info( + "Escalation notifications dispatched for handoff %s: %d/%d recipients", + handoff.id, + sent, + len(recipients), + ) + return sent + + except Exception: + logger.exception( + "Escalation notification dispatch failed for handoff %s", + handoff.id, + ) + return 0 + async def _generate_snapshot(self, session: AISession) -> dict[str, Any]: """Generate a snapshot of the session state at handoff time.""" snapshot: dict[str, Any] = { diff --git a/backend/tests/test_handoff_manager.py b/backend/tests/test_handoff_manager.py index 6e1e530e..fc4644be 100644 --- a/backend/tests/test_handoff_manager.py +++ b/backend/tests/test_handoff_manager.py @@ -1,8 +1,12 @@ """Integration tests for HandoffManager service.""" +from unittest.mock import AsyncMock, patch + import pytest from httpx import AsyncClient from app.models.ai_session import AISession +from app.models.user import User +from app.services.handoff_manager import HandoffManager @pytest.mark.asyncio @@ -113,3 +117,207 @@ async def test_claim_session(client: AsyncClient, test_user, test_admin, auth_he await test_db.refresh(session) assert session.status == "active" + + +# ─── Notification dispatch ──────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_dispatch_emails_engineer_recipients_in_account( + client: AsyncClient, test_user, auth_headers, test_db +): + """dispatch_escalation_notifications emails every engineer/admin in the + account except the escalator.""" + # Add a second user (engineer role) in the same account. + teammate = User( + email="teammate@example.com", + password_hash="x", + name="Teammate", + role="engineer", + account_id=test_user["user_data"]["account_id"], + account_role="engineer", + ) + test_db.add(teammate) + await test_db.flush() + + # Add a viewer-role user — must NOT receive a notification. + viewer = User( + email="viewer@example.com", + password_hash="x", + name="Viewer", + role="engineer", + account_id=test_user["user_data"]["account_id"], + account_role="viewer", + ) + test_db.add(viewer) + await test_db.flush() + + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "vpn down"}, + problem_summary="VPN won't connect after Win update", + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.commit() + + manager = HandoffManager(test_db) + handoff = await manager.create_handoff( + session_id=session.id, + intent="escalate", + engineer_notes="Stuck on auth handshake", + user_id=test_user["user_data"]["id"], + ) + await test_db.commit() + + with patch( + "app.services.handoff_manager.EmailService.send_notification_email", + new=AsyncMock(return_value=True), + ) as send: + sent = await manager.dispatch_escalation_notifications(handoff) + + assert sent == 1 # only the engineer-role teammate + recipients = {call.kwargs["to_email"] for call in send.call_args_list} + assert recipients == {"teammate@example.com"} + assert viewer.email not in recipients + assert test_user["email"] not in recipients # not self-notified + + title = send.call_args_list[0].kwargs["title"] + assert "VPN won't connect after Win update" in title + + +@pytest.mark.asyncio +async def test_dispatch_skipped_for_park_intent( + client: AsyncClient, test_user, auth_headers, test_db +): + """park-intent handoffs are private (waiting for client logs etc) — no + team-wide email.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "x"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.commit() + + manager = HandoffManager(test_db) + handoff = await manager.create_handoff( + session_id=session.id, + intent="park", + engineer_notes="waiting on customer", + user_id=test_user["user_data"]["id"], + ) + await test_db.commit() + + with patch( + "app.services.handoff_manager.EmailService.send_notification_email", + new=AsyncMock(return_value=True), + ) as send: + sent = await manager.dispatch_escalation_notifications(handoff) + + assert sent == 0 + assert send.call_count == 0 + + +@pytest.mark.asyncio +async def test_dispatch_graceful_degradation_when_email_raises( + client: AsyncClient, test_user, auth_headers, test_db +): + """If the email service raises (auth misconfig, network, etc.), dispatch + must NOT raise. Handoff creation has already committed; emailing is + best-effort. Codex-flagged regression.""" + teammate = User( + email="t@example.com", + password_hash="x", + name="T", + role="engineer", + account_id=test_user["user_data"]["account_id"], + account_role="engineer", + ) + test_db.add(teammate) + await test_db.flush() + + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "x"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.commit() + + manager = HandoffManager(test_db) + handoff = await manager.create_handoff( + session_id=session.id, + intent="escalate", + engineer_notes="help", + user_id=test_user["user_data"]["id"], + ) + await test_db.commit() + + with patch( + "app.services.handoff_manager.EmailService.send_notification_email", + new=AsyncMock(side_effect=RuntimeError("SMTP down")), + ): + # Must not raise. + sent = await manager.dispatch_escalation_notifications(handoff) + assert sent == 0 + + +@pytest.mark.asyncio +async def test_create_handoff_endpoint_dispatches_on_escalate( + client: AsyncClient, test_user, auth_headers, test_db +): + """End-to-end: POST /handoff with intent=escalate triggers + dispatch_escalation_notifications after commit. Verifies the wiring in + the endpoint, not just the manager method.""" + teammate = User( + email="t2@example.com", + password_hash="x", + name="T2", + role="engineer", + account_id=test_user["user_data"]["account_id"], + account_role="engineer", + ) + test_db.add(teammate) + await test_db.commit() + + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "x"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.commit() + + with patch( + "app.services.handoff_manager.EmailService.send_notification_email", + new=AsyncMock(return_value=True), + ) as send: + resp = await client.post( + f"/api/v1/ai-sessions/{session.id}/handoff", + headers=auth_headers, + json={"intent": "escalate", "engineer_notes": "Need help"}, + ) + assert resp.status_code == 201 + assert send.call_count == 1 + assert send.call_args.kwargs["to_email"] == "t2@example.com"