"""Integration tests for HandoffManager service.""" import asyncio from unittest.mock import AsyncMock, patch import pytest from httpx import AsyncClient from app.models.ai_session import AISession from app.models.user import User from app.services.handoff_manager import HandoffManager @pytest.fixture(autouse=True) def stub_ai_assessment(): """Keep handoff tests focused on handoff behavior, not external AI calls.""" with patch.object( HandoffManager, "_generate_handoff_summary", new=AsyncMock( return_value={ "summary_prose": "Stub escalation assessment", "what_we_know": [], "likely_cause": "Stub", "suggested_steps": [], "confidence": "medium", } ), ): yield @pytest.mark.asyncio async def test_create_park_handoff(client: AsyncClient, test_user, auth_headers, test_db): """Parking a session creates a handoff with snapshot.""" session = AISession( user_id=test_user["user_data"]["id"], account_id=test_user["user_data"]["account_id"], session_type="guided", intake_type="free_text", intake_content={"text": "test"}, status="active", confidence_tier="discovery", conversation_messages=[{"role": "user", "content": "help me"}], ) test_db.add(session) await test_db.flush() from app.services.handoff_manager import HandoffManager manager = HandoffManager(test_db) handoff = await manager.create_handoff( session_id=session.id, intent="park", engineer_notes="Waiting for client to provide logs", user_id=test_user["user_data"]["id"], ) assert handoff is not None assert handoff.intent == "park" assert handoff.engineer_notes == "Waiting for client to provide logs" assert handoff.snapshot is not None # Session should be paused await test_db.refresh(session) assert session.status == "paused" assert session.handoff_count == 1 @pytest.mark.asyncio async def test_create_escalate_handoff(client: AsyncClient, test_user, auth_headers, test_db): """Escalating creates handoff + dual-writes to escalation_package.""" session = AISession( user_id=test_user["user_data"]["id"], account_id=test_user["user_data"]["account_id"], session_type="guided", intake_type="free_text", intake_content={"text": "test"}, status="active", confidence_tier="discovery", conversation_messages=[], ) test_db.add(session) await test_db.flush() from app.services.handoff_manager import HandoffManager manager = HandoffManager(test_db) handoff = await manager.create_handoff( session_id=session.id, intent="escalate", engineer_notes="Need senior help", user_id=test_user["user_data"]["id"], ) assert handoff.intent == "escalate" # Dual-write check await test_db.refresh(session) assert session.status == "escalated" assert session.escalation_package is not None assert "branch_map" in session.escalation_package or "snapshot" in session.escalation_package assert session.escalation_package["handoff_id"] == str(handoff.id) @pytest.mark.asyncio async def test_create_escalate_handoff_does_not_wait_on_slow_ai_assessment( client: AsyncClient, test_user, auth_headers, test_db, monkeypatch ): """Escalate should commit a handoff even when optional AI assessment is slow.""" session = AISession( user_id=test_user["user_data"]["id"], account_id=test_user["user_data"]["account_id"], session_type="guided", intake_type="free_text", intake_content={"text": "test"}, status="active", confidence_tier="discovery", conversation_messages=[], ) test_db.add(session) await test_db.flush() async def slow_summary(self, session): await asyncio.sleep(0.2) return {"summary_prose": "too slow", "confidence": "medium"} monkeypatch.setattr( "app.services.handoff_manager.settings." "ESCALATION_AI_ASSESSMENT_TIMEOUT_SECONDS", 0.01, ) with patch.object( HandoffManager, "_generate_handoff_summary_inner", new=slow_summary, ): manager = HandoffManager(test_db) handoff = await manager.create_handoff( session_id=session.id, intent="escalate", engineer_notes="Need senior help", user_id=test_user["user_data"]["id"], ) assert handoff.intent == "escalate" assert handoff.ai_assessment is None assert handoff.ai_assessment_data is None await test_db.refresh(session) assert session.status == "escalated" assert session.handoff_count == 1 @pytest.mark.asyncio async def test_claim_session(client: AsyncClient, test_user, test_admin, auth_headers, test_db): """Claiming a handoff sets claimed_by and reactivates session.""" session = AISession( user_id=test_user["user_data"]["id"], account_id=test_user["user_data"]["account_id"], session_type="guided", intake_type="free_text", intake_content={"text": "test"}, status="active", confidence_tier="discovery", conversation_messages=[], ) test_db.add(session) await test_db.flush() from app.services.handoff_manager import HandoffManager manager = HandoffManager(test_db) handoff = await manager.create_handoff( session_id=session.id, intent="escalate", engineer_notes="Need help", user_id=test_user["user_data"]["id"], ) claimed = await manager.claim_session( handoff_id=handoff.id, claiming_user_id=test_admin["user_data"]["id"], ) assert str(claimed.claimed_by) == test_admin["user_data"]["id"] assert claimed.claimed_at is not None await test_db.refresh(session) assert session.status == "active" @pytest.mark.asyncio async def test_claim_session_conflict_raises_already_claimed( client: AsyncClient, test_user, test_admin, auth_headers, test_db ): """Two seniors claiming simultaneously: the second raises the typed HandoffAlreadyClaimedError carrying the winner's identity. Without this guard both calls would silently overwrite claimed_by — the locked race-condition story depends on a real conflict response.""" from app.services.handoff_manager import ( HandoffAlreadyClaimedError, HandoffManager, ) session = AISession( user_id=test_user["user_data"]["id"], account_id=test_user["user_data"]["account_id"], session_type="guided", intake_type="free_text", intake_content={"text": "test"}, status="active", confidence_tier="discovery", conversation_messages=[], ) test_db.add(session) loser = User( email="race-loser@example.com", password_hash="x", name="Race Loser", role="engineer", account_id=test_user["user_data"]["account_id"], account_role="engineer", ) test_db.add(loser) await test_db.flush() manager = HandoffManager(test_db) handoff = await manager.create_handoff( session_id=session.id, intent="escalate", engineer_notes="Need help", user_id=test_user["user_data"]["id"], ) # First claim — admin wins. await manager.claim_session( handoff_id=handoff.id, claiming_user_id=test_admin["user_data"]["id"], ) # Second claim by a different user — standing in for the other senior who # lost the race. with pytest.raises(HandoffAlreadyClaimedError) as exc_info: await manager.claim_session( handoff_id=handoff.id, claiming_user_id=loser.id, ) err = exc_info.value assert str(err.claimed_by_id) == test_admin["user_data"]["id"] assert err.claimed_by_name # populated from User.name assert err.claimed_at is not None @pytest.mark.asyncio async def test_claim_session_idempotent_for_same_user( client: AsyncClient, test_user, test_admin, auth_headers, test_db ): """A re-claim by the user who already won is a no-op, not a conflict. Defends against double-clicks / network retries on the loser-side toast.""" session = AISession( user_id=test_user["user_data"]["id"], account_id=test_user["user_data"]["account_id"], session_type="guided", intake_type="free_text", intake_content={"text": "test"}, status="active", confidence_tier="discovery", conversation_messages=[], ) test_db.add(session) await test_db.flush() manager = HandoffManager(test_db) handoff = await manager.create_handoff( session_id=session.id, intent="escalate", engineer_notes="Need help", user_id=test_user["user_data"]["id"], ) first = await manager.claim_session( handoff_id=handoff.id, claiming_user_id=test_admin["user_data"]["id"], ) second = await manager.claim_session( handoff_id=handoff.id, claiming_user_id=test_admin["user_data"]["id"], ) assert str(first.claimed_by) == str(second.claimed_by) == test_admin["user_data"]["id"] @pytest.mark.asyncio async def test_claim_session_rejects_self_claim( client: AsyncClient, test_user, auth_headers, test_db ): """The engineer who escalated a session cannot pick up their own handoff.""" session = AISession( user_id=test_user["user_data"]["id"], account_id=test_user["user_data"]["account_id"], session_type="guided", intake_type="free_text", intake_content={"text": "test"}, status="active", confidence_tier="discovery", conversation_messages=[], ) test_db.add(session) await test_db.flush() manager = HandoffManager(test_db) handoff = await manager.create_handoff( session_id=session.id, intent="escalate", engineer_notes="Need help", user_id=test_user["user_data"]["id"], ) with pytest.raises(PermissionError): await manager.claim_session( handoff_id=handoff.id, claiming_user_id=test_user["user_data"]["id"], ) # ─── Notification dispatch ──────────────────────────────────────────────────── @pytest.mark.asyncio async def test_dispatch_emails_engineer_recipients_in_account( client: AsyncClient, test_user, auth_headers, test_db ): """dispatch_escalation_notifications emails every engineer/admin in the account except the escalator.""" # Add a second user (engineer role) in the same account. teammate = User( email="teammate@example.com", password_hash="x", name="Teammate", role="engineer", account_id=test_user["user_data"]["account_id"], account_role="engineer", ) test_db.add(teammate) await test_db.flush() # Add a viewer-role user — must NOT receive a notification. viewer = User( email="viewer@example.com", password_hash="x", name="Viewer", role="engineer", account_id=test_user["user_data"]["account_id"], account_role="viewer", ) test_db.add(viewer) await test_db.flush() session = AISession( user_id=test_user["user_data"]["id"], account_id=test_user["user_data"]["account_id"], session_type="guided", intake_type="free_text", intake_content={"text": "vpn down"}, problem_summary="VPN won't connect after Win update", status="active", confidence_tier="discovery", conversation_messages=[], ) test_db.add(session) await test_db.commit() manager = HandoffManager(test_db) handoff = await manager.create_handoff( session_id=session.id, intent="escalate", engineer_notes="Stuck on auth handshake", user_id=test_user["user_data"]["id"], ) await test_db.commit() with patch( "app.services.handoff_manager.EmailService.send_notification_email", new=AsyncMock(return_value=True), ) as send: sent = await manager.dispatch_escalation_notifications(handoff) assert sent == 1 # only the engineer-role teammate recipients = {call.kwargs["to_email"] for call in send.call_args_list} assert recipients == {"teammate@example.com"} assert viewer.email not in recipients assert test_user["email"] not in recipients # not self-notified title = send.call_args_list[0].kwargs["title"] assert "VPN won't connect after Win update" in title @pytest.mark.asyncio async def test_dispatch_skipped_for_park_intent( client: AsyncClient, test_user, auth_headers, test_db ): """park-intent handoffs are private (waiting for client logs etc) — no team-wide email.""" session = AISession( user_id=test_user["user_data"]["id"], account_id=test_user["user_data"]["account_id"], session_type="guided", intake_type="free_text", intake_content={"text": "x"}, status="active", confidence_tier="discovery", conversation_messages=[], ) test_db.add(session) await test_db.commit() manager = HandoffManager(test_db) handoff = await manager.create_handoff( session_id=session.id, intent="park", engineer_notes="waiting on customer", user_id=test_user["user_data"]["id"], ) await test_db.commit() with patch( "app.services.handoff_manager.EmailService.send_notification_email", new=AsyncMock(return_value=True), ) as send: sent = await manager.dispatch_escalation_notifications(handoff) assert sent == 0 assert send.call_count == 0 @pytest.mark.asyncio async def test_dispatch_graceful_degradation_when_email_raises( client: AsyncClient, test_user, auth_headers, test_db ): """If the email service raises (auth misconfig, network, etc.), dispatch must NOT raise. Handoff creation has already committed; emailing is best-effort. Codex-flagged regression.""" teammate = User( email="t@example.com", password_hash="x", name="T", role="engineer", account_id=test_user["user_data"]["account_id"], account_role="engineer", ) test_db.add(teammate) await test_db.flush() session = AISession( user_id=test_user["user_data"]["id"], account_id=test_user["user_data"]["account_id"], session_type="guided", intake_type="free_text", intake_content={"text": "x"}, status="active", confidence_tier="discovery", conversation_messages=[], ) test_db.add(session) await test_db.commit() manager = HandoffManager(test_db) handoff = await manager.create_handoff( session_id=session.id, intent="escalate", engineer_notes="help", user_id=test_user["user_data"]["id"], ) await test_db.commit() with patch( "app.services.handoff_manager.EmailService.send_notification_email", new=AsyncMock(side_effect=RuntimeError("SMTP down")), ): # Must not raise. sent = await manager.dispatch_escalation_notifications(handoff) assert sent == 0 @pytest.mark.asyncio async def test_dispatch_publishes_to_escalation_bus( client: AsyncClient, test_user, auth_headers, test_db ): """dispatch_escalation_notifications puts an event on the in-memory bus so connected SSE subscribers see live arrivals.""" from app.core.escalation_bus import bus as escalation_bus session = AISession( user_id=test_user["user_data"]["id"], account_id=test_user["user_data"]["account_id"], session_type="guided", intake_type="free_text", intake_content={"text": "x"}, problem_summary="VPN down", status="active", confidence_tier="discovery", conversation_messages=[], ) test_db.add(session) await test_db.commit() manager = HandoffManager(test_db) handoff = await manager.create_handoff( session_id=session.id, intent="escalate", engineer_notes="please help", user_id=test_user["user_data"]["id"], ) await test_db.commit() from uuid import UUID as PyUUID account_id = PyUUID(test_user["user_data"]["account_id"]) queue = await escalation_bus.subscribe(account_id) try: with patch( "app.services.handoff_manager.EmailService.send_notification_email", new=AsyncMock(return_value=True), ): await manager.dispatch_escalation_notifications(handoff) import asyncio event = await asyncio.wait_for(queue.get(), timeout=1.0) assert event["type"] == "handoff_created" assert event["handoff_id"] == str(handoff.id) assert event["session_id"] == str(session.id) assert event["priority"] == "normal" finally: await escalation_bus.unsubscribe(account_id, queue) @pytest.mark.asyncio async def test_create_handoff_endpoint_dispatches_on_escalate( client: AsyncClient, test_user, auth_headers, test_db ): """End-to-end: POST /handoff with intent=escalate triggers dispatch_escalation_notifications after commit. Verifies the wiring in the endpoint, not just the manager method.""" teammate = User( email="t2@example.com", password_hash="x", name="T2", role="engineer", account_id=test_user["user_data"]["account_id"], account_role="engineer", ) test_db.add(teammate) await test_db.commit() session = AISession( user_id=test_user["user_data"]["id"], account_id=test_user["user_data"]["account_id"], session_type="guided", intake_type="free_text", intake_content={"text": "x"}, status="active", confidence_tier="discovery", conversation_messages=[], ) test_db.add(session) await test_db.commit() with patch( "app.services.handoff_manager.EmailService.send_notification_email", new=AsyncMock(return_value=True), ) as send: resp = await client.post( f"/api/v1/ai-sessions/{session.id}/handoff", headers=auth_headers, json={"intent": "escalate", "engineer_notes": "Need help"}, ) assert resp.status_code == 201 assert send.call_count == 1 assert send.call_args.kwargs["to_email"] == "t2@example.com"