diff --git a/docs/superpowers/plans/2026-05-29-l1-ai-tree-builder-phase-2a.md b/docs/superpowers/plans/2026-05-29-l1-ai-tree-builder-phase-2a.md new file mode 100644 index 00000000..af4ce758 --- /dev/null +++ b/docs/superpowers/plans/2026-05-29-l1-ai-tree-builder-phase-2a.md @@ -0,0 +1,1966 @@ +# L1 AI Decision-Tree Builder — Phase 2A Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** When an L1 tech describes a problem with no matching published flow, build a yes/no decision tree in real time from generic L1 knowledge (constrained + escalate-early), walk it node-by-node, capture resolved trees as outcome-validated drafts, and route escalations to engineers. + +**Architecture:** Approach C — a dedicated `ai_tree_builder` service for constrained node-by-node generation, an `match_or_build` orchestrator that matches published flows first and gates generic building behind admin-configured categories, reusing `flow_matching_engine` (match), `knowledge_flywheel`/`FlowProposal` (capture), and `notification_service` (escalation). + +**Tech Stack:** Python 3.12 · FastAPI · SQLAlchemy 2.0 async · Alembic · PostgreSQL 16 (RLS) · React 19 + Vite + TS + Tailwind v4 · Playwright. + +**Source spec:** [`docs/superpowers/specs/2026-05-29-l1-ai-tree-builder-phase-2a-design.md`](../specs/2026-05-29-l1-ai-tree-builder-phase-2a-design.md) + +**Conventions (read before starting):** +- Migrations are **hand-written**: `alembic revision -m "msg"` then edit `upgrade()`/`downgrade()` by hand. **Never** `--autogenerate`, **never** `--rev-id`. Current head is `b3358ba0e48c`; each new migration chains from the previous. +- Backend tests run in the container: `docker exec resolutionflow_backend pytest -v`. The suite uses pytest-xdist in CI; single-module runs work locally. +- JSONB columns require **reassignment** (`x = [*x, item]`), not in-place mutation (see `record_step`). +- Commit after each task with the message shown. Git trailer: `Co-Authored-By: Claude Opus 4.7 `. +- Model tiers: `settings.get_model_for_action(key)` → `ACTION_MODEL_MAP[key]` → `AI_MODEL_TIERS[tier]`. `fast`=Haiku, `standard`=Sonnet. + +--- + +## File Structure + +**New backend files:** +- `backend/app/services/ai_tree_builder.py` — node Pydantic models, constrained system prompt, `generate_next_node`, per-node validation, `normalize_walked_path`. +- `backend/app/services/match_or_build.py` — orchestrator (`match_or_build`, `classify`). +- `backend/app/services/l1_category_service.py` — `DEFAULT_L1_CATEGORIES`, `HARD_FLOOR_FORBIDDEN`, get/set enabled categories. +- `backend/app/schemas/l1_categories.py` — category settings request/response. +- `backend/tests/test_ai_tree_builder.py`, `test_match_or_build.py`, `test_l1_category_service.py`, `test_l1_ai_build_flow.py` (integration). +- 3 Alembic migrations. + +**Modified backend files:** +- `backend/app/models/l1_walk_session.py` — `ai_build` in CHECK constraints. +- `backend/app/models/account.py` — `enabled_l1_categories` column. +- `backend/app/models/flow_proposal.py` — `l1_session_id`, nullable `source_session_id`, exactly-one CHECK. +- `backend/app/core/config.py` — `l1_realtime_build` + `l1_classify` action keys. +- `backend/app/api/deps.py` — `require_account_owner_or_admin`. +- `backend/app/api/endpoints/l1.py` — intake dispatch, `/sessions/{id}/next-node`, `/escalations`. +- `backend/app/api/endpoints/accounts.py` — `/me/l1-categories` GET/PATCH. +- `backend/app/schemas/l1.py` — `IntakeResponse.outcome`, `ai_build` literal, `NextNode*` schemas. +- `backend/app/services/l1_session_service.py` — `start_ai_build_session`, flywheel capture in `resolve`, engineer notification in `escalate`. +- `backend/app/services/notification_service.py` + `backend/app/schemas/notification.py` — `l1.session.escalated` event. + +**Modified frontend files:** +- `frontend/src/api/l1.ts`, `frontend/src/types/l1.ts` — next-node, outcome, categories. +- `frontend/src/pages/l1/L1Dashboard.tsx` — dispatch on intake `outcome`. +- `frontend/src/components/l1/L1WalkTreeVariant.tsx` — real node rendering + disclaimer. +- `frontend/src/components/flowpilot/ProposalDetail.tsx` — L1-sourced source block. +- `frontend/src/pages/EscalationQueuePage.tsx` — L1 escalations section. +- New: `frontend/src/pages/account/L1CategoriesPage.tsx` + route + nav. +- `frontend/e2e/l1-workspace.spec.ts` — AI build flow tests. + +--- + +## Task 1: Migration + model — `ai_build` session kind + +**Files:** +- Create: `backend/alembic/versions/_add_ai_build_session_kind.py` +- Modify: `backend/app/models/l1_walk_session.py:42-61` +- Test: `backend/tests/test_l1_ai_build_model.py` + +- [ ] **Step 1: Write the failing test** + +```python +# backend/tests/test_l1_ai_build_model.py +import uuid +import pytest +from app.models.l1_walk_session import L1WalkSession + + +def test_ai_build_session_kind_allowed_by_model_constraint(): + """ai_build is a valid session_kind with both target FKs null (like adhoc).""" + s = L1WalkSession( + account_id=uuid.uuid4(), + created_by_user_id=uuid.uuid4(), + ticket_id="t1", + ticket_kind="internal", + session_kind="ai_build", + ) + assert s.session_kind == "ai_build" + assert s.flow_id is None and s.flow_proposal_id is None +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_ai_build_model.py -v` +Expected: PASS at the Python level already (model has no enum on the attribute) — the real enforcement is the DB CHECK. If it errors on import, fix the import first. Treat this task's true verification as the migration roundtrip in Step 6. + +- [ ] **Step 3: Update model CHECK constraints** + +In `backend/app/models/l1_walk_session.py`, update the two constraints: + +```python + CheckConstraint( + "session_kind IN ('flow', 'proposal', 'adhoc', 'ai_build')", + name="ck_l1_walk_sessions_session_kind", + ), + CheckConstraint( + "(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) " + "OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) " + "OR (session_kind IN ('adhoc', 'ai_build') AND flow_id IS NULL AND flow_proposal_id IS NULL)", + name="ck_l1_walk_sessions_target_consistency", + ), +``` + +- [ ] **Step 4: Create the migration** + +Run: `docker exec resolutionflow_backend alembic revision -m "add ai_build session kind"` +Then edit the generated file so `down_revision` is the current head (`b3358ba0e48c` unless a later task already advanced it) and the body drops+recreates the two CHECK constraints: + +```python +def upgrade() -> None: + op.drop_constraint("ck_l1_walk_sessions_session_kind", "l1_walk_sessions", type_="check") + op.create_check_constraint( + "ck_l1_walk_sessions_session_kind", "l1_walk_sessions", + "session_kind IN ('flow', 'proposal', 'adhoc', 'ai_build')", + ) + op.drop_constraint("ck_l1_walk_sessions_target_consistency", "l1_walk_sessions", type_="check") + op.create_check_constraint( + "ck_l1_walk_sessions_target_consistency", "l1_walk_sessions", + "(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) " + "OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) " + "OR (session_kind IN ('adhoc', 'ai_build') AND flow_id IS NULL AND flow_proposal_id IS NULL)", + ) + + +def downgrade() -> None: + op.drop_constraint("ck_l1_walk_sessions_target_consistency", "l1_walk_sessions", type_="check") + op.create_check_constraint( + "ck_l1_walk_sessions_target_consistency", "l1_walk_sessions", + "(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) " + "OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) " + "OR (session_kind = 'adhoc' AND flow_id IS NULL AND flow_proposal_id IS NULL)", + ) + op.drop_constraint("ck_l1_walk_sessions_session_kind", "l1_walk_sessions", type_="check") + op.create_check_constraint( + "ck_l1_walk_sessions_session_kind", "l1_walk_sessions", + "session_kind IN ('flow', 'proposal', 'adhoc')", + ) +``` + +- [ ] **Step 5: Apply the migration** + +Run: `docker exec resolutionflow_backend alembic upgrade head` +Expected: `Running upgrade b3358ba0e48c -> , add ai_build session kind` + +- [ ] **Step 6: Verify roundtrip + insert an ai_build row** + +Run: +```bash +docker exec resolutionflow_postgres psql -U postgres -d resolutionflow -c \ +"INSERT INTO l1_walk_sessions (id, account_id, created_by_user_id, ticket_id, ticket_kind, session_kind, walked_path, walk_notes, status, started_at, last_step_at) \ + SELECT gen_random_uuid(), a.id, u.id, 't-smoke', 'internal', 'ai_build', '[]'::jsonb, '[]'::jsonb, 'active', now(), now() \ + FROM accounts a JOIN users u ON u.account_id=a.id LIMIT 1 RETURNING id;" +``` +Expected: one row id returned (no CHECK violation). Then clean up: `DELETE FROM l1_walk_sessions WHERE ticket_id='t-smoke';` + +- [ ] **Step 7: Commit** + +```bash +git add backend/app/models/l1_walk_session.py backend/alembic/versions/ backend/tests/test_l1_ai_build_model.py +git commit -m "feat(l1): add ai_build session kind (model + migration)" +``` + +--- + +## Task 2: Migration + model — account `enabled_l1_categories` + +**Files:** +- Create: `backend/alembic/versions/_add_enabled_l1_categories.py` +- Modify: `backend/app/models/account.py` +- Test: `backend/tests/test_account_l1_categories_column.py` + +- [ ] **Step 1: Write the failing test** + +```python +# backend/tests/test_account_l1_categories_column.py +from app.models.account import Account + + +def test_account_has_enabled_l1_categories_default(): + a = Account(name="Acme", display_code="ABC12345") + # Column default is applied at flush; attribute may be None pre-flush. + assert hasattr(a, "enabled_l1_categories") +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker exec resolutionflow_backend pytest tests/test_account_l1_categories_column.py -v` +Expected: FAIL — `AttributeError`/no such attribute. + +- [ ] **Step 3: Add the model column** + +In `backend/app/models/account.py`, after `sso_config` (or near other JSONB columns), add: + +```python + enabled_l1_categories: Mapped[list[str]] = mapped_column( + JSONB(), nullable=False, + server_default=sa_text( + "'[\"password_reset\",\"account_lockout\",\"printer\"," + "\"email_outlook_client\",\"wifi_network_basics\",\"vpn_connect\"," + "\"teams_zoom_av\",\"browser_cache_cookies\",\"peripheral_reconnect\"," + "\"os_restart_update\"]'::jsonb" + ), + ) +``` + +Ensure imports exist at top of file: `from sqlalchemy.dialects.postgresql import JSONB` and `from sqlalchemy import text as sa_text` (add if missing). + +- [ ] **Step 4: Run test to verify it passes** + +Run: `docker exec resolutionflow_backend pytest tests/test_account_l1_categories_column.py -v` +Expected: PASS. + +- [ ] **Step 5: Create + apply migration** + +Run: `docker exec resolutionflow_backend alembic revision -m "add enabled_l1_categories to accounts"` +Edit body: + +```python +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +_DEFAULT = ('["password_reset","account_lockout","printer","email_outlook_client",' + '"wifi_network_basics","vpn_connect","teams_zoom_av","browser_cache_cookies",' + '"peripheral_reconnect","os_restart_update"]') + +def upgrade() -> None: + op.add_column("accounts", sa.Column( + "enabled_l1_categories", postgresql.JSONB(), nullable=False, + server_default=sa.text(f"'{_DEFAULT}'::jsonb"), + )) + +def downgrade() -> None: + op.drop_column("accounts", "enabled_l1_categories") +``` + +Run: `docker exec resolutionflow_backend alembic upgrade head` +Expected: upgrade applied; existing accounts backfill to the default list. + +- [ ] **Step 6: Commit** + +```bash +git add backend/app/models/account.py backend/alembic/versions/ backend/tests/test_account_l1_categories_column.py +git commit -m "feat(l1): add accounts.enabled_l1_categories with default allowlist" +``` + +--- + +## Task 3: Migration + model — FlowProposal L1 source linkage (Finding 1) + +**Files:** +- Create: `backend/alembic/versions/_flow_proposal_l1_source.py` +- Modify: `backend/app/models/flow_proposal.py:42-82` +- Test: `backend/tests/test_flow_proposal_l1_source.py` + +- [ ] **Step 1: Write the failing test** + +```python +# backend/tests/test_flow_proposal_l1_source.py +import uuid +from app.models.flow_proposal import FlowProposal + + +def test_flow_proposal_accepts_l1_session_id_without_source_session(): + p = FlowProposal( + account_id=uuid.uuid4(), + l1_session_id=uuid.uuid4(), + source_session_id=None, + proposal_type="new_flow", + title="AI L1 draft", + proposed_flow_data={"tree_structure": {"id": "root"}}, + source="ai_realtime_l1", + status="pending", + ) + assert p.l1_session_id is not None and p.source_session_id is None +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker exec resolutionflow_backend pytest tests/test_flow_proposal_l1_source.py -v` +Expected: FAIL — `TypeError`/unexpected kwarg `l1_session_id`. + +- [ ] **Step 3: Update the model** + +In `backend/app/models/flow_proposal.py`: make `source_session_id` nullable, add `l1_session_id`, add the exactly-one CHECK in `__table_args__`. + +```python + source_session_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("ai_sessions.id", ondelete="CASCADE"), + nullable=True, + index=True, + ) + l1_session_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("l1_walk_sessions.id", ondelete="SET NULL"), + nullable=True, + index=True, + ) +``` + +Add to `__table_args__` (alongside the existing source/linked_ticket checks): + +```python + CheckConstraint( + "(source_session_id IS NOT NULL) <> (l1_session_id IS NOT NULL)", + name="ck_flow_proposals_exactly_one_source", + ), +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `docker exec resolutionflow_backend pytest tests/test_flow_proposal_l1_source.py -v` +Expected: PASS. + +- [ ] **Step 5: Create + apply migration** + +Run: `docker exec resolutionflow_backend alembic revision -m "flow_proposal l1 source linkage"` +Edit body: + +```python +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +def upgrade() -> None: + op.add_column("flow_proposals", sa.Column( + "l1_session_id", postgresql.UUID(as_uuid=True), nullable=True)) + op.create_index("ix_flow_proposals_l1_session_id", "flow_proposals", ["l1_session_id"]) + op.create_foreign_key( + "fk_flow_proposals_l1_session_id", "flow_proposals", "l1_walk_sessions", + ["l1_session_id"], ["id"], ondelete="SET NULL") + op.alter_column("flow_proposals", "source_session_id", nullable=True) + op.create_check_constraint( + "ck_flow_proposals_exactly_one_source", "flow_proposals", + "(source_session_id IS NOT NULL) <> (l1_session_id IS NOT NULL)") + +def downgrade() -> None: + op.drop_constraint("ck_flow_proposals_exactly_one_source", "flow_proposals", type_="check") + op.alter_column("flow_proposals", "source_session_id", nullable=False) + op.drop_constraint("fk_flow_proposals_l1_session_id", "flow_proposals", type_="foreignkey") + op.drop_index("ix_flow_proposals_l1_session_id", "flow_proposals") + op.drop_column("flow_proposals", "l1_session_id") +``` + +Run: `docker exec resolutionflow_backend alembic upgrade head`. Expected: applied cleanly (no existing rows violate the new CHECK because all current proposals have a non-null `source_session_id` and null `l1_session_id`). + +- [ ] **Step 6: Verify Tree.source_session_id is nullable** + +Run: `docker exec resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d trees" | grep source_session_id` +Expected: shows the column **without** `not null`. If it shows `not null`, add `op.alter_column("trees","source_session_id",nullable=True)` to this migration's `upgrade()` and re-run. (L1-promoted trees leave it NULL.) + +- [ ] **Step 7: Commit** + +```bash +git add backend/app/models/flow_proposal.py backend/alembic/versions/ backend/tests/test_flow_proposal_l1_source.py +git commit -m "feat(l1): FlowProposal l1_session_id source linkage (nullable source_session_id + exactly-one check)" +``` + +--- + +## Task 4: Category service + model action keys + +**Files:** +- Create: `backend/app/services/l1_category_service.py` +- Modify: `backend/app/core/config.py` (ACTION_MODEL_MAP) +- Test: `backend/tests/test_l1_category_service.py` + +- [ ] **Step 1: Write the failing test** + +```python +# backend/tests/test_l1_category_service.py +from app.services.l1_category_service import ( + DEFAULT_L1_CATEGORIES, HARD_FLOOR_FORBIDDEN, is_category_enabled, +) + + +def test_defaults_and_hard_floor_present(): + assert "password_reset" in DEFAULT_L1_CATEGORIES + assert "registry_edit" in HARD_FLOOR_FORBIDDEN # representative forbidden action key + assert len(DEFAULT_L1_CATEGORIES) == 10 + + +def test_is_category_enabled(): + enabled = ["printer", "vpn_connect"] + assert is_category_enabled("printer", enabled) is True + assert is_category_enabled("registry_edit", enabled) is False + assert is_category_enabled("unknown", enabled) is False +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_category_service.py -v` +Expected: FAIL — module not found. + +- [ ] **Step 3: Implement the service** + +```python +# backend/app/services/l1_category_service.py +"""L1 category allowlist + the always-forbidden hard floor. + +DEFAULT_L1_CATEGORIES seeds an account's enabled set. HARD_FLOOR_FORBIDDEN is a +category-independent safety floor the AI tree builder must never emit and admins +cannot enable. See spec §5.1/§5.2. +""" +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.account import Account + +DEFAULT_L1_CATEGORIES: list[str] = [ + "password_reset", "account_lockout", "printer", "email_outlook_client", + "wifi_network_basics", "vpn_connect", "teams_zoom_av", + "browser_cache_cookies", "peripheral_reconnect", "os_restart_update", +] + +# Always-forbidden action classes (keys are stable identifiers; the human-readable +# phrasing lives in the builder system prompt). Admins cannot enable these. +HARD_FLOOR_FORBIDDEN: list[str] = [ + "registry_edit", "system_file_or_boot_edit", "data_or_disk_deletion", + "credential_or_mfa_change", "security_or_av_or_firewall_change", + "elevated_or_admin_script", "domain_dns_dhcp_change", + "server_or_production_config", "billing_or_license_change", +] + +# Substrings that, if present in a generated node's text, indicate a hard-floor +# violation. Used by ai_tree_builder per-node validation (defense in depth). +HARD_FLOOR_TEXT_PATTERNS: list[str] = [ + "regedit", "registry", "format ", "delete partition", "diskpart", + "reset password for", "disable firewall", "disable antivirus", "disable defender", + "run as administrator", "sudo ", "domain controller", "dns record", "dhcp scope", + "uninstall security", "bitlocker", +] + + +def is_category_enabled(category: str, enabled: list[str]) -> bool: + """A category is buildable only if explicitly enabled and not hard-floored.""" + if category in HARD_FLOOR_FORBIDDEN: + return False + return category in enabled + + +async def get_enabled_categories(account_id: UUID, db: AsyncSession) -> list[str]: + acct = (await db.execute(select(Account).where(Account.id == account_id))).scalar_one() + return list(acct.enabled_l1_categories or []) + + +async def set_enabled_categories( + account_id: UUID, categories: list[str], db: AsyncSession +) -> list[str]: + """Persist the enabled set, dropping anything unknown or hard-floored.""" + cleaned = [c for c in categories if c in DEFAULT_L1_CATEGORIES] + acct = (await db.execute(select(Account).where(Account.id == account_id))).scalar_one() + acct.enabled_l1_categories = cleaned + await db.flush() + return cleaned +``` + +- [ ] **Step 4: Add model action keys** + +In `backend/app/core/config.py`, add to `ACTION_MODEL_MAP`: + +```python + # L1 AI tree builder (Phase 2A): per-node generation is latency-sensitive + # on a live call → Sonnet; classification is a short label task → Haiku. + "l1_realtime_build": "standard", + "l1_classify": "fast", +``` + +- [ ] **Step 5: Run tests** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_category_service.py -v` +Expected: PASS (3 tests). + +- [ ] **Step 6: Commit** + +```bash +git add backend/app/services/l1_category_service.py backend/app/core/config.py backend/tests/test_l1_category_service.py +git commit -m "feat(l1): category service (defaults + hard floor) and AI action keys" +``` + +--- + +## Task 5: `ai_tree_builder` — node schema, prompt, generation, validation + +**Files:** +- Create: `backend/app/services/ai_tree_builder.py` +- Test: `backend/tests/test_ai_tree_builder.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# backend/tests/test_ai_tree_builder.py +import pytest +from app.services import ai_tree_builder as atb + + +def test_validate_node_rejects_hard_floor_text(): + node = {"node_type": "instruction", "id": "n1", "text": "Open regedit and change the key", "next": "generate"} + with pytest.raises(atb.UnsafeNodeError): + atb.validate_node(node) + + +def test_validate_node_accepts_safe_instruction(): + node = {"node_type": "instruction", "id": "n1", "text": "Restart the printer.", "next": "generate"} + assert atb.validate_node(node)["node_type"] == "instruction" + + +def test_depth_cap_forces_escalate(): + walked = [{"node_type": "question", "id": f"n{i}", "text": "?", "answer": "no"} for i in range(atb.MAX_DEPTH)] + node = atb.escalate_if_depth_exceeded(walked) + assert node is not None and node["node_type"] == "escalate" + + +def test_normalize_walked_path_builds_valid_tree(): + walked = [ + {"node_type": "question", "id": "n1", "text": "Powered on?", "answer": "no"}, + {"node_type": "instruction", "id": "n2", "text": "Power it on.", "answer": "ack"}, + {"node_type": "resolved", "id": "n3", "text": "Fixed."}, + ] + tree = atb.normalize_walked_path(walked) + assert isinstance(tree, dict) and tree.get("id") == "n1" + # untraversed 'yes' branch of n1 became a needs_review stub + assert any(n["node_type"] == "needs_review" for n in tree["nodes"].values()) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `docker exec resolutionflow_backend pytest tests/test_ai_tree_builder.py -v` +Expected: FAIL — module not found. + +- [ ] **Step 3: Implement the builder** + +```python +# backend/app/services/ai_tree_builder.py +"""Constrained, node-by-node L1 decision-tree generation (spec §4/§5/§6.1). + +Each call produces ONE node given the problem, category, and full walked path. +Generation is constrained to safe/reversible L1 steps and biased to escalate +early. normalize_walked_path() turns a resolved walk into a valid tree object +for flywheel capture. +""" +import json +import logging +from typing import Any, Optional + +from app.core.ai_provider import get_ai_provider +from app.core.config import settings +from app.services.l1_category_service import HARD_FLOOR_TEXT_PATTERNS +from app.services.llm_utils import parse_llm_json + +logger = logging.getLogger(__name__) + +MAX_DEPTH = 12 +VALID_NODE_TYPES = {"question", "instruction", "resolved", "escalate"} + + +class UnsafeNodeError(ValueError): + """Raised when a generated node violates the hard floor or is malformed.""" + + +SYSTEM_PROMPT = """\ +You are an L1 helpdesk troubleshooting guide builder. Given a problem and the +steps already tried, produce the SINGLE next node of a yes/no decision tree. + +HARD RULES: +- Only safe, reversible, observe-or-restart-class steps: checking status, toggling, + restarting, reconnecting, re-entering credentials the USER already knows. +- NEVER produce steps that: edit the registry/system files/boot config; delete or + format data/disks; change credentials/MFA/security/firewall/AV; run elevated or + admin scripts; touch domain controllers/DNS/DHCP or production servers; or have + billing/license impact. These are out of L1 scope. +- When you run out of safe in-scope steps, DO NOT GUESS. Emit an "escalate" node. + +Return ONLY a JSON object for ONE node, one of: +{"node_type":"question","text":""} +{"node_type":"instruction","text":""} +{"node_type":"resolved","text":""} +{"node_type":"escalate","reason_category":"exhausted_safe_steps","text":""} +No prose, no markdown fences. +""" + + +def _build_context(problem_text: str, category: str, walked_path: list[dict]) -> str: + lines = [f"PROBLEM: {problem_text}", f"CATEGORY: {category}", "STEPS SO FAR:"] + if not walked_path: + lines.append("(none yet — produce the first diagnostic question)") + for i, step in enumerate(walked_path, 1): + ans = step.get("answer") + suffix = f" -> {ans}" if ans else "" + lines.append(f"{i}. [{step.get('node_type','?')}] {step.get('text','')}{suffix}") + return "\n".join(lines) + + +def validate_node(node: dict[str, Any]) -> dict[str, Any]: + """Shape + hard-floor validation. Raises UnsafeNodeError on violation.""" + if not isinstance(node, dict) or node.get("node_type") not in VALID_NODE_TYPES: + raise UnsafeNodeError(f"invalid node_type: {node!r}") + text = (node.get("text") or "").lower() + for pat in HARD_FLOOR_TEXT_PATTERNS: + if pat in text: + raise UnsafeNodeError(f"hard-floor pattern '{pat}' in node text") + return node + + +def escalate_if_depth_exceeded(walked_path: list[dict]) -> Optional[dict[str, Any]]: + if len(walked_path) >= MAX_DEPTH: + return { + "node_type": "escalate", + "reason_category": "depth_cap", + "text": "Reached the L1 troubleshooting depth limit — escalating to engineering.", + } + return None + + +async def generate_next_node( + problem_text: str, category: str, walked_path: list[dict] +) -> dict[str, Any]: + """Generate + validate the next node. Regenerate once on failure, then escalate.""" + capped = escalate_if_depth_exceeded(walked_path) + if capped: + return capped + + provider = get_ai_provider(settings.get_model_for_action("l1_realtime_build")) + context = _build_context(problem_text, category, walked_path) + + for attempt in range(2): + try: + raw, _, _ = await provider.generate_json( + system_prompt=SYSTEM_PROMPT, + messages=[{"role": "user", "content": context}], + max_tokens=1024, + ) + node = parse_llm_json(raw) + return validate_node(node) + except (UnsafeNodeError, ValueError) as e: + logger.warning("ai_tree_builder node attempt %d failed: %s", attempt + 1, e) + continue + + return { + "node_type": "escalate", + "reason_category": "generation_failed", + "text": "Could not generate a safe next step — escalating to engineering.", + } + + +def normalize_walked_path(walked_path: list[dict]) -> dict[str, Any]: + """Turn a resolved walk into a valid troubleshooting tree (spec §6.1). + + Root = first node's id; question nodes' traversed branch points to the next + node, the untraversed branch to a needs_review stub; terminal node ends it. + Returns {id, nodes: {id: node}} — a dict with an id (passes the proposal + approval guard). + """ + nodes: dict[str, Any] = {} + if not walked_path: + root_id = "root" + nodes[root_id] = {"id": root_id, "node_type": "needs_review", + "text": "Empty walk — needs authoring."} + return {"id": root_id, "nodes": nodes} + + stub_seq = 0 + for i, step in enumerate(walked_path): + nid = step.get("id") or f"n{i+1}" + ntype = step.get("node_type", "question") + nxt = walked_path[i + 1].get("id", f"n{i+2}") if i + 1 < len(walked_path) else None + node: dict[str, Any] = {"id": nid, "node_type": ntype, "text": step.get("text", "")} + if ntype == "question": + answer = (step.get("answer") or "").lower() + stub_seq += 1 + stub_id = f"review-{stub_seq}" + nodes[stub_id] = {"id": stub_id, "node_type": "needs_review", + "text": "Branch not explored during the originating call."} + node["yes_next"] = nxt if answer == "yes" else stub_id + node["no_next"] = nxt if answer == "no" else stub_id + elif ntype == "instruction": + node["next"] = nxt + nodes[nid] = node + + return {"id": walked_path[0].get("id", "n1"), "nodes": nodes} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `docker exec resolutionflow_backend pytest tests/test_ai_tree_builder.py -v` +Expected: PASS (4 tests). `generate_next_node` is not unit-tested against a live model here; it is covered by the integration test in Task 11 with a mocked provider. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/services/ai_tree_builder.py backend/tests/test_ai_tree_builder.py +git commit -m "feat(l1): ai_tree_builder — constrained node generation, validation, normalize" +``` + +--- + +## Task 6: `match_or_build` orchestrator + `classify` + +**Files:** +- Create: `backend/app/services/match_or_build.py` +- Test: `backend/tests/test_match_or_build.py` + +- [ ] **Step 1: Write the failing tests** + +```python +# backend/tests/test_match_or_build.py +import uuid +import pytest +from unittest.mock import AsyncMock, patch +from app.services import match_or_build as mob + + +@pytest.mark.asyncio +async def test_match_wins_before_category_gate(): + """A strong published-flow match returns 'matched' even if category disabled.""" + with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock( + return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "VPN", "score": 0.9}])), \ + patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=[])): + res = await mob.match_or_build(uuid.uuid4(), "vpn down", None, "t1", db=AsyncMock(), force_build=False) + assert res["outcome"] == "matched" + assert res["session_kind"] == "flow" + + +@pytest.mark.asyncio +async def test_suggest_band(): + with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock( + return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.66}])): + res = await mob.match_or_build(uuid.uuid4(), "p", None, "t1", db=AsyncMock(), force_build=False) + assert res["outcome"] == "suggest" + + +@pytest.mark.asyncio +async def test_out_of_scope_when_category_disabled_on_build_path(): + with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(return_value=[])), \ + patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \ + patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["vpn_connect"])): + res = await mob.match_or_build(uuid.uuid4(), "printer jam", None, "t1", db=AsyncMock(), force_build=False) + assert res["outcome"] == "out_of_scope" + + +@pytest.mark.asyncio +async def test_build_when_enabled_and_no_match(): + with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(return_value=[])), \ + patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \ + patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["printer"])): + res = await mob.match_or_build(uuid.uuid4(), "printer jam", None, "t1", db=AsyncMock(), force_build=False) + assert res["outcome"] == "build" + assert res["session_kind"] == "ai_build" + + +@pytest.mark.asyncio +async def test_force_build_skips_match_but_still_gates(): + fm = AsyncMock(return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.99}]) + with patch.object(mob.flow_matching_engine, "find_matches", new=fm), \ + patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \ + patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["printer"])): + res = await mob.match_or_build(uuid.uuid4(), "p", None, "t1", db=AsyncMock(), force_build=True) + fm.assert_not_called() + assert res["outcome"] == "build" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `docker exec resolutionflow_backend pytest tests/test_match_or_build.py -v` +Expected: FAIL — module not found. + +- [ ] **Step 3: Implement the orchestrator** + +```python +# backend/app/services/match_or_build.py +"""Intake orchestrator: match published flows first, gate generic build behind +the account's enabled categories (spec §3). Match runs BEFORE the category gate +so an authored flow is never blocked by category settings (Finding 4).""" +import logging +from typing import Any, Optional +from uuid import UUID + +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.ai_provider import get_ai_provider +from app.core.config import settings +from app.services import flow_matching_engine +from app.services.l1_category_service import ( + DEFAULT_L1_CATEGORIES, get_enabled_categories, is_category_enabled, +) +from app.services.llm_utils import parse_llm_json + +logger = logging.getLogger(__name__) + +MATCH_THRESHOLD = 0.75 +SUGGEST_THRESHOLD = 0.60 + +_CLASSIFY_PROMPT = ( + "Classify the IT support problem into exactly one of these category keys, " + "or 'unknown'. Return JSON {\"category\":\"\"} only.\nKEYS: " + + ", ".join(DEFAULT_L1_CATEGORIES) +) + + +async def classify(problem_text: str) -> str: + """Map a problem to a category key via a short model call; keyword fallback.""" + try: + provider = get_ai_provider(settings.get_model_for_action("l1_classify")) + raw, _, _ = await provider.generate_json( + system_prompt=_CLASSIFY_PROMPT, + messages=[{"role": "user", "content": problem_text}], + max_tokens=64, + ) + cat = parse_llm_json(raw).get("category", "unknown") + return cat if cat in DEFAULT_L1_CATEGORIES else "unknown" + except Exception as e: # noqa: BLE001 — fall back, never hard-fail intake + logger.warning("classify model call failed (%s); keyword fallback", e) + text = problem_text.lower() + for cat in DEFAULT_L1_CATEGORIES: + if any(tok in text for tok in cat.split("_")): + return cat + return "unknown" + + +async def match_or_build( + account_id: UUID, + problem_text: str, + problem_domain: Optional[str], + ticket_ref: str, + *, + db: AsyncSession, + force_build: bool = False, +) -> dict[str, Any]: + if not force_build: + hits = await flow_matching_engine.find_matches( + problem_text, problem_domain, account_id, db) + best = max(hits, key=lambda h: h["score"], default=None) if hits else None + if best and best["score"] >= MATCH_THRESHOLD: + return {"outcome": "matched", "flow_id": best["tree_id"], "session_kind": "flow"} + if best and best["score"] >= SUGGEST_THRESHOLD: + return {"outcome": "suggest", + "near_miss": {"flow_id": best["tree_id"], "flow_name": best["tree_name"], + "score": best["score"]}, + "can_build": True} + + category = await classify(problem_text) + enabled = await get_enabled_categories(account_id, db) + if not is_category_enabled(category, enabled): + return {"outcome": "out_of_scope", "category": category} + return {"outcome": "build", "session_kind": "ai_build", "category": category} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `docker exec resolutionflow_backend pytest tests/test_match_or_build.py -v` +Expected: PASS (5 tests). + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/services/match_or_build.py backend/tests/test_match_or_build.py +git commit -m "feat(l1): match_or_build orchestrator + classify (match-first, gate-on-build)" +``` + +--- + +## Task 7: Session service — `start_ai_build_session` + +**Files:** +- Modify: `backend/app/services/l1_session_service.py` +- Test: `backend/tests/test_l1_session_service.py` (add) + +- [ ] **Step 1: Write the failing test** + +```python +# add to backend/tests/test_l1_session_service.py +@pytest.mark.asyncio +async def test_start_ai_build_session(db_session, l1_user): + from app.services import l1_session_service as svc + s = await svc.start_ai_build_session( + db_session, account_id=l1_user.account_id, user=l1_user, + ticket_id="t-ai", ticket_kind="internal", + ) + assert s.session_kind == "ai_build" + assert s.flow_id is None and s.flow_proposal_id is None + assert s.status == "active" +``` + +(Use the same fixtures the existing tests in this file use for `db_session`/`l1_user`.) + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_session_service.py::test_start_ai_build_session -v` +Expected: FAIL — `AttributeError: start_ai_build_session`. + +- [ ] **Step 3: Implement (mirror `start_adhoc_session`)** + +In `backend/app/services/l1_session_service.py`, after `start_adhoc_session`: + +```python +async def start_ai_build_session( + db: AsyncSession, + *, + account_id: UUID, + user: User, + ticket_id: str, + ticket_kind: str, +) -> L1WalkSession: + """Start an AI-built tree session (nodes generated on demand via next-node).""" + session = L1WalkSession( + account_id=account_id, + created_by_user_id=user.id, + acting_as=_resolve_acting_as(user), + ticket_id=ticket_id, + ticket_kind=ticket_kind, + session_kind="ai_build", + ) + db.add(session) + await db.flush() + return session +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_session_service.py::test_start_ai_build_session -v` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py +git commit -m "feat(l1): start_ai_build_session" +``` + +--- + +## Task 8: Session service — `advance_ai_build` (record answer + generate next node) + +**Files:** +- Modify: `backend/app/services/l1_session_service.py` +- Test: `backend/tests/test_l1_session_service.py` (add) + +- [ ] **Step 1: Write the failing test** + +```python +# add to backend/tests/test_l1_session_service.py +@pytest.mark.asyncio +async def test_advance_ai_build_appends_and_returns_next(db_session, l1_user, monkeypatch): + from app.services import l1_session_service as svc + from app.services import ai_tree_builder + s = await svc.start_ai_build_session( + db_session, account_id=l1_user.account_id, user=l1_user, + ticket_id="t-ai", ticket_kind="internal") + + async def fake_next(problem, category, walked): + return {"node_type": "resolved", "id": "done", "text": "Fixed."} + monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next) + + next_node = await svc.advance_ai_build( + db_session, session_id=s.id, problem_text="printer", category="printer", + node_id="n1", answer="no", note=None) + assert next_node["node_type"] == "resolved" + refreshed = await db_session.get(type(s), s.id) + assert len(refreshed.walked_path) == 1 + assert refreshed.walked_path[0]["answer"] == "no" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_session_service.py::test_advance_ai_build_appends_and_returns_next -v` +Expected: FAIL — `AttributeError: advance_ai_build`. + +- [ ] **Step 3: Implement** + +Add to `l1_session_service.py` (imports at top: `from app.services import ai_tree_builder`): + +```python +async def advance_ai_build( + db: AsyncSession, + *, + session_id: UUID, + problem_text: str, + category: str, + node_id: Optional[str] = None, + answer: Optional[str] = None, + note: Optional[str] = None, +) -> dict: + """Append the answered/acked node to walked_path, then generate the next node. + + On the first call (node_id is None) nothing is appended — we just generate the + first node. Returns the next node dict (caller persists current_node_id). + Raises ValueError on missing/inactive/non-ai_build session. + """ + session = await db.get(L1WalkSession, session_id) + if not session: + raise ValueError(f"L1WalkSession {session_id} not found") + if session.session_kind != "ai_build": + raise ValueError("advance_ai_build requires an ai_build session") + if session.status != "active": + raise ValueError(f"Session {session_id} is not active (status={session.status})") + + if node_id is not None: + # Find the text of the node being answered from current_node payload if + # the caller passed it via walk; otherwise store id+answer (text optional). + entry = {"node_type": "question" if answer in ("yes", "no") else "instruction", + "id": node_id, "answer": answer, "l1_note": note} + session.walked_path = [*session.walked_path, entry] + + next_node = await ai_tree_builder.generate_next_node( + problem_text, category, session.walked_path) + session.current_node_id = next_node.get("id") + session.last_step_at = datetime.now(timezone.utc) + await db.flush() + return next_node +``` + +> Note: the node `text` for traversed nodes is filled by the endpoint layer (Task 10) which knows the current node it served; `advance_ai_build` records the answer against the id. The endpoint passes the served node's text in `note`-adjacent payload if richer transcript is desired — keep Phase 2A minimal (id + answer). + +- [ ] **Step 4: Run test to verify it passes** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_session_service.py::test_advance_ai_build_appends_and_returns_next -v` +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py +git commit -m "feat(l1): advance_ai_build — record answer + generate next node" +``` + +--- + +## Task 9: Session service — flywheel capture on resolve + engineer notification on escalate + +**Files:** +- Modify: `backend/app/services/l1_session_service.py` (`resolve`, `escalate`) +- Modify: `backend/app/schemas/notification.py` (VALID_EVENTS) +- Modify: `backend/app/services/notification_service.py` (link + body) +- Test: `backend/tests/test_l1_session_service.py` (add) + +- [ ] **Step 1: Write the failing tests** + +```python +# add to backend/tests/test_l1_session_service.py +@pytest.mark.asyncio +async def test_resolve_ai_build_creates_outcome_validated_proposal(db_session, l1_user, monkeypatch): + from app.services import l1_session_service as svc + from app.models.flow_proposal import FlowProposal + from sqlalchemy import select + s = await svc.start_ai_build_session( + db_session, account_id=l1_user.account_id, user=l1_user, + ticket_id="t-ai", ticket_kind="internal") + s.walked_path = [ + {"node_type": "question", "id": "n1", "text": "On?", "answer": "no"}, + {"node_type": "resolved", "id": "n2", "text": "Fixed."}, + ] + await db_session.flush() + await svc.resolve(db_session, session_id=s.id, helpful=True, resolution_notes="ok") + props = (await db_session.execute( + select(FlowProposal).where(FlowProposal.l1_session_id == s.id))).scalars().all() + assert len(props) == 1 + assert props[0].source == "ai_realtime_l1" + assert props[0].validated_by_outcome is True + assert props[0].source_session_id is None + assert props[0].proposed_flow_data["tree_structure"]["id"] == "n1" + + +@pytest.mark.asyncio +async def test_escalate_notifies_engineers(db_session, l1_user, monkeypatch): + from app.services import l1_session_service as svc + calls = {} + async def fake_notify(event, account_id, payload, db, target_user_ids=None): + calls["event"] = event + calls["target_user_ids"] = target_user_ids + monkeypatch.setattr(svc, "notify", fake_notify) + s = await svc.start_ai_build_session( + db_session, account_id=l1_user.account_id, user=l1_user, + ticket_id="t-ai", ticket_kind="internal") + await svc.escalate(db_session, session_id=s.id, reason="stuck", reason_category="exhausted_safe_steps") + assert calls["event"] == "l1.session.escalated" + assert calls["target_user_ids"] is not None # explicit engineer recipients +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_session_service.py -k "ai_build_creates_outcome or notifies_engineers" -v` +Expected: FAIL — no proposal created / `notify` not called with the new event. + +- [ ] **Step 3a: Add notification event + link + body** + +`backend/app/schemas/notification.py` — add to `VALID_EVENTS`: +```python + "l1.session.escalated", +``` + +`backend/app/services/notification_service.py` — in `_build_notification_link` `links` dict add: +```python + "l1.session.escalated": "/escalations", +``` +and in the body-template builder (the `bodies` dict near `_build_notification_link`) add: +```python + "l1.session.escalated": "L1 escalated a ticket: {problem_summary}", +``` + +- [ ] **Step 3b: Flywheel capture in `resolve`** + +In `l1_session_service.resolve`, after the existing `proposal.validated_by_outcome` block and before the ticket close, add (imports: `from app.services import ai_tree_builder`, `from app.models.flow_proposal import FlowProposal` already present): + +```python + if helpful and session.session_kind == "ai_build" and session.walked_path: + tree_structure = ai_tree_builder.normalize_walked_path(session.walked_path) + db.add(FlowProposal( + account_id=session.account_id, + l1_session_id=session.id, + source_session_id=None, + proposal_type="new_flow", + title=(session.resolution_notes or "AI L1 resolution")[:255], + proposed_flow_data={"tree_structure": tree_structure, "match_keywords": []}, + source="ai_realtime_l1", + validated_by_outcome=True, + linked_ticket_id=session.ticket_id, + linked_ticket_kind=session.ticket_kind, + status="pending", + )) +``` + +> Dedupe via `_find_similar_pending_proposal` is a nice-to-have; Phase 2A inserts directly. If duplicate noise appears in QA, wire the existing dedupe helper here. + +- [ ] **Step 3c: Engineer notification in `escalate`** + +In `l1_session_service.escalate`, after `await log_audit(...)` and before the final `await db.flush()`, add (imports: `from app.services.notification_service import notify`, `from app.models.user import User`, `from sqlalchemy import select`): + +```python + eng_rows = await db.execute( + select(User.id).where( + User.account_id == session.account_id, + User.is_active.is_(True), + User.account_role.in_(("owner", "admin", "engineer")), + ) + ) + target_ids = [r[0] for r in eng_rows.all()] + await notify( + "l1.session.escalated", + session.account_id, + {"problem_summary": session.ticket_id, "session_id": str(session.id), + "reason_category": reason_category}, + db, + target_user_ids=target_ids, + ) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_session_service.py -k "ai_build_creates_outcome or notifies_engineers" -v` +Expected: PASS. + +- [ ] **Step 5: Run notification schema test** + +Run: `docker exec resolutionflow_backend pytest tests/ -k notification -v` +Expected: PASS (the new event is accepted by `validate_event_keys`). + +- [ ] **Step 6: Commit** + +```bash +git add backend/app/services/l1_session_service.py backend/app/schemas/notification.py backend/app/services/notification_service.py backend/tests/test_l1_session_service.py +git commit -m "feat(l1): flywheel capture on resolve + engineer notification on escalate" +``` + +--- + +## Task 10: API — intake dispatch, next-node, escalations; schemas + deps + +**Files:** +- Modify: `backend/app/schemas/l1.py` +- Modify: `backend/app/api/deps.py` +- Modify: `backend/app/api/endpoints/l1.py` +- Test: `backend/tests/test_l1_api_ai_build.py` + +- [ ] **Step 1: Write the failing test** + +```python +# backend/tests/test_l1_api_ai_build.py +import pytest +from unittest.mock import AsyncMock, patch + + +@pytest.mark.asyncio +async def test_intake_build_outcome_creates_ai_build_session(l1_client): + with patch("app.api.endpoints.l1.match_or_build.match_or_build", + new=AsyncMock(return_value={"outcome": "build", "session_kind": "ai_build", "category": "printer"})): + r = await l1_client.post("/api/v1/l1/intake", json={"problem_statement": "printer jam"}) + assert r.status_code == 200 + body = r.json() + assert body["outcome"] == "build" + assert body["session_kind"] == "ai_build" + assert body["session_id"] + + +@pytest.mark.asyncio +async def test_intake_out_of_scope(l1_client): + with patch("app.api.endpoints.l1.match_or_build.match_or_build", + new=AsyncMock(return_value={"outcome": "out_of_scope", "category": "unknown"})): + r = await l1_client.post("/api/v1/l1/intake", json={"problem_statement": "weird"}) + assert r.status_code == 200 + assert r.json()["outcome"] == "out_of_scope" +``` + +(Use the existing L1 client fixture pattern from `test_l1_api*`/conftest; `l1_client` is an authed AsyncClient for an `l1_tech` user.) + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_api_ai_build.py -v` +Expected: FAIL — `outcome` not in response / KeyError. + +- [ ] **Step 3a: Schemas** + +In `backend/app/schemas/l1.py`: +- Change `IntakeResponse.session_kind` literal to include `ai_build` and make `session_id`/`session_kind` optional (non-build outcomes have no session): + +```python +class IntakeResponse(BaseModel): + outcome: Literal["matched", "suggest", "out_of_scope", "build"] + session_id: Optional[UUID] = None + session_kind: Optional[Literal["flow", "proposal", "adhoc", "ai_build"]] = None + ticket_id: Optional[str] = None + ticket_kind: Optional[str] = None + flow_id: Optional[UUID] = None # for 'matched' + near_miss: Optional[dict] = None # for 'suggest' + category: Optional[str] = None # for 'out_of_scope' +``` +Add `NextNodeRequest` / `NextNodeResponse`: +```python +class NextNodeRequest(BaseModel): + node_id: Optional[str] = None + answer: Optional[str] = None # 'yes' | 'no' for questions + acknowledged: Optional[bool] = None + note: Optional[str] = None + +class NextNodeResponse(BaseModel): + node: dict + session_status: str +``` +Ensure `IntakeRequest` has an optional `force_build: bool = False` and `flow_id` is no longer required. + +- [ ] **Step 3b: Auth dep** + +In `backend/app/api/deps.py`, after `require_account_owner`: +```python +async def require_account_owner_or_admin( + current_user: Annotated[User, Depends(get_current_active_user)] +) -> User: + """Require account owner or account-admin (blocks engineers); super_admin bypass.""" + if current_user.is_super_admin: + return current_user + if current_user.account_role in ("owner", "admin"): + return current_user + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Account owner or admin access required", + ) +``` + +- [ ] **Step 3c: Rewrite intake + add next-node + escalations in `l1.py`** + +Replace the intake body to run the orchestrator (imports: `from app.services import match_or_build`): + +```python +@router.post("/intake", response_model=IntakeResponse) +async def intake( + payload: IntakeRequest, + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_l1_or_coverage)], +): + """L1 intake: match a published flow, else gate + build, else suggest/out-of-scope.""" + result = await match_or_build.match_or_build( + user.account_id, payload.problem_statement, None, ticket_ref="", + db=db, force_build=payload.force_build, + ) + outcome = result["outcome"] + + if outcome in ("suggest", "out_of_scope"): + await db.commit() + return IntakeResponse(outcome=outcome, near_miss=result.get("near_miss"), + category=result.get("category")) + + # matched OR build → create a ticket and a session + ticket = await internal_ticket_service.create_ticket( + db, account_id=user.account_id, created_by_user_id=user.id, + problem_statement=payload.problem_statement, + customer_name=payload.customer_name, customer_contact=payload.customer_contact, + ) + if outcome == "matched": + session = await l1_session_service.start_flow_session( + db, account_id=user.account_id, user=user, flow_id=UUID(result["flow_id"]), + ticket_id=str(ticket.id), ticket_kind="internal") + else: # build + session = await l1_session_service.start_ai_build_session( + db, account_id=user.account_id, user=user, + ticket_id=str(ticket.id), ticket_kind="internal") + await db.commit() + return IntakeResponse( + outcome=outcome, session_id=session.id, session_kind=session.session_kind, + ticket_id=str(ticket.id), ticket_kind="internal", + flow_id=UUID(result["flow_id"]) if outcome == "matched" else None, + ) +``` + +Add next-node endpoint: +```python +@router.post("/sessions/{session_id}/next-node", response_model=NextNodeResponse) +async def next_node( + session_id: UUID, + payload: NextNodeRequest, + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_l1_or_coverage)], +): + session = await _get_session_or_404(db, session_id, user) + # problem_text + category come from the linked internal ticket + stored category. + ticket = await internal_ticket_service.get_ticket(db, ticket_id=UUID(session.ticket_id)) + problem_text = ticket.problem_statement if ticket else "" + category = session.walked_path[0].get("category") if session.walked_path else None + try: + node = await l1_session_service.advance_ai_build( + db, session_id=session_id, problem_text=problem_text, + category=category or "unknown", node_id=payload.node_id, + answer=payload.answer, note=payload.note) + except ValueError as e: + raise HTTPException(status_code=http_status.HTTP_409_CONFLICT, detail=str(e)) + await db.commit() + return NextNodeResponse(node=node, session_status=session.status) + + +@router.get("/escalations", response_model=list[WalkSessionResponse]) +async def l1_escalations( + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_engineer_or_admin)], + limit: int = 50, +): + rows = await db.execute( + select(L1WalkSession) + .where(L1WalkSession.account_id == user.account_id, + L1WalkSession.status == "escalated") + .order_by(L1WalkSession.resolved_at.desc()).limit(limit)) + return [_to_response(s) for s in rows.scalars()] +``` +Update the import line for deps: `from app.api.deps import get_db, require_l1_or_coverage, require_engineer_or_admin` and add the new schema imports (`NextNodeRequest, NextNodeResponse`). + +> Category persistence: store the resolved category on the first walked_path entry. In `advance_ai_build`, when `node_id is None` (first call), seed `walked_path` with a hidden meta entry `{"node_type":"meta","category":category}` OR persist category on the session. Simplest: pass category from intake by storing it — add a `category` field write in `start_ai_build_session` is out of scope; instead the endpoint seeds the first node call with the classified category by re-classifying once and caching in walked_path meta. **Decision for implementer:** add a nullable `meta` first entry on session creation in the intake `build` branch: after `start_ai_build_session`, call `advance_ai_build(..., node_id=None)` is NOT done here; instead store category by setting `session.walked_path=[{"node_type":"meta","category":result["category"]}]` before commit, and have `normalize_walked_path`/`generate_next_node` skip `meta` entries. Add a one-line filter in both. + +- [ ] **Step 4: Handle the `meta` entry** + +In `ai_tree_builder._build_context` and `normalize_walked_path`, skip entries with `node_type == "meta"`: +```python + walked_path = [s for s in walked_path if s.get("node_type") != "meta"] +``` +(add as the first line of both functions). In the next-node endpoint, read category from the meta entry: +```python + category = next((s.get("category") for s in session.walked_path if s.get("node_type") == "meta"), "unknown") +``` + +- [ ] **Step 5: Run tests to verify they pass** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_api_ai_build.py -v` +Expected: PASS. + +- [ ] **Step 6: Commit** + +```bash +git add backend/app/schemas/l1.py backend/app/api/deps.py backend/app/api/endpoints/l1.py backend/tests/test_l1_api_ai_build.py +git commit -m "feat(l1): intake dispatch + next-node + escalations endpoints, owner/admin dep" +``` + +--- + +## Task 11: Category settings API + +**Files:** +- Create: `backend/app/schemas/l1_categories.py` +- Modify: `backend/app/api/endpoints/accounts.py` +- Test: `backend/tests/test_l1_categories_api.py` + +- [ ] **Step 1: Write the failing test** + +```python +# backend/tests/test_l1_categories_api.py +import pytest + + +@pytest.mark.asyncio +async def test_get_categories(owner_client): + r = await owner_client.get("/api/v1/accounts/me/l1-categories") + assert r.status_code == 200 + body = r.json() + assert "enabled" in body and "available" in body and "hard_floor" in body + + +@pytest.mark.asyncio +async def test_patch_categories_owner_only(owner_client, engineer_client): + r = await engineer_client.patch("/api/v1/accounts/me/l1-categories", + json={"enabled": ["printer"]}) + assert r.status_code == 403 + r2 = await owner_client.patch("/api/v1/accounts/me/l1-categories", + json={"enabled": ["printer", "vpn_connect"]}) + assert r2.status_code == 200 + assert set(r2.json()["enabled"]) == {"printer", "vpn_connect"} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_categories_api.py -v` +Expected: FAIL — 404 (routes not defined). + +- [ ] **Step 3: Schema + endpoints** + +```python +# backend/app/schemas/l1_categories.py +from pydantic import BaseModel + +class L1CategoriesResponse(BaseModel): + enabled: list[str] + available: list[str] + hard_floor: list[str] + +class L1CategoriesUpdate(BaseModel): + enabled: list[str] +``` + +In `backend/app/api/endpoints/accounts.py` (imports: the category service + new deps/schemas): +```python +@router.get("/me/l1-categories", response_model=L1CategoriesResponse) +async def get_l1_categories( + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_l1_or_above)], +): + enabled = await l1_category_service.get_enabled_categories(user.account_id, db) + return L1CategoriesResponse( + enabled=enabled, + available=l1_category_service.DEFAULT_L1_CATEGORIES, + hard_floor=l1_category_service.HARD_FLOOR_FORBIDDEN, + ) + + +@router.patch("/me/l1-categories", response_model=L1CategoriesResponse) +async def set_l1_categories( + payload: L1CategoriesUpdate, + db: Annotated[AsyncSession, Depends(get_db)], + user: Annotated[User, Depends(require_account_owner_or_admin)], +): + enabled = await l1_category_service.set_enabled_categories(user.account_id, payload.enabled, db) + await db.commit() + return L1CategoriesResponse( + enabled=enabled, + available=l1_category_service.DEFAULT_L1_CATEGORIES, + hard_floor=l1_category_service.HARD_FLOOR_FORBIDDEN, + ) +``` +Add imports: `from app.services import l1_category_service`, `from app.api.deps import require_l1_or_above, require_account_owner_or_admin`, `from app.schemas.l1_categories import L1CategoriesResponse, L1CategoriesUpdate`. + +- [ ] **Step 4: Run test to verify it passes** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_categories_api.py -v` +Expected: PASS. (If `engineer_client`/`owner_client` fixtures don't exist, add them mirroring `l1_client` with `account_role` `engineer`/`owner`.) + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/schemas/l1_categories.py backend/app/api/endpoints/accounts.py backend/tests/test_l1_categories_api.py +git commit -m "feat(l1): account L1 category settings API (owner/admin write)" +``` + +--- + +## Task 12: Backend integration test — full intake→build→resolve and →escalate + +**Files:** +- Test: `backend/tests/test_l1_ai_build_flow.py` + +- [ ] **Step 1: Write the integration test** + +```python +# backend/tests/test_l1_ai_build_flow.py +import pytest +from unittest.mock import AsyncMock, patch +from sqlalchemy import select +from app.models.flow_proposal import FlowProposal + + +@pytest.mark.asyncio +async def test_intake_build_walk_resolve_creates_proposal(l1_client, db_session, monkeypatch): + from app.services import ai_tree_builder + # 1. force a build outcome + with patch("app.api.endpoints.l1.match_or_build.match_or_build", + new=AsyncMock(return_value={"outcome": "build", "session_kind": "ai_build", "category": "printer"})): + r = await l1_client.post("/api/v1/l1/intake", json={"problem_statement": "printer jam"}) + sid = r.json()["session_id"] + + # 2. drive next-node deterministically to a resolved node + seq = iter([ + {"node_type": "question", "id": "n1", "text": "Powered on?"}, + {"node_type": "resolved", "id": "n2", "text": "Fixed."}, + ]) + async def fake_next(problem, category, walked): + return next(seq) + monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next) + + r1 = await l1_client.post(f"/api/v1/l1/sessions/{sid}/next-node", json={}) + assert r1.json()["node"]["node_type"] == "question" + r2 = await l1_client.post(f"/api/v1/l1/sessions/{sid}/next-node", + json={"node_id": "n1", "answer": "no"}) + assert r2.json()["node"]["node_type"] == "resolved" + + # 3. resolve → proposal + await l1_client.post(f"/api/v1/l1/sessions/{sid}/resolve", + json={"helpful": True, "resolution_notes": "ok"}) + props = (await db_session.execute( + select(FlowProposal).where(FlowProposal.source == "ai_realtime_l1"))).scalars().all() + assert len(props) >= 1 +``` + +- [ ] **Step 2: Run test** + +Run: `docker exec resolutionflow_backend pytest tests/test_l1_ai_build_flow.py -v` +Expected: PASS. Fix any wiring gaps surfaced here (this is the end-to-end backend gate). + +- [ ] **Step 3: Run the full L1 backend suite for regressions** + +Run: `docker exec resolutionflow_backend pytest tests/ -k "l1 or match_or_build or ai_tree_builder or notification" -q` +Expected: all pass. + +- [ ] **Step 4: Commit** + +```bash +git add backend/tests/test_l1_ai_build_flow.py +git commit -m "test(l1): integration — intake build → walk → resolve → proposal" +``` + +--- + +## Task 13: Frontend — API client + types + +**Files:** +- Modify: `frontend/src/types/l1.ts`, `frontend/src/api/l1.ts` + +- [ ] **Step 1: Add types** + +In `frontend/src/types/l1.ts`: +```typescript +export type IntakeOutcome = 'matched' | 'suggest' | 'out_of_scope' | 'build' + +export interface IntakeResult { + outcome: IntakeOutcome + session_id?: string + session_kind?: 'flow' | 'proposal' | 'adhoc' | 'ai_build' + ticket_id?: string + ticket_kind?: string + flow_id?: string + near_miss?: { flow_id: string; flow_name: string; score: number } + category?: string +} + +export type TreeNode = + | { node_type: 'question'; id: string; text: string } + | { node_type: 'instruction'; id: string; text: string } + | { node_type: 'resolved'; id: string; text: string } + | { node_type: 'escalate'; id: string; reason_category?: string; text: string } + | { node_type: 'needs_review'; id: string; text: string } + +export interface NextNodeResult { node: TreeNode; session_status: string } + +export interface L1Categories { enabled: string[]; available: string[]; hard_floor: string[] } +``` + +- [ ] **Step 2: Add API methods** + +In `frontend/src/api/l1.ts`: +```typescript + nextNode: (sessionId: string, body: { node_id?: string; answer?: 'yes' | 'no'; acknowledged?: boolean; note?: string }) => + apiClient.post(`/l1/sessions/${sessionId}/next-node`, body).then(r => r.data), + + getCategories: () => + apiClient.get('/accounts/me/l1-categories').then(r => r.data), + + setCategories: (enabled: string[]) => + apiClient.patch('/accounts/me/l1-categories', { enabled }).then(r => r.data), + + escalations: () => + apiClient.get('/l1/escalations').then(r => r.data), +``` +Update the existing `intake` method's return type to `IntakeResult`. + +- [ ] **Step 3: Type-check** + +Run: `docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json'` +Expected: clean (callers updated in Tasks 14-15; if tsc flags `intake` callers, proceed — they're fixed next). + +- [ ] **Step 4: Commit** + +```bash +git add frontend/src/types/l1.ts frontend/src/api/l1.ts +git commit -m "feat(l1): frontend api/types for next-node, outcome, categories" +``` + +--- + +## Task 14: Frontend — L1Dashboard intake dispatch + +**Files:** +- Modify: `frontend/src/pages/l1/L1Dashboard.tsx` + +- [ ] **Step 1: Replace `handleStart` to dispatch on outcome** + +```typescript + const handleStart = async () => { + if (!problem.trim()) return + setSubmitting(true) + try { + const res = await l1Api.intake({ + problem_statement: problem.trim(), + customer_name: customerName.trim() || undefined, + customer_contact: customerContact.trim() || undefined, + }) + if (res.outcome === 'matched' || res.outcome === 'build') { + navigate(`/l1/walk/${res.session_id}`) + } else if (res.outcome === 'suggest') { + setSuggestion(res.near_miss ?? null) // render an inline prompt (below) + } else if (res.outcome === 'out_of_scope') { + setOutOfScope(res.category ?? 'unknown') + } + } catch (err) { + const detail = (err as { response?: { data?: { detail?: string } } }).response?.data?.detail + toast.error(typeof detail === 'string' ? detail : 'Failed to start. Try again.') + } finally { + setSubmitting(false) + } + } + + const buildNew = async () => { + setSuggestion(null) + const res = await l1Api.intake({ problem_statement: problem.trim(), force_build: true }) + if (res.outcome === 'build') navigate(`/l1/walk/${res.session_id}`) + else if (res.outcome === 'out_of_scope') setOutOfScope(res.category ?? 'unknown') + } +``` + +Add state near the top: `const [suggestion, setSuggestion] = useState<{flow_id:string;flow_name:string;score:number}|null>(null)` and `const [outOfScope, setOutOfScope] = useState(null)`. Add `force_build` as an optional field in the `l1Api.intake` body type. + +- [ ] **Step 2: Render the suggest + out-of-scope prompts** + +Below the intake card, add: +```tsx +{suggestion && ( +
+

