Files
resolutionflow/backend/tests/test_handoff_manager.py
Michael Chihlas f10649abc2
All checks were successful
Mirror to GitHub / mirror (push) Successful in 5s
CI / frontend (pull_request) Successful in 4m59s
CI / backend (pull_request) Successful in 10m22s
CI / e2e (pull_request) Successful in 10m46s
fix(escalations): atomic claim + self-claim rejection + queue exclusion
Codex review pass on the escalation wedge. Reworks claim_session from
read-then-write to a conditional UPDATE so two seniors racing can't both
win, blocks the original engineer from claiming their own handoff, and
filters self-escalated sessions out of the dashboard escalation queue.
Also preassigns the handoff UUID before flush so the compatibility
escalation_package payload carries it. Removes legacy frontend pickup
state (claiming, handleStartHere) that broke tsc --noEmit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 16:21:20 -04:00

581 lines
18 KiB
Python

"""Integration tests for HandoffManager service."""
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
from httpx import AsyncClient
from app.models.ai_session import AISession
from app.models.user import User
from app.services.handoff_manager import HandoffManager
@pytest.fixture(autouse=True)
def stub_ai_assessment():
"""Keep handoff tests focused on handoff behavior, not external AI calls."""
with patch.object(
HandoffManager,
"_generate_handoff_summary",
new=AsyncMock(
return_value={
"summary_prose": "Stub escalation assessment",
"what_we_know": [],
"likely_cause": "Stub",
"suggested_steps": [],
"confidence": "medium",
}
),
):
yield
@pytest.mark.asyncio
async def test_create_park_handoff(client: AsyncClient, test_user, auth_headers, test_db):
"""Parking a session creates a handoff with snapshot."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[{"role": "user", "content": "help me"}],
)
test_db.add(session)
await test_db.flush()
from app.services.handoff_manager import HandoffManager
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="park",
engineer_notes="Waiting for client to provide logs",
user_id=test_user["user_data"]["id"],
)
assert handoff is not None
assert handoff.intent == "park"
assert handoff.engineer_notes == "Waiting for client to provide logs"
assert handoff.snapshot is not None
# Session should be paused
await test_db.refresh(session)
assert session.status == "paused"
assert session.handoff_count == 1
@pytest.mark.asyncio
async def test_create_escalate_handoff(client: AsyncClient, test_user, auth_headers, test_db):
"""Escalating creates handoff + dual-writes to escalation_package."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.flush()
from app.services.handoff_manager import HandoffManager
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="escalate",
engineer_notes="Need senior help",
user_id=test_user["user_data"]["id"],
)
assert handoff.intent == "escalate"
# Dual-write check
await test_db.refresh(session)
assert session.status == "escalated"
assert session.escalation_package is not None
assert "branch_map" in session.escalation_package or "snapshot" in session.escalation_package
assert session.escalation_package["handoff_id"] == str(handoff.id)
@pytest.mark.asyncio
async def test_create_escalate_handoff_does_not_wait_on_slow_ai_assessment(
client: AsyncClient, test_user, auth_headers, test_db, monkeypatch
):
"""Escalate should commit a handoff even when optional AI assessment is slow."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.flush()
async def slow_summary(self, session):
await asyncio.sleep(0.2)
return {"summary_prose": "too slow", "confidence": "medium"}
monkeypatch.setattr(
"app.services.handoff_manager.settings."
"ESCALATION_AI_ASSESSMENT_TIMEOUT_SECONDS",
0.01,
)
with patch.object(
HandoffManager,
"_generate_handoff_summary_inner",
new=slow_summary,
):
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="escalate",
engineer_notes="Need senior help",
user_id=test_user["user_data"]["id"],
)
assert handoff.intent == "escalate"
assert handoff.ai_assessment is None
assert handoff.ai_assessment_data is None
await test_db.refresh(session)
assert session.status == "escalated"
assert session.handoff_count == 1
@pytest.mark.asyncio
async def test_claim_session(client: AsyncClient, test_user, test_admin, auth_headers, test_db):
"""Claiming a handoff sets claimed_by and reactivates session."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.flush()
from app.services.handoff_manager import HandoffManager
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="escalate",
engineer_notes="Need help",
user_id=test_user["user_data"]["id"],
)
claimed = await manager.claim_session(
handoff_id=handoff.id,
claiming_user_id=test_admin["user_data"]["id"],
)
assert str(claimed.claimed_by) == test_admin["user_data"]["id"]
assert claimed.claimed_at is not None
await test_db.refresh(session)
assert session.status == "active"
@pytest.mark.asyncio
async def test_claim_session_conflict_raises_already_claimed(
client: AsyncClient, test_user, test_admin, auth_headers, test_db
):
"""Two seniors claiming simultaneously: the second raises the typed
HandoffAlreadyClaimedError carrying the winner's identity. Without this
guard both calls would silently overwrite claimed_by — the locked
race-condition story depends on a real conflict response."""
from app.services.handoff_manager import (
HandoffAlreadyClaimedError,
HandoffManager,
)
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
loser = User(
email="race-loser@example.com",
password_hash="x",
name="Race Loser",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="engineer",
)
test_db.add(loser)
await test_db.flush()
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="escalate",
engineer_notes="Need help",
user_id=test_user["user_data"]["id"],
)
# First claim — admin wins.
await manager.claim_session(
handoff_id=handoff.id,
claiming_user_id=test_admin["user_data"]["id"],
)
# Second claim by a different user — standing in for the other senior who
# lost the race.
with pytest.raises(HandoffAlreadyClaimedError) as exc_info:
await manager.claim_session(
handoff_id=handoff.id,
claiming_user_id=loser.id,
)
err = exc_info.value
assert str(err.claimed_by_id) == test_admin["user_data"]["id"]
assert err.claimed_by_name # populated from User.name
assert err.claimed_at is not None
@pytest.mark.asyncio
async def test_claim_session_idempotent_for_same_user(
client: AsyncClient, test_user, test_admin, auth_headers, test_db
):
"""A re-claim by the user who already won is a no-op, not a conflict.
Defends against double-clicks / network retries on the loser-side toast."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.flush()
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="escalate",
engineer_notes="Need help",
user_id=test_user["user_data"]["id"],
)
first = await manager.claim_session(
handoff_id=handoff.id,
claiming_user_id=test_admin["user_data"]["id"],
)
second = await manager.claim_session(
handoff_id=handoff.id,
claiming_user_id=test_admin["user_data"]["id"],
)
assert str(first.claimed_by) == str(second.claimed_by) == test_admin["user_data"]["id"]
@pytest.mark.asyncio
async def test_claim_session_rejects_self_claim(
client: AsyncClient, test_user, auth_headers, test_db
):
"""The engineer who escalated a session cannot pick up their own handoff."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.flush()
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="escalate",
engineer_notes="Need help",
user_id=test_user["user_data"]["id"],
)
with pytest.raises(PermissionError):
await manager.claim_session(
handoff_id=handoff.id,
claiming_user_id=test_user["user_data"]["id"],
)
# ─── Notification dispatch ────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_dispatch_emails_engineer_recipients_in_account(
client: AsyncClient, test_user, auth_headers, test_db
):
"""dispatch_escalation_notifications emails every engineer/admin in the
account except the escalator."""
# Add a second user (engineer role) in the same account.
teammate = User(
email="teammate@example.com",
password_hash="x",
name="Teammate",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="engineer",
)
test_db.add(teammate)
await test_db.flush()
# Add a viewer-role user — must NOT receive a notification.
viewer = User(
email="viewer@example.com",
password_hash="x",
name="Viewer",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="viewer",
)
test_db.add(viewer)
await test_db.flush()
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "vpn down"},
problem_summary="VPN won't connect after Win update",
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="escalate",
engineer_notes="Stuck on auth handshake",
user_id=test_user["user_data"]["id"],
)
await test_db.commit()
with patch(
"app.services.handoff_manager.EmailService.send_notification_email",
new=AsyncMock(return_value=True),
) as send:
sent = await manager.dispatch_escalation_notifications(handoff)
assert sent == 1 # only the engineer-role teammate
recipients = {call.kwargs["to_email"] for call in send.call_args_list}
assert recipients == {"teammate@example.com"}
assert viewer.email not in recipients
assert test_user["email"] not in recipients # not self-notified
title = send.call_args_list[0].kwargs["title"]
assert "VPN won't connect after Win update" in title
@pytest.mark.asyncio
async def test_dispatch_skipped_for_park_intent(
client: AsyncClient, test_user, auth_headers, test_db
):
"""park-intent handoffs are private (waiting for client logs etc) — no
team-wide email."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "x"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="park",
engineer_notes="waiting on customer",
user_id=test_user["user_data"]["id"],
)
await test_db.commit()
with patch(
"app.services.handoff_manager.EmailService.send_notification_email",
new=AsyncMock(return_value=True),
) as send:
sent = await manager.dispatch_escalation_notifications(handoff)
assert sent == 0
assert send.call_count == 0
@pytest.mark.asyncio
async def test_dispatch_graceful_degradation_when_email_raises(
client: AsyncClient, test_user, auth_headers, test_db
):
"""If the email service raises (auth misconfig, network, etc.), dispatch
must NOT raise. Handoff creation has already committed; emailing is
best-effort. Codex-flagged regression."""
teammate = User(
email="t@example.com",
password_hash="x",
name="T",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="engineer",
)
test_db.add(teammate)
await test_db.flush()
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "x"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="escalate",
engineer_notes="help",
user_id=test_user["user_data"]["id"],
)
await test_db.commit()
with patch(
"app.services.handoff_manager.EmailService.send_notification_email",
new=AsyncMock(side_effect=RuntimeError("SMTP down")),
):
# Must not raise.
sent = await manager.dispatch_escalation_notifications(handoff)
assert sent == 0
@pytest.mark.asyncio
async def test_dispatch_publishes_to_escalation_bus(
client: AsyncClient, test_user, auth_headers, test_db
):
"""dispatch_escalation_notifications puts an event on the in-memory bus
so connected SSE subscribers see live arrivals."""
from app.core.escalation_bus import bus as escalation_bus
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "x"},
problem_summary="VPN down",
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="escalate",
engineer_notes="please help",
user_id=test_user["user_data"]["id"],
)
await test_db.commit()
from uuid import UUID as PyUUID
account_id = PyUUID(test_user["user_data"]["account_id"])
queue = await escalation_bus.subscribe(account_id)
try:
with patch(
"app.services.handoff_manager.EmailService.send_notification_email",
new=AsyncMock(return_value=True),
):
await manager.dispatch_escalation_notifications(handoff)
import asyncio
event = await asyncio.wait_for(queue.get(), timeout=1.0)
assert event["type"] == "handoff_created"
assert event["handoff_id"] == str(handoff.id)
assert event["session_id"] == str(session.id)
assert event["priority"] == "normal"
finally:
await escalation_bus.unsubscribe(account_id, queue)
@pytest.mark.asyncio
async def test_create_handoff_endpoint_dispatches_on_escalate(
client: AsyncClient, test_user, auth_headers, test_db
):
"""End-to-end: POST /handoff with intent=escalate triggers
dispatch_escalation_notifications after commit. Verifies the wiring in
the endpoint, not just the manager method."""
teammate = User(
email="t2@example.com",
password_hash="x",
name="T2",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="engineer",
)
test_db.add(teammate)
await test_db.commit()
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "x"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
with patch(
"app.services.handoff_manager.EmailService.send_notification_email",
new=AsyncMock(return_value=True),
) as send:
resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoff",
headers=auth_headers,
json={"intent": "escalate", "engineer_notes": "Need help"},
)
assert resp.status_code == 201
assert send.call_count == 1
assert send.call_args.kwargs["to_email"] == "t2@example.com"