From 96973c79686dd463cb34091792109f0962a462fe Mon Sep 17 00:00:00 2001 From: Michael Chihlas Date: Thu, 28 May 2026 13:33:18 -0400 Subject: [PATCH] feat(l1): L1 endpoint surface (intake/queue/step/notes/resolve/escalate) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mounts /api/v1/l1/* with require_l1_or_coverage on every route. Intake creates an internal ticket and starts a flow OR adhoc session (PSA queue merge follows in Phase 2). Step/notes/resolve/escalate delegate to l1_session_service. escalate-without-walk creates an immediately- escalated session for the BuildAbortedNoKB path. ValueError from services → 400. Cross-account session access → 404. Co-Authored-By: Claude Opus 4.7 --- backend/app/api/endpoints/l1.py | 268 +++++++++++++++++++++ backend/app/api/router.py | 2 + backend/app/schemas/l1.py | 72 ++++++ backend/tests/test_l1_endpoints.py | 362 +++++++++++++++++++++++++++++ 4 files changed, 704 insertions(+) create mode 100644 backend/app/api/endpoints/l1.py create mode 100644 backend/app/schemas/l1.py create mode 100644 backend/tests/test_l1_endpoints.py diff --git a/backend/app/api/endpoints/l1.py b/backend/app/api/endpoints/l1.py new file mode 100644 index 00000000..cca69a70 --- /dev/null +++ b/backend/app/api/endpoints/l1.py @@ -0,0 +1,268 @@ +"""L1 Workspace endpoints (Phase 1). + +PSA-merge queue support + AI build path are deferred to Phase 2. +""" +from typing import Annotated, Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, status as http_status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.deps import get_db, require_l1_or_coverage +from app.models.l1_walk_session import L1WalkSession +from app.models.user import User +from app.schemas.l1 import ( + EscalateRequest, + EscalateWithoutWalkRequest, + IntakeRequest, + IntakeResponse, + NotesRequest, + QueueRow, + ResolveRequest, + StepRequest, + WalkSessionResponse, +) +from app.services import internal_ticket_service, l1_session_service + + +router = APIRouter(prefix="/l1", tags=["l1"]) + + +def _to_response(session: L1WalkSession) -> WalkSessionResponse: + return WalkSessionResponse( + id=session.id, + session_kind=session.session_kind, + flow_id=session.flow_id, + flow_proposal_id=session.flow_proposal_id, + current_node_id=session.current_node_id, + walked_path=session.walked_path or [], + walk_notes=session.walk_notes or [], + status=session.status, + started_at=session.started_at, + last_step_at=session.last_step_at, + resolved_at=session.resolved_at, + ) + + +async def _get_session_or_404( + db: AsyncSession, session_id: UUID, user: User +) -> L1WalkSession: + session = await db.get(L1WalkSession, session_id) + if session is None or session.account_id != user.account_id: + raise HTTPException( + status_code=http_status.HTTP_404_NOT_FOUND, + detail="Session not found", + ) + return session + + +@router.post("/intake", response_model=IntakeResponse) +async def intake( + payload: IntakeRequest, + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_l1_or_coverage)], +): + """L1 intake: creates an internal ticket and starts a walk session. + + Phase 1: internal-ticket only (PSA support follows in Phase 2 escalation polish). + If `flow_id` is provided, starts a flow session; otherwise an adhoc session. + """ + ticket = await internal_ticket_service.create_ticket( + db, + account_id=user.account_id, + created_by_user_id=user.id, + problem_statement=payload.problem_statement, + customer_name=payload.customer_name, + customer_contact=payload.customer_contact, + ) + if payload.flow_id is not None: + session = await l1_session_service.start_flow_session( + db, + account_id=user.account_id, + user=user, + flow_id=payload.flow_id, + ticket_id=str(ticket.id), + ticket_kind="internal", + ) + else: + session = await l1_session_service.start_adhoc_session( + db, + account_id=user.account_id, + user=user, + ticket_id=str(ticket.id), + ticket_kind="internal", + ) + await db.commit() + return IntakeResponse( + session_id=session.id, + session_kind=session.session_kind, + ticket_id=str(ticket.id), + ticket_kind="internal", + ) + + +@router.get("/queue", response_model=list[QueueRow]) +async def queue( + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_l1_or_coverage)], + status_filter: Optional[str] = None, + limit: int = 50, +): + """Phase 1 queue: internal tickets only. PSA-fed rows in Phase 2.""" + tickets = await internal_ticket_service.list_tickets_for_account( + db, + account_id=user.account_id, + status=status_filter, + limit=limit, + ) + return [ + QueueRow( + ticket_id=str(t.id), + ticket_kind="internal", + problem_statement=t.problem_statement, + customer_name=t.customer_name, + status=t.status, + created_at=t.created_at, + ) + for t in tickets + ] + + +@router.get("/sessions/active", response_model=list[WalkSessionResponse]) +async def list_active_sessions( + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_l1_or_coverage)], +): + """The caller's currently-active sessions (for the dashboard 'Resume in progress' widget).""" + stmt = ( + select(L1WalkSession) + .where(L1WalkSession.created_by_user_id == user.id) + .where(L1WalkSession.status == "active") + .order_by(L1WalkSession.last_step_at.desc()) + .limit(20) + ) + result = await db.execute(stmt) + return [_to_response(s) for s in result.scalars()] + + +@router.get("/sessions/{session_id}", response_model=WalkSessionResponse) +async def get_session( + session_id: UUID, + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_l1_or_coverage)], +): + session = await _get_session_or_404(db, session_id, user) + return _to_response(session) + + +@router.post("/sessions/{session_id}/step", response_model=WalkSessionResponse) +async def post_step( + session_id: UUID, + payload: StepRequest, + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_l1_or_coverage)], +): + await _get_session_or_404(db, session_id, user) + try: + updated = await l1_session_service.record_step( + db, + session_id=session_id, + node_id=payload.node_id, + question=payload.question, + answer=payload.answer, + note=payload.note, + ) + except ValueError as exc: + raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST, detail=str(exc)) + await db.commit() + return _to_response(updated) + + +@router.post("/sessions/{session_id}/notes", response_model=WalkSessionResponse) +async def post_notes( + session_id: UUID, + payload: NotesRequest, + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_l1_or_coverage)], +): + await _get_session_or_404(db, session_id, user) + try: + updated = await l1_session_service.update_notes( + db, + session_id=session_id, + notes=payload.notes, + ) + except ValueError as exc: + raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST, detail=str(exc)) + await db.commit() + return _to_response(updated) + + +@router.post("/sessions/{session_id}/resolve", response_model=WalkSessionResponse) +async def post_resolve( + session_id: UUID, + payload: ResolveRequest, + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_l1_or_coverage)], +): + await _get_session_or_404(db, session_id, user) + try: + updated = await l1_session_service.resolve( + db, + session_id=session_id, + helpful=payload.helpful, + resolution_notes=payload.resolution_notes, + ) + except ValueError as exc: + raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST, detail=str(exc)) + await db.commit() + return _to_response(updated) + + +@router.post("/sessions/{session_id}/escalate", response_model=WalkSessionResponse) +async def post_escalate( + session_id: UUID, + payload: EscalateRequest, + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_l1_or_coverage)], +): + await _get_session_or_404(db, session_id, user) + try: + updated = await l1_session_service.escalate( + db, + session_id=session_id, + reason=payload.reason or "", + reason_category=payload.reason_category, + ) + except ValueError as exc: + raise HTTPException(status_code=http_status.HTTP_400_BAD_REQUEST, detail=str(exc)) + await db.commit() + return _to_response(updated) + + +@router.post("/escalate-without-walk", response_model=WalkSessionResponse) +async def post_escalate_without_walk( + payload: EscalateWithoutWalkRequest, + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_l1_or_coverage)], +): + ticket = await internal_ticket_service.create_ticket( + db, + account_id=user.account_id, + created_by_user_id=user.id, + problem_statement=payload.problem_statement, + customer_name=payload.customer_name, + customer_contact=payload.customer_contact, + ) + session = await l1_session_service.escalate_without_walk( + db, + account_id=user.account_id, + user=user, + ticket_id=str(ticket.id), + ticket_kind="internal", + reason_category=payload.reason_category, + reason=payload.reason, + ) + await db.commit() + return _to_response(session) diff --git a/backend/app/api/router.py b/backend/app/api/router.py index f8f13a35..60e417a3 100644 --- a/backend/app/api/router.py +++ b/backend/app/api/router.py @@ -8,6 +8,7 @@ from app.api.deps import ( from app.api.endpoints import ( admin, admin_audit, + l1, admin_categories, admin_dashboard, admin_feature_flags, @@ -185,3 +186,4 @@ api_router.include_router(beta_feedback.router, dependencies=_tenant_deps) api_router.include_router(session_branches.router, dependencies=_pro_deps) api_router.include_router(session_handoffs.router, dependencies=_pro_deps) api_router.include_router(device_types.router, dependencies=_tenant_deps) +api_router.include_router(l1.router, dependencies=_tenant_deps) diff --git a/backend/app/schemas/l1.py b/backend/app/schemas/l1.py new file mode 100644 index 00000000..cbb74bad --- /dev/null +++ b/backend/app/schemas/l1.py @@ -0,0 +1,72 @@ +"""Pydantic schemas for the /l1/* endpoint surface.""" +from datetime import datetime +from typing import Any, Literal, Optional +from uuid import UUID + +from pydantic import BaseModel, Field + + +class IntakeRequest(BaseModel): + problem_statement: str = Field(..., min_length=1) + customer_name: Optional[str] = None + customer_contact: Optional[str] = None + flow_id: Optional[UUID] = None + + +class IntakeResponse(BaseModel): + session_id: UUID + session_kind: Literal["flow", "proposal", "adhoc"] + ticket_id: str + ticket_kind: Literal["psa", "internal"] + + +class StepRequest(BaseModel): + node_id: str + question: str + answer: str + note: Optional[str] = None + + +class NotesRequest(BaseModel): + notes: list[dict[str, Any]] + + +class ResolveRequest(BaseModel): + helpful: bool + resolution_notes: str + + +class EscalateRequest(BaseModel): + reason: Optional[str] = None + reason_category: str = Field(..., min_length=1) + + +class EscalateWithoutWalkRequest(BaseModel): + problem_statement: str = Field(..., min_length=1) + customer_name: Optional[str] = None + customer_contact: Optional[str] = None + reason_category: str = Field(..., min_length=1) + reason: Optional[str] = None + + +class WalkSessionResponse(BaseModel): + id: UUID + session_kind: str + flow_id: Optional[UUID] + flow_proposal_id: Optional[UUID] + current_node_id: Optional[str] + walked_path: list[dict[str, Any]] + walk_notes: list[dict[str, Any]] + status: str + started_at: datetime + last_step_at: datetime + resolved_at: Optional[datetime] + + +class QueueRow(BaseModel): + ticket_id: str + ticket_kind: Literal["psa", "internal"] + problem_statement: Optional[str] = None + customer_name: Optional[str] = None + status: str + created_at: Optional[datetime] = None diff --git a/backend/tests/test_l1_endpoints.py b/backend/tests/test_l1_endpoints.py new file mode 100644 index 00000000..8a3716d0 --- /dev/null +++ b/backend/tests/test_l1_endpoints.py @@ -0,0 +1,362 @@ +"""Integration tests for the /l1/* endpoint surface (Task 15). + +All tests use the `client` + `test_db` fixtures from conftest. +""" +import uuid +from datetime import datetime, timezone, timedelta + +import pytest +from httpx import AsyncClient +from sqlalchemy import delete +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.subscription import Subscription +from app.models.user import User +from app.models.l1_walk_session import L1WalkSession + + +# --------------------------------------------------------------------------- +# Test-local helpers +# --------------------------------------------------------------------------- + +async def _register(client: AsyncClient, *, email: str, password: str = "TestPassword123!", name: str = "Test User") -> dict: + resp = await client.post("/api/v1/auth/register", json={"email": email, "password": password, "name": name}) + assert resp.status_code in (200, 201), resp.text + return resp.json() + + +async def _login(client: AsyncClient, *, email: str, password: str = "TestPassword123!") -> dict: + resp = await client.post("/api/v1/auth/login/json", json={"email": email, "password": password}) + assert resp.status_code == 200, resp.text + return {"Authorization": f"Bearer {resp.json()['access_token']}"} + + +async def _ensure_subscription(db: AsyncSession, account_id: uuid.UUID) -> None: + """Ensure account has an active Pro subscription.""" + await db.execute(delete(Subscription).where(Subscription.account_id == account_id)) + db.add(Subscription(account_id=account_id, plan="pro", status="active")) + await db.commit() + + +async def _make_l1_user( + client: AsyncClient, + db: AsyncSession, + *, + email: str, + account_id: uuid.UUID | None = None, +) -> dict: + """Register a user, set role=l1_tech, ensure subscription. + + If account_id is given, inserts a second user directly into that account. + Otherwise registers a fresh user via the API (new account) and returns + both user data and login headers. + """ + if account_id is None: + user_data = await _register(client, email=email) + uid = uuid.UUID(user_data["id"]) + acct_id = uuid.UUID(user_data["account_id"]) + # Promote to l1_tech + from sqlalchemy import select as sa_select + result = await db.execute(sa_select(User).where(User.id == uid)) + user = result.scalar_one() + user.account_role = "l1_tech" + await db.commit() + await _ensure_subscription(db, acct_id) + headers = await _login(client, email=email) + return {"user_data": user_data, "headers": headers, "account_id": acct_id} + else: + # Insert directly into an existing account + s = str(uuid.uuid4())[:8] + user = User( + id=uuid.uuid4(), + email=email, + name=f"L1 Tech {s}", + account_id=account_id, + account_role="l1_tech", + role="engineer", + is_active=True, + hashed_password="$2b$12$placeholder.placeholder.placeholder.placeholder.plac", + ) + db.add(user) + await db.commit() + return {"user_data": {"id": str(user.id), "account_id": str(account_id)}, "headers": None} + + +# --------------------------------------------------------------------------- +# 1. Intake without flow_id → 200 + session_kind='adhoc' +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_intake_adhoc(client: AsyncClient, test_db: AsyncSession): + """POST /l1/intake without flow_id creates adhoc session.""" + info = await _make_l1_user(client, test_db, email="l1intake@example.com") + headers = info["headers"] + + resp = await client.post( + "/api/v1/l1/intake", + json={"problem_statement": "Printer won't turn on", "customer_name": "Alice"}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + body = resp.json() + assert body["session_kind"] == "adhoc" + assert body["ticket_kind"] == "internal" + assert "session_id" in body + assert "ticket_id" in body + + +# --------------------------------------------------------------------------- +# 2. Intake without auth → 401 +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_intake_no_auth(client: AsyncClient, test_db: AsyncSession): + """POST /l1/intake without token → 401.""" + resp = await client.post( + "/api/v1/l1/intake", + json={"problem_statement": "Test"}, + ) + assert resp.status_code == 401 + + +# --------------------------------------------------------------------------- +# 3. Intake as viewer → 403 +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_intake_viewer_forbidden(client: AsyncClient, test_db: AsyncSession): + """POST /l1/intake as viewer role → 403.""" + user_data = await _register(client, email="viewer_l1@example.com") + uid = uuid.UUID(user_data["id"]) + acct_id = uuid.UUID(user_data["account_id"]) + + from sqlalchemy import select as sa_select + result = await test_db.execute(sa_select(User).where(User.id == uid)) + user = result.scalar_one() + user.account_role = "viewer" + await test_db.commit() + await _ensure_subscription(test_db, acct_id) + + headers = await _login(client, email="viewer_l1@example.com") + resp = await client.post( + "/api/v1/l1/intake", + json={"problem_statement": "Test"}, + headers=headers, + ) + assert resp.status_code == 403 + + +# --------------------------------------------------------------------------- +# 4. Step on adhoc session → 400 (cannot step an adhoc) +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_step_on_adhoc_returns_400(client: AsyncClient, test_db: AsyncSession): + """POST /l1/sessions/{id}/step on adhoc session → 400.""" + info = await _make_l1_user(client, test_db, email="l1step@example.com") + headers = info["headers"] + + # Create adhoc session via intake + resp = await client.post( + "/api/v1/l1/intake", + json={"problem_statement": "Adhoc issue"}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + session_id = resp.json()["session_id"] + + resp = await client.post( + f"/api/v1/l1/sessions/{session_id}/step", + json={"node_id": "node1", "question": "Q?", "answer": "A"}, + headers=headers, + ) + assert resp.status_code == 400 + assert "adhoc" in resp.json()["detail"] + + +# --------------------------------------------------------------------------- +# 5. Notes on adhoc session → 200, walk_notes updated +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_notes_on_adhoc_session(client: AsyncClient, test_db: AsyncSession): + """POST /l1/sessions/{id}/notes → 200 and walk_notes is updated.""" + info = await _make_l1_user(client, test_db, email="l1notes@example.com") + headers = info["headers"] + + resp = await client.post( + "/api/v1/l1/intake", + json={"problem_statement": "Notes test"}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + session_id = resp.json()["session_id"] + + notes_payload = [{"text": "Customer called about printer", "ts": "2026-05-28T10:00:00Z"}] + resp = await client.post( + f"/api/v1/l1/sessions/{session_id}/notes", + json={"notes": notes_payload}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + body = resp.json() + assert body["walk_notes"] == notes_payload + + +# --------------------------------------------------------------------------- +# 6. Resolve with helpful=True → 200; GET shows status=resolved +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_resolve_session(client: AsyncClient, test_db: AsyncSession): + """POST /l1/sessions/{id}/resolve → 200; subsequent GET shows resolved.""" + info = await _make_l1_user(client, test_db, email="l1resolve@example.com") + headers = info["headers"] + + resp = await client.post( + "/api/v1/l1/intake", + json={"problem_statement": "Resolve test"}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + session_id = resp.json()["session_id"] + + resp = await client.post( + f"/api/v1/l1/sessions/{session_id}/resolve", + json={"helpful": True, "resolution_notes": "Restarted the printer."}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + assert resp.json()["status"] == "resolved" + + # GET should also show resolved + resp = await client.get(f"/api/v1/l1/sessions/{session_id}", headers=headers) + assert resp.status_code == 200 + assert resp.json()["status"] == "resolved" + + +# --------------------------------------------------------------------------- +# 7. Escalate session → 200; status=escalated +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_escalate_session(client: AsyncClient, test_db: AsyncSession): + """POST /l1/sessions/{id}/escalate → 200; status becomes escalated.""" + info = await _make_l1_user(client, test_db, email="l1escalate@example.com") + headers = info["headers"] + + resp = await client.post( + "/api/v1/l1/intake", + json={"problem_statement": "Escalation test"}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + session_id = resp.json()["session_id"] + + resp = await client.post( + f"/api/v1/l1/sessions/{session_id}/escalate", + json={"reason_category": "needs_l2", "reason": "Beyond L1 scope"}, + headers=headers, + ) + assert resp.status_code == 200, resp.text + body = resp.json() + assert body["status"] == "escalated" + + +# --------------------------------------------------------------------------- +# 8. escalate-without-walk → 200 + session in escalated status +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_escalate_without_walk(client: AsyncClient, test_db: AsyncSession): + """POST /l1/escalate-without-walk → 200 + session.status=escalated.""" + info = await _make_l1_user(client, test_db, email="l1eww@example.com") + headers = info["headers"] + + resp = await client.post( + "/api/v1/l1/escalate-without-walk", + json={ + "problem_statement": "No KB available", + "reason_category": "no_kb", + "reason": "No knowledge base content matched", + }, + headers=headers, + ) + assert resp.status_code == 200, resp.text + body = resp.json() + assert body["status"] == "escalated" + assert body["session_kind"] == "adhoc" + + +# --------------------------------------------------------------------------- +# 9. List active sessions returns L1's active sessions ordered by last_step_at DESC +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_list_active_sessions_ordered(client: AsyncClient, test_db: AsyncSession): + """GET /l1/sessions/active returns active sessions ordered by last_step_at DESC.""" + info = await _make_l1_user(client, test_db, email="l1active@example.com") + headers = info["headers"] + user_id = uuid.UUID(info["user_data"]["id"]) + account_id = info["account_id"] + + # Create two sessions with controlled timestamps directly in DB + now = datetime.now(timezone.utc) + s1 = L1WalkSession( + id=uuid.uuid4(), + account_id=account_id, + created_by_user_id=user_id, + ticket_id=str(uuid.uuid4()), + ticket_kind="internal", + session_kind="adhoc", + status="active", + started_at=now - timedelta(minutes=10), + last_step_at=now - timedelta(minutes=5), + ) + s2 = L1WalkSession( + id=uuid.uuid4(), + account_id=account_id, + created_by_user_id=user_id, + ticket_id=str(uuid.uuid4()), + ticket_kind="internal", + session_kind="adhoc", + status="active", + started_at=now - timedelta(minutes=20), + last_step_at=now - timedelta(minutes=1), + ) + test_db.add_all([s1, s2]) + await test_db.commit() + + resp = await client.get("/api/v1/l1/sessions/active", headers=headers) + assert resp.status_code == 200, resp.text + bodies = resp.json() + ids = [b["id"] for b in bodies] + # s2 has the more recent last_step_at → should come first + assert ids.index(str(s2.id)) < ids.index(str(s1.id)) + + +# --------------------------------------------------------------------------- +# 10. GET session from different account → 404 (tenancy) +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_get_session_cross_account_returns_404(client: AsyncClient, test_db: AsyncSession): + """GET /l1/sessions/{id} from a different account → 404.""" + # Account A: creates a session + info_a = await _make_l1_user(client, test_db, email="l1tenanta@example.com") + headers_a = info_a["headers"] + + resp = await client.post( + "/api/v1/l1/intake", + json={"problem_statement": "Account A issue"}, + headers=headers_a, + ) + assert resp.status_code == 200, resp.text + session_id = resp.json()["session_id"] + + # Account B: different user in a different account + info_b = await _make_l1_user(client, test_db, email="l1tenantb@example.com") + headers_b = info_b["headers"] + + resp = await client.get(f"/api/v1/l1/sessions/{session_id}", headers=headers_b) + assert resp.status_code == 404