Found a similar flow: {suggestion.flow_name}.

+
+ + +
+
+)} +{outOfScope && ( +
+

This problem isn’t in your enabled L1 categories. Start an ad-hoc walk or escalate.

+ {/* reuse existing adhoc/escalate CTAs from Phase 1 */} +
+)} +``` + +> For "Use this flow", reuse the Phase-1 matched-flow path: re-call intake is unnecessary — the matched outcome already created a session. Simplest Phase 2A: when `outcome==='matched'` we already navigated; for `suggest → Use this flow`, call `l1Api.intake({problem_statement, ...})` is the matched path again is not guaranteed. Implementer: on "Use this flow", POST intake with the original text (it will match again and return `matched` with a session) — acceptable for Phase 2A. + +- [ ] **Step 3: Type-check + lint** + +Run: `docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npx eslint src/pages/l1/L1Dashboard.tsx'` +Expected: clean. + +- [ ] **Step 4: Commit** + +```bash +git add frontend/src/pages/l1/L1Dashboard.tsx +git commit -m "feat(l1): dashboard intake dispatch on match_or_build outcome" +``` + +--- + +## Task 15: Frontend — L1WalkTreeVariant real node rendering + disclaimer + +**Files:** +- Modify: `frontend/src/components/l1/L1WalkTreeVariant.tsx` + +- [ ] **Step 1: Drive nodes from `/next-node`** + +Replace the synthetic stepping. On mount, if `session.session_kind === 'ai_build'`, fetch the first node (`l1Api.nextNode(session.id, {})`). On answer/ack, POST the current node id + answer, render the returned node. Terminal nodes (`resolved`/`escalate`/`needs_review`) switch to the existing Resolve/Escalate modal affordances. + +```tsx +const [node, setNode] = useState(null) +const [loading, setLoading] = useState(false) + +useEffect(() => { + if (session.session_kind !== 'ai_build') return + setLoading(true) + l1Api.nextNode(session.id, {}).then(r => setNode(r.node)).finally(() => setLoading(false)) +}, [session.id, session.session_kind]) + +const answer = async (a: 'yes' | 'no') => { + if (!node) return + setLoading(true) + try { + const r = await l1Api.nextNode(session.id, { node_id: node.id, answer: a }) + setNode(r.node) + } finally { setLoading(false) } +} + +const acknowledge = async () => { + if (!node) return + setLoading(true) + try { + const r = await l1Api.nextNode(session.id, { node_id: node.id, acknowledged: true }) + setNode(r.node) + } finally { setLoading(false) } +} +``` + +- [ ] **Step 2: Render by node_type + disclaimer banner** + +```tsx +{session.session_kind === 'ai_build' && ( +
+ These are high-confidence troubleshooting steps, but they come from outside your + organization’s knowledge base — review them before acting. When in doubt, escalate early. +
+)} +{loading &&

Thinking through the next step…

} +{node?.node_type === 'question' && ( + <> +

{node.text}

+
+ + +
+ +)} +{node?.node_type === 'instruction' && ( + <> +

{node.text}

+ + +)} +{(node?.node_type === 'resolved') && ( + /* opens existing Resolve modal */ +)} +{(node?.node_type === 'escalate' || node?.node_type === 'needs_review') && ( + /* opens existing Escalate modal */ +)} +``` +Wire `ResolveCta`/`EscalateCta` to the existing `WalkModals` Resolve/Escalate handlers already in this component (reuse, don't duplicate). + +- [ ] **Step 3: Type-check + lint** + +Run: `docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npx eslint src/components/l1/L1WalkTreeVariant.tsx'` +Expected: clean. + +- [ ] **Step 4: Commit** + +```bash +git add frontend/src/components/l1/L1WalkTreeVariant.tsx +git commit -m "feat(l1): walker renders AI-built nodes via next-node + disclaimer banner" +``` + +--- + +## Task 16: Frontend — admin category settings page + +**Files:** +- Create: `frontend/src/pages/account/L1CategoriesPage.tsx` +- Modify: router + account nav (follow the existing `/account/*` child-route pattern) + +- [ ] **Step 1: Build the page** + +```tsx +// frontend/src/pages/account/L1CategoriesPage.tsx +import { useEffect, useState } from 'react' +import { l1Api } from '@/api/l1' +import { toast } from '@/lib/toast' +import type { L1Categories } from '@/types/l1' + +export default function L1CategoriesPage() { + const [data, setData] = useState(null) + useEffect(() => { l1Api.getCategories().then(setData) }, []) + if (!data) return null + const toggle = async (cat: string) => { + const enabled = data.enabled.includes(cat) + ? data.enabled.filter(c => c !== cat) : [...data.enabled, cat] + const updated = await l1Api.setCategories(enabled) + setData({ ...data, enabled: updated.enabled }) + toast.success('L1 categories updated') + } + return ( +
+

