Files
resolutionflow/backend/tests/test_session_handoffs_api.py
Michael Chihlas f10649abc2
All checks were successful
Mirror to GitHub / mirror (push) Successful in 5s
CI / frontend (pull_request) Successful in 4m59s
CI / backend (pull_request) Successful in 10m22s
CI / e2e (pull_request) Successful in 10m46s
fix(escalations): atomic claim + self-claim rejection + queue exclusion
Codex review pass on the escalation wedge. Reworks claim_session from
read-then-write to a conditional UPDATE so two seniors racing can't both
win, blocks the original engineer from claiming their own handoff, and
filters self-escalated sessions out of the dashboard escalation queue.
Also preassigns the handoff UUID before flush so the compatibility
escalation_package payload carries it. Removes legacy frontend pickup
state (claiming, handleStartHere) that broke tsc --noEmit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 16:21:20 -04:00

325 lines
10 KiB
Python

"""API endpoint tests for session handoffs."""
from unittest.mock import AsyncMock, patch
from uuid import UUID as PyUUID
import pytest
from httpx import AsyncClient
from sqlalchemy import select
from app.api.endpoints.session_handoffs import stream_escalations
from app.core.escalation_bus import bus as escalation_bus
from app.models.ai_session import AISession
from app.models.session_handoff import SessionHandoff
from app.models.user import User
from app.services.handoff_manager import HandoffManager
class _ConnectedRequest:
async def is_disconnected(self) -> bool:
return False
@pytest.fixture(autouse=True)
def stub_ai_assessment():
"""Endpoint tests should not wait on the external AI assessment path."""
with patch.object(
HandoffManager,
"_generate_handoff_summary",
new=AsyncMock(
return_value={
"summary_prose": "Stub escalation assessment",
"what_we_know": [],
"likely_cause": "Stub",
"suggested_steps": [],
"confidence": "medium",
}
),
):
yield
@pytest.mark.asyncio
async def test_create_park_handoff_api(client: AsyncClient, test_user, auth_headers, test_db):
"""POST /ai-sessions/{id}/handoff with intent=park."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoff",
headers=auth_headers,
json={"intent": "park", "engineer_notes": "Waiting for logs"},
)
assert resp.status_code == 201
data = resp.json()
assert data["intent"] == "park"
@pytest.mark.asyncio
async def test_get_queue(client: AsyncClient, test_user, auth_headers, test_db):
"""GET /ai-sessions/queue returns unclaimed handoffs."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
# Create a handoff
await client.post(
f"/api/v1/ai-sessions/{session.id}/handoff",
headers=auth_headers,
json={"intent": "escalate", "engineer_notes": "Need help"},
)
resp = await client.get("/api/v1/ai-sessions/queue", headers=auth_headers)
assert resp.status_code == 200
data = resp.json()
assert len(data) >= 1
@pytest.mark.asyncio
async def test_claim_blocked_for_viewer_role(
client: AsyncClient, test_user, auth_headers, test_db
):
"""POST /handoffs/{id}/claim must 403 for viewer-role users.
Codex review flagged the missing role gate as wedge-relevant: the
race-condition story (two seniors clicking Pick Up simultaneously)
requires auth gating for audit integrity. Viewers must not be able
to claim escalations.
"""
# Create a session + handoff as the engineer-role test_user (default = owner).
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
create_resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoff",
headers=auth_headers,
json={"intent": "escalate", "engineer_notes": "Need help"},
)
assert create_resp.status_code == 201
handoff_id = create_resp.json()["id"]
# Downgrade the user to viewer.
user_id = PyUUID(test_user["user_data"]["id"])
user = (
await test_db.execute(select(User).where(User.id == user_id))
).scalar_one()
user.account_role = "viewer"
await test_db.commit()
claim_resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoffs/{handoff_id}/claim",
headers=auth_headers,
)
assert claim_resp.status_code == 403
assert "engineer" in claim_resp.json()["detail"].lower()
@pytest.mark.asyncio
async def test_escalations_stream_blocked_for_viewer(
client: AsyncClient, test_user, auth_headers, test_db
):
"""SSE stream is role-gated to engineer-or-admin (matches queue/claim)."""
user_id = PyUUID(test_user["user_data"]["id"])
user = (
await test_db.execute(select(User).where(User.id == user_id))
).scalar_one()
user.account_role = "viewer"
await test_db.commit()
resp = await client.get(
"/api/v1/ai-sessions/escalations/stream", headers=auth_headers
)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_escalations_stream_returns_sse_content_type(
client: AsyncClient, test_user, auth_headers, test_db
):
"""Engineer/owner can open the SSE stream and gets text/event-stream
plus an initial `ready` event. Read just enough bytes to confirm the
handshake — the full pub/sub flow is covered by the bus + dispatcher tests
separately.
Do not use `client.stream()` here: HTTPX's ASGITransport buffers the whole
response body before returning, which hangs forever for an infinite SSE
stream.
"""
user_id = PyUUID(test_user["user_data"]["id"])
user = (
await test_db.execute(select(User).where(User.id == user_id))
).scalar_one()
resp = await stream_escalations(_ConnectedRequest(), current_user=user)
assert resp.media_type == "text/event-stream"
body_iterator = resp.body_iterator
try:
first = await anext(body_iterator)
finally:
await body_iterator.aclose()
assert "event: ready" in first
assert '"account_id"' in first
assert escalation_bus.subscriber_count(user.account_id) == 0
@pytest.mark.asyncio
async def test_claim_allowed_for_engineer_role(
client: AsyncClient, test_user, auth_headers, test_db
):
"""POST /handoffs/{id}/claim succeeds for engineer-or-admin roles."""
original_engineer = User(
email="original-engineer@example.com",
password_hash="x",
name="Original Engineer",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="engineer",
)
test_db.add(original_engineer)
await test_db.flush()
session = AISession(
user_id=original_engineer.id,
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.flush()
handoff = SessionHandoff(
session_id=session.id,
account_id=test_user["user_data"]["account_id"],
handed_off_by=original_engineer.id,
intent="escalate",
snapshot={"problem_summary": "test"},
engineer_notes="Need help",
)
test_db.add(handoff)
await test_db.commit()
# Default test_user role is "owner", which passes engineer-or-admin.
claim_resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoffs/{handoff.id}/claim",
headers=auth_headers,
)
assert claim_resp.status_code == 200
assert claim_resp.json()["claimed_by"] == test_user["user_data"]["id"]
assert claim_resp.json()["claimed_at"] is not None
@pytest.mark.asyncio
async def test_claim_rejects_self_claim(
client: AsyncClient, test_user, auth_headers, test_db
):
"""POST /handoffs/{id}/claim returns 403 for the original escalator."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="escalated",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.flush()
handoff = SessionHandoff(
session_id=session.id,
account_id=test_user["user_data"]["account_id"],
handed_off_by=test_user["user_data"]["id"],
intent="escalate",
snapshot={"problem_summary": "test"},
engineer_notes="Need help",
)
test_db.add(handoff)
await test_db.commit()
claim_resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoffs/{handoff.id}/claim",
headers=auth_headers,
)
assert claim_resp.status_code == 403
assert "own handoff" in claim_resp.json()["detail"]
@pytest.mark.asyncio
async def test_escalation_queue_excludes_own_escalations(
client: AsyncClient, test_user, auth_headers, test_db
):
"""The post-escalation dashboard queue should not show your own handoff."""
own_session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="chat",
intake_type="free_text",
intake_content={"text": "own"},
status="escalated",
confidence_tier="discovery",
conversation_messages=[],
)
other_engineer = User(
email="other-engineer@example.com",
password_hash="x",
name="Other Engineer",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="engineer",
)
test_db.add_all([own_session, other_engineer])
await test_db.flush()
other_session = AISession(
user_id=other_engineer.id,
account_id=test_user["user_data"]["account_id"],
session_type="chat",
intake_type="free_text",
intake_content={"text": "other"},
status="escalated",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(other_session)
await test_db.commit()
resp = await client.get("/api/v1/ai-sessions/escalation-queue", headers=auth_headers)
assert resp.status_code == 200
ids = {item["id"] for item in resp.json()}
assert str(own_session.id) not in ids
assert str(other_session.id) in ids