Files
resolutionflow/backend/tests/test_session_handoffs_api.py
Michael Chihlas db717b0b3f feat(escalations): magic-moment 3-option CTA + claim 500 fix
- HandoffContextScreen: 3-option layout (Continue/AI analysis/Own thing)
  with hasTaskLane, activeOptionKey, spinner/disabled states
- AssistantChatPage: wire up handleContinue, handleAIAnalysis, handleOwnThing
  handlers; chip detail expansion inline with copy-button fix; post-escalation
  redirect to dashboard on ConcludeSessionModal close
- TaskLane: fix async copy button (await + execCommand fallback + copiedKey
  visual feedback); whitespace-pre-wrap on command blocks
- Fix 500 on claim: Pydantic v2 model_validate() + model_copy(update={})
  (was passing update= kwarg directly which v2 rejects)
- HandoffResponse schema: handed_off_by_name field

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-30 00:05:02 -04:00

228 lines
7.3 KiB
Python

"""API endpoint tests for session handoffs."""
from unittest.mock import AsyncMock, patch
from uuid import UUID as PyUUID
import pytest
from httpx import AsyncClient
from sqlalchemy import select
from app.api.endpoints.session_handoffs import stream_escalations
from app.core.escalation_bus import bus as escalation_bus
from app.models.ai_session import AISession
from app.models.user import User
from app.services.handoff_manager import HandoffManager
class _ConnectedRequest:
async def is_disconnected(self) -> bool:
return False
@pytest.fixture(autouse=True)
def stub_ai_assessment():
"""Endpoint tests should not wait on the external AI assessment path."""
with patch.object(
HandoffManager,
"_generate_handoff_summary",
new=AsyncMock(
return_value={
"summary_prose": "Stub escalation assessment",
"what_we_know": [],
"likely_cause": "Stub",
"suggested_steps": [],
"confidence": "medium",
}
),
):
yield
@pytest.mark.asyncio
async def test_create_park_handoff_api(client: AsyncClient, test_user, auth_headers, test_db):
"""POST /ai-sessions/{id}/handoff with intent=park."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoff",
headers=auth_headers,
json={"intent": "park", "engineer_notes": "Waiting for logs"},
)
assert resp.status_code == 201
data = resp.json()
assert data["intent"] == "park"
@pytest.mark.asyncio
async def test_get_queue(client: AsyncClient, test_user, auth_headers, test_db):
"""GET /ai-sessions/queue returns unclaimed handoffs."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
# Create a handoff
await client.post(
f"/api/v1/ai-sessions/{session.id}/handoff",
headers=auth_headers,
json={"intent": "escalate", "engineer_notes": "Need help"},
)
resp = await client.get("/api/v1/ai-sessions/queue", headers=auth_headers)
assert resp.status_code == 200
data = resp.json()
assert len(data) >= 1
@pytest.mark.asyncio
async def test_claim_blocked_for_viewer_role(
client: AsyncClient, test_user, auth_headers, test_db
):
"""POST /handoffs/{id}/claim must 403 for viewer-role users.
Codex review flagged the missing role gate as wedge-relevant: the
race-condition story (two seniors clicking Pick Up simultaneously)
requires auth gating for audit integrity. Viewers must not be able
to claim escalations.
"""
# Create a session + handoff as the engineer-role test_user (default = owner).
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
create_resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoff",
headers=auth_headers,
json={"intent": "escalate", "engineer_notes": "Need help"},
)
assert create_resp.status_code == 201
handoff_id = create_resp.json()["id"]
# Downgrade the user to viewer.
user_id = PyUUID(test_user["user_data"]["id"])
user = (
await test_db.execute(select(User).where(User.id == user_id))
).scalar_one()
user.account_role = "viewer"
await test_db.commit()
claim_resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoffs/{handoff_id}/claim",
headers=auth_headers,
)
assert claim_resp.status_code == 403
assert "engineer" in claim_resp.json()["detail"].lower()
@pytest.mark.asyncio
async def test_escalations_stream_blocked_for_viewer(
client: AsyncClient, test_user, auth_headers, test_db
):
"""SSE stream is role-gated to engineer-or-admin (matches queue/claim)."""
user_id = PyUUID(test_user["user_data"]["id"])
user = (
await test_db.execute(select(User).where(User.id == user_id))
).scalar_one()
user.account_role = "viewer"
await test_db.commit()
resp = await client.get(
"/api/v1/ai-sessions/escalations/stream", headers=auth_headers
)
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_escalations_stream_returns_sse_content_type(
client: AsyncClient, test_user, auth_headers, test_db
):
"""Engineer/owner can open the SSE stream and gets text/event-stream
plus an initial `ready` event. Read just enough bytes to confirm the
handshake — the full pub/sub flow is covered by the bus + dispatcher tests
separately.
Do not use `client.stream()` here: HTTPX's ASGITransport buffers the whole
response body before returning, which hangs forever for an infinite SSE
stream.
"""
user_id = PyUUID(test_user["user_data"]["id"])
user = (
await test_db.execute(select(User).where(User.id == user_id))
).scalar_one()
resp = await stream_escalations(_ConnectedRequest(), current_user=user)
assert resp.media_type == "text/event-stream"
body_iterator = resp.body_iterator
try:
first = await anext(body_iterator)
finally:
await body_iterator.aclose()
assert "event: ready" in first
assert '"account_id"' in first
assert escalation_bus.subscriber_count(user.account_id) == 0
@pytest.mark.asyncio
async def test_claim_allowed_for_engineer_role(
client: AsyncClient, test_user, auth_headers, test_db
):
"""POST /handoffs/{id}/claim succeeds for engineer-or-admin roles."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
create_resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoff",
headers=auth_headers,
json={"intent": "escalate", "engineer_notes": "Need help"},
)
assert create_resp.status_code == 201
handoff_id = create_resp.json()["id"]
# Default test_user role is "owner", which passes engineer-or-admin.
claim_resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoffs/{handoff_id}/claim",
headers=auth_headers,
)
assert claim_resp.status_code == 200
assert claim_resp.json()["claimed_by"] == test_user["user_data"]["id"]
assert claim_resp.json()["claimed_at"] is not None