diff --git a/backend/tests/test_l1_ai_build_flow.py b/backend/tests/test_l1_ai_build_flow.py new file mode 100644 index 00000000..6bb24b5b --- /dev/null +++ b/backend/tests/test_l1_ai_build_flow.py @@ -0,0 +1,161 @@ +"""End-to-end backend integration test for the L1 AI-build flow (Phase 2A). + +Drives the real endpoint + service path — intake (build) → next-node walk → +resolve — and asserts an outcome-validated FlowProposal is captured. Only the AI +boundary is mocked: match_or_build's outcome and ai_tree_builder.generate_next_node. +A second test drives intake → escalate and asserts the engineer notification fires +and the session surfaces in GET /l1/escalations. +""" +import uuid +from unittest.mock import AsyncMock, patch + +import pytest +from httpx import AsyncClient +from sqlalchemy import delete, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.flow_proposal import FlowProposal +from app.models.subscription import Subscription +from app.models.user import User + + +async def _register(client: AsyncClient, *, email: str) -> dict: + resp = await client.post( + "/api/v1/auth/register", + json={"email": email, "password": "TestPassword123!", "name": "Test User"}, + ) + assert resp.status_code in (200, 201), resp.text + return resp.json() + + +async def _login(client: AsyncClient, *, email: str) -> dict: + resp = await client.post( + "/api/v1/auth/login/json", + json={"email": email, "password": "TestPassword123!"}, + ) + assert resp.status_code == 200, resp.text + return {"Authorization": f"Bearer {resp.json()['access_token']}"} + + +async def _ensure_subscription(db: AsyncSession, account_id: uuid.UUID) -> None: + await db.execute(delete(Subscription).where(Subscription.account_id == account_id)) + db.add(Subscription(account_id=account_id, plan="pro", status="active")) + await db.commit() + + +async def _make_user(client: AsyncClient, db: AsyncSession, *, email: str, account_role: str) -> dict: + data = await _register(client, email=email) + uid = uuid.UUID(data["id"]) + acct_id = uuid.UUID(data["account_id"]) + user = (await db.execute(select(User).where(User.id == uid))).scalar_one() + user.account_role = account_role + await db.commit() + await _ensure_subscription(db, acct_id) + headers = await _login(client, email=email) + return {"headers": headers, "account_id": acct_id, "user_id": uid} + + +@pytest.mark.asyncio +async def test_intake_build_walk_resolve_creates_proposal(client: AsyncClient, test_db: AsyncSession): + """intake(build) → answer a question node → reach resolved → resolve → proposal.""" + info = await _make_user(client, test_db, email="flow_resolve@example.com", account_role="l1_tech") + headers = info["headers"] + + # 1. force a build outcome at intake (real ticket + ai_build session created) + with patch( + "app.api.endpoints.l1.match_or_build.match_or_build", + new=AsyncMock(return_value={"outcome": "build", "session_kind": "ai_build", + "category": "printer"}), + ): + r = await client.post("/api/v1/l1/intake", + json={"problem_statement": "printer jam"}, headers=headers) + assert r.status_code == 200, r.text + sid = r.json()["session_id"] + + # 2. drive next-node deterministically: first a question, then a resolved terminal + seq = iter([ + {"node_type": "question", "id": "n1", "text": "Is the printer powered on?"}, + {"node_type": "resolved", "id": "n2", "text": "Printer prints a test page."}, + ]) + + async def fake_next(problem_text, category, walked_path): + return next(seq) + + with patch("app.services.ai_tree_builder.generate_next_node", new=fake_next): + r1 = await client.post(f"/api/v1/l1/sessions/{sid}/next-node", + json={}, headers=headers) + assert r1.status_code == 200, r1.text + assert r1.json()["node"]["node_type"] == "question" + + r2 = await client.post( + f"/api/v1/l1/sessions/{sid}/next-node", + json={"node_id": "n1", "node_text": "Is the printer powered on?", "answer": "no"}, + headers=headers, + ) + assert r2.status_code == 200, r2.text + assert r2.json()["node"]["node_type"] == "resolved" + + # 3. resolve helpful → outcome-validated proposal captured + rr = await client.post(f"/api/v1/l1/sessions/{sid}/resolve", + json={"helpful": True, "resolution_notes": "Powered it on."}, + headers=headers) + assert rr.status_code == 200, rr.text + assert rr.json()["status"] == "resolved" + + props = (await test_db.execute( + select(FlowProposal).where(FlowProposal.source == "ai_realtime_l1") + )).scalars().all() + assert len(props) == 1 + p = props[0] + assert p.validated_by_outcome is True + assert p.source_session_id is None + assert str(p.l1_session_id) == sid + # the walked question 'n1' becomes the captured tree root (meta entry skipped) + assert p.proposed_flow_data["tree_structure"]["id"] == "n1" + + +@pytest.mark.asyncio +async def test_intake_build_escalate_notifies_and_lists(client: AsyncClient, test_db: AsyncSession): + """intake(build) → escalate → notify fires for engineers → appears in GET /escalations.""" + # an engineer in the same account is the escalation recipient + the queue viewer + l1 = await _make_user(client, test_db, email="flow_esc_l1@example.com", account_role="l1_tech") + eng_data = await _register(client, email="flow_esc_eng@example.com") + eng_uid = uuid.UUID(eng_data["id"]) + # put the engineer in the L1 tech's account + eng = (await test_db.execute(select(User).where(User.id == eng_uid))).scalar_one() + eng.account_id = l1["account_id"] + eng.account_role = "engineer" + await test_db.commit() + eng_headers = await _login(client, email="flow_esc_eng@example.com") + + with patch( + "app.api.endpoints.l1.match_or_build.match_or_build", + new=AsyncMock(return_value={"outcome": "build", "session_kind": "ai_build", + "category": "printer"}), + ): + r = await client.post("/api/v1/l1/intake", + json={"problem_statement": "weird driver fault"}, + headers=l1["headers"]) + assert r.status_code == 200, r.text + sid = r.json()["session_id"] + + captured = {} + + async def fake_notify(event, account_id, payload, db, target_user_ids=None): + captured["event"] = event + captured["target_user_ids"] = target_user_ids + + with patch("app.services.l1_session_service.notify", new=fake_notify): + re_ = await client.post(f"/api/v1/l1/sessions/{sid}/escalate", + json={"reason_category": "exhausted_safe_steps", + "reason": "Beyond L1 scope"}, + headers=l1["headers"]) + assert re_.status_code == 200, re_.text + assert re_.json()["status"] == "escalated" + assert captured["event"] == "l1.session.escalated" + assert eng_uid in (captured["target_user_ids"] or []) + + # engineer sees it in the escalations queue + q = await client.get("/api/v1/l1/escalations", headers=eng_headers) + assert q.status_code == 200, q.text + assert any(row["id"] == sid for row in q.json())