L1 AI build categories

+

+ Problems in enabled categories can be built into AI troubleshooting trees when no + flow exists. Disabled categories fall back to ad-hoc or escalation. +

+
+ {data.available.map(cat => ( + + ))} +
+
+

Always excluded (safety)

+
    + {data.hard_floor.map(h =>
  • {h.replace(/_/g, ' ')}
  • )} +
+
+
+ ) +} +``` + +- [ ] **Step 2: Register route + nav** + +Add a lazy import + a child route under the `/account` subtree in `frontend/src/router.tsx` (mirror existing account children, e.g. `{ path: 'l1-categories', element: page(L1CategoriesPage) }` under the AccountLayout route), and a nav entry in the account settings sidebar/menu following the existing pattern. Gate visibility to owner/admin in the menu (reuse `usePermissions`). + +- [ ] **Step 3: Type-check + lint + build** + +Run: `docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npx eslint src/pages/account/L1CategoriesPage.tsx'` +Expected: clean. + +- [ ] **Step 4: Commit** + +```bash +git add frontend/src/pages/account/L1CategoriesPage.tsx frontend/src/router.tsx +git commit -m "feat(l1): admin L1 category settings page" +``` + +--- + +## Task 17: Frontend — ProposalDetail L1 source + engineer escalations section + +**Files:** +- Modify: `frontend/src/components/flowpilot/ProposalDetail.tsx` +- Modify: `frontend/src/pages/EscalationQueuePage.tsx` + +- [ ] **Step 1: ProposalDetail — L1-sourced source block (Finding 1)** + +Where it currently renders the `/pilot/{source_session_id}` link, branch on the new `l1_session_id`: +```tsx +{proposal.l1_session_id ? ( +
+ Source: AI L1 walk (outcome-validated). Unexplored branches are marked + needs review below. +
+) : proposal.source_session_id ? ( + + {/* existing link */} + +) : null} +``` +Add `l1_session_id?: string | null` to the proposal type used here. + +- [ ] **Step 2: EscalationQueuePage — L1 escalations section** + +Fetch `l1Api.escalations()` and render a section above/below the existing queue: +```tsx +const [l1Escalations, setL1Escalations] = useState([]) +useEffect(() => { l1Api.escalations().then(setL1Escalations).catch(() => setL1Escalations([])) }, []) +// render: problem (from ticket), walked-path length, escalated-at, reason +``` +Each row shows the walked-path summary and links to a read-only view (Phase 2A: a simple expandable row is sufficient; no new route required). + +- [ ] **Step 3: Type-check + lint + build** + +Run: `docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npm run build'` +Expected: tsc clean, build succeeds. + +- [ ] **Step 4: Commit** + +```bash +git add frontend/src/components/flowpilot/ProposalDetail.tsx frontend/src/pages/EscalationQueuePage.tsx +git commit -m "feat(l1): proposal L1 source block + engineer L1-escalations section" +``` + +--- + +## Task 18: E2E — AI build flow + +**Files:** +- Modify: `frontend/e2e/l1-workspace.spec.ts` + +- [ ] **Step 1: Add an AI-build e2e test** + +Because the builder calls a live model, stub the network at the Playwright layer: intercept `POST **/l1/intake` to return `{outcome:'build', session_kind:'ai_build', session_id:}` and `POST **/l1/sessions/*/next-node` to return scripted nodes (question → resolved). Assert: L1 lands on the walker, sees the disclaimer banner, answers the question, reaches the resolved CTA. + +```typescript +test('L1 AI build: intake → answer node → resolve CTA', async ({ page }) => { + await login(page, L1_EMAIL) + await page.route('**/api/v1/l1/intake', route => route.fulfill({ + status: 200, contentType: 'application/json', + body: JSON.stringify({ outcome: 'build', session_kind: 'ai_build', session_id: 'e2e-sess', ticket_id: 't', ticket_kind: 'internal' }), + })) + let call = 0 + await page.route('**/api/v1/l1/sessions/*/next-node', route => { + call += 1 + const node = call === 1 + ? { node_type: 'question', id: 'n1', text: 'Is it powered on?' } + : { node_type: 'resolved', id: 'n2', text: 'Resolved.' } + route.fulfill({ status: 200, contentType: 'application/json', + body: JSON.stringify({ node, session_status: 'active' }) }) + }) + // also stub GET session fetch the walker does on load, if any, to return an ai_build session + await page.goto('/l1') + await page.getByPlaceholder(/What's the user calling about/i).fill('printer jam') + await page.getByRole('button', { name: /Start walk/i }).click() + await expect(page.getByText(/outside your organization’s knowledge base/i)).toBeVisible() + await expect(page.getByText('Is it powered on?')).toBeVisible() + await page.getByRole('button', { name: 'No' }).click() + await expect(page.getByText(/Resolved\./i)).toBeVisible() +}) +``` +Adjust selectors/route patterns to the actual walker data-loading (stub the session GET the walker performs so it reports `session_kind: 'ai_build'`). + +- [ ] **Step 2: Run e2e locally only if chromium available; otherwise rely on CI** + +This container cannot launch chromium (sandbox). Push and let CI run `npm run test:e2e`. Do not block on local e2e. + +- [ ] **Step 3: Commit** + +```bash +git add frontend/e2e/l1-workspace.spec.ts +git commit -m "test(l1): e2e AI build flow (network-stubbed)" +``` + +--- + +## Task 19: Final verification + +- [ ] **Step 1: Backend suite** + +Run: `docker exec resolutionflow_backend pytest tests/ -q` +Expected: all pass (note any pre-existing xdist-only failures per Phase-1 acceptance report §7). + +- [ ] **Step 2: Frontend gates** + +Run: `docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npm run lint && npm run build'` +Expected: tsc clean, lint 0 errors, build succeeds. + +- [ ] **Step 3: Migration roundtrip on a clean DB** + +Run: `docker exec resolutionflow_backend alembic downgrade -3 && docker exec resolutionflow_backend alembic upgrade head` +Expected: clean down+up for the three new migrations (run against a DB without `ai_build`/L1-proposal rows, or accept the documented downgrade caveat). + +- [ ] **Step 4: Open PR** + +Push the branch and open a PR to `main` summarizing Phase 2A, linking the spec, and listing the deferred items (KB grounding/connectors, PSA reassign, escalation package, AI chat handoff, proposal-matching). + +--- + +## Self-Review notes (author) + +- **Spec coverage:** §3 match_or_build → Task 6/10; §4 streaming + node schema → Task 5/8/10/15; §5 safety (classify, constrained prompt, validation, depth cap, disclaimer) → Task 4/5/15; §6 flywheel + §6.1 normalize + §6.2 linkage → Task 3/9; §7 escalation handoff → Task 9/10/17; §8 migrations → Task 1/2/3; §9 API → Task 10/11; §10 frontend → Task 13-17; §11 testing → throughout + Task 12/18. +- **Known soft spots flagged for the implementer:** category persistence via a `meta` walked_path entry (Task 10 Step 3c/Step 4) and the "Use this flow" suggest path (Task 14 Step 2) are the two places to validate carefully during review. +- **Model calls** are mocked/stubbed in tests; a live constrained-decoding smoke test + the Sonnet-vs-Opus benchmark for `l1_realtime_build` should run in staging before wide enablement (spec §5.3).