Codex review pass on the escalation wedge. Reworks claim_session from read-then-write to a conditional UPDATE so two seniors racing can't both win, blocks the original engineer from claiming their own handoff, and filters self-escalated sessions out of the dashboard escalation queue. Also preassigns the handoff UUID before flush so the compatibility escalation_package payload carries it. Removes legacy frontend pickup state (claiming, handleStartHere) that broke tsc --noEmit. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
581 lines
18 KiB
Python
581 lines
18 KiB
Python
"""Integration tests for HandoffManager service."""
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
from app.models.ai_session import AISession
|
|
from app.models.user import User
|
|
from app.services.handoff_manager import HandoffManager
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def stub_ai_assessment():
|
|
"""Keep handoff tests focused on handoff behavior, not external AI calls."""
|
|
with patch.object(
|
|
HandoffManager,
|
|
"_generate_handoff_summary",
|
|
new=AsyncMock(
|
|
return_value={
|
|
"summary_prose": "Stub escalation assessment",
|
|
"what_we_know": [],
|
|
"likely_cause": "Stub",
|
|
"suggested_steps": [],
|
|
"confidence": "medium",
|
|
}
|
|
),
|
|
):
|
|
yield
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_park_handoff(client: AsyncClient, test_user, auth_headers, test_db):
|
|
"""Parking a session creates a handoff with snapshot."""
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[{"role": "user", "content": "help me"}],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.flush()
|
|
|
|
from app.services.handoff_manager import HandoffManager
|
|
manager = HandoffManager(test_db)
|
|
|
|
handoff = await manager.create_handoff(
|
|
session_id=session.id,
|
|
intent="park",
|
|
engineer_notes="Waiting for client to provide logs",
|
|
user_id=test_user["user_data"]["id"],
|
|
)
|
|
|
|
assert handoff is not None
|
|
assert handoff.intent == "park"
|
|
assert handoff.engineer_notes == "Waiting for client to provide logs"
|
|
assert handoff.snapshot is not None
|
|
|
|
# Session should be paused
|
|
await test_db.refresh(session)
|
|
assert session.status == "paused"
|
|
assert session.handoff_count == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_escalate_handoff(client: AsyncClient, test_user, auth_headers, test_db):
|
|
"""Escalating creates handoff + dual-writes to escalation_package."""
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.flush()
|
|
|
|
from app.services.handoff_manager import HandoffManager
|
|
manager = HandoffManager(test_db)
|
|
|
|
handoff = await manager.create_handoff(
|
|
session_id=session.id,
|
|
intent="escalate",
|
|
engineer_notes="Need senior help",
|
|
user_id=test_user["user_data"]["id"],
|
|
)
|
|
|
|
assert handoff.intent == "escalate"
|
|
|
|
# Dual-write check
|
|
await test_db.refresh(session)
|
|
assert session.status == "escalated"
|
|
assert session.escalation_package is not None
|
|
assert "branch_map" in session.escalation_package or "snapshot" in session.escalation_package
|
|
assert session.escalation_package["handoff_id"] == str(handoff.id)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_escalate_handoff_does_not_wait_on_slow_ai_assessment(
|
|
client: AsyncClient, test_user, auth_headers, test_db, monkeypatch
|
|
):
|
|
"""Escalate should commit a handoff even when optional AI assessment is slow."""
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.flush()
|
|
|
|
async def slow_summary(self, session):
|
|
await asyncio.sleep(0.2)
|
|
return {"summary_prose": "too slow", "confidence": "medium"}
|
|
|
|
monkeypatch.setattr(
|
|
"app.services.handoff_manager.settings."
|
|
"ESCALATION_AI_ASSESSMENT_TIMEOUT_SECONDS",
|
|
0.01,
|
|
)
|
|
with patch.object(
|
|
HandoffManager,
|
|
"_generate_handoff_summary_inner",
|
|
new=slow_summary,
|
|
):
|
|
manager = HandoffManager(test_db)
|
|
handoff = await manager.create_handoff(
|
|
session_id=session.id,
|
|
intent="escalate",
|
|
engineer_notes="Need senior help",
|
|
user_id=test_user["user_data"]["id"],
|
|
)
|
|
|
|
assert handoff.intent == "escalate"
|
|
assert handoff.ai_assessment is None
|
|
assert handoff.ai_assessment_data is None
|
|
|
|
await test_db.refresh(session)
|
|
assert session.status == "escalated"
|
|
assert session.handoff_count == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_claim_session(client: AsyncClient, test_user, test_admin, auth_headers, test_db):
|
|
"""Claiming a handoff sets claimed_by and reactivates session."""
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.flush()
|
|
|
|
from app.services.handoff_manager import HandoffManager
|
|
manager = HandoffManager(test_db)
|
|
|
|
handoff = await manager.create_handoff(
|
|
session_id=session.id,
|
|
intent="escalate",
|
|
engineer_notes="Need help",
|
|
user_id=test_user["user_data"]["id"],
|
|
)
|
|
|
|
claimed = await manager.claim_session(
|
|
handoff_id=handoff.id,
|
|
claiming_user_id=test_admin["user_data"]["id"],
|
|
)
|
|
|
|
assert str(claimed.claimed_by) == test_admin["user_data"]["id"]
|
|
assert claimed.claimed_at is not None
|
|
|
|
await test_db.refresh(session)
|
|
assert session.status == "active"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_claim_session_conflict_raises_already_claimed(
|
|
client: AsyncClient, test_user, test_admin, auth_headers, test_db
|
|
):
|
|
"""Two seniors claiming simultaneously: the second raises the typed
|
|
HandoffAlreadyClaimedError carrying the winner's identity. Without this
|
|
guard both calls would silently overwrite claimed_by — the locked
|
|
race-condition story depends on a real conflict response."""
|
|
from app.services.handoff_manager import (
|
|
HandoffAlreadyClaimedError,
|
|
HandoffManager,
|
|
)
|
|
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
loser = User(
|
|
email="race-loser@example.com",
|
|
password_hash="x",
|
|
name="Race Loser",
|
|
role="engineer",
|
|
account_id=test_user["user_data"]["account_id"],
|
|
account_role="engineer",
|
|
)
|
|
test_db.add(loser)
|
|
await test_db.flush()
|
|
|
|
manager = HandoffManager(test_db)
|
|
handoff = await manager.create_handoff(
|
|
session_id=session.id,
|
|
intent="escalate",
|
|
engineer_notes="Need help",
|
|
user_id=test_user["user_data"]["id"],
|
|
)
|
|
|
|
# First claim — admin wins.
|
|
await manager.claim_session(
|
|
handoff_id=handoff.id,
|
|
claiming_user_id=test_admin["user_data"]["id"],
|
|
)
|
|
|
|
# Second claim by a different user — standing in for the other senior who
|
|
# lost the race.
|
|
with pytest.raises(HandoffAlreadyClaimedError) as exc_info:
|
|
await manager.claim_session(
|
|
handoff_id=handoff.id,
|
|
claiming_user_id=loser.id,
|
|
)
|
|
|
|
err = exc_info.value
|
|
assert str(err.claimed_by_id) == test_admin["user_data"]["id"]
|
|
assert err.claimed_by_name # populated from User.name
|
|
assert err.claimed_at is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_claim_session_idempotent_for_same_user(
|
|
client: AsyncClient, test_user, test_admin, auth_headers, test_db
|
|
):
|
|
"""A re-claim by the user who already won is a no-op, not a conflict.
|
|
Defends against double-clicks / network retries on the loser-side toast."""
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.flush()
|
|
|
|
manager = HandoffManager(test_db)
|
|
handoff = await manager.create_handoff(
|
|
session_id=session.id,
|
|
intent="escalate",
|
|
engineer_notes="Need help",
|
|
user_id=test_user["user_data"]["id"],
|
|
)
|
|
|
|
first = await manager.claim_session(
|
|
handoff_id=handoff.id,
|
|
claiming_user_id=test_admin["user_data"]["id"],
|
|
)
|
|
second = await manager.claim_session(
|
|
handoff_id=handoff.id,
|
|
claiming_user_id=test_admin["user_data"]["id"],
|
|
)
|
|
|
|
assert str(first.claimed_by) == str(second.claimed_by) == test_admin["user_data"]["id"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_claim_session_rejects_self_claim(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""The engineer who escalated a session cannot pick up their own handoff."""
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.flush()
|
|
|
|
manager = HandoffManager(test_db)
|
|
handoff = await manager.create_handoff(
|
|
session_id=session.id,
|
|
intent="escalate",
|
|
engineer_notes="Need help",
|
|
user_id=test_user["user_data"]["id"],
|
|
)
|
|
|
|
with pytest.raises(PermissionError):
|
|
await manager.claim_session(
|
|
handoff_id=handoff.id,
|
|
claiming_user_id=test_user["user_data"]["id"],
|
|
)
|
|
|
|
|
|
# ─── Notification dispatch ────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_emails_engineer_recipients_in_account(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""dispatch_escalation_notifications emails every engineer/admin in the
|
|
account except the escalator."""
|
|
# Add a second user (engineer role) in the same account.
|
|
teammate = User(
|
|
email="teammate@example.com",
|
|
password_hash="x",
|
|
name="Teammate",
|
|
role="engineer",
|
|
account_id=test_user["user_data"]["account_id"],
|
|
account_role="engineer",
|
|
)
|
|
test_db.add(teammate)
|
|
await test_db.flush()
|
|
|
|
# Add a viewer-role user — must NOT receive a notification.
|
|
viewer = User(
|
|
email="viewer@example.com",
|
|
password_hash="x",
|
|
name="Viewer",
|
|
role="engineer",
|
|
account_id=test_user["user_data"]["account_id"],
|
|
account_role="viewer",
|
|
)
|
|
test_db.add(viewer)
|
|
await test_db.flush()
|
|
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "vpn down"},
|
|
problem_summary="VPN won't connect after Win update",
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.commit()
|
|
|
|
manager = HandoffManager(test_db)
|
|
handoff = await manager.create_handoff(
|
|
session_id=session.id,
|
|
intent="escalate",
|
|
engineer_notes="Stuck on auth handshake",
|
|
user_id=test_user["user_data"]["id"],
|
|
)
|
|
await test_db.commit()
|
|
|
|
with patch(
|
|
"app.services.handoff_manager.EmailService.send_notification_email",
|
|
new=AsyncMock(return_value=True),
|
|
) as send:
|
|
sent = await manager.dispatch_escalation_notifications(handoff)
|
|
|
|
assert sent == 1 # only the engineer-role teammate
|
|
recipients = {call.kwargs["to_email"] for call in send.call_args_list}
|
|
assert recipients == {"teammate@example.com"}
|
|
assert viewer.email not in recipients
|
|
assert test_user["email"] not in recipients # not self-notified
|
|
|
|
title = send.call_args_list[0].kwargs["title"]
|
|
assert "VPN won't connect after Win update" in title
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_skipped_for_park_intent(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""park-intent handoffs are private (waiting for client logs etc) — no
|
|
team-wide email."""
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "x"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.commit()
|
|
|
|
manager = HandoffManager(test_db)
|
|
handoff = await manager.create_handoff(
|
|
session_id=session.id,
|
|
intent="park",
|
|
engineer_notes="waiting on customer",
|
|
user_id=test_user["user_data"]["id"],
|
|
)
|
|
await test_db.commit()
|
|
|
|
with patch(
|
|
"app.services.handoff_manager.EmailService.send_notification_email",
|
|
new=AsyncMock(return_value=True),
|
|
) as send:
|
|
sent = await manager.dispatch_escalation_notifications(handoff)
|
|
|
|
assert sent == 0
|
|
assert send.call_count == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_graceful_degradation_when_email_raises(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""If the email service raises (auth misconfig, network, etc.), dispatch
|
|
must NOT raise. Handoff creation has already committed; emailing is
|
|
best-effort. Codex-flagged regression."""
|
|
teammate = User(
|
|
email="t@example.com",
|
|
password_hash="x",
|
|
name="T",
|
|
role="engineer",
|
|
account_id=test_user["user_data"]["account_id"],
|
|
account_role="engineer",
|
|
)
|
|
test_db.add(teammate)
|
|
await test_db.flush()
|
|
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "x"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.commit()
|
|
|
|
manager = HandoffManager(test_db)
|
|
handoff = await manager.create_handoff(
|
|
session_id=session.id,
|
|
intent="escalate",
|
|
engineer_notes="help",
|
|
user_id=test_user["user_data"]["id"],
|
|
)
|
|
await test_db.commit()
|
|
|
|
with patch(
|
|
"app.services.handoff_manager.EmailService.send_notification_email",
|
|
new=AsyncMock(side_effect=RuntimeError("SMTP down")),
|
|
):
|
|
# Must not raise.
|
|
sent = await manager.dispatch_escalation_notifications(handoff)
|
|
assert sent == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_publishes_to_escalation_bus(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""dispatch_escalation_notifications puts an event on the in-memory bus
|
|
so connected SSE subscribers see live arrivals."""
|
|
from app.core.escalation_bus import bus as escalation_bus
|
|
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "x"},
|
|
problem_summary="VPN down",
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.commit()
|
|
|
|
manager = HandoffManager(test_db)
|
|
handoff = await manager.create_handoff(
|
|
session_id=session.id,
|
|
intent="escalate",
|
|
engineer_notes="please help",
|
|
user_id=test_user["user_data"]["id"],
|
|
)
|
|
await test_db.commit()
|
|
|
|
from uuid import UUID as PyUUID
|
|
account_id = PyUUID(test_user["user_data"]["account_id"])
|
|
|
|
queue = await escalation_bus.subscribe(account_id)
|
|
try:
|
|
with patch(
|
|
"app.services.handoff_manager.EmailService.send_notification_email",
|
|
new=AsyncMock(return_value=True),
|
|
):
|
|
await manager.dispatch_escalation_notifications(handoff)
|
|
|
|
import asyncio
|
|
event = await asyncio.wait_for(queue.get(), timeout=1.0)
|
|
assert event["type"] == "handoff_created"
|
|
assert event["handoff_id"] == str(handoff.id)
|
|
assert event["session_id"] == str(session.id)
|
|
assert event["priority"] == "normal"
|
|
finally:
|
|
await escalation_bus.unsubscribe(account_id, queue)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_handoff_endpoint_dispatches_on_escalate(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""End-to-end: POST /handoff with intent=escalate triggers
|
|
dispatch_escalation_notifications after commit. Verifies the wiring in
|
|
the endpoint, not just the manager method."""
|
|
teammate = User(
|
|
email="t2@example.com",
|
|
password_hash="x",
|
|
name="T2",
|
|
role="engineer",
|
|
account_id=test_user["user_data"]["account_id"],
|
|
account_role="engineer",
|
|
)
|
|
test_db.add(teammate)
|
|
await test_db.commit()
|
|
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "x"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.commit()
|
|
|
|
with patch(
|
|
"app.services.handoff_manager.EmailService.send_notification_email",
|
|
new=AsyncMock(return_value=True),
|
|
) as send:
|
|
resp = await client.post(
|
|
f"/api/v1/ai-sessions/{session.id}/handoff",
|
|
headers=auth_headers,
|
|
json={"intent": "escalate", "engineer_notes": "Need help"},
|
|
)
|
|
assert resp.status_code == 201
|
|
assert send.call_count == 1
|
|
assert send.call_args.kwargs["to_email"] == "t2@example.com"
|