fix(escalations): atomic claim + self-claim rejection + queue exclusion
All checks were successful
Mirror to GitHub / mirror (push) Successful in 5s
CI / frontend (pull_request) Successful in 4m59s
CI / backend (pull_request) Successful in 10m22s
CI / e2e (pull_request) Successful in 10m46s

Codex review pass on the escalation wedge. Reworks claim_session from
read-then-write to a conditional UPDATE so two seniors racing can't both
win, blocks the original engineer from claiming their own handoff, and
filters self-escalated sessions out of the dashboard escalation queue.
Also preassigns the handoff UUID before flush so the compatibility
escalation_package payload carries it. Removes legacy frontend pickup
state (claiming, handleStartHere) that broke tsc --noEmit.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-30 16:21:20 -04:00
parent ab5e0deaf7
commit f10649abc2
10 changed files with 248 additions and 134 deletions

View File

@@ -99,6 +99,7 @@ async def test_create_escalate_handoff(client: AsyncClient, test_user, auth_head
assert session.status == "escalated"
assert session.escalation_package is not None
assert "branch_map" in session.escalation_package or "snapshot" in session.escalation_package
assert session.escalation_package["handoff_id"] == str(handoff.id)
@pytest.mark.asyncio
@@ -181,7 +182,7 @@ async def test_claim_session(client: AsyncClient, test_user, test_admin, auth_he
claiming_user_id=test_admin["user_data"]["id"],
)
assert claimed.claimed_by == test_admin["user_data"]["id"]
assert str(claimed.claimed_by) == test_admin["user_data"]["id"]
assert claimed.claimed_at is not None
await test_db.refresh(session)
@@ -212,6 +213,15 @@ async def test_claim_session_conflict_raises_already_claimed(
conversation_messages=[],
)
test_db.add(session)
loser = User(
email="race-loser@example.com",
password_hash="x",
name="Race Loser",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="engineer",
)
test_db.add(loser)
await test_db.flush()
manager = HandoffManager(test_db)
@@ -228,16 +238,16 @@ async def test_claim_session_conflict_raises_already_claimed(
claiming_user_id=test_admin["user_data"]["id"],
)
# Second claim by a different user — owner of the original session,
# standing in for "the other senior who lost the race."
# Second claim by a different user — standing in for the other senior who
# lost the race.
with pytest.raises(HandoffAlreadyClaimedError) as exc_info:
await manager.claim_session(
handoff_id=handoff.id,
claiming_user_id=test_user["user_data"]["id"],
claiming_user_id=loser.id,
)
err = exc_info.value
assert err.claimed_by_id == test_admin["user_data"]["id"]
assert str(err.claimed_by_id) == test_admin["user_data"]["id"]
assert err.claimed_by_name # populated from User.name
assert err.claimed_at is not None
@@ -278,7 +288,40 @@ async def test_claim_session_idempotent_for_same_user(
claiming_user_id=test_admin["user_data"]["id"],
)
assert first.claimed_by == second.claimed_by == test_admin["user_data"]["id"]
assert str(first.claimed_by) == str(second.claimed_by) == test_admin["user_data"]["id"]
@pytest.mark.asyncio
async def test_claim_session_rejects_self_claim(
client: AsyncClient, test_user, auth_headers, test_db
):
"""The engineer who escalated a session cannot pick up their own handoff."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.flush()
manager = HandoffManager(test_db)
handoff = await manager.create_handoff(
session_id=session.id,
intent="escalate",
engineer_notes="Need help",
user_id=test_user["user_data"]["id"],
)
with pytest.raises(PermissionError):
await manager.claim_session(
handoff_id=handoff.id,
claiming_user_id=test_user["user_data"]["id"],
)
# ─── Notification dispatch ────────────────────────────────────────────────────

View File

@@ -9,6 +9,7 @@ from sqlalchemy import select
from app.api.endpoints.session_handoffs import stream_escalations
from app.core.escalation_bus import bus as escalation_bus
from app.models.ai_session import AISession
from app.models.session_handoff import SessionHandoff
from app.models.user import User
from app.services.handoff_manager import HandoffManager
@@ -196,8 +197,19 @@ async def test_claim_allowed_for_engineer_role(
client: AsyncClient, test_user, auth_headers, test_db
):
"""POST /handoffs/{id}/claim succeeds for engineer-or-admin roles."""
original_engineer = User(
email="original-engineer@example.com",
password_hash="x",
name="Original Engineer",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="engineer",
)
test_db.add(original_engineer)
await test_db.flush()
session = AISession(
user_id=test_user["user_data"]["id"],
user_id=original_engineer.id,
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
@@ -207,21 +219,106 @@ async def test_claim_allowed_for_engineer_role(
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
await test_db.flush()
create_resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoff",
headers=auth_headers,
json={"intent": "escalate", "engineer_notes": "Need help"},
handoff = SessionHandoff(
session_id=session.id,
account_id=test_user["user_data"]["account_id"],
handed_off_by=original_engineer.id,
intent="escalate",
snapshot={"problem_summary": "test"},
engineer_notes="Need help",
)
assert create_resp.status_code == 201
handoff_id = create_resp.json()["id"]
test_db.add(handoff)
await test_db.commit()
# Default test_user role is "owner", which passes engineer-or-admin.
claim_resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoffs/{handoff_id}/claim",
f"/api/v1/ai-sessions/{session.id}/handoffs/{handoff.id}/claim",
headers=auth_headers,
)
assert claim_resp.status_code == 200
assert claim_resp.json()["claimed_by"] == test_user["user_data"]["id"]
assert claim_resp.json()["claimed_at"] is not None
@pytest.mark.asyncio
async def test_claim_rejects_self_claim(
client: AsyncClient, test_user, auth_headers, test_db
):
"""POST /handoffs/{id}/claim returns 403 for the original escalator."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="escalated",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.flush()
handoff = SessionHandoff(
session_id=session.id,
account_id=test_user["user_data"]["account_id"],
handed_off_by=test_user["user_data"]["id"],
intent="escalate",
snapshot={"problem_summary": "test"},
engineer_notes="Need help",
)
test_db.add(handoff)
await test_db.commit()
claim_resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoffs/{handoff.id}/claim",
headers=auth_headers,
)
assert claim_resp.status_code == 403
assert "own handoff" in claim_resp.json()["detail"]
@pytest.mark.asyncio
async def test_escalation_queue_excludes_own_escalations(
client: AsyncClient, test_user, auth_headers, test_db
):
"""The post-escalation dashboard queue should not show your own handoff."""
own_session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="chat",
intake_type="free_text",
intake_content={"text": "own"},
status="escalated",
confidence_tier="discovery",
conversation_messages=[],
)
other_engineer = User(
email="other-engineer@example.com",
password_hash="x",
name="Other Engineer",
role="engineer",
account_id=test_user["user_data"]["account_id"],
account_role="engineer",
)
test_db.add_all([own_session, other_engineer])
await test_db.flush()
other_session = AISession(
user_id=other_engineer.id,
account_id=test_user["user_data"]["account_id"],
session_type="chat",
intake_type="free_text",
intake_content={"text": "other"},
status="escalated",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(other_session)
await test_db.commit()
resp = await client.get("/api/v1/ai-sessions/escalation-queue", headers=auth_headers)
assert resp.status_code == 200
ids = {item["id"] for item in resp.json()}
assert str(own_session.id) not in ids
assert str(other_session.id) in ids