19 TDD tasks from the approved spec: 3 migrations (ai_build kind, account categories, FlowProposal l1_session_id), ai_tree_builder (constrained node gen + validation + normalize), match_or_build orchestrator (match-first, gate-on-build), session-service ai_build start/advance, flywheel capture on resolve, engineer escalation notification, category settings API, and the frontend walker/dispatch/settings/escalations surfaces + e2e. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
79 KiB
L1 AI Decision-Tree Builder — Phase 2A Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: When an L1 tech describes a problem with no matching published flow, build a yes/no decision tree in real time from generic L1 knowledge (constrained + escalate-early), walk it node-by-node, capture resolved trees as outcome-validated drafts, and route escalations to engineers.
Architecture: Approach C — a dedicated ai_tree_builder service for constrained node-by-node generation, an match_or_build orchestrator that matches published flows first and gates generic building behind admin-configured categories, reusing flow_matching_engine (match), knowledge_flywheel/FlowProposal (capture), and notification_service (escalation).
Tech Stack: Python 3.12 · FastAPI · SQLAlchemy 2.0 async · Alembic · PostgreSQL 16 (RLS) · React 19 + Vite + TS + Tailwind v4 · Playwright.
Source spec: docs/superpowers/specs/2026-05-29-l1-ai-tree-builder-phase-2a-design.md
Conventions (read before starting):
- Migrations are hand-written:
alembic revision -m "msg"then editupgrade()/downgrade()by hand. Never--autogenerate, never--rev-id. Current head isb3358ba0e48c; each new migration chains from the previous. - Backend tests run in the container:
docker exec resolutionflow_backend pytest <path> -v. The suite uses pytest-xdist in CI; single-module runs work locally. - JSONB columns require reassignment (
x = [*x, item]), not in-place mutation (seerecord_step). - Commit after each task with the message shown. Git trailer:
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>. - Model tiers:
settings.get_model_for_action(key)→ACTION_MODEL_MAP[key]→AI_MODEL_TIERS[tier].fast=Haiku,standard=Sonnet.
File Structure
New backend files:
backend/app/services/ai_tree_builder.py— node Pydantic models, constrained system prompt,generate_next_node, per-node validation,normalize_walked_path.backend/app/services/match_or_build.py— orchestrator (match_or_build,classify).backend/app/services/l1_category_service.py—DEFAULT_L1_CATEGORIES,HARD_FLOOR_FORBIDDEN, get/set enabled categories.backend/app/schemas/l1_categories.py— category settings request/response.backend/tests/test_ai_tree_builder.py,test_match_or_build.py,test_l1_category_service.py,test_l1_ai_build_flow.py(integration).- 3 Alembic migrations.
Modified backend files:
backend/app/models/l1_walk_session.py—ai_buildin CHECK constraints.backend/app/models/account.py—enabled_l1_categoriescolumn.backend/app/models/flow_proposal.py—l1_session_id, nullablesource_session_id, exactly-one CHECK.backend/app/core/config.py—l1_realtime_build+l1_classifyaction keys.backend/app/api/deps.py—require_account_owner_or_admin.backend/app/api/endpoints/l1.py— intake dispatch,/sessions/{id}/next-node,/escalations.backend/app/api/endpoints/accounts.py—/me/l1-categoriesGET/PATCH.backend/app/schemas/l1.py—IntakeResponse.outcome,ai_buildliteral,NextNode*schemas.backend/app/services/l1_session_service.py—start_ai_build_session, flywheel capture inresolve, engineer notification inescalate.backend/app/services/notification_service.py+backend/app/schemas/notification.py—l1.session.escalatedevent.
Modified frontend files:
frontend/src/api/l1.ts,frontend/src/types/l1.ts— next-node, outcome, categories.frontend/src/pages/l1/L1Dashboard.tsx— dispatch on intakeoutcome.frontend/src/components/l1/L1WalkTreeVariant.tsx— real node rendering + disclaimer.frontend/src/components/flowpilot/ProposalDetail.tsx— L1-sourced source block.frontend/src/pages/EscalationQueuePage.tsx— L1 escalations section.- New:
frontend/src/pages/account/L1CategoriesPage.tsx+ route + nav. frontend/e2e/l1-workspace.spec.ts— AI build flow tests.
Task 1: Migration + model — ai_build session kind
Files:
-
Create:
backend/alembic/versions/<rev>_add_ai_build_session_kind.py -
Modify:
backend/app/models/l1_walk_session.py:42-61 -
Test:
backend/tests/test_l1_ai_build_model.py -
Step 1: Write the failing test
# backend/tests/test_l1_ai_build_model.py
import uuid
import pytest
from app.models.l1_walk_session import L1WalkSession
def test_ai_build_session_kind_allowed_by_model_constraint():
"""ai_build is a valid session_kind with both target FKs null (like adhoc)."""
s = L1WalkSession(
account_id=uuid.uuid4(),
created_by_user_id=uuid.uuid4(),
ticket_id="t1",
ticket_kind="internal",
session_kind="ai_build",
)
assert s.session_kind == "ai_build"
assert s.flow_id is None and s.flow_proposal_id is None
- Step 2: Run test to verify it fails
Run: docker exec resolutionflow_backend pytest tests/test_l1_ai_build_model.py -v
Expected: PASS at the Python level already (model has no enum on the attribute) — the real enforcement is the DB CHECK. If it errors on import, fix the import first. Treat this task's true verification as the migration roundtrip in Step 6.
- Step 3: Update model CHECK constraints
In backend/app/models/l1_walk_session.py, update the two constraints:
CheckConstraint(
"session_kind IN ('flow', 'proposal', 'adhoc', 'ai_build')",
name="ck_l1_walk_sessions_session_kind",
),
CheckConstraint(
"(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) "
"OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) "
"OR (session_kind IN ('adhoc', 'ai_build') AND flow_id IS NULL AND flow_proposal_id IS NULL)",
name="ck_l1_walk_sessions_target_consistency",
),
- Step 4: Create the migration
Run: docker exec resolutionflow_backend alembic revision -m "add ai_build session kind"
Then edit the generated file so down_revision is the current head (b3358ba0e48c unless a later task already advanced it) and the body drops+recreates the two CHECK constraints:
def upgrade() -> None:
op.drop_constraint("ck_l1_walk_sessions_session_kind", "l1_walk_sessions", type_="check")
op.create_check_constraint(
"ck_l1_walk_sessions_session_kind", "l1_walk_sessions",
"session_kind IN ('flow', 'proposal', 'adhoc', 'ai_build')",
)
op.drop_constraint("ck_l1_walk_sessions_target_consistency", "l1_walk_sessions", type_="check")
op.create_check_constraint(
"ck_l1_walk_sessions_target_consistency", "l1_walk_sessions",
"(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) "
"OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) "
"OR (session_kind IN ('adhoc', 'ai_build') AND flow_id IS NULL AND flow_proposal_id IS NULL)",
)
def downgrade() -> None:
op.drop_constraint("ck_l1_walk_sessions_target_consistency", "l1_walk_sessions", type_="check")
op.create_check_constraint(
"ck_l1_walk_sessions_target_consistency", "l1_walk_sessions",
"(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) "
"OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) "
"OR (session_kind = 'adhoc' AND flow_id IS NULL AND flow_proposal_id IS NULL)",
)
op.drop_constraint("ck_l1_walk_sessions_session_kind", "l1_walk_sessions", type_="check")
op.create_check_constraint(
"ck_l1_walk_sessions_session_kind", "l1_walk_sessions",
"session_kind IN ('flow', 'proposal', 'adhoc')",
)
- Step 5: Apply the migration
Run: docker exec resolutionflow_backend alembic upgrade head
Expected: Running upgrade b3358ba0e48c -> <rev>, add ai_build session kind
- Step 6: Verify roundtrip + insert an ai_build row
Run:
docker exec resolutionflow_postgres psql -U postgres -d resolutionflow -c \
"INSERT INTO l1_walk_sessions (id, account_id, created_by_user_id, ticket_id, ticket_kind, session_kind, walked_path, walk_notes, status, started_at, last_step_at) \
SELECT gen_random_uuid(), a.id, u.id, 't-smoke', 'internal', 'ai_build', '[]'::jsonb, '[]'::jsonb, 'active', now(), now() \
FROM accounts a JOIN users u ON u.account_id=a.id LIMIT 1 RETURNING id;"
Expected: one row id returned (no CHECK violation). Then clean up: DELETE FROM l1_walk_sessions WHERE ticket_id='t-smoke';
- Step 7: Commit
git add backend/app/models/l1_walk_session.py backend/alembic/versions/ backend/tests/test_l1_ai_build_model.py
git commit -m "feat(l1): add ai_build session kind (model + migration)"
Task 2: Migration + model — account enabled_l1_categories
Files:
-
Create:
backend/alembic/versions/<rev>_add_enabled_l1_categories.py -
Modify:
backend/app/models/account.py -
Test:
backend/tests/test_account_l1_categories_column.py -
Step 1: Write the failing test
# backend/tests/test_account_l1_categories_column.py
from app.models.account import Account
def test_account_has_enabled_l1_categories_default():
a = Account(name="Acme", display_code="ABC12345")
# Column default is applied at flush; attribute may be None pre-flush.
assert hasattr(a, "enabled_l1_categories")
- Step 2: Run test to verify it fails
Run: docker exec resolutionflow_backend pytest tests/test_account_l1_categories_column.py -v
Expected: FAIL — AttributeError/no such attribute.
- Step 3: Add the model column
In backend/app/models/account.py, after sso_config (or near other JSONB columns), add:
enabled_l1_categories: Mapped[list[str]] = mapped_column(
JSONB(), nullable=False,
server_default=sa_text(
"'[\"password_reset\",\"account_lockout\",\"printer\","
"\"email_outlook_client\",\"wifi_network_basics\",\"vpn_connect\","
"\"teams_zoom_av\",\"browser_cache_cookies\",\"peripheral_reconnect\","
"\"os_restart_update\"]'::jsonb"
),
)
Ensure imports exist at top of file: from sqlalchemy.dialects.postgresql import JSONB and from sqlalchemy import text as sa_text (add if missing).
- Step 4: Run test to verify it passes
Run: docker exec resolutionflow_backend pytest tests/test_account_l1_categories_column.py -v
Expected: PASS.
- Step 5: Create + apply migration
Run: docker exec resolutionflow_backend alembic revision -m "add enabled_l1_categories to accounts"
Edit body:
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
_DEFAULT = ('["password_reset","account_lockout","printer","email_outlook_client",'
'"wifi_network_basics","vpn_connect","teams_zoom_av","browser_cache_cookies",'
'"peripheral_reconnect","os_restart_update"]')
def upgrade() -> None:
op.add_column("accounts", sa.Column(
"enabled_l1_categories", postgresql.JSONB(), nullable=False,
server_default=sa.text(f"'{_DEFAULT}'::jsonb"),
))
def downgrade() -> None:
op.drop_column("accounts", "enabled_l1_categories")
Run: docker exec resolutionflow_backend alembic upgrade head
Expected: upgrade applied; existing accounts backfill to the default list.
- Step 6: Commit
git add backend/app/models/account.py backend/alembic/versions/ backend/tests/test_account_l1_categories_column.py
git commit -m "feat(l1): add accounts.enabled_l1_categories with default allowlist"
Task 3: Migration + model — FlowProposal L1 source linkage (Finding 1)
Files:
-
Create:
backend/alembic/versions/<rev>_flow_proposal_l1_source.py -
Modify:
backend/app/models/flow_proposal.py:42-82 -
Test:
backend/tests/test_flow_proposal_l1_source.py -
Step 1: Write the failing test
# backend/tests/test_flow_proposal_l1_source.py
import uuid
from app.models.flow_proposal import FlowProposal
def test_flow_proposal_accepts_l1_session_id_without_source_session():
p = FlowProposal(
account_id=uuid.uuid4(),
l1_session_id=uuid.uuid4(),
source_session_id=None,
proposal_type="new_flow",
title="AI L1 draft",
proposed_flow_data={"tree_structure": {"id": "root"}},
source="ai_realtime_l1",
status="pending",
)
assert p.l1_session_id is not None and p.source_session_id is None
- Step 2: Run test to verify it fails
Run: docker exec resolutionflow_backend pytest tests/test_flow_proposal_l1_source.py -v
Expected: FAIL — TypeError/unexpected kwarg l1_session_id.
- Step 3: Update the model
In backend/app/models/flow_proposal.py: make source_session_id nullable, add l1_session_id, add the exactly-one CHECK in __table_args__.
source_session_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("ai_sessions.id", ondelete="CASCADE"),
nullable=True,
index=True,
)
l1_session_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("l1_walk_sessions.id", ondelete="SET NULL"),
nullable=True,
index=True,
)
Add to __table_args__ (alongside the existing source/linked_ticket checks):
CheckConstraint(
"(source_session_id IS NOT NULL) <> (l1_session_id IS NOT NULL)",
name="ck_flow_proposals_exactly_one_source",
),
- Step 4: Run test to verify it passes
Run: docker exec resolutionflow_backend pytest tests/test_flow_proposal_l1_source.py -v
Expected: PASS.
- Step 5: Create + apply migration
Run: docker exec resolutionflow_backend alembic revision -m "flow_proposal l1 source linkage"
Edit body:
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
def upgrade() -> None:
op.add_column("flow_proposals", sa.Column(
"l1_session_id", postgresql.UUID(as_uuid=True), nullable=True))
op.create_index("ix_flow_proposals_l1_session_id", "flow_proposals", ["l1_session_id"])
op.create_foreign_key(
"fk_flow_proposals_l1_session_id", "flow_proposals", "l1_walk_sessions",
["l1_session_id"], ["id"], ondelete="SET NULL")
op.alter_column("flow_proposals", "source_session_id", nullable=True)
op.create_check_constraint(
"ck_flow_proposals_exactly_one_source", "flow_proposals",
"(source_session_id IS NOT NULL) <> (l1_session_id IS NOT NULL)")
def downgrade() -> None:
op.drop_constraint("ck_flow_proposals_exactly_one_source", "flow_proposals", type_="check")
op.alter_column("flow_proposals", "source_session_id", nullable=False)
op.drop_constraint("fk_flow_proposals_l1_session_id", "flow_proposals", type_="foreignkey")
op.drop_index("ix_flow_proposals_l1_session_id", "flow_proposals")
op.drop_column("flow_proposals", "l1_session_id")
Run: docker exec resolutionflow_backend alembic upgrade head. Expected: applied cleanly (no existing rows violate the new CHECK because all current proposals have a non-null source_session_id and null l1_session_id).
- Step 6: Verify Tree.source_session_id is nullable
Run: docker exec resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d trees" | grep source_session_id
Expected: shows the column without not null. If it shows not null, add op.alter_column("trees","source_session_id",nullable=True) to this migration's upgrade() and re-run. (L1-promoted trees leave it NULL.)
- Step 7: Commit
git add backend/app/models/flow_proposal.py backend/alembic/versions/ backend/tests/test_flow_proposal_l1_source.py
git commit -m "feat(l1): FlowProposal l1_session_id source linkage (nullable source_session_id + exactly-one check)"
Task 4: Category service + model action keys
Files:
-
Create:
backend/app/services/l1_category_service.py -
Modify:
backend/app/core/config.py(ACTION_MODEL_MAP) -
Test:
backend/tests/test_l1_category_service.py -
Step 1: Write the failing test
# backend/tests/test_l1_category_service.py
from app.services.l1_category_service import (
DEFAULT_L1_CATEGORIES, HARD_FLOOR_FORBIDDEN, is_category_enabled,
)
def test_defaults_and_hard_floor_present():
assert "password_reset" in DEFAULT_L1_CATEGORIES
assert "registry_edit" in HARD_FLOOR_FORBIDDEN # representative forbidden action key
assert len(DEFAULT_L1_CATEGORIES) == 10
def test_is_category_enabled():
enabled = ["printer", "vpn_connect"]
assert is_category_enabled("printer", enabled) is True
assert is_category_enabled("registry_edit", enabled) is False
assert is_category_enabled("unknown", enabled) is False
- Step 2: Run test to verify it fails
Run: docker exec resolutionflow_backend pytest tests/test_l1_category_service.py -v
Expected: FAIL — module not found.
- Step 3: Implement the service
# backend/app/services/l1_category_service.py
"""L1 category allowlist + the always-forbidden hard floor.
DEFAULT_L1_CATEGORIES seeds an account's enabled set. HARD_FLOOR_FORBIDDEN is a
category-independent safety floor the AI tree builder must never emit and admins
cannot enable. See spec §5.1/§5.2.
"""
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.account import Account
DEFAULT_L1_CATEGORIES: list[str] = [
"password_reset", "account_lockout", "printer", "email_outlook_client",
"wifi_network_basics", "vpn_connect", "teams_zoom_av",
"browser_cache_cookies", "peripheral_reconnect", "os_restart_update",
]
# Always-forbidden action classes (keys are stable identifiers; the human-readable
# phrasing lives in the builder system prompt). Admins cannot enable these.
HARD_FLOOR_FORBIDDEN: list[str] = [
"registry_edit", "system_file_or_boot_edit", "data_or_disk_deletion",
"credential_or_mfa_change", "security_or_av_or_firewall_change",
"elevated_or_admin_script", "domain_dns_dhcp_change",
"server_or_production_config", "billing_or_license_change",
]
# Substrings that, if present in a generated node's text, indicate a hard-floor
# violation. Used by ai_tree_builder per-node validation (defense in depth).
HARD_FLOOR_TEXT_PATTERNS: list[str] = [
"regedit", "registry", "format ", "delete partition", "diskpart",
"reset password for", "disable firewall", "disable antivirus", "disable defender",
"run as administrator", "sudo ", "domain controller", "dns record", "dhcp scope",
"uninstall security", "bitlocker",
]
def is_category_enabled(category: str, enabled: list[str]) -> bool:
"""A category is buildable only if explicitly enabled and not hard-floored."""
if category in HARD_FLOOR_FORBIDDEN:
return False
return category in enabled
async def get_enabled_categories(account_id: UUID, db: AsyncSession) -> list[str]:
acct = (await db.execute(select(Account).where(Account.id == account_id))).scalar_one()
return list(acct.enabled_l1_categories or [])
async def set_enabled_categories(
account_id: UUID, categories: list[str], db: AsyncSession
) -> list[str]:
"""Persist the enabled set, dropping anything unknown or hard-floored."""
cleaned = [c for c in categories if c in DEFAULT_L1_CATEGORIES]
acct = (await db.execute(select(Account).where(Account.id == account_id))).scalar_one()
acct.enabled_l1_categories = cleaned
await db.flush()
return cleaned
- Step 4: Add model action keys
In backend/app/core/config.py, add to ACTION_MODEL_MAP:
# L1 AI tree builder (Phase 2A): per-node generation is latency-sensitive
# on a live call → Sonnet; classification is a short label task → Haiku.
"l1_realtime_build": "standard",
"l1_classify": "fast",
- Step 5: Run tests
Run: docker exec resolutionflow_backend pytest tests/test_l1_category_service.py -v
Expected: PASS (3 tests).
- Step 6: Commit
git add backend/app/services/l1_category_service.py backend/app/core/config.py backend/tests/test_l1_category_service.py
git commit -m "feat(l1): category service (defaults + hard floor) and AI action keys"
Task 5: ai_tree_builder — node schema, prompt, generation, validation
Files:
-
Create:
backend/app/services/ai_tree_builder.py -
Test:
backend/tests/test_ai_tree_builder.py -
Step 1: Write the failing tests
# backend/tests/test_ai_tree_builder.py
import pytest
from app.services import ai_tree_builder as atb
def test_validate_node_rejects_hard_floor_text():
node = {"node_type": "instruction", "id": "n1", "text": "Open regedit and change the key", "next": "generate"}
with pytest.raises(atb.UnsafeNodeError):
atb.validate_node(node)
def test_validate_node_accepts_safe_instruction():
node = {"node_type": "instruction", "id": "n1", "text": "Restart the printer.", "next": "generate"}
assert atb.validate_node(node)["node_type"] == "instruction"
def test_depth_cap_forces_escalate():
walked = [{"node_type": "question", "id": f"n{i}", "text": "?", "answer": "no"} for i in range(atb.MAX_DEPTH)]
node = atb.escalate_if_depth_exceeded(walked)
assert node is not None and node["node_type"] == "escalate"
def test_normalize_walked_path_builds_valid_tree():
walked = [
{"node_type": "question", "id": "n1", "text": "Powered on?", "answer": "no"},
{"node_type": "instruction", "id": "n2", "text": "Power it on.", "answer": "ack"},
{"node_type": "resolved", "id": "n3", "text": "Fixed."},
]
tree = atb.normalize_walked_path(walked)
assert isinstance(tree, dict) and tree.get("id") == "n1"
# untraversed 'yes' branch of n1 became a needs_review stub
assert any(n["node_type"] == "needs_review" for n in tree["nodes"].values())
- Step 2: Run tests to verify they fail
Run: docker exec resolutionflow_backend pytest tests/test_ai_tree_builder.py -v
Expected: FAIL — module not found.
- Step 3: Implement the builder
# backend/app/services/ai_tree_builder.py
"""Constrained, node-by-node L1 decision-tree generation (spec §4/§5/§6.1).
Each call produces ONE node given the problem, category, and full walked path.
Generation is constrained to safe/reversible L1 steps and biased to escalate
early. normalize_walked_path() turns a resolved walk into a valid tree object
for flywheel capture.
"""
import json
import logging
from typing import Any, Optional
from app.core.ai_provider import get_ai_provider
from app.core.config import settings
from app.services.l1_category_service import HARD_FLOOR_TEXT_PATTERNS
from app.services.llm_utils import parse_llm_json
logger = logging.getLogger(__name__)
MAX_DEPTH = 12
VALID_NODE_TYPES = {"question", "instruction", "resolved", "escalate"}
class UnsafeNodeError(ValueError):
"""Raised when a generated node violates the hard floor or is malformed."""
SYSTEM_PROMPT = """\
You are an L1 helpdesk troubleshooting guide builder. Given a problem and the
steps already tried, produce the SINGLE next node of a yes/no decision tree.
HARD RULES:
- Only safe, reversible, observe-or-restart-class steps: checking status, toggling,
restarting, reconnecting, re-entering credentials the USER already knows.
- NEVER produce steps that: edit the registry/system files/boot config; delete or
format data/disks; change credentials/MFA/security/firewall/AV; run elevated or
admin scripts; touch domain controllers/DNS/DHCP or production servers; or have
billing/license impact. These are out of L1 scope.
- When you run out of safe in-scope steps, DO NOT GUESS. Emit an "escalate" node.
Return ONLY a JSON object for ONE node, one of:
{"node_type":"question","text":"<yes/no question>"}
{"node_type":"instruction","text":"<one safe reversible action>"}
{"node_type":"resolved","text":"<confirmation the issue is fixed>"}
{"node_type":"escalate","reason_category":"exhausted_safe_steps","text":"<why>"}
No prose, no markdown fences.
"""
def _build_context(problem_text: str, category: str, walked_path: list[dict]) -> str:
lines = [f"PROBLEM: {problem_text}", f"CATEGORY: {category}", "STEPS SO FAR:"]
if not walked_path:
lines.append("(none yet — produce the first diagnostic question)")
for i, step in enumerate(walked_path, 1):
ans = step.get("answer")
suffix = f" -> {ans}" if ans else ""
lines.append(f"{i}. [{step.get('node_type','?')}] {step.get('text','')}{suffix}")
return "\n".join(lines)
def validate_node(node: dict[str, Any]) -> dict[str, Any]:
"""Shape + hard-floor validation. Raises UnsafeNodeError on violation."""
if not isinstance(node, dict) or node.get("node_type") not in VALID_NODE_TYPES:
raise UnsafeNodeError(f"invalid node_type: {node!r}")
text = (node.get("text") or "").lower()
for pat in HARD_FLOOR_TEXT_PATTERNS:
if pat in text:
raise UnsafeNodeError(f"hard-floor pattern '{pat}' in node text")
return node
def escalate_if_depth_exceeded(walked_path: list[dict]) -> Optional[dict[str, Any]]:
if len(walked_path) >= MAX_DEPTH:
return {
"node_type": "escalate",
"reason_category": "depth_cap",
"text": "Reached the L1 troubleshooting depth limit — escalating to engineering.",
}
return None
async def generate_next_node(
problem_text: str, category: str, walked_path: list[dict]
) -> dict[str, Any]:
"""Generate + validate the next node. Regenerate once on failure, then escalate."""
capped = escalate_if_depth_exceeded(walked_path)
if capped:
return capped
provider = get_ai_provider(settings.get_model_for_action("l1_realtime_build"))
context = _build_context(problem_text, category, walked_path)
for attempt in range(2):
try:
raw, _, _ = await provider.generate_json(
system_prompt=SYSTEM_PROMPT,
messages=[{"role": "user", "content": context}],
max_tokens=1024,
)
node = parse_llm_json(raw)
return validate_node(node)
except (UnsafeNodeError, ValueError) as e:
logger.warning("ai_tree_builder node attempt %d failed: %s", attempt + 1, e)
continue
return {
"node_type": "escalate",
"reason_category": "generation_failed",
"text": "Could not generate a safe next step — escalating to engineering.",
}
def normalize_walked_path(walked_path: list[dict]) -> dict[str, Any]:
"""Turn a resolved walk into a valid troubleshooting tree (spec §6.1).
Root = first node's id; question nodes' traversed branch points to the next
node, the untraversed branch to a needs_review stub; terminal node ends it.
Returns {id, nodes: {id: node}} — a dict with an id (passes the proposal
approval guard).
"""
nodes: dict[str, Any] = {}
if not walked_path:
root_id = "root"
nodes[root_id] = {"id": root_id, "node_type": "needs_review",
"text": "Empty walk — needs authoring."}
return {"id": root_id, "nodes": nodes}
stub_seq = 0
for i, step in enumerate(walked_path):
nid = step.get("id") or f"n{i+1}"
ntype = step.get("node_type", "question")
nxt = walked_path[i + 1].get("id", f"n{i+2}") if i + 1 < len(walked_path) else None
node: dict[str, Any] = {"id": nid, "node_type": ntype, "text": step.get("text", "")}
if ntype == "question":
answer = (step.get("answer") or "").lower()
stub_seq += 1
stub_id = f"review-{stub_seq}"
nodes[stub_id] = {"id": stub_id, "node_type": "needs_review",
"text": "Branch not explored during the originating call."}
node["yes_next"] = nxt if answer == "yes" else stub_id
node["no_next"] = nxt if answer == "no" else stub_id
elif ntype == "instruction":
node["next"] = nxt
nodes[nid] = node
return {"id": walked_path[0].get("id", "n1"), "nodes": nodes}
- Step 4: Run tests to verify they pass
Run: docker exec resolutionflow_backend pytest tests/test_ai_tree_builder.py -v
Expected: PASS (4 tests). generate_next_node is not unit-tested against a live model here; it is covered by the integration test in Task 11 with a mocked provider.
- Step 5: Commit
git add backend/app/services/ai_tree_builder.py backend/tests/test_ai_tree_builder.py
git commit -m "feat(l1): ai_tree_builder — constrained node generation, validation, normalize"
Task 6: match_or_build orchestrator + classify
Files:
-
Create:
backend/app/services/match_or_build.py -
Test:
backend/tests/test_match_or_build.py -
Step 1: Write the failing tests
# backend/tests/test_match_or_build.py
import uuid
import pytest
from unittest.mock import AsyncMock, patch
from app.services import match_or_build as mob
@pytest.mark.asyncio
async def test_match_wins_before_category_gate():
"""A strong published-flow match returns 'matched' even if category disabled."""
with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(
return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "VPN", "score": 0.9}])), \
patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=[])):
res = await mob.match_or_build(uuid.uuid4(), "vpn down", None, "t1", db=AsyncMock(), force_build=False)
assert res["outcome"] == "matched"
assert res["session_kind"] == "flow"
@pytest.mark.asyncio
async def test_suggest_band():
with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(
return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.66}])):
res = await mob.match_or_build(uuid.uuid4(), "p", None, "t1", db=AsyncMock(), force_build=False)
assert res["outcome"] == "suggest"
@pytest.mark.asyncio
async def test_out_of_scope_when_category_disabled_on_build_path():
with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(return_value=[])), \
patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \
patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["vpn_connect"])):
res = await mob.match_or_build(uuid.uuid4(), "printer jam", None, "t1", db=AsyncMock(), force_build=False)
assert res["outcome"] == "out_of_scope"
@pytest.mark.asyncio
async def test_build_when_enabled_and_no_match():
with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(return_value=[])), \
patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \
patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["printer"])):
res = await mob.match_or_build(uuid.uuid4(), "printer jam", None, "t1", db=AsyncMock(), force_build=False)
assert res["outcome"] == "build"
assert res["session_kind"] == "ai_build"
@pytest.mark.asyncio
async def test_force_build_skips_match_but_still_gates():
fm = AsyncMock(return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.99}])
with patch.object(mob.flow_matching_engine, "find_matches", new=fm), \
patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \
patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["printer"])):
res = await mob.match_or_build(uuid.uuid4(), "p", None, "t1", db=AsyncMock(), force_build=True)
fm.assert_not_called()
assert res["outcome"] == "build"
- Step 2: Run tests to verify they fail
Run: docker exec resolutionflow_backend pytest tests/test_match_or_build.py -v
Expected: FAIL — module not found.
- Step 3: Implement the orchestrator
# backend/app/services/match_or_build.py
"""Intake orchestrator: match published flows first, gate generic build behind
the account's enabled categories (spec §3). Match runs BEFORE the category gate
so an authored flow is never blocked by category settings (Finding 4)."""
import logging
from typing import Any, Optional
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.ai_provider import get_ai_provider
from app.core.config import settings
from app.services import flow_matching_engine
from app.services.l1_category_service import (
DEFAULT_L1_CATEGORIES, get_enabled_categories, is_category_enabled,
)
from app.services.llm_utils import parse_llm_json
logger = logging.getLogger(__name__)
MATCH_THRESHOLD = 0.75
SUGGEST_THRESHOLD = 0.60
_CLASSIFY_PROMPT = (
"Classify the IT support problem into exactly one of these category keys, "
"or 'unknown'. Return JSON {\"category\":\"<key>\"} only.\nKEYS: "
+ ", ".join(DEFAULT_L1_CATEGORIES)
)
async def classify(problem_text: str) -> str:
"""Map a problem to a category key via a short model call; keyword fallback."""
try:
provider = get_ai_provider(settings.get_model_for_action("l1_classify"))
raw, _, _ = await provider.generate_json(
system_prompt=_CLASSIFY_PROMPT,
messages=[{"role": "user", "content": problem_text}],
max_tokens=64,
)
cat = parse_llm_json(raw).get("category", "unknown")
return cat if cat in DEFAULT_L1_CATEGORIES else "unknown"
except Exception as e: # noqa: BLE001 — fall back, never hard-fail intake
logger.warning("classify model call failed (%s); keyword fallback", e)
text = problem_text.lower()
for cat in DEFAULT_L1_CATEGORIES:
if any(tok in text for tok in cat.split("_")):
return cat
return "unknown"
async def match_or_build(
account_id: UUID,
problem_text: str,
problem_domain: Optional[str],
ticket_ref: str,
*,
db: AsyncSession,
force_build: bool = False,
) -> dict[str, Any]:
if not force_build:
hits = await flow_matching_engine.find_matches(
problem_text, problem_domain, account_id, db)
best = max(hits, key=lambda h: h["score"], default=None) if hits else None
if best and best["score"] >= MATCH_THRESHOLD:
return {"outcome": "matched", "flow_id": best["tree_id"], "session_kind": "flow"}
if best and best["score"] >= SUGGEST_THRESHOLD:
return {"outcome": "suggest",
"near_miss": {"flow_id": best["tree_id"], "flow_name": best["tree_name"],
"score": best["score"]},
"can_build": True}
category = await classify(problem_text)
enabled = await get_enabled_categories(account_id, db)
if not is_category_enabled(category, enabled):
return {"outcome": "out_of_scope", "category": category}
return {"outcome": "build", "session_kind": "ai_build", "category": category}
- Step 4: Run tests to verify they pass
Run: docker exec resolutionflow_backend pytest tests/test_match_or_build.py -v
Expected: PASS (5 tests).
- Step 5: Commit
git add backend/app/services/match_or_build.py backend/tests/test_match_or_build.py
git commit -m "feat(l1): match_or_build orchestrator + classify (match-first, gate-on-build)"
Task 7: Session service — start_ai_build_session
Files:
-
Modify:
backend/app/services/l1_session_service.py -
Test:
backend/tests/test_l1_session_service.py(add) -
Step 1: Write the failing test
# add to backend/tests/test_l1_session_service.py
@pytest.mark.asyncio
async def test_start_ai_build_session(db_session, l1_user):
from app.services import l1_session_service as svc
s = await svc.start_ai_build_session(
db_session, account_id=l1_user.account_id, user=l1_user,
ticket_id="t-ai", ticket_kind="internal",
)
assert s.session_kind == "ai_build"
assert s.flow_id is None and s.flow_proposal_id is None
assert s.status == "active"
(Use the same fixtures the existing tests in this file use for db_session/l1_user.)
- Step 2: Run test to verify it fails
Run: docker exec resolutionflow_backend pytest tests/test_l1_session_service.py::test_start_ai_build_session -v
Expected: FAIL — AttributeError: start_ai_build_session.
- Step 3: Implement (mirror
start_adhoc_session)
In backend/app/services/l1_session_service.py, after start_adhoc_session:
async def start_ai_build_session(
db: AsyncSession,
*,
account_id: UUID,
user: User,
ticket_id: str,
ticket_kind: str,
) -> L1WalkSession:
"""Start an AI-built tree session (nodes generated on demand via next-node)."""
session = L1WalkSession(
account_id=account_id,
created_by_user_id=user.id,
acting_as=_resolve_acting_as(user),
ticket_id=ticket_id,
ticket_kind=ticket_kind,
session_kind="ai_build",
)
db.add(session)
await db.flush()
return session
- Step 4: Run test to verify it passes
Run: docker exec resolutionflow_backend pytest tests/test_l1_session_service.py::test_start_ai_build_session -v
Expected: PASS.
- Step 5: Commit
git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py
git commit -m "feat(l1): start_ai_build_session"
Task 8: Session service — advance_ai_build (record answer + generate next node)
Files:
-
Modify:
backend/app/services/l1_session_service.py -
Test:
backend/tests/test_l1_session_service.py(add) -
Step 1: Write the failing test
# add to backend/tests/test_l1_session_service.py
@pytest.mark.asyncio
async def test_advance_ai_build_appends_and_returns_next(db_session, l1_user, monkeypatch):
from app.services import l1_session_service as svc
from app.services import ai_tree_builder
s = await svc.start_ai_build_session(
db_session, account_id=l1_user.account_id, user=l1_user,
ticket_id="t-ai", ticket_kind="internal")
async def fake_next(problem, category, walked):
return {"node_type": "resolved", "id": "done", "text": "Fixed."}
monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next)
next_node = await svc.advance_ai_build(
db_session, session_id=s.id, problem_text="printer", category="printer",
node_id="n1", answer="no", note=None)
assert next_node["node_type"] == "resolved"
refreshed = await db_session.get(type(s), s.id)
assert len(refreshed.walked_path) == 1
assert refreshed.walked_path[0]["answer"] == "no"
- Step 2: Run test to verify it fails
Run: docker exec resolutionflow_backend pytest tests/test_l1_session_service.py::test_advance_ai_build_appends_and_returns_next -v
Expected: FAIL — AttributeError: advance_ai_build.
- Step 3: Implement
Add to l1_session_service.py (imports at top: from app.services import ai_tree_builder):
async def advance_ai_build(
db: AsyncSession,
*,
session_id: UUID,
problem_text: str,
category: str,
node_id: Optional[str] = None,
answer: Optional[str] = None,
note: Optional[str] = None,
) -> dict:
"""Append the answered/acked node to walked_path, then generate the next node.
On the first call (node_id is None) nothing is appended — we just generate the
first node. Returns the next node dict (caller persists current_node_id).
Raises ValueError on missing/inactive/non-ai_build session.
"""
session = await db.get(L1WalkSession, session_id)
if not session:
raise ValueError(f"L1WalkSession {session_id} not found")
if session.session_kind != "ai_build":
raise ValueError("advance_ai_build requires an ai_build session")
if session.status != "active":
raise ValueError(f"Session {session_id} is not active (status={session.status})")
if node_id is not None:
# Find the text of the node being answered from current_node payload if
# the caller passed it via walk; otherwise store id+answer (text optional).
entry = {"node_type": "question" if answer in ("yes", "no") else "instruction",
"id": node_id, "answer": answer, "l1_note": note}
session.walked_path = [*session.walked_path, entry]
next_node = await ai_tree_builder.generate_next_node(
problem_text, category, session.walked_path)
session.current_node_id = next_node.get("id")
session.last_step_at = datetime.now(timezone.utc)
await db.flush()
return next_node
Note: the node
textfor traversed nodes is filled by the endpoint layer (Task 10) which knows the current node it served;advance_ai_buildrecords the answer against the id. The endpoint passes the served node's text innote-adjacent payload if richer transcript is desired — keep Phase 2A minimal (id + answer).
- Step 4: Run test to verify it passes
Run: docker exec resolutionflow_backend pytest tests/test_l1_session_service.py::test_advance_ai_build_appends_and_returns_next -v
Expected: PASS.
- Step 5: Commit
git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py
git commit -m "feat(l1): advance_ai_build — record answer + generate next node"
Task 9: Session service — flywheel capture on resolve + engineer notification on escalate
Files:
-
Modify:
backend/app/services/l1_session_service.py(resolve,escalate) -
Modify:
backend/app/schemas/notification.py(VALID_EVENTS) -
Modify:
backend/app/services/notification_service.py(link + body) -
Test:
backend/tests/test_l1_session_service.py(add) -
Step 1: Write the failing tests
# add to backend/tests/test_l1_session_service.py
@pytest.mark.asyncio
async def test_resolve_ai_build_creates_outcome_validated_proposal(db_session, l1_user, monkeypatch):
from app.services import l1_session_service as svc
from app.models.flow_proposal import FlowProposal
from sqlalchemy import select
s = await svc.start_ai_build_session(
db_session, account_id=l1_user.account_id, user=l1_user,
ticket_id="t-ai", ticket_kind="internal")
s.walked_path = [
{"node_type": "question", "id": "n1", "text": "On?", "answer": "no"},
{"node_type": "resolved", "id": "n2", "text": "Fixed."},
]
await db_session.flush()
await svc.resolve(db_session, session_id=s.id, helpful=True, resolution_notes="ok")
props = (await db_session.execute(
select(FlowProposal).where(FlowProposal.l1_session_id == s.id))).scalars().all()
assert len(props) == 1
assert props[0].source == "ai_realtime_l1"
assert props[0].validated_by_outcome is True
assert props[0].source_session_id is None
assert props[0].proposed_flow_data["tree_structure"]["id"] == "n1"
@pytest.mark.asyncio
async def test_escalate_notifies_engineers(db_session, l1_user, monkeypatch):
from app.services import l1_session_service as svc
calls = {}
async def fake_notify(event, account_id, payload, db, target_user_ids=None):
calls["event"] = event
calls["target_user_ids"] = target_user_ids
monkeypatch.setattr(svc, "notify", fake_notify)
s = await svc.start_ai_build_session(
db_session, account_id=l1_user.account_id, user=l1_user,
ticket_id="t-ai", ticket_kind="internal")
await svc.escalate(db_session, session_id=s.id, reason="stuck", reason_category="exhausted_safe_steps")
assert calls["event"] == "l1.session.escalated"
assert calls["target_user_ids"] is not None # explicit engineer recipients
- Step 2: Run tests to verify they fail
Run: docker exec resolutionflow_backend pytest tests/test_l1_session_service.py -k "ai_build_creates_outcome or notifies_engineers" -v
Expected: FAIL — no proposal created / notify not called with the new event.
- Step 3a: Add notification event + link + body
backend/app/schemas/notification.py — add to VALID_EVENTS:
"l1.session.escalated",
backend/app/services/notification_service.py — in _build_notification_link links dict add:
"l1.session.escalated": "/escalations",
and in the body-template builder (the bodies dict near _build_notification_link) add:
"l1.session.escalated": "L1 escalated a ticket: {problem_summary}",
- Step 3b: Flywheel capture in
resolve
In l1_session_service.resolve, after the existing proposal.validated_by_outcome block and before the ticket close, add (imports: from app.services import ai_tree_builder, from app.models.flow_proposal import FlowProposal already present):
if helpful and session.session_kind == "ai_build" and session.walked_path:
tree_structure = ai_tree_builder.normalize_walked_path(session.walked_path)
db.add(FlowProposal(
account_id=session.account_id,
l1_session_id=session.id,
source_session_id=None,
proposal_type="new_flow",
title=(session.resolution_notes or "AI L1 resolution")[:255],
proposed_flow_data={"tree_structure": tree_structure, "match_keywords": []},
source="ai_realtime_l1",
validated_by_outcome=True,
linked_ticket_id=session.ticket_id,
linked_ticket_kind=session.ticket_kind,
status="pending",
))
Dedupe via
_find_similar_pending_proposalis a nice-to-have; Phase 2A inserts directly. If duplicate noise appears in QA, wire the existing dedupe helper here.
- Step 3c: Engineer notification in
escalate
In l1_session_service.escalate, after await log_audit(...) and before the final await db.flush(), add (imports: from app.services.notification_service import notify, from app.models.user import User, from sqlalchemy import select):
eng_rows = await db.execute(
select(User.id).where(
User.account_id == session.account_id,
User.is_active.is_(True),
User.account_role.in_(("owner", "admin", "engineer")),
)
)
target_ids = [r[0] for r in eng_rows.all()]
await notify(
"l1.session.escalated",
session.account_id,
{"problem_summary": session.ticket_id, "session_id": str(session.id),
"reason_category": reason_category},
db,
target_user_ids=target_ids,
)
- Step 4: Run tests to verify they pass
Run: docker exec resolutionflow_backend pytest tests/test_l1_session_service.py -k "ai_build_creates_outcome or notifies_engineers" -v
Expected: PASS.
- Step 5: Run notification schema test
Run: docker exec resolutionflow_backend pytest tests/ -k notification -v
Expected: PASS (the new event is accepted by validate_event_keys).
- Step 6: Commit
git add backend/app/services/l1_session_service.py backend/app/schemas/notification.py backend/app/services/notification_service.py backend/tests/test_l1_session_service.py
git commit -m "feat(l1): flywheel capture on resolve + engineer notification on escalate"
Task 10: API — intake dispatch, next-node, escalations; schemas + deps
Files:
-
Modify:
backend/app/schemas/l1.py -
Modify:
backend/app/api/deps.py -
Modify:
backend/app/api/endpoints/l1.py -
Test:
backend/tests/test_l1_api_ai_build.py -
Step 1: Write the failing test
# backend/tests/test_l1_api_ai_build.py
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_intake_build_outcome_creates_ai_build_session(l1_client):
with patch("app.api.endpoints.l1.match_or_build.match_or_build",
new=AsyncMock(return_value={"outcome": "build", "session_kind": "ai_build", "category": "printer"})):
r = await l1_client.post("/api/v1/l1/intake", json={"problem_statement": "printer jam"})
assert r.status_code == 200
body = r.json()
assert body["outcome"] == "build"
assert body["session_kind"] == "ai_build"
assert body["session_id"]
@pytest.mark.asyncio
async def test_intake_out_of_scope(l1_client):
with patch("app.api.endpoints.l1.match_or_build.match_or_build",
new=AsyncMock(return_value={"outcome": "out_of_scope", "category": "unknown"})):
r = await l1_client.post("/api/v1/l1/intake", json={"problem_statement": "weird"})
assert r.status_code == 200
assert r.json()["outcome"] == "out_of_scope"
(Use the existing L1 client fixture pattern from test_l1_api*/conftest; l1_client is an authed AsyncClient for an l1_tech user.)
- Step 2: Run test to verify it fails
Run: docker exec resolutionflow_backend pytest tests/test_l1_api_ai_build.py -v
Expected: FAIL — outcome not in response / KeyError.
- Step 3a: Schemas
In backend/app/schemas/l1.py:
- Change
IntakeResponse.session_kindliteral to includeai_buildand makesession_id/session_kindoptional (non-build outcomes have no session):
class IntakeResponse(BaseModel):
outcome: Literal["matched", "suggest", "out_of_scope", "build"]
session_id: Optional[UUID] = None
session_kind: Optional[Literal["flow", "proposal", "adhoc", "ai_build"]] = None
ticket_id: Optional[str] = None
ticket_kind: Optional[str] = None
flow_id: Optional[UUID] = None # for 'matched'
near_miss: Optional[dict] = None # for 'suggest'
category: Optional[str] = None # for 'out_of_scope'
Add NextNodeRequest / NextNodeResponse:
class NextNodeRequest(BaseModel):
node_id: Optional[str] = None
answer: Optional[str] = None # 'yes' | 'no' for questions
acknowledged: Optional[bool] = None
note: Optional[str] = None
class NextNodeResponse(BaseModel):
node: dict
session_status: str
Ensure IntakeRequest has an optional force_build: bool = False and flow_id is no longer required.
- Step 3b: Auth dep
In backend/app/api/deps.py, after require_account_owner:
async def require_account_owner_or_admin(
current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
"""Require account owner or account-admin (blocks engineers); super_admin bypass."""
if current_user.is_super_admin:
return current_user
if current_user.account_role in ("owner", "admin"):
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account owner or admin access required",
)
- Step 3c: Rewrite intake + add next-node + escalations in
l1.py
Replace the intake body to run the orchestrator (imports: from app.services import match_or_build):
@router.post("/intake", response_model=IntakeResponse)
async def intake(
payload: IntakeRequest,
db: Annotated[AsyncSession, Depends(get_db)],
user: Annotated[User, Depends(require_l1_or_coverage)],
):
"""L1 intake: match a published flow, else gate + build, else suggest/out-of-scope."""
result = await match_or_build.match_or_build(
user.account_id, payload.problem_statement, None, ticket_ref="",
db=db, force_build=payload.force_build,
)
outcome = result["outcome"]
if outcome in ("suggest", "out_of_scope"):
await db.commit()
return IntakeResponse(outcome=outcome, near_miss=result.get("near_miss"),
category=result.get("category"))
# matched OR build → create a ticket and a session
ticket = await internal_ticket_service.create_ticket(
db, account_id=user.account_id, created_by_user_id=user.id,
problem_statement=payload.problem_statement,
customer_name=payload.customer_name, customer_contact=payload.customer_contact,
)
if outcome == "matched":
session = await l1_session_service.start_flow_session(
db, account_id=user.account_id, user=user, flow_id=UUID(result["flow_id"]),
ticket_id=str(ticket.id), ticket_kind="internal")
else: # build
session = await l1_session_service.start_ai_build_session(
db, account_id=user.account_id, user=user,
ticket_id=str(ticket.id), ticket_kind="internal")
await db.commit()
return IntakeResponse(
outcome=outcome, session_id=session.id, session_kind=session.session_kind,
ticket_id=str(ticket.id), ticket_kind="internal",
flow_id=UUID(result["flow_id"]) if outcome == "matched" else None,
)
Add next-node endpoint:
@router.post("/sessions/{session_id}/next-node", response_model=NextNodeResponse)
async def next_node(
session_id: UUID,
payload: NextNodeRequest,
db: Annotated[AsyncSession, Depends(get_db)],
user: Annotated[User, Depends(require_l1_or_coverage)],
):
session = await _get_session_or_404(db, session_id, user)
# problem_text + category come from the linked internal ticket + stored category.
ticket = await internal_ticket_service.get_ticket(db, ticket_id=UUID(session.ticket_id))
problem_text = ticket.problem_statement if ticket else ""
category = session.walked_path[0].get("category") if session.walked_path else None
try:
node = await l1_session_service.advance_ai_build(
db, session_id=session_id, problem_text=problem_text,
category=category or "unknown", node_id=payload.node_id,
answer=payload.answer, note=payload.note)
except ValueError as e:
raise HTTPException(status_code=http_status.HTTP_409_CONFLICT, detail=str(e))
await db.commit()
return NextNodeResponse(node=node, session_status=session.status)
@router.get("/escalations", response_model=list[WalkSessionResponse])
async def l1_escalations(
db: Annotated[AsyncSession, Depends(get_db)],
user: Annotated[User, Depends(require_engineer_or_admin)],
limit: int = 50,
):
rows = await db.execute(
select(L1WalkSession)
.where(L1WalkSession.account_id == user.account_id,
L1WalkSession.status == "escalated")
.order_by(L1WalkSession.resolved_at.desc()).limit(limit))
return [_to_response(s) for s in rows.scalars()]
Update the import line for deps: from app.api.deps import get_db, require_l1_or_coverage, require_engineer_or_admin and add the new schema imports (NextNodeRequest, NextNodeResponse).
Category persistence: store the resolved category on the first walked_path entry. In
advance_ai_build, whennode_id is None(first call), seedwalked_pathwith a hidden meta entry{"node_type":"meta","category":category}OR persist category on the session. Simplest: pass category from intake by storing it — add acategoryfield write instart_ai_build_sessionis out of scope; instead the endpoint seeds the first node call with the classified category by re-classifying once and caching in walked_path meta. Decision for implementer: add a nullablemetafirst entry on session creation in the intakebuildbranch: afterstart_ai_build_session, calladvance_ai_build(..., node_id=None)is NOT done here; instead store category by settingsession.walked_path=[{"node_type":"meta","category":result["category"]}]before commit, and havenormalize_walked_path/generate_next_nodeskipmetaentries. Add a one-line filter in both.
- Step 4: Handle the
metaentry
In ai_tree_builder._build_context and normalize_walked_path, skip entries with node_type == "meta":
walked_path = [s for s in walked_path if s.get("node_type") != "meta"]
(add as the first line of both functions). In the next-node endpoint, read category from the meta entry:
category = next((s.get("category") for s in session.walked_path if s.get("node_type") == "meta"), "unknown")
- Step 5: Run tests to verify they pass
Run: docker exec resolutionflow_backend pytest tests/test_l1_api_ai_build.py -v
Expected: PASS.
- Step 6: Commit
git add backend/app/schemas/l1.py backend/app/api/deps.py backend/app/api/endpoints/l1.py backend/tests/test_l1_api_ai_build.py
git commit -m "feat(l1): intake dispatch + next-node + escalations endpoints, owner/admin dep"
Task 11: Category settings API
Files:
-
Create:
backend/app/schemas/l1_categories.py -
Modify:
backend/app/api/endpoints/accounts.py -
Test:
backend/tests/test_l1_categories_api.py -
Step 1: Write the failing test
# backend/tests/test_l1_categories_api.py
import pytest
@pytest.mark.asyncio
async def test_get_categories(owner_client):
r = await owner_client.get("/api/v1/accounts/me/l1-categories")
assert r.status_code == 200
body = r.json()
assert "enabled" in body and "available" in body and "hard_floor" in body
@pytest.mark.asyncio
async def test_patch_categories_owner_only(owner_client, engineer_client):
r = await engineer_client.patch("/api/v1/accounts/me/l1-categories",
json={"enabled": ["printer"]})
assert r.status_code == 403
r2 = await owner_client.patch("/api/v1/accounts/me/l1-categories",
json={"enabled": ["printer", "vpn_connect"]})
assert r2.status_code == 200
assert set(r2.json()["enabled"]) == {"printer", "vpn_connect"}
- Step 2: Run test to verify it fails
Run: docker exec resolutionflow_backend pytest tests/test_l1_categories_api.py -v
Expected: FAIL — 404 (routes not defined).
- Step 3: Schema + endpoints
# backend/app/schemas/l1_categories.py
from pydantic import BaseModel
class L1CategoriesResponse(BaseModel):
enabled: list[str]
available: list[str]
hard_floor: list[str]
class L1CategoriesUpdate(BaseModel):
enabled: list[str]
In backend/app/api/endpoints/accounts.py (imports: the category service + new deps/schemas):
@router.get("/me/l1-categories", response_model=L1CategoriesResponse)
async def get_l1_categories(
db: Annotated[AsyncSession, Depends(get_db)],
user: Annotated[User, Depends(require_l1_or_above)],
):
enabled = await l1_category_service.get_enabled_categories(user.account_id, db)
return L1CategoriesResponse(
enabled=enabled,
available=l1_category_service.DEFAULT_L1_CATEGORIES,
hard_floor=l1_category_service.HARD_FLOOR_FORBIDDEN,
)
@router.patch("/me/l1-categories", response_model=L1CategoriesResponse)
async def set_l1_categories(
payload: L1CategoriesUpdate,
db: Annotated[AsyncSession, Depends(get_db)],
user: Annotated[User, Depends(require_account_owner_or_admin)],
):
enabled = await l1_category_service.set_enabled_categories(user.account_id, payload.enabled, db)
await db.commit()
return L1CategoriesResponse(
enabled=enabled,
available=l1_category_service.DEFAULT_L1_CATEGORIES,
hard_floor=l1_category_service.HARD_FLOOR_FORBIDDEN,
)
Add imports: from app.services import l1_category_service, from app.api.deps import require_l1_or_above, require_account_owner_or_admin, from app.schemas.l1_categories import L1CategoriesResponse, L1CategoriesUpdate.
- Step 4: Run test to verify it passes
Run: docker exec resolutionflow_backend pytest tests/test_l1_categories_api.py -v
Expected: PASS. (If engineer_client/owner_client fixtures don't exist, add them mirroring l1_client with account_role engineer/owner.)
- Step 5: Commit
git add backend/app/schemas/l1_categories.py backend/app/api/endpoints/accounts.py backend/tests/test_l1_categories_api.py
git commit -m "feat(l1): account L1 category settings API (owner/admin write)"
Task 12: Backend integration test — full intake→build→resolve and →escalate
Files:
-
Test:
backend/tests/test_l1_ai_build_flow.py -
Step 1: Write the integration test
# backend/tests/test_l1_ai_build_flow.py
import pytest
from unittest.mock import AsyncMock, patch
from sqlalchemy import select
from app.models.flow_proposal import FlowProposal
@pytest.mark.asyncio
async def test_intake_build_walk_resolve_creates_proposal(l1_client, db_session, monkeypatch):
from app.services import ai_tree_builder
# 1. force a build outcome
with patch("app.api.endpoints.l1.match_or_build.match_or_build",
new=AsyncMock(return_value={"outcome": "build", "session_kind": "ai_build", "category": "printer"})):
r = await l1_client.post("/api/v1/l1/intake", json={"problem_statement": "printer jam"})
sid = r.json()["session_id"]
# 2. drive next-node deterministically to a resolved node
seq = iter([
{"node_type": "question", "id": "n1", "text": "Powered on?"},
{"node_type": "resolved", "id": "n2", "text": "Fixed."},
])
async def fake_next(problem, category, walked):
return next(seq)
monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next)
r1 = await l1_client.post(f"/api/v1/l1/sessions/{sid}/next-node", json={})
assert r1.json()["node"]["node_type"] == "question"
r2 = await l1_client.post(f"/api/v1/l1/sessions/{sid}/next-node",
json={"node_id": "n1", "answer": "no"})
assert r2.json()["node"]["node_type"] == "resolved"
# 3. resolve → proposal
await l1_client.post(f"/api/v1/l1/sessions/{sid}/resolve",
json={"helpful": True, "resolution_notes": "ok"})
props = (await db_session.execute(
select(FlowProposal).where(FlowProposal.source == "ai_realtime_l1"))).scalars().all()
assert len(props) >= 1
- Step 2: Run test
Run: docker exec resolutionflow_backend pytest tests/test_l1_ai_build_flow.py -v
Expected: PASS. Fix any wiring gaps surfaced here (this is the end-to-end backend gate).
- Step 3: Run the full L1 backend suite for regressions
Run: docker exec resolutionflow_backend pytest tests/ -k "l1 or match_or_build or ai_tree_builder or notification" -q
Expected: all pass.
- Step 4: Commit
git add backend/tests/test_l1_ai_build_flow.py
git commit -m "test(l1): integration — intake build → walk → resolve → proposal"
Task 13: Frontend — API client + types
Files:
-
Modify:
frontend/src/types/l1.ts,frontend/src/api/l1.ts -
Step 1: Add types
In frontend/src/types/l1.ts:
export type IntakeOutcome = 'matched' | 'suggest' | 'out_of_scope' | 'build'
export interface IntakeResult {
outcome: IntakeOutcome
session_id?: string
session_kind?: 'flow' | 'proposal' | 'adhoc' | 'ai_build'
ticket_id?: string
ticket_kind?: string
flow_id?: string
near_miss?: { flow_id: string; flow_name: string; score: number }
category?: string
}
export type TreeNode =
| { node_type: 'question'; id: string; text: string }
| { node_type: 'instruction'; id: string; text: string }
| { node_type: 'resolved'; id: string; text: string }
| { node_type: 'escalate'; id: string; reason_category?: string; text: string }
| { node_type: 'needs_review'; id: string; text: string }
export interface NextNodeResult { node: TreeNode; session_status: string }
export interface L1Categories { enabled: string[]; available: string[]; hard_floor: string[] }
- Step 2: Add API methods
In frontend/src/api/l1.ts:
nextNode: (sessionId: string, body: { node_id?: string; answer?: 'yes' | 'no'; acknowledged?: boolean; note?: string }) =>
apiClient.post<NextNodeResult>(`/l1/sessions/${sessionId}/next-node`, body).then(r => r.data),
getCategories: () =>
apiClient.get<L1Categories>('/accounts/me/l1-categories').then(r => r.data),
setCategories: (enabled: string[]) =>
apiClient.patch<L1Categories>('/accounts/me/l1-categories', { enabled }).then(r => r.data),
escalations: () =>
apiClient.get<WalkSession[]>('/l1/escalations').then(r => r.data),
Update the existing intake method's return type to IntakeResult.
- Step 3: Type-check
Run: docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json'
Expected: clean (callers updated in Tasks 14-15; if tsc flags intake callers, proceed — they're fixed next).
- Step 4: Commit
git add frontend/src/types/l1.ts frontend/src/api/l1.ts
git commit -m "feat(l1): frontend api/types for next-node, outcome, categories"
Task 14: Frontend — L1Dashboard intake dispatch
Files:
-
Modify:
frontend/src/pages/l1/L1Dashboard.tsx -
Step 1: Replace
handleStartto dispatch on outcome
const handleStart = async () => {
if (!problem.trim()) return
setSubmitting(true)
try {
const res = await l1Api.intake({
problem_statement: problem.trim(),
customer_name: customerName.trim() || undefined,
customer_contact: customerContact.trim() || undefined,
})
if (res.outcome === 'matched' || res.outcome === 'build') {
navigate(`/l1/walk/${res.session_id}`)
} else if (res.outcome === 'suggest') {
setSuggestion(res.near_miss ?? null) // render an inline prompt (below)
} else if (res.outcome === 'out_of_scope') {
setOutOfScope(res.category ?? 'unknown')
}
} catch (err) {
const detail = (err as { response?: { data?: { detail?: string } } }).response?.data?.detail
toast.error(typeof detail === 'string' ? detail : 'Failed to start. Try again.')
} finally {
setSubmitting(false)
}
}
const buildNew = async () => {
setSuggestion(null)
const res = await l1Api.intake({ problem_statement: problem.trim(), force_build: true })
if (res.outcome === 'build') navigate(`/l1/walk/${res.session_id}`)
else if (res.outcome === 'out_of_scope') setOutOfScope(res.category ?? 'unknown')
}
Add state near the top: const [suggestion, setSuggestion] = useState<{flow_id:string;flow_name:string;score:number}|null>(null) and const [outOfScope, setOutOfScope] = useState<string|null>(null). Add force_build as an optional field in the l1Api.intake body type.
- Step 2: Render the suggest + out-of-scope prompts
Below the intake card, add:
{suggestion && (
<div className="rounded-lg border border-default bg-card p-4 space-y-3">
<p className="text-sm">Found a similar flow: <strong>{suggestion.flow_name}</strong>.</p>
<div className="flex gap-2">
<button className="rounded-md bg-accent text-white px-4 py-2 text-sm"
onClick={() => navigate('/l1/walk/use-flow', { state: { flowId: suggestion.flow_id } })}>
Use this flow
</button>
<button className="rounded-md border border-default px-4 py-2 text-sm" onClick={buildNew}>
Build new
</button>
</div>
</div>
)}
{outOfScope && (
<div className="rounded-lg border border-default bg-card p-4 space-y-3">
<p className="text-sm">This problem isn’t in your enabled L1 categories. Start an ad-hoc walk or escalate.</p>
{/* reuse existing adhoc/escalate CTAs from Phase 1 */}
</div>
)}
For "Use this flow", reuse the Phase-1 matched-flow path: re-call intake is unnecessary — the matched outcome already created a session. Simplest Phase 2A: when
outcome==='matched'we already navigated; forsuggest → Use this flow, calll1Api.intake({problem_statement, ...})is the matched path again is not guaranteed. Implementer: on "Use this flow", POST intake with the original text (it will match again and returnmatchedwith a session) — acceptable for Phase 2A.
- Step 3: Type-check + lint
Run: docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npx eslint src/pages/l1/L1Dashboard.tsx'
Expected: clean.
- Step 4: Commit
git add frontend/src/pages/l1/L1Dashboard.tsx
git commit -m "feat(l1): dashboard intake dispatch on match_or_build outcome"
Task 15: Frontend — L1WalkTreeVariant real node rendering + disclaimer
Files:
-
Modify:
frontend/src/components/l1/L1WalkTreeVariant.tsx -
Step 1: Drive nodes from
/next-node
Replace the synthetic stepping. On mount, if session.session_kind === 'ai_build', fetch the first node (l1Api.nextNode(session.id, {})). On answer/ack, POST the current node id + answer, render the returned node. Terminal nodes (resolved/escalate/needs_review) switch to the existing Resolve/Escalate modal affordances.
const [node, setNode] = useState<TreeNode | null>(null)
const [loading, setLoading] = useState(false)
useEffect(() => {
if (session.session_kind !== 'ai_build') return
setLoading(true)
l1Api.nextNode(session.id, {}).then(r => setNode(r.node)).finally(() => setLoading(false))
}, [session.id, session.session_kind])
const answer = async (a: 'yes' | 'no') => {
if (!node) return
setLoading(true)
try {
const r = await l1Api.nextNode(session.id, { node_id: node.id, answer: a })
setNode(r.node)
} finally { setLoading(false) }
}
const acknowledge = async () => {
if (!node) return
setLoading(true)
try {
const r = await l1Api.nextNode(session.id, { node_id: node.id, acknowledged: true })
setNode(r.node)
} finally { setLoading(false) }
}
- Step 2: Render by node_type + disclaimer banner
{session.session_kind === 'ai_build' && (
<div className="rounded-md border border-amber-500/30 bg-amber-500/10 px-4 py-2 text-xs text-amber-200">
These are high-confidence troubleshooting steps, but they come from outside your
organization’s knowledge base — review them before acting. When in doubt, escalate early.
</div>
)}
{loading && <p className="text-sm text-muted-foreground">Thinking through the next step…</p>}
{node?.node_type === 'question' && (
<>
<p className="text-lg">{node.text}</p>
<div className="flex gap-3">
<button onClick={() => answer('yes')} className="rounded-md bg-accent text-white px-5 py-2">Yes</button>
<button onClick={() => answer('no')} className="rounded-md border border-default px-5 py-2">No</button>
</div>
</>
)}
{node?.node_type === 'instruction' && (
<>
<p className="text-lg">{node.text}</p>
<button onClick={acknowledge} className="rounded-md bg-accent text-white px-5 py-2">Done — next</button>
</>
)}
{(node?.node_type === 'resolved') && (
<ResolveCta sessionId={session.id} prefillNote={node.text} /> /* opens existing Resolve modal */
)}
{(node?.node_type === 'escalate' || node?.node_type === 'needs_review') && (
<EscalateCta sessionId={session.id} reason={node.text} /> /* opens existing Escalate modal */
)}
Wire ResolveCta/EscalateCta to the existing WalkModals Resolve/Escalate handlers already in this component (reuse, don't duplicate).
- Step 3: Type-check + lint
Run: docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npx eslint src/components/l1/L1WalkTreeVariant.tsx'
Expected: clean.
- Step 4: Commit
git add frontend/src/components/l1/L1WalkTreeVariant.tsx
git commit -m "feat(l1): walker renders AI-built nodes via next-node + disclaimer banner"
Task 16: Frontend — admin category settings page
Files:
-
Create:
frontend/src/pages/account/L1CategoriesPage.tsx -
Modify: router + account nav (follow the existing
/account/*child-route pattern) -
Step 1: Build the page
// frontend/src/pages/account/L1CategoriesPage.tsx
import { useEffect, useState } from 'react'
import { l1Api } from '@/api/l1'
import { toast } from '@/lib/toast'
import type { L1Categories } from '@/types/l1'
export default function L1CategoriesPage() {
const [data, setData] = useState<L1Categories | null>(null)
useEffect(() => { l1Api.getCategories().then(setData) }, [])
if (!data) return null
const toggle = async (cat: string) => {
const enabled = data.enabled.includes(cat)
? data.enabled.filter(c => c !== cat) : [...data.enabled, cat]
const updated = await l1Api.setCategories(enabled)
setData({ ...data, enabled: updated.enabled })
toast.success('L1 categories updated')
}
return (
<div className="max-w-2xl space-y-6">
<h1 className="font-heading text-2xl font-bold">L1 AI build categories</h1>
<p className="text-sm text-muted-foreground">
Problems in enabled categories can be built into AI troubleshooting trees when no
flow exists. Disabled categories fall back to ad-hoc or escalation.
</p>
<div className="space-y-2">
{data.available.map(cat => (
<label key={cat} className="flex items-center gap-3 rounded-md border border-default bg-card px-4 py-3">
<input type="checkbox" checked={data.enabled.includes(cat)} onChange={() => toggle(cat)} />
<span className="text-sm">{cat.replace(/_/g, ' ')}</span>
</label>
))}
</div>
<div>
<h2 className="font-heading text-sm font-semibold mb-2">Always excluded (safety)</h2>
<ul className="text-xs text-muted-foreground list-disc pl-5">
{data.hard_floor.map(h => <li key={h}>{h.replace(/_/g, ' ')}</li>)}
</ul>
</div>
</div>
)
}
- Step 2: Register route + nav
Add a lazy import + a child route under the /account subtree in frontend/src/router.tsx (mirror existing account children, e.g. { path: 'l1-categories', element: page(L1CategoriesPage) } under the AccountLayout route), and a nav entry in the account settings sidebar/menu following the existing pattern. Gate visibility to owner/admin in the menu (reuse usePermissions).
- Step 3: Type-check + lint + build
Run: docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npx eslint src/pages/account/L1CategoriesPage.tsx'
Expected: clean.
- Step 4: Commit
git add frontend/src/pages/account/L1CategoriesPage.tsx frontend/src/router.tsx
git commit -m "feat(l1): admin L1 category settings page"
Task 17: Frontend — ProposalDetail L1 source + engineer escalations section
Files:
-
Modify:
frontend/src/components/flowpilot/ProposalDetail.tsx -
Modify:
frontend/src/pages/EscalationQueuePage.tsx -
Step 1: ProposalDetail — L1-sourced source block (Finding 1)
Where it currently renders the /pilot/{source_session_id} link, branch on the new l1_session_id:
{proposal.l1_session_id ? (
<div className="text-sm text-text-muted">
Source: AI L1 walk (outcome-validated). Unexplored branches are marked
<span className="font-medium"> needs review</span> below.
</div>
) : proposal.source_session_id ? (
<Link to={`/pilot/${proposal.source_session_id}`} target="_blank" className="...">
{/* existing link */}
</Link>
) : null}
Add l1_session_id?: string | null to the proposal type used here.
- Step 2: EscalationQueuePage — L1 escalations section
Fetch l1Api.escalations() and render a section above/below the existing queue:
const [l1Escalations, setL1Escalations] = useState<WalkSession[]>([])
useEffect(() => { l1Api.escalations().then(setL1Escalations).catch(() => setL1Escalations([])) }, [])
// render: problem (from ticket), walked-path length, escalated-at, reason
Each row shows the walked-path summary and links to a read-only view (Phase 2A: a simple expandable row is sufficient; no new route required).
- Step 3: Type-check + lint + build
Run: docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npm run build'
Expected: tsc clean, build succeeds.
- Step 4: Commit
git add frontend/src/components/flowpilot/ProposalDetail.tsx frontend/src/pages/EscalationQueuePage.tsx
git commit -m "feat(l1): proposal L1 source block + engineer L1-escalations section"
Task 18: E2E — AI build flow
Files:
-
Modify:
frontend/e2e/l1-workspace.spec.ts -
Step 1: Add an AI-build e2e test
Because the builder calls a live model, stub the network at the Playwright layer: intercept POST **/l1/intake to return {outcome:'build', session_kind:'ai_build', session_id:<seeded>} and POST **/l1/sessions/*/next-node to return scripted nodes (question → resolved). Assert: L1 lands on the walker, sees the disclaimer banner, answers the question, reaches the resolved CTA.
test('L1 AI build: intake → answer node → resolve CTA', async ({ page }) => {
await login(page, L1_EMAIL)
await page.route('**/api/v1/l1/intake', route => route.fulfill({
status: 200, contentType: 'application/json',
body: JSON.stringify({ outcome: 'build', session_kind: 'ai_build', session_id: 'e2e-sess', ticket_id: 't', ticket_kind: 'internal' }),
}))
let call = 0
await page.route('**/api/v1/l1/sessions/*/next-node', route => {
call += 1
const node = call === 1
? { node_type: 'question', id: 'n1', text: 'Is it powered on?' }
: { node_type: 'resolved', id: 'n2', text: 'Resolved.' }
route.fulfill({ status: 200, contentType: 'application/json',
body: JSON.stringify({ node, session_status: 'active' }) })
})
// also stub GET session fetch the walker does on load, if any, to return an ai_build session
await page.goto('/l1')
await page.getByPlaceholder(/What's the user calling about/i).fill('printer jam')
await page.getByRole('button', { name: /Start walk/i }).click()
await expect(page.getByText(/outside your organization’s knowledge base/i)).toBeVisible()
await expect(page.getByText('Is it powered on?')).toBeVisible()
await page.getByRole('button', { name: 'No' }).click()
await expect(page.getByText(/Resolved\./i)).toBeVisible()
})
Adjust selectors/route patterns to the actual walker data-loading (stub the session GET the walker performs so it reports session_kind: 'ai_build').
- Step 2: Run e2e locally only if chromium available; otherwise rely on CI
This container cannot launch chromium (sandbox). Push and let CI run npm run test:e2e. Do not block on local e2e.
- Step 3: Commit
git add frontend/e2e/l1-workspace.spec.ts
git commit -m "test(l1): e2e AI build flow (network-stubbed)"
Task 19: Final verification
- Step 1: Backend suite
Run: docker exec resolutionflow_backend pytest tests/ -q
Expected: all pass (note any pre-existing xdist-only failures per Phase-1 acceptance report §7).
- Step 2: Frontend gates
Run: docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npm run lint && npm run build'
Expected: tsc clean, lint 0 errors, build succeeds.
- Step 3: Migration roundtrip on a clean DB
Run: docker exec resolutionflow_backend alembic downgrade -3 && docker exec resolutionflow_backend alembic upgrade head
Expected: clean down+up for the three new migrations (run against a DB without ai_build/L1-proposal rows, or accept the documented downgrade caveat).
- Step 4: Open PR
Push the branch and open a PR to main summarizing Phase 2A, linking the spec, and listing the deferred items (KB grounding/connectors, PSA reassign, escalation package, AI chat handoff, proposal-matching).
Self-Review notes (author)
- Spec coverage: §3 match_or_build → Task 6/10; §4 streaming + node schema → Task 5/8/10/15; §5 safety (classify, constrained prompt, validation, depth cap, disclaimer) → Task 4/5/15; §6 flywheel + §6.1 normalize + §6.2 linkage → Task 3/9; §7 escalation handoff → Task 9/10/17; §8 migrations → Task 1/2/3; §9 API → Task 10/11; §10 frontend → Task 13-17; §11 testing → throughout + Task 12/18.
- Known soft spots flagged for the implementer: category persistence via a
metawalked_path entry (Task 10 Step 3c/Step 4) and the "Use this flow" suggest path (Task 14 Step 2) are the two places to validate carefully during review. - Model calls are mocked/stubbed in tests; a live constrained-decoding smoke test + the Sonnet-vs-Opus benchmark for
l1_realtime_buildshould run in staging before wide enablement (spec §5.3).