feat(l1): intake dispatch via match_or_build + next-node + escalations endpoints

- /intake now runs match_or_build (matched/suggest/out_of_scope/build); build
  seeds the classified category as a hidden meta walked_path entry, matched starts
  a flow session, suggest/out_of_scope return prompt data with no session.
- New POST /sessions/{id}/next-node (threads node_text to advance_ai_build) and
  GET /escalations (engineer-or-above) for the handoff queue.
- New IntakeResponse(outcome=...)/NextNodeRequest/NextNodeResponse schemas and
  require_account_owner_or_admin dep.
- Reconcile Phase-1 intake tests to the new contract (mock match_or_build); add
  test_l1_api_ai_build.py covering build/out_of_scope/suggest/next-node/escalations.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-30 03:54:23 -04:00
parent af3b1c0123
commit 633a208742
4 changed files with 245 additions and 13 deletions

View File

@@ -276,6 +276,20 @@ async def require_account_owner(
)
async def require_account_owner_or_admin(
current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
"""Require account owner or account-admin (blocks engineers); super_admin bypass."""
if current_user.is_super_admin:
return current_user
if current_user.account_role in ("owner", "admin"):
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account owner or admin access required",
)
def get_service_account_id(request: Request) -> Optional[UUID]:
"""Return the cached ResolutionFlow service account UUID from app.state.

View File

@@ -9,7 +9,7 @@ from fastapi import APIRouter, Depends, HTTPException, status as http_status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_db, require_l1_or_coverage
from app.api.deps import get_db, require_engineer_or_admin, require_l1_or_coverage
from app.models.l1_walk_session import L1WalkSession
from app.models.user import User
from app.schemas.l1 import (
@@ -17,13 +17,15 @@ from app.schemas.l1 import (
EscalateWithoutWalkRequest,
IntakeRequest,
IntakeResponse,
NextNodeRequest,
NextNodeResponse,
NotesRequest,
QueueRow,
ResolveRequest,
StepRequest,
WalkSessionResponse,
)
from app.services import internal_ticket_service, l1_session_service
from app.services import internal_ticket_service, l1_session_service, match_or_build
router = APIRouter(prefix="/l1", tags=["l1"])
@@ -72,11 +74,34 @@ async def intake(
db: Annotated[AsyncSession, Depends(get_db)],
user: Annotated[User, Depends(require_l1_or_coverage)],
):
"""L1 intake: creates an internal ticket and starts a walk session.
"""L1 intake (Phase 2A): match a published flow, else gate + build.
Phase 1: internal-ticket only (PSA support follows in Phase 2 escalation polish).
If `flow_id` is provided, starts a flow session; otherwise an adhoc session.
Runs the match_or_build orchestrator. Outcomes:
- matched → create ticket + flow session, walk the published flow.
- build → create ticket + ai_build session (category persisted as a hidden
meta entry on walked_path for /next-node), walk an AI-built tree.
- suggest → near-miss prompt; no session created.
- out_of_scope → category disabled/unknown; no session created.
"""
result = await match_or_build.match_or_build(
user.account_id,
payload.problem_statement,
None,
ticket_ref="",
db=db,
force_build=payload.force_build,
)
outcome = result["outcome"]
if outcome in ("suggest", "out_of_scope"):
await db.commit()
return IntakeResponse(
outcome=outcome,
near_miss=result.get("near_miss"),
category=result.get("category"),
)
# matched OR build → create a ticket and a session
ticket = await internal_ticket_service.create_ticket(
db,
account_id=user.account_id,
@@ -85,29 +110,38 @@ async def intake(
customer_name=payload.customer_name,
customer_contact=payload.customer_contact,
)
if payload.flow_id is not None:
if outcome == "matched":
session = await l1_session_service.start_flow_session(
db,
account_id=user.account_id,
user=user,
flow_id=payload.flow_id,
flow_id=UUID(result["flow_id"]),
ticket_id=str(ticket.id),
ticket_kind="internal",
)
else:
session = await l1_session_service.start_adhoc_session(
else: # build
session = await l1_session_service.start_ai_build_session(
db,
account_id=user.account_id,
user=user,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
# Persist the classified category as a hidden meta entry so /next-node
# can recover it (no dedicated column; ai_tree_builder skips meta entries).
session.walked_path = [
{"node_type": "meta", "category": result.get("category", "unknown")}
]
await db.flush()
await db.commit()
return IntakeResponse(
outcome=outcome,
session_id=session.id,
session_kind=session.session_kind,
ticket_id=str(ticket.id),
ticket_kind="internal",
flow_id=UUID(result["flow_id"]) if outcome == "matched" else None,
)

View File

@@ -11,13 +11,31 @@ class IntakeRequest(BaseModel):
customer_name: Optional[str] = None
customer_contact: Optional[str] = None
flow_id: Optional[UUID] = None
force_build: bool = False
class IntakeResponse(BaseModel):
session_id: UUID
session_kind: Literal["flow", "proposal", "adhoc"]
ticket_id: str
ticket_kind: Literal["psa", "internal"]
outcome: Literal["matched", "suggest", "out_of_scope", "build"]
session_id: Optional[UUID] = None
session_kind: Optional[Literal["flow", "proposal", "adhoc", "ai_build"]] = None
ticket_id: Optional[str] = None
ticket_kind: Optional[str] = None
flow_id: Optional[UUID] = None # for 'matched'
near_miss: Optional[dict] = None # for 'suggest'
category: Optional[str] = None # for 'out_of_scope'
class NextNodeRequest(BaseModel):
node_id: Optional[str] = None
node_text: Optional[str] = None # rendered text of the node being answered (carry-forward Task 8)
answer: Optional[str] = None # 'yes' | 'no' for questions
acknowledged: Optional[bool] = None
note: Optional[str] = None
class NextNodeResponse(BaseModel):
node: dict
session_status: str
class StepRequest(BaseModel):

View File

@@ -0,0 +1,166 @@
"""Tests for the Phase 2A L1 AI-build API surface.
Covers intake dispatch (match_or_build outcomes), the next-node endpoint, and
the engineer escalations list. The orchestrator and node generator are mocked —
this exercises the endpoint wiring, not the AI.
"""
import uuid
from unittest.mock import AsyncMock, patch
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import create_access_token, get_password_hash
from app.models.account import Account
from app.models.user import User
async def _seed_user(test_db: AsyncSession, *, account_role="l1_tech"):
account = Account(name="L1 Co", display_code=f"L1{uuid.uuid4().hex[:6].upper()}")
test_db.add(account)
await test_db.flush()
user = User(
account_id=account.id,
email=f"l1-{uuid.uuid4().hex[:8]}@example.com",
hashed_password=get_password_hash("pw"),
full_name="L1 Tech",
account_role=account_role,
is_active=True,
is_verified=True,
)
test_db.add(user)
await test_db.flush()
return account, user
def _auth(user: User) -> dict:
return {"Authorization": f"Bearer {create_access_token(subject=str(user.id))}"}
@pytest.mark.asyncio
async def test_intake_build_outcome_creates_ai_build_session(
client: AsyncClient, test_db: AsyncSession
):
"""intake → match_or_build returns 'build' → an ai_build session is created."""
account, user = await _seed_user(test_db)
with patch(
"app.api.endpoints.l1.match_or_build.match_or_build",
new=AsyncMock(return_value={"outcome": "build", "session_kind": "ai_build",
"category": "printer"}),
):
r = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "printer jam"},
headers=_auth(user),
)
assert r.status_code == 200
body = r.json()
assert body["outcome"] == "build"
assert body["session_kind"] == "ai_build"
assert body["session_id"]
@pytest.mark.asyncio
async def test_intake_out_of_scope(client: AsyncClient, test_db: AsyncSession):
"""intake → 'out_of_scope' → no session, surfaced to the caller."""
account, user = await _seed_user(test_db)
with patch(
"app.api.endpoints.l1.match_or_build.match_or_build",
new=AsyncMock(return_value={"outcome": "out_of_scope", "category": "unknown"}),
):
r = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "weird thing"},
headers=_auth(user),
)
assert r.status_code == 200
body = r.json()
assert body["outcome"] == "out_of_scope"
assert body["session_id"] is None
@pytest.mark.asyncio
async def test_intake_suggest_returns_near_miss(client: AsyncClient, test_db: AsyncSession):
"""intake → 'suggest' → near_miss prompt, no session."""
account, user = await _seed_user(test_db)
near = {"flow_id": str(uuid.uuid4()), "flow_name": "VPN", "score": 0.66}
with patch(
"app.api.endpoints.l1.match_or_build.match_or_build",
new=AsyncMock(return_value={"outcome": "suggest", "near_miss": near, "can_build": True}),
):
r = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "vpn"},
headers=_auth(user),
)
assert r.status_code == 200
body = r.json()
assert body["outcome"] == "suggest"
assert body["near_miss"]["flow_name"] == "VPN"
@pytest.mark.asyncio
async def test_next_node_returns_generated_node(client: AsyncClient, test_db: AsyncSession):
"""After a build intake, /next-node returns the node from advance_ai_build."""
account, user = await _seed_user(test_db)
with patch(
"app.api.endpoints.l1.match_or_build.match_or_build",
new=AsyncMock(return_value={"outcome": "build", "session_kind": "ai_build",
"category": "printer"}),
):
r = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "printer jam"},
headers=_auth(user),
)
sid = r.json()["session_id"]
with patch(
"app.api.endpoints.l1.l1_session_service.advance_ai_build",
new=AsyncMock(return_value={"node_type": "question", "id": "n1", "text": "Powered on?"}),
):
r2 = await client.post(
f"/api/v1/l1/sessions/{sid}/next-node",
json={},
headers=_auth(user),
)
assert r2.status_code == 200
assert r2.json()["node"]["node_type"] == "question"
@pytest.mark.asyncio
async def test_escalations_lists_escalated_sessions_for_engineer(
client: AsyncClient, test_db: AsyncSession
):
"""GET /l1/escalations returns escalated L1 sessions; requires engineer-or-above."""
from datetime import datetime, timezone
from app.models.l1_walk_session import L1WalkSession
account, engineer = await _seed_user(test_db, account_role="engineer")
now = datetime.now(timezone.utc)
sess = L1WalkSession(
account_id=account.id,
created_by_user_id=engineer.id,
ticket_id="t-esc",
ticket_kind="internal",
session_kind="ai_build",
status="escalated",
started_at=now,
last_step_at=now,
escalated_at=now,
)
test_db.add(sess)
await test_db.flush()
r = await client.get("/api/v1/l1/escalations", headers=_auth(engineer))
assert r.status_code == 200
rows = r.json()
assert any(row["id"] == str(sess.id) for row in rows)
@pytest.mark.asyncio
async def test_escalations_forbidden_for_l1_tech(client: AsyncClient, test_db: AsyncSession):
"""An l1_tech (not engineer-or-above) is rejected from the escalations queue."""
account, l1 = await _seed_user(test_db, account_role="l1_tech")
r = await client.get("/api/v1/l1/escalations", headers=_auth(l1))
assert r.status_code == 403