feat(l1): AI decision-tree builder — Phase 2A #193

Merged
chihlasm merged 42 commits from feat/l1-ai-tree-builder-phase-2a into main 2026-06-12 23:41:16 +00:00
Showing only changes of commit 04b5511bdd - Show all commits

View File

@@ -0,0 +1,161 @@
"""End-to-end backend integration test for the L1 AI-build flow (Phase 2A).
Drives the real endpoint + service path — intake (build) → next-node walk →
resolve — and asserts an outcome-validated FlowProposal is captured. Only the AI
boundary is mocked: match_or_build's outcome and ai_tree_builder.generate_next_node.
A second test drives intake → escalate and asserts the engineer notification fires
and the session surfaces in GET /l1/escalations.
"""
import uuid
from unittest.mock import AsyncMock, patch
import pytest
from httpx import AsyncClient
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.flow_proposal import FlowProposal
from app.models.subscription import Subscription
from app.models.user import User
async def _register(client: AsyncClient, *, email: str) -> dict:
resp = await client.post(
"/api/v1/auth/register",
json={"email": email, "password": "TestPassword123!", "name": "Test User"},
)
assert resp.status_code in (200, 201), resp.text
return resp.json()
async def _login(client: AsyncClient, *, email: str) -> dict:
resp = await client.post(
"/api/v1/auth/login/json",
json={"email": email, "password": "TestPassword123!"},
)
assert resp.status_code == 200, resp.text
return {"Authorization": f"Bearer {resp.json()['access_token']}"}
async def _ensure_subscription(db: AsyncSession, account_id: uuid.UUID) -> None:
await db.execute(delete(Subscription).where(Subscription.account_id == account_id))
db.add(Subscription(account_id=account_id, plan="pro", status="active"))
await db.commit()
async def _make_user(client: AsyncClient, db: AsyncSession, *, email: str, account_role: str) -> dict:
data = await _register(client, email=email)
uid = uuid.UUID(data["id"])
acct_id = uuid.UUID(data["account_id"])
user = (await db.execute(select(User).where(User.id == uid))).scalar_one()
user.account_role = account_role
await db.commit()
await _ensure_subscription(db, acct_id)
headers = await _login(client, email=email)
return {"headers": headers, "account_id": acct_id, "user_id": uid}
@pytest.mark.asyncio
async def test_intake_build_walk_resolve_creates_proposal(client: AsyncClient, test_db: AsyncSession):
"""intake(build) → answer a question node → reach resolved → resolve → proposal."""
info = await _make_user(client, test_db, email="flow_resolve@example.com", account_role="l1_tech")
headers = info["headers"]
# 1. force a build outcome at intake (real ticket + ai_build session created)
with patch(
"app.api.endpoints.l1.match_or_build.match_or_build",
new=AsyncMock(return_value={"outcome": "build", "session_kind": "ai_build",
"category": "printer"}),
):
r = await client.post("/api/v1/l1/intake",
json={"problem_statement": "printer jam"}, headers=headers)
assert r.status_code == 200, r.text
sid = r.json()["session_id"]
# 2. drive next-node deterministically: first a question, then a resolved terminal
seq = iter([
{"node_type": "question", "id": "n1", "text": "Is the printer powered on?"},
{"node_type": "resolved", "id": "n2", "text": "Printer prints a test page."},
])
async def fake_next(problem_text, category, walked_path):
return next(seq)
with patch("app.services.ai_tree_builder.generate_next_node", new=fake_next):
r1 = await client.post(f"/api/v1/l1/sessions/{sid}/next-node",
json={}, headers=headers)
assert r1.status_code == 200, r1.text
assert r1.json()["node"]["node_type"] == "question"
r2 = await client.post(
f"/api/v1/l1/sessions/{sid}/next-node",
json={"node_id": "n1", "node_text": "Is the printer powered on?", "answer": "no"},
headers=headers,
)
assert r2.status_code == 200, r2.text
assert r2.json()["node"]["node_type"] == "resolved"
# 3. resolve helpful → outcome-validated proposal captured
rr = await client.post(f"/api/v1/l1/sessions/{sid}/resolve",
json={"helpful": True, "resolution_notes": "Powered it on."},
headers=headers)
assert rr.status_code == 200, rr.text
assert rr.json()["status"] == "resolved"
props = (await test_db.execute(
select(FlowProposal).where(FlowProposal.source == "ai_realtime_l1")
)).scalars().all()
assert len(props) == 1
p = props[0]
assert p.validated_by_outcome is True
assert p.source_session_id is None
assert str(p.l1_session_id) == sid
# the walked question 'n1' becomes the captured tree root (meta entry skipped)
assert p.proposed_flow_data["tree_structure"]["id"] == "n1"
@pytest.mark.asyncio
async def test_intake_build_escalate_notifies_and_lists(client: AsyncClient, test_db: AsyncSession):
"""intake(build) → escalate → notify fires for engineers → appears in GET /escalations."""
# an engineer in the same account is the escalation recipient + the queue viewer
l1 = await _make_user(client, test_db, email="flow_esc_l1@example.com", account_role="l1_tech")
eng_data = await _register(client, email="flow_esc_eng@example.com")
eng_uid = uuid.UUID(eng_data["id"])
# put the engineer in the L1 tech's account
eng = (await test_db.execute(select(User).where(User.id == eng_uid))).scalar_one()
eng.account_id = l1["account_id"]
eng.account_role = "engineer"
await test_db.commit()
eng_headers = await _login(client, email="flow_esc_eng@example.com")
with patch(
"app.api.endpoints.l1.match_or_build.match_or_build",
new=AsyncMock(return_value={"outcome": "build", "session_kind": "ai_build",
"category": "printer"}),
):
r = await client.post("/api/v1/l1/intake",
json={"problem_statement": "weird driver fault"},
headers=l1["headers"])
assert r.status_code == 200, r.text
sid = r.json()["session_id"]
captured = {}
async def fake_notify(event, account_id, payload, db, target_user_ids=None):
captured["event"] = event
captured["target_user_ids"] = target_user_ids
with patch("app.services.l1_session_service.notify", new=fake_notify):
re_ = await client.post(f"/api/v1/l1/sessions/{sid}/escalate",
json={"reason_category": "exhausted_safe_steps",
"reason": "Beyond L1 scope"},
headers=l1["headers"])
assert re_.status_code == 200, re_.text
assert re_.json()["status"] == "escalated"
assert captured["event"] == "l1.session.escalated"
assert eng_uid in (captured["target_user_ids"] or [])
# engineer sees it in the escalations queue
q = await client.get("/api/v1/l1/escalations", headers=eng_headers)
assert q.status_code == 200, q.text
assert any(row["id"] == sid for row in q.json())