Codex review pass on the escalation wedge. Reworks claim_session from read-then-write to a conditional UPDATE so two seniors racing can't both win, blocks the original engineer from claiming their own handoff, and filters self-escalated sessions out of the dashboard escalation queue. Also preassigns the handoff UUID before flush so the compatibility escalation_package payload carries it. Removes legacy frontend pickup state (claiming, handleStartHere) that broke tsc --noEmit. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
325 lines
10 KiB
Python
325 lines
10 KiB
Python
"""API endpoint tests for session handoffs."""
|
|
from unittest.mock import AsyncMock, patch
|
|
from uuid import UUID as PyUUID
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
from sqlalchemy import select
|
|
|
|
from app.api.endpoints.session_handoffs import stream_escalations
|
|
from app.core.escalation_bus import bus as escalation_bus
|
|
from app.models.ai_session import AISession
|
|
from app.models.session_handoff import SessionHandoff
|
|
from app.models.user import User
|
|
from app.services.handoff_manager import HandoffManager
|
|
|
|
|
|
class _ConnectedRequest:
|
|
async def is_disconnected(self) -> bool:
|
|
return False
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def stub_ai_assessment():
|
|
"""Endpoint tests should not wait on the external AI assessment path."""
|
|
with patch.object(
|
|
HandoffManager,
|
|
"_generate_handoff_summary",
|
|
new=AsyncMock(
|
|
return_value={
|
|
"summary_prose": "Stub escalation assessment",
|
|
"what_we_know": [],
|
|
"likely_cause": "Stub",
|
|
"suggested_steps": [],
|
|
"confidence": "medium",
|
|
}
|
|
),
|
|
):
|
|
yield
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_park_handoff_api(client: AsyncClient, test_user, auth_headers, test_db):
|
|
"""POST /ai-sessions/{id}/handoff with intent=park."""
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.commit()
|
|
|
|
resp = await client.post(
|
|
f"/api/v1/ai-sessions/{session.id}/handoff",
|
|
headers=auth_headers,
|
|
json={"intent": "park", "engineer_notes": "Waiting for logs"},
|
|
)
|
|
assert resp.status_code == 201
|
|
data = resp.json()
|
|
assert data["intent"] == "park"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_queue(client: AsyncClient, test_user, auth_headers, test_db):
|
|
"""GET /ai-sessions/queue returns unclaimed handoffs."""
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.commit()
|
|
|
|
# Create a handoff
|
|
await client.post(
|
|
f"/api/v1/ai-sessions/{session.id}/handoff",
|
|
headers=auth_headers,
|
|
json={"intent": "escalate", "engineer_notes": "Need help"},
|
|
)
|
|
|
|
resp = await client.get("/api/v1/ai-sessions/queue", headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert len(data) >= 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_claim_blocked_for_viewer_role(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""POST /handoffs/{id}/claim must 403 for viewer-role users.
|
|
|
|
Codex review flagged the missing role gate as wedge-relevant: the
|
|
race-condition story (two seniors clicking Pick Up simultaneously)
|
|
requires auth gating for audit integrity. Viewers must not be able
|
|
to claim escalations.
|
|
"""
|
|
# Create a session + handoff as the engineer-role test_user (default = owner).
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.commit()
|
|
|
|
create_resp = await client.post(
|
|
f"/api/v1/ai-sessions/{session.id}/handoff",
|
|
headers=auth_headers,
|
|
json={"intent": "escalate", "engineer_notes": "Need help"},
|
|
)
|
|
assert create_resp.status_code == 201
|
|
handoff_id = create_resp.json()["id"]
|
|
|
|
# Downgrade the user to viewer.
|
|
user_id = PyUUID(test_user["user_data"]["id"])
|
|
user = (
|
|
await test_db.execute(select(User).where(User.id == user_id))
|
|
).scalar_one()
|
|
user.account_role = "viewer"
|
|
await test_db.commit()
|
|
|
|
claim_resp = await client.post(
|
|
f"/api/v1/ai-sessions/{session.id}/handoffs/{handoff_id}/claim",
|
|
headers=auth_headers,
|
|
)
|
|
assert claim_resp.status_code == 403
|
|
assert "engineer" in claim_resp.json()["detail"].lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalations_stream_blocked_for_viewer(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""SSE stream is role-gated to engineer-or-admin (matches queue/claim)."""
|
|
user_id = PyUUID(test_user["user_data"]["id"])
|
|
user = (
|
|
await test_db.execute(select(User).where(User.id == user_id))
|
|
).scalar_one()
|
|
user.account_role = "viewer"
|
|
await test_db.commit()
|
|
|
|
resp = await client.get(
|
|
"/api/v1/ai-sessions/escalations/stream", headers=auth_headers
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalations_stream_returns_sse_content_type(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""Engineer/owner can open the SSE stream and gets text/event-stream
|
|
plus an initial `ready` event. Read just enough bytes to confirm the
|
|
handshake — the full pub/sub flow is covered by the bus + dispatcher tests
|
|
separately.
|
|
|
|
Do not use `client.stream()` here: HTTPX's ASGITransport buffers the whole
|
|
response body before returning, which hangs forever for an infinite SSE
|
|
stream.
|
|
"""
|
|
user_id = PyUUID(test_user["user_data"]["id"])
|
|
user = (
|
|
await test_db.execute(select(User).where(User.id == user_id))
|
|
).scalar_one()
|
|
|
|
resp = await stream_escalations(_ConnectedRequest(), current_user=user)
|
|
assert resp.media_type == "text/event-stream"
|
|
|
|
body_iterator = resp.body_iterator
|
|
try:
|
|
first = await anext(body_iterator)
|
|
finally:
|
|
await body_iterator.aclose()
|
|
|
|
assert "event: ready" in first
|
|
assert '"account_id"' in first
|
|
assert escalation_bus.subscriber_count(user.account_id) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_claim_allowed_for_engineer_role(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""POST /handoffs/{id}/claim succeeds for engineer-or-admin roles."""
|
|
original_engineer = User(
|
|
email="original-engineer@example.com",
|
|
password_hash="x",
|
|
name="Original Engineer",
|
|
role="engineer",
|
|
account_id=test_user["user_data"]["account_id"],
|
|
account_role="engineer",
|
|
)
|
|
test_db.add(original_engineer)
|
|
await test_db.flush()
|
|
|
|
session = AISession(
|
|
user_id=original_engineer.id,
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.flush()
|
|
|
|
handoff = SessionHandoff(
|
|
session_id=session.id,
|
|
account_id=test_user["user_data"]["account_id"],
|
|
handed_off_by=original_engineer.id,
|
|
intent="escalate",
|
|
snapshot={"problem_summary": "test"},
|
|
engineer_notes="Need help",
|
|
)
|
|
test_db.add(handoff)
|
|
await test_db.commit()
|
|
|
|
# Default test_user role is "owner", which passes engineer-or-admin.
|
|
claim_resp = await client.post(
|
|
f"/api/v1/ai-sessions/{session.id}/handoffs/{handoff.id}/claim",
|
|
headers=auth_headers,
|
|
)
|
|
assert claim_resp.status_code == 200
|
|
assert claim_resp.json()["claimed_by"] == test_user["user_data"]["id"]
|
|
assert claim_resp.json()["claimed_at"] is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_claim_rejects_self_claim(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""POST /handoffs/{id}/claim returns 403 for the original escalator."""
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="escalated",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.flush()
|
|
|
|
handoff = SessionHandoff(
|
|
session_id=session.id,
|
|
account_id=test_user["user_data"]["account_id"],
|
|
handed_off_by=test_user["user_data"]["id"],
|
|
intent="escalate",
|
|
snapshot={"problem_summary": "test"},
|
|
engineer_notes="Need help",
|
|
)
|
|
test_db.add(handoff)
|
|
await test_db.commit()
|
|
|
|
claim_resp = await client.post(
|
|
f"/api/v1/ai-sessions/{session.id}/handoffs/{handoff.id}/claim",
|
|
headers=auth_headers,
|
|
)
|
|
assert claim_resp.status_code == 403
|
|
assert "own handoff" in claim_resp.json()["detail"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalation_queue_excludes_own_escalations(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""The post-escalation dashboard queue should not show your own handoff."""
|
|
own_session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="chat",
|
|
intake_type="free_text",
|
|
intake_content={"text": "own"},
|
|
status="escalated",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
other_engineer = User(
|
|
email="other-engineer@example.com",
|
|
password_hash="x",
|
|
name="Other Engineer",
|
|
role="engineer",
|
|
account_id=test_user["user_data"]["account_id"],
|
|
account_role="engineer",
|
|
)
|
|
test_db.add_all([own_session, other_engineer])
|
|
await test_db.flush()
|
|
other_session = AISession(
|
|
user_id=other_engineer.id,
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="chat",
|
|
intake_type="free_text",
|
|
intake_content={"text": "other"},
|
|
status="escalated",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(other_session)
|
|
await test_db.commit()
|
|
|
|
resp = await client.get("/api/v1/ai-sessions/escalation-queue", headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
ids = {item["id"] for item in resp.json()}
|
|
assert str(own_session.id) not in ids
|
|
assert str(other_session.id) in ids
|