feat: add handoff API endpoints with queue and integration tests

Four endpoints: create handoff (park/escalate), list handoff history,
claim session, and team queue. Two routers: session-scoped router with
prefix /ai-sessions/{session_id} and queue_router with prefix /ai-sessions.
queue_router registered before ai_sessions.router to avoid /{session_id}
path conflict on GET /ai-sessions/queue.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-03-24 08:43:32 +00:00
parent f84b868d13
commit 9d2a8332aa
3 changed files with 179 additions and 0 deletions

View File

@@ -0,0 +1,116 @@
"""Handoff endpoints — unified park/escalate.
POST /ai-sessions/{id}/handoff — Create handoff
GET /ai-sessions/{id}/handoffs — Handoff history
POST /ai-sessions/{id}/handoffs/{hid}/claim — Claim session
GET /ai-sessions/queue — Team queue
"""
import logging
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_active_user, get_db
from app.models.user import User
from app.models.ai_session import AISession
from app.models.session_handoff import SessionHandoff
from app.services.handoff_manager import HandoffManager
from app.schemas.session_handoff import (
HandoffCreateRequest,
HandoffResponse,
)
logger = logging.getLogger(__name__)
# Queue endpoint needs its own router (no session_id prefix)
queue_router = APIRouter(prefix="/ai-sessions", tags=["session-handoffs"])
# Session-scoped endpoints
router = APIRouter(prefix="/ai-sessions/{session_id}", tags=["session-handoffs"])
@router.post("/handoff", response_model=HandoffResponse, status_code=status.HTTP_201_CREATED)
async def create_handoff(
session_id: UUID,
body: HandoffCreateRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> HandoffResponse:
"""Create a handoff (park or escalate)."""
result = await db.execute(
select(AISession).where(
AISession.id == session_id,
AISession.user_id == current_user.id,
)
)
session = result.scalar_one_or_none()
if not session:
raise HTTPException(status_code=404, detail="Session not found")
manager = HandoffManager(db)
try:
handoff = await manager.create_handoff(
session_id=session_id,
intent=body.intent,
engineer_notes=body.engineer_notes,
user_id=current_user.id,
priority=body.priority,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
await db.commit()
return HandoffResponse.model_validate(handoff)
@router.get("/handoffs", response_model=list[HandoffResponse])
async def list_handoffs(
session_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> list[HandoffResponse]:
"""Get handoff history for a session."""
result = await db.execute(
select(SessionHandoff)
.where(SessionHandoff.session_id == session_id)
.order_by(SessionHandoff.created_at.desc())
)
handoffs = result.scalars().all()
return [HandoffResponse.model_validate(h) for h in handoffs]
@router.post("/handoffs/{handoff_id}/claim", response_model=HandoffResponse)
async def claim_handoff(
session_id: UUID,
handoff_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> HandoffResponse:
"""Claim a handed-off session."""
manager = HandoffManager(db)
try:
handoff = await manager.claim_session(
handoff_id=handoff_id,
claiming_user_id=current_user.id,
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
await db.commit()
return HandoffResponse.model_validate(handoff)
@queue_router.get("/queue")
async def get_queue(
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> list[dict]:
"""Get team queue of parked + escalated sessions."""
manager = HandoffManager(db)
return await manager.get_queue(
team_id=current_user.team_id,
account_id=current_user.account_id,
)

View File

@@ -31,6 +31,7 @@ from app.api.endpoints import uploads
from app.api.endpoints import script_builder
from app.api.endpoints import beta_feedback
from app.api.endpoints import session_branches
from app.api.endpoints import session_handoffs
api_router = APIRouter()
@@ -77,6 +78,7 @@ api_router.include_router(integrations.router)
api_router.include_router(onboarding.router)
api_router.include_router(branding.router)
api_router.include_router(supporting_data.router)
api_router.include_router(session_handoffs.queue_router) # Must be before ai_sessions to avoid /{session_id} conflict
api_router.include_router(ai_sessions.router)
api_router.include_router(flow_proposals.router)
api_router.include_router(flowpilot_analytics.router)
@@ -87,3 +89,4 @@ api_router.include_router(uploads.router)
api_router.include_router(script_builder.router)
api_router.include_router(beta_feedback.router)
api_router.include_router(session_branches.router)
api_router.include_router(session_handoffs.router)

View File

@@ -0,0 +1,60 @@
"""API endpoint tests for session handoffs."""
import pytest
from httpx import AsyncClient
from app.models.ai_session import AISession
@pytest.mark.asyncio
async def test_create_park_handoff_api(client: AsyncClient, test_user, auth_headers, test_db):
"""POST /ai-sessions/{id}/handoff with intent=park."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
resp = await client.post(
f"/api/v1/ai-sessions/{session.id}/handoff",
headers=auth_headers,
json={"intent": "park", "engineer_notes": "Waiting for logs"},
)
assert resp.status_code == 201
data = resp.json()
assert data["intent"] == "park"
@pytest.mark.asyncio
async def test_get_queue(client: AsyncClient, test_user, auth_headers, test_db):
"""GET /ai-sessions/queue returns unclaimed handoffs."""
session = AISession(
user_id=test_user["user_data"]["id"],
account_id=test_user["user_data"]["account_id"],
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
test_db.add(session)
await test_db.commit()
# Create a handoff
await client.post(
f"/api/v1/ai-sessions/{session.id}/handoff",
headers=auth_headers,
json={"intent": "escalate", "engineer_notes": "Need help"},
)
resp = await client.get("/api/v1/ai-sessions/queue", headers=auth_headers)
assert resp.status_code == 200
data = resp.json()
assert len(data) >= 1