229 lines
7.3 KiB
Python
229 lines
7.3 KiB
Python
"""API endpoint tests for session handoffs."""
|
|
from unittest.mock import AsyncMock, patch
|
|
from uuid import UUID as PyUUID
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
from sqlalchemy import select
|
|
|
|
from app.api.endpoints.session_handoffs import stream_escalations
|
|
from app.core.escalation_bus import bus as escalation_bus
|
|
from app.models.ai_session import AISession
|
|
from app.models.user import User
|
|
from app.services.handoff_manager import HandoffManager
|
|
|
|
|
|
class _ConnectedRequest:
|
|
async def is_disconnected(self) -> bool:
|
|
return False
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def stub_ai_assessment():
|
|
"""Endpoint tests should not wait on the external AI assessment path."""
|
|
with patch.object(
|
|
HandoffManager,
|
|
"_generate_ai_assessment",
|
|
new=AsyncMock(
|
|
return_value=(
|
|
"Stub escalation assessment",
|
|
{
|
|
"likely_cause": "Stub",
|
|
"suggested_steps": [],
|
|
"confidence": "medium",
|
|
},
|
|
)
|
|
),
|
|
):
|
|
yield
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_park_handoff_api(client: AsyncClient, test_user, auth_headers, test_db):
|
|
"""POST /ai-sessions/{id}/handoff with intent=park."""
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.commit()
|
|
|
|
resp = await client.post(
|
|
f"/api/v1/ai-sessions/{session.id}/handoff",
|
|
headers=auth_headers,
|
|
json={"intent": "park", "engineer_notes": "Waiting for logs"},
|
|
)
|
|
assert resp.status_code == 201
|
|
data = resp.json()
|
|
assert data["intent"] == "park"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_queue(client: AsyncClient, test_user, auth_headers, test_db):
|
|
"""GET /ai-sessions/queue returns unclaimed handoffs."""
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.commit()
|
|
|
|
# Create a handoff
|
|
await client.post(
|
|
f"/api/v1/ai-sessions/{session.id}/handoff",
|
|
headers=auth_headers,
|
|
json={"intent": "escalate", "engineer_notes": "Need help"},
|
|
)
|
|
|
|
resp = await client.get("/api/v1/ai-sessions/queue", headers=auth_headers)
|
|
assert resp.status_code == 200
|
|
data = resp.json()
|
|
assert len(data) >= 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_claim_blocked_for_viewer_role(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""POST /handoffs/{id}/claim must 403 for viewer-role users.
|
|
|
|
Codex review flagged the missing role gate as wedge-relevant: the
|
|
race-condition story (two seniors clicking Pick Up simultaneously)
|
|
requires auth gating for audit integrity. Viewers must not be able
|
|
to claim escalations.
|
|
"""
|
|
# Create a session + handoff as the engineer-role test_user (default = owner).
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.commit()
|
|
|
|
create_resp = await client.post(
|
|
f"/api/v1/ai-sessions/{session.id}/handoff",
|
|
headers=auth_headers,
|
|
json={"intent": "escalate", "engineer_notes": "Need help"},
|
|
)
|
|
assert create_resp.status_code == 201
|
|
handoff_id = create_resp.json()["id"]
|
|
|
|
# Downgrade the user to viewer.
|
|
user_id = PyUUID(test_user["user_data"]["id"])
|
|
user = (
|
|
await test_db.execute(select(User).where(User.id == user_id))
|
|
).scalar_one()
|
|
user.account_role = "viewer"
|
|
await test_db.commit()
|
|
|
|
claim_resp = await client.post(
|
|
f"/api/v1/ai-sessions/{session.id}/handoffs/{handoff_id}/claim",
|
|
headers=auth_headers,
|
|
)
|
|
assert claim_resp.status_code == 403
|
|
assert "engineer" in claim_resp.json()["detail"].lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalations_stream_blocked_for_viewer(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""SSE stream is role-gated to engineer-or-admin (matches queue/claim)."""
|
|
user_id = PyUUID(test_user["user_data"]["id"])
|
|
user = (
|
|
await test_db.execute(select(User).where(User.id == user_id))
|
|
).scalar_one()
|
|
user.account_role = "viewer"
|
|
await test_db.commit()
|
|
|
|
resp = await client.get(
|
|
"/api/v1/ai-sessions/escalations/stream", headers=auth_headers
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalations_stream_returns_sse_content_type(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""Engineer/owner can open the SSE stream and gets text/event-stream
|
|
plus an initial `ready` event. Read just enough bytes to confirm the
|
|
handshake — the full pub/sub flow is covered by the bus + dispatcher tests
|
|
separately.
|
|
|
|
Do not use `client.stream()` here: HTTPX's ASGITransport buffers the whole
|
|
response body before returning, which hangs forever for an infinite SSE
|
|
stream.
|
|
"""
|
|
user_id = PyUUID(test_user["user_data"]["id"])
|
|
user = (
|
|
await test_db.execute(select(User).where(User.id == user_id))
|
|
).scalar_one()
|
|
|
|
resp = await stream_escalations(_ConnectedRequest(), current_user=user)
|
|
assert resp.media_type == "text/event-stream"
|
|
|
|
body_iterator = resp.body_iterator
|
|
try:
|
|
first = await anext(body_iterator)
|
|
finally:
|
|
await body_iterator.aclose()
|
|
|
|
assert "event: ready" in first
|
|
assert '"account_id"' in first
|
|
assert escalation_bus.subscriber_count(user.account_id) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_claim_allowed_for_engineer_role(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""POST /handoffs/{id}/claim succeeds for engineer-or-admin roles."""
|
|
session = AISession(
|
|
user_id=test_user["user_data"]["id"],
|
|
account_id=test_user["user_data"]["account_id"],
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
test_db.add(session)
|
|
await test_db.commit()
|
|
|
|
create_resp = await client.post(
|
|
f"/api/v1/ai-sessions/{session.id}/handoff",
|
|
headers=auth_headers,
|
|
json={"intent": "escalate", "engineer_notes": "Need help"},
|
|
)
|
|
assert create_resp.status_code == 201
|
|
handoff_id = create_resp.json()["id"]
|
|
|
|
# Default test_user role is "owner", which passes engineer-or-admin.
|
|
claim_resp = await client.post(
|
|
f"/api/v1/ai-sessions/{session.id}/handoffs/{handoff_id}/claim",
|
|
headers=auth_headers,
|
|
)
|
|
assert claim_resp.status_code == 200
|
|
assert claim_resp.json()["claimed_by"] == test_user["user_data"]["id"]
|
|
assert claim_resp.json()["claimed_at"] is not None
|