Files
resolutionflow/docs/superpowers/plans/2026-05-29-l1-ai-tree-builder-phase-2a.md
Michael Chihlas 23dbcec86e docs(plan): L1 AI decision-tree builder — Phase 2A implementation plan
19 TDD tasks from the approved spec: 3 migrations (ai_build kind, account
categories, FlowProposal l1_session_id), ai_tree_builder (constrained node
gen + validation + normalize), match_or_build orchestrator (match-first,
gate-on-build), session-service ai_build start/advance, flywheel capture on
resolve, engineer escalation notification, category settings API, and the
frontend walker/dispatch/settings/escalations surfaces + e2e.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 03:16:10 -04:00

79 KiB
Raw Blame History

L1 AI Decision-Tree Builder — Phase 2A Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: When an L1 tech describes a problem with no matching published flow, build a yes/no decision tree in real time from generic L1 knowledge (constrained + escalate-early), walk it node-by-node, capture resolved trees as outcome-validated drafts, and route escalations to engineers.

Architecture: Approach C — a dedicated ai_tree_builder service for constrained node-by-node generation, an match_or_build orchestrator that matches published flows first and gates generic building behind admin-configured categories, reusing flow_matching_engine (match), knowledge_flywheel/FlowProposal (capture), and notification_service (escalation).

Tech Stack: Python 3.12 · FastAPI · SQLAlchemy 2.0 async · Alembic · PostgreSQL 16 (RLS) · React 19 + Vite + TS + Tailwind v4 · Playwright.

Source spec: docs/superpowers/specs/2026-05-29-l1-ai-tree-builder-phase-2a-design.md

Conventions (read before starting):

  • Migrations are hand-written: alembic revision -m "msg" then edit upgrade()/downgrade() by hand. Never --autogenerate, never --rev-id. Current head is b3358ba0e48c; each new migration chains from the previous.
  • Backend tests run in the container: docker exec resolutionflow_backend pytest <path> -v. The suite uses pytest-xdist in CI; single-module runs work locally.
  • JSONB columns require reassignment (x = [*x, item]), not in-place mutation (see record_step).
  • Commit after each task with the message shown. Git trailer: Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>.
  • Model tiers: settings.get_model_for_action(key)ACTION_MODEL_MAP[key]AI_MODEL_TIERS[tier]. fast=Haiku, standard=Sonnet.

File Structure

New backend files:

  • backend/app/services/ai_tree_builder.py — node Pydantic models, constrained system prompt, generate_next_node, per-node validation, normalize_walked_path.
  • backend/app/services/match_or_build.py — orchestrator (match_or_build, classify).
  • backend/app/services/l1_category_service.pyDEFAULT_L1_CATEGORIES, HARD_FLOOR_FORBIDDEN, get/set enabled categories.
  • backend/app/schemas/l1_categories.py — category settings request/response.
  • backend/tests/test_ai_tree_builder.py, test_match_or_build.py, test_l1_category_service.py, test_l1_ai_build_flow.py (integration).
  • 3 Alembic migrations.

Modified backend files:

  • backend/app/models/l1_walk_session.pyai_build in CHECK constraints.
  • backend/app/models/account.pyenabled_l1_categories column.
  • backend/app/models/flow_proposal.pyl1_session_id, nullable source_session_id, exactly-one CHECK.
  • backend/app/core/config.pyl1_realtime_build + l1_classify action keys.
  • backend/app/api/deps.pyrequire_account_owner_or_admin.
  • backend/app/api/endpoints/l1.py — intake dispatch, /sessions/{id}/next-node, /escalations.
  • backend/app/api/endpoints/accounts.py/me/l1-categories GET/PATCH.
  • backend/app/schemas/l1.pyIntakeResponse.outcome, ai_build literal, NextNode* schemas.
  • backend/app/services/l1_session_service.pystart_ai_build_session, flywheel capture in resolve, engineer notification in escalate.
  • backend/app/services/notification_service.py + backend/app/schemas/notification.pyl1.session.escalated event.

Modified frontend files:

  • frontend/src/api/l1.ts, frontend/src/types/l1.ts — next-node, outcome, categories.
  • frontend/src/pages/l1/L1Dashboard.tsx — dispatch on intake outcome.
  • frontend/src/components/l1/L1WalkTreeVariant.tsx — real node rendering + disclaimer.
  • frontend/src/components/flowpilot/ProposalDetail.tsx — L1-sourced source block.
  • frontend/src/pages/EscalationQueuePage.tsx — L1 escalations section.
  • New: frontend/src/pages/account/L1CategoriesPage.tsx + route + nav.
  • frontend/e2e/l1-workspace.spec.ts — AI build flow tests.

Task 1: Migration + model — ai_build session kind

Files:

  • Create: backend/alembic/versions/<rev>_add_ai_build_session_kind.py

  • Modify: backend/app/models/l1_walk_session.py:42-61

  • Test: backend/tests/test_l1_ai_build_model.py

  • Step 1: Write the failing test

# backend/tests/test_l1_ai_build_model.py
import uuid
import pytest
from app.models.l1_walk_session import L1WalkSession


def test_ai_build_session_kind_allowed_by_model_constraint():
    """ai_build is a valid session_kind with both target FKs null (like adhoc)."""
    s = L1WalkSession(
        account_id=uuid.uuid4(),
        created_by_user_id=uuid.uuid4(),
        ticket_id="t1",
        ticket_kind="internal",
        session_kind="ai_build",
    )
    assert s.session_kind == "ai_build"
    assert s.flow_id is None and s.flow_proposal_id is None
  • Step 2: Run test to verify it fails

Run: docker exec resolutionflow_backend pytest tests/test_l1_ai_build_model.py -v Expected: PASS at the Python level already (model has no enum on the attribute) — the real enforcement is the DB CHECK. If it errors on import, fix the import first. Treat this task's true verification as the migration roundtrip in Step 6.

  • Step 3: Update model CHECK constraints

In backend/app/models/l1_walk_session.py, update the two constraints:

        CheckConstraint(
            "session_kind IN ('flow', 'proposal', 'adhoc', 'ai_build')",
            name="ck_l1_walk_sessions_session_kind",
        ),
        CheckConstraint(
            "(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) "
            "OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) "
            "OR (session_kind IN ('adhoc', 'ai_build') AND flow_id IS NULL AND flow_proposal_id IS NULL)",
            name="ck_l1_walk_sessions_target_consistency",
        ),
  • Step 4: Create the migration

Run: docker exec resolutionflow_backend alembic revision -m "add ai_build session kind" Then edit the generated file so down_revision is the current head (b3358ba0e48c unless a later task already advanced it) and the body drops+recreates the two CHECK constraints:

def upgrade() -> None:
    op.drop_constraint("ck_l1_walk_sessions_session_kind", "l1_walk_sessions", type_="check")
    op.create_check_constraint(
        "ck_l1_walk_sessions_session_kind", "l1_walk_sessions",
        "session_kind IN ('flow', 'proposal', 'adhoc', 'ai_build')",
    )
    op.drop_constraint("ck_l1_walk_sessions_target_consistency", "l1_walk_sessions", type_="check")
    op.create_check_constraint(
        "ck_l1_walk_sessions_target_consistency", "l1_walk_sessions",
        "(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) "
        "OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) "
        "OR (session_kind IN ('adhoc', 'ai_build') AND flow_id IS NULL AND flow_proposal_id IS NULL)",
    )


def downgrade() -> None:
    op.drop_constraint("ck_l1_walk_sessions_target_consistency", "l1_walk_sessions", type_="check")
    op.create_check_constraint(
        "ck_l1_walk_sessions_target_consistency", "l1_walk_sessions",
        "(session_kind = 'flow' AND flow_id IS NOT NULL AND flow_proposal_id IS NULL) "
        "OR (session_kind = 'proposal' AND flow_proposal_id IS NOT NULL AND flow_id IS NULL) "
        "OR (session_kind = 'adhoc' AND flow_id IS NULL AND flow_proposal_id IS NULL)",
    )
    op.drop_constraint("ck_l1_walk_sessions_session_kind", "l1_walk_sessions", type_="check")
    op.create_check_constraint(
        "ck_l1_walk_sessions_session_kind", "l1_walk_sessions",
        "session_kind IN ('flow', 'proposal', 'adhoc')",
    )
  • Step 5: Apply the migration

Run: docker exec resolutionflow_backend alembic upgrade head Expected: Running upgrade b3358ba0e48c -> <rev>, add ai_build session kind

  • Step 6: Verify roundtrip + insert an ai_build row

Run:

docker exec resolutionflow_postgres psql -U postgres -d resolutionflow -c \
"INSERT INTO l1_walk_sessions (id, account_id, created_by_user_id, ticket_id, ticket_kind, session_kind, walked_path, walk_notes, status, started_at, last_step_at) \
 SELECT gen_random_uuid(), a.id, u.id, 't-smoke', 'internal', 'ai_build', '[]'::jsonb, '[]'::jsonb, 'active', now(), now() \
 FROM accounts a JOIN users u ON u.account_id=a.id LIMIT 1 RETURNING id;"

Expected: one row id returned (no CHECK violation). Then clean up: DELETE FROM l1_walk_sessions WHERE ticket_id='t-smoke';

  • Step 7: Commit
git add backend/app/models/l1_walk_session.py backend/alembic/versions/ backend/tests/test_l1_ai_build_model.py
git commit -m "feat(l1): add ai_build session kind (model + migration)"

Task 2: Migration + model — account enabled_l1_categories

Files:

  • Create: backend/alembic/versions/<rev>_add_enabled_l1_categories.py

  • Modify: backend/app/models/account.py

  • Test: backend/tests/test_account_l1_categories_column.py

  • Step 1: Write the failing test

# backend/tests/test_account_l1_categories_column.py
from app.models.account import Account


def test_account_has_enabled_l1_categories_default():
    a = Account(name="Acme", display_code="ABC12345")
    # Column default is applied at flush; attribute may be None pre-flush.
    assert hasattr(a, "enabled_l1_categories")
  • Step 2: Run test to verify it fails

Run: docker exec resolutionflow_backend pytest tests/test_account_l1_categories_column.py -v Expected: FAIL — AttributeError/no such attribute.

  • Step 3: Add the model column

In backend/app/models/account.py, after sso_config (or near other JSONB columns), add:

    enabled_l1_categories: Mapped[list[str]] = mapped_column(
        JSONB(), nullable=False,
        server_default=sa_text(
            "'[\"password_reset\",\"account_lockout\",\"printer\","
            "\"email_outlook_client\",\"wifi_network_basics\",\"vpn_connect\","
            "\"teams_zoom_av\",\"browser_cache_cookies\",\"peripheral_reconnect\","
            "\"os_restart_update\"]'::jsonb"
        ),
    )

Ensure imports exist at top of file: from sqlalchemy.dialects.postgresql import JSONB and from sqlalchemy import text as sa_text (add if missing).

  • Step 4: Run test to verify it passes

Run: docker exec resolutionflow_backend pytest tests/test_account_l1_categories_column.py -v Expected: PASS.

  • Step 5: Create + apply migration

Run: docker exec resolutionflow_backend alembic revision -m "add enabled_l1_categories to accounts" Edit body:

import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

_DEFAULT = ('["password_reset","account_lockout","printer","email_outlook_client",'
            '"wifi_network_basics","vpn_connect","teams_zoom_av","browser_cache_cookies",'
            '"peripheral_reconnect","os_restart_update"]')

def upgrade() -> None:
    op.add_column("accounts", sa.Column(
        "enabled_l1_categories", postgresql.JSONB(), nullable=False,
        server_default=sa.text(f"'{_DEFAULT}'::jsonb"),
    ))

def downgrade() -> None:
    op.drop_column("accounts", "enabled_l1_categories")

Run: docker exec resolutionflow_backend alembic upgrade head Expected: upgrade applied; existing accounts backfill to the default list.

  • Step 6: Commit
git add backend/app/models/account.py backend/alembic/versions/ backend/tests/test_account_l1_categories_column.py
git commit -m "feat(l1): add accounts.enabled_l1_categories with default allowlist"

Task 3: Migration + model — FlowProposal L1 source linkage (Finding 1)

Files:

  • Create: backend/alembic/versions/<rev>_flow_proposal_l1_source.py

  • Modify: backend/app/models/flow_proposal.py:42-82

  • Test: backend/tests/test_flow_proposal_l1_source.py

  • Step 1: Write the failing test

# backend/tests/test_flow_proposal_l1_source.py
import uuid
from app.models.flow_proposal import FlowProposal


def test_flow_proposal_accepts_l1_session_id_without_source_session():
    p = FlowProposal(
        account_id=uuid.uuid4(),
        l1_session_id=uuid.uuid4(),
        source_session_id=None,
        proposal_type="new_flow",
        title="AI L1 draft",
        proposed_flow_data={"tree_structure": {"id": "root"}},
        source="ai_realtime_l1",
        status="pending",
    )
    assert p.l1_session_id is not None and p.source_session_id is None
  • Step 2: Run test to verify it fails

Run: docker exec resolutionflow_backend pytest tests/test_flow_proposal_l1_source.py -v Expected: FAIL — TypeError/unexpected kwarg l1_session_id.

  • Step 3: Update the model

In backend/app/models/flow_proposal.py: make source_session_id nullable, add l1_session_id, add the exactly-one CHECK in __table_args__.

    source_session_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("ai_sessions.id", ondelete="CASCADE"),
        nullable=True,
        index=True,
    )
    l1_session_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("l1_walk_sessions.id", ondelete="SET NULL"),
        nullable=True,
        index=True,
    )

Add to __table_args__ (alongside the existing source/linked_ticket checks):

        CheckConstraint(
            "(source_session_id IS NOT NULL) <> (l1_session_id IS NOT NULL)",
            name="ck_flow_proposals_exactly_one_source",
        ),
  • Step 4: Run test to verify it passes

Run: docker exec resolutionflow_backend pytest tests/test_flow_proposal_l1_source.py -v Expected: PASS.

  • Step 5: Create + apply migration

Run: docker exec resolutionflow_backend alembic revision -m "flow_proposal l1 source linkage" Edit body:

import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

def upgrade() -> None:
    op.add_column("flow_proposals", sa.Column(
        "l1_session_id", postgresql.UUID(as_uuid=True), nullable=True))
    op.create_index("ix_flow_proposals_l1_session_id", "flow_proposals", ["l1_session_id"])
    op.create_foreign_key(
        "fk_flow_proposals_l1_session_id", "flow_proposals", "l1_walk_sessions",
        ["l1_session_id"], ["id"], ondelete="SET NULL")
    op.alter_column("flow_proposals", "source_session_id", nullable=True)
    op.create_check_constraint(
        "ck_flow_proposals_exactly_one_source", "flow_proposals",
        "(source_session_id IS NOT NULL) <> (l1_session_id IS NOT NULL)")

def downgrade() -> None:
    op.drop_constraint("ck_flow_proposals_exactly_one_source", "flow_proposals", type_="check")
    op.alter_column("flow_proposals", "source_session_id", nullable=False)
    op.drop_constraint("fk_flow_proposals_l1_session_id", "flow_proposals", type_="foreignkey")
    op.drop_index("ix_flow_proposals_l1_session_id", "flow_proposals")
    op.drop_column("flow_proposals", "l1_session_id")

Run: docker exec resolutionflow_backend alembic upgrade head. Expected: applied cleanly (no existing rows violate the new CHECK because all current proposals have a non-null source_session_id and null l1_session_id).

  • Step 6: Verify Tree.source_session_id is nullable

Run: docker exec resolutionflow_postgres psql -U postgres -d resolutionflow -c "\d trees" | grep source_session_id Expected: shows the column without not null. If it shows not null, add op.alter_column("trees","source_session_id",nullable=True) to this migration's upgrade() and re-run. (L1-promoted trees leave it NULL.)

  • Step 7: Commit
git add backend/app/models/flow_proposal.py backend/alembic/versions/ backend/tests/test_flow_proposal_l1_source.py
git commit -m "feat(l1): FlowProposal l1_session_id source linkage (nullable source_session_id + exactly-one check)"

Task 4: Category service + model action keys

Files:

  • Create: backend/app/services/l1_category_service.py

  • Modify: backend/app/core/config.py (ACTION_MODEL_MAP)

  • Test: backend/tests/test_l1_category_service.py

  • Step 1: Write the failing test

# backend/tests/test_l1_category_service.py
from app.services.l1_category_service import (
    DEFAULT_L1_CATEGORIES, HARD_FLOOR_FORBIDDEN, is_category_enabled,
)


def test_defaults_and_hard_floor_present():
    assert "password_reset" in DEFAULT_L1_CATEGORIES
    assert "registry_edit" in HARD_FLOOR_FORBIDDEN  # representative forbidden action key
    assert len(DEFAULT_L1_CATEGORIES) == 10


def test_is_category_enabled():
    enabled = ["printer", "vpn_connect"]
    assert is_category_enabled("printer", enabled) is True
    assert is_category_enabled("registry_edit", enabled) is False
    assert is_category_enabled("unknown", enabled) is False
  • Step 2: Run test to verify it fails

Run: docker exec resolutionflow_backend pytest tests/test_l1_category_service.py -v Expected: FAIL — module not found.

  • Step 3: Implement the service
# backend/app/services/l1_category_service.py
"""L1 category allowlist + the always-forbidden hard floor.

DEFAULT_L1_CATEGORIES seeds an account's enabled set. HARD_FLOOR_FORBIDDEN is a
category-independent safety floor the AI tree builder must never emit and admins
cannot enable. See spec §5.1/§5.2.
"""
from uuid import UUID

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.models.account import Account

DEFAULT_L1_CATEGORIES: list[str] = [
    "password_reset", "account_lockout", "printer", "email_outlook_client",
    "wifi_network_basics", "vpn_connect", "teams_zoom_av",
    "browser_cache_cookies", "peripheral_reconnect", "os_restart_update",
]

# Always-forbidden action classes (keys are stable identifiers; the human-readable
# phrasing lives in the builder system prompt). Admins cannot enable these.
HARD_FLOOR_FORBIDDEN: list[str] = [
    "registry_edit", "system_file_or_boot_edit", "data_or_disk_deletion",
    "credential_or_mfa_change", "security_or_av_or_firewall_change",
    "elevated_or_admin_script", "domain_dns_dhcp_change",
    "server_or_production_config", "billing_or_license_change",
]

# Substrings that, if present in a generated node's text, indicate a hard-floor
# violation. Used by ai_tree_builder per-node validation (defense in depth).
HARD_FLOOR_TEXT_PATTERNS: list[str] = [
    "regedit", "registry", "format ", "delete partition", "diskpart",
    "reset password for", "disable firewall", "disable antivirus", "disable defender",
    "run as administrator", "sudo ", "domain controller", "dns record", "dhcp scope",
    "uninstall security", "bitlocker",
]


def is_category_enabled(category: str, enabled: list[str]) -> bool:
    """A category is buildable only if explicitly enabled and not hard-floored."""
    if category in HARD_FLOOR_FORBIDDEN:
        return False
    return category in enabled


async def get_enabled_categories(account_id: UUID, db: AsyncSession) -> list[str]:
    acct = (await db.execute(select(Account).where(Account.id == account_id))).scalar_one()
    return list(acct.enabled_l1_categories or [])


async def set_enabled_categories(
    account_id: UUID, categories: list[str], db: AsyncSession
) -> list[str]:
    """Persist the enabled set, dropping anything unknown or hard-floored."""
    cleaned = [c for c in categories if c in DEFAULT_L1_CATEGORIES]
    acct = (await db.execute(select(Account).where(Account.id == account_id))).scalar_one()
    acct.enabled_l1_categories = cleaned
    await db.flush()
    return cleaned
  • Step 4: Add model action keys

In backend/app/core/config.py, add to ACTION_MODEL_MAP:

        # L1 AI tree builder (Phase 2A): per-node generation is latency-sensitive
        # on a live call → Sonnet; classification is a short label task → Haiku.
        "l1_realtime_build": "standard",
        "l1_classify": "fast",
  • Step 5: Run tests

Run: docker exec resolutionflow_backend pytest tests/test_l1_category_service.py -v Expected: PASS (3 tests).

  • Step 6: Commit
git add backend/app/services/l1_category_service.py backend/app/core/config.py backend/tests/test_l1_category_service.py
git commit -m "feat(l1): category service (defaults + hard floor) and AI action keys"

Task 5: ai_tree_builder — node schema, prompt, generation, validation

Files:

  • Create: backend/app/services/ai_tree_builder.py

  • Test: backend/tests/test_ai_tree_builder.py

  • Step 1: Write the failing tests

# backend/tests/test_ai_tree_builder.py
import pytest
from app.services import ai_tree_builder as atb


def test_validate_node_rejects_hard_floor_text():
    node = {"node_type": "instruction", "id": "n1", "text": "Open regedit and change the key", "next": "generate"}
    with pytest.raises(atb.UnsafeNodeError):
        atb.validate_node(node)


def test_validate_node_accepts_safe_instruction():
    node = {"node_type": "instruction", "id": "n1", "text": "Restart the printer.", "next": "generate"}
    assert atb.validate_node(node)["node_type"] == "instruction"


def test_depth_cap_forces_escalate():
    walked = [{"node_type": "question", "id": f"n{i}", "text": "?", "answer": "no"} for i in range(atb.MAX_DEPTH)]
    node = atb.escalate_if_depth_exceeded(walked)
    assert node is not None and node["node_type"] == "escalate"


def test_normalize_walked_path_builds_valid_tree():
    walked = [
        {"node_type": "question", "id": "n1", "text": "Powered on?", "answer": "no"},
        {"node_type": "instruction", "id": "n2", "text": "Power it on.", "answer": "ack"},
        {"node_type": "resolved", "id": "n3", "text": "Fixed."},
    ]
    tree = atb.normalize_walked_path(walked)
    assert isinstance(tree, dict) and tree.get("id") == "n1"
    # untraversed 'yes' branch of n1 became a needs_review stub
    assert any(n["node_type"] == "needs_review" for n in tree["nodes"].values())
  • Step 2: Run tests to verify they fail

Run: docker exec resolutionflow_backend pytest tests/test_ai_tree_builder.py -v Expected: FAIL — module not found.

  • Step 3: Implement the builder
# backend/app/services/ai_tree_builder.py
"""Constrained, node-by-node L1 decision-tree generation (spec §4/§5/§6.1).

Each call produces ONE node given the problem, category, and full walked path.
Generation is constrained to safe/reversible L1 steps and biased to escalate
early. normalize_walked_path() turns a resolved walk into a valid tree object
for flywheel capture.
"""
import json
import logging
from typing import Any, Optional

from app.core.ai_provider import get_ai_provider
from app.core.config import settings
from app.services.l1_category_service import HARD_FLOOR_TEXT_PATTERNS
from app.services.llm_utils import parse_llm_json

logger = logging.getLogger(__name__)

MAX_DEPTH = 12
VALID_NODE_TYPES = {"question", "instruction", "resolved", "escalate"}


class UnsafeNodeError(ValueError):
    """Raised when a generated node violates the hard floor or is malformed."""


SYSTEM_PROMPT = """\
You are an L1 helpdesk troubleshooting guide builder. Given a problem and the
steps already tried, produce the SINGLE next node of a yes/no decision tree.

HARD RULES:
- Only safe, reversible, observe-or-restart-class steps: checking status, toggling,
  restarting, reconnecting, re-entering credentials the USER already knows.
- NEVER produce steps that: edit the registry/system files/boot config; delete or
  format data/disks; change credentials/MFA/security/firewall/AV; run elevated or
  admin scripts; touch domain controllers/DNS/DHCP or production servers; or have
  billing/license impact. These are out of L1 scope.
- When you run out of safe in-scope steps, DO NOT GUESS. Emit an "escalate" node.

Return ONLY a JSON object for ONE node, one of:
{"node_type":"question","text":"<yes/no question>"}
{"node_type":"instruction","text":"<one safe reversible action>"}
{"node_type":"resolved","text":"<confirmation the issue is fixed>"}
{"node_type":"escalate","reason_category":"exhausted_safe_steps","text":"<why>"}
No prose, no markdown fences.
"""


def _build_context(problem_text: str, category: str, walked_path: list[dict]) -> str:
    lines = [f"PROBLEM: {problem_text}", f"CATEGORY: {category}", "STEPS SO FAR:"]
    if not walked_path:
        lines.append("(none yet — produce the first diagnostic question)")
    for i, step in enumerate(walked_path, 1):
        ans = step.get("answer")
        suffix = f" -> {ans}" if ans else ""
        lines.append(f"{i}. [{step.get('node_type','?')}] {step.get('text','')}{suffix}")
    return "\n".join(lines)


def validate_node(node: dict[str, Any]) -> dict[str, Any]:
    """Shape + hard-floor validation. Raises UnsafeNodeError on violation."""
    if not isinstance(node, dict) or node.get("node_type") not in VALID_NODE_TYPES:
        raise UnsafeNodeError(f"invalid node_type: {node!r}")
    text = (node.get("text") or "").lower()
    for pat in HARD_FLOOR_TEXT_PATTERNS:
        if pat in text:
            raise UnsafeNodeError(f"hard-floor pattern '{pat}' in node text")
    return node


def escalate_if_depth_exceeded(walked_path: list[dict]) -> Optional[dict[str, Any]]:
    if len(walked_path) >= MAX_DEPTH:
        return {
            "node_type": "escalate",
            "reason_category": "depth_cap",
            "text": "Reached the L1 troubleshooting depth limit — escalating to engineering.",
        }
    return None


async def generate_next_node(
    problem_text: str, category: str, walked_path: list[dict]
) -> dict[str, Any]:
    """Generate + validate the next node. Regenerate once on failure, then escalate."""
    capped = escalate_if_depth_exceeded(walked_path)
    if capped:
        return capped

    provider = get_ai_provider(settings.get_model_for_action("l1_realtime_build"))
    context = _build_context(problem_text, category, walked_path)

    for attempt in range(2):
        try:
            raw, _, _ = await provider.generate_json(
                system_prompt=SYSTEM_PROMPT,
                messages=[{"role": "user", "content": context}],
                max_tokens=1024,
            )
            node = parse_llm_json(raw)
            return validate_node(node)
        except (UnsafeNodeError, ValueError) as e:
            logger.warning("ai_tree_builder node attempt %d failed: %s", attempt + 1, e)
            continue

    return {
        "node_type": "escalate",
        "reason_category": "generation_failed",
        "text": "Could not generate a safe next step — escalating to engineering.",
    }


def normalize_walked_path(walked_path: list[dict]) -> dict[str, Any]:
    """Turn a resolved walk into a valid troubleshooting tree (spec §6.1).

    Root = first node's id; question nodes' traversed branch points to the next
    node, the untraversed branch to a needs_review stub; terminal node ends it.
    Returns {id, nodes: {id: node}} — a dict with an id (passes the proposal
    approval guard).
    """
    nodes: dict[str, Any] = {}
    if not walked_path:
        root_id = "root"
        nodes[root_id] = {"id": root_id, "node_type": "needs_review",
                          "text": "Empty walk — needs authoring."}
        return {"id": root_id, "nodes": nodes}

    stub_seq = 0
    for i, step in enumerate(walked_path):
        nid = step.get("id") or f"n{i+1}"
        ntype = step.get("node_type", "question")
        nxt = walked_path[i + 1].get("id", f"n{i+2}") if i + 1 < len(walked_path) else None
        node: dict[str, Any] = {"id": nid, "node_type": ntype, "text": step.get("text", "")}
        if ntype == "question":
            answer = (step.get("answer") or "").lower()
            stub_seq += 1
            stub_id = f"review-{stub_seq}"
            nodes[stub_id] = {"id": stub_id, "node_type": "needs_review",
                              "text": "Branch not explored during the originating call."}
            node["yes_next"] = nxt if answer == "yes" else stub_id
            node["no_next"] = nxt if answer == "no" else stub_id
        elif ntype == "instruction":
            node["next"] = nxt
        nodes[nid] = node

    return {"id": walked_path[0].get("id", "n1"), "nodes": nodes}
  • Step 4: Run tests to verify they pass

Run: docker exec resolutionflow_backend pytest tests/test_ai_tree_builder.py -v Expected: PASS (4 tests). generate_next_node is not unit-tested against a live model here; it is covered by the integration test in Task 11 with a mocked provider.

  • Step 5: Commit
git add backend/app/services/ai_tree_builder.py backend/tests/test_ai_tree_builder.py
git commit -m "feat(l1): ai_tree_builder — constrained node generation, validation, normalize"

Task 6: match_or_build orchestrator + classify

Files:

  • Create: backend/app/services/match_or_build.py

  • Test: backend/tests/test_match_or_build.py

  • Step 1: Write the failing tests

# backend/tests/test_match_or_build.py
import uuid
import pytest
from unittest.mock import AsyncMock, patch
from app.services import match_or_build as mob


@pytest.mark.asyncio
async def test_match_wins_before_category_gate():
    """A strong published-flow match returns 'matched' even if category disabled."""
    with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(
            return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "VPN", "score": 0.9}])), \
         patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=[])):
        res = await mob.match_or_build(uuid.uuid4(), "vpn down", None, "t1", db=AsyncMock(), force_build=False)
    assert res["outcome"] == "matched"
    assert res["session_kind"] == "flow"


@pytest.mark.asyncio
async def test_suggest_band():
    with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(
            return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.66}])):
        res = await mob.match_or_build(uuid.uuid4(), "p", None, "t1", db=AsyncMock(), force_build=False)
    assert res["outcome"] == "suggest"


@pytest.mark.asyncio
async def test_out_of_scope_when_category_disabled_on_build_path():
    with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(return_value=[])), \
         patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \
         patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["vpn_connect"])):
        res = await mob.match_or_build(uuid.uuid4(), "printer jam", None, "t1", db=AsyncMock(), force_build=False)
    assert res["outcome"] == "out_of_scope"


@pytest.mark.asyncio
async def test_build_when_enabled_and_no_match():
    with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(return_value=[])), \
         patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \
         patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["printer"])):
        res = await mob.match_or_build(uuid.uuid4(), "printer jam", None, "t1", db=AsyncMock(), force_build=False)
    assert res["outcome"] == "build"
    assert res["session_kind"] == "ai_build"


@pytest.mark.asyncio
async def test_force_build_skips_match_but_still_gates():
    fm = AsyncMock(return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.99}])
    with patch.object(mob.flow_matching_engine, "find_matches", new=fm), \
         patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \
         patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["printer"])):
        res = await mob.match_or_build(uuid.uuid4(), "p", None, "t1", db=AsyncMock(), force_build=True)
    fm.assert_not_called()
    assert res["outcome"] == "build"
  • Step 2: Run tests to verify they fail

Run: docker exec resolutionflow_backend pytest tests/test_match_or_build.py -v Expected: FAIL — module not found.

  • Step 3: Implement the orchestrator
# backend/app/services/match_or_build.py
"""Intake orchestrator: match published flows first, gate generic build behind
the account's enabled categories (spec §3). Match runs BEFORE the category gate
so an authored flow is never blocked by category settings (Finding 4)."""
import logging
from typing import Any, Optional
from uuid import UUID

from sqlalchemy.ext.asyncio import AsyncSession

from app.core.ai_provider import get_ai_provider
from app.core.config import settings
from app.services import flow_matching_engine
from app.services.l1_category_service import (
    DEFAULT_L1_CATEGORIES, get_enabled_categories, is_category_enabled,
)
from app.services.llm_utils import parse_llm_json

logger = logging.getLogger(__name__)

MATCH_THRESHOLD = 0.75
SUGGEST_THRESHOLD = 0.60

_CLASSIFY_PROMPT = (
    "Classify the IT support problem into exactly one of these category keys, "
    "or 'unknown'. Return JSON {\"category\":\"<key>\"} only.\nKEYS: "
    + ", ".join(DEFAULT_L1_CATEGORIES)
)


async def classify(problem_text: str) -> str:
    """Map a problem to a category key via a short model call; keyword fallback."""
    try:
        provider = get_ai_provider(settings.get_model_for_action("l1_classify"))
        raw, _, _ = await provider.generate_json(
            system_prompt=_CLASSIFY_PROMPT,
            messages=[{"role": "user", "content": problem_text}],
            max_tokens=64,
        )
        cat = parse_llm_json(raw).get("category", "unknown")
        return cat if cat in DEFAULT_L1_CATEGORIES else "unknown"
    except Exception as e:  # noqa: BLE001 — fall back, never hard-fail intake
        logger.warning("classify model call failed (%s); keyword fallback", e)
        text = problem_text.lower()
        for cat in DEFAULT_L1_CATEGORIES:
            if any(tok in text for tok in cat.split("_")):
                return cat
        return "unknown"


async def match_or_build(
    account_id: UUID,
    problem_text: str,
    problem_domain: Optional[str],
    ticket_ref: str,
    *,
    db: AsyncSession,
    force_build: bool = False,
) -> dict[str, Any]:
    if not force_build:
        hits = await flow_matching_engine.find_matches(
            problem_text, problem_domain, account_id, db)
        best = max(hits, key=lambda h: h["score"], default=None) if hits else None
        if best and best["score"] >= MATCH_THRESHOLD:
            return {"outcome": "matched", "flow_id": best["tree_id"], "session_kind": "flow"}
        if best and best["score"] >= SUGGEST_THRESHOLD:
            return {"outcome": "suggest",
                    "near_miss": {"flow_id": best["tree_id"], "flow_name": best["tree_name"],
                                  "score": best["score"]},
                    "can_build": True}

    category = await classify(problem_text)
    enabled = await get_enabled_categories(account_id, db)
    if not is_category_enabled(category, enabled):
        return {"outcome": "out_of_scope", "category": category}
    return {"outcome": "build", "session_kind": "ai_build", "category": category}
  • Step 4: Run tests to verify they pass

Run: docker exec resolutionflow_backend pytest tests/test_match_or_build.py -v Expected: PASS (5 tests).

  • Step 5: Commit
git add backend/app/services/match_or_build.py backend/tests/test_match_or_build.py
git commit -m "feat(l1): match_or_build orchestrator + classify (match-first, gate-on-build)"

Task 7: Session service — start_ai_build_session

Files:

  • Modify: backend/app/services/l1_session_service.py

  • Test: backend/tests/test_l1_session_service.py (add)

  • Step 1: Write the failing test

# add to backend/tests/test_l1_session_service.py
@pytest.mark.asyncio
async def test_start_ai_build_session(db_session, l1_user):
    from app.services import l1_session_service as svc
    s = await svc.start_ai_build_session(
        db_session, account_id=l1_user.account_id, user=l1_user,
        ticket_id="t-ai", ticket_kind="internal",
    )
    assert s.session_kind == "ai_build"
    assert s.flow_id is None and s.flow_proposal_id is None
    assert s.status == "active"

(Use the same fixtures the existing tests in this file use for db_session/l1_user.)

  • Step 2: Run test to verify it fails

Run: docker exec resolutionflow_backend pytest tests/test_l1_session_service.py::test_start_ai_build_session -v Expected: FAIL — AttributeError: start_ai_build_session.

  • Step 3: Implement (mirror start_adhoc_session)

In backend/app/services/l1_session_service.py, after start_adhoc_session:

async def start_ai_build_session(
    db: AsyncSession,
    *,
    account_id: UUID,
    user: User,
    ticket_id: str,
    ticket_kind: str,
) -> L1WalkSession:
    """Start an AI-built tree session (nodes generated on demand via next-node)."""
    session = L1WalkSession(
        account_id=account_id,
        created_by_user_id=user.id,
        acting_as=_resolve_acting_as(user),
        ticket_id=ticket_id,
        ticket_kind=ticket_kind,
        session_kind="ai_build",
    )
    db.add(session)
    await db.flush()
    return session
  • Step 4: Run test to verify it passes

Run: docker exec resolutionflow_backend pytest tests/test_l1_session_service.py::test_start_ai_build_session -v Expected: PASS.

  • Step 5: Commit
git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py
git commit -m "feat(l1): start_ai_build_session"

Task 8: Session service — advance_ai_build (record answer + generate next node)

Files:

  • Modify: backend/app/services/l1_session_service.py

  • Test: backend/tests/test_l1_session_service.py (add)

  • Step 1: Write the failing test

# add to backend/tests/test_l1_session_service.py
@pytest.mark.asyncio
async def test_advance_ai_build_appends_and_returns_next(db_session, l1_user, monkeypatch):
    from app.services import l1_session_service as svc
    from app.services import ai_tree_builder
    s = await svc.start_ai_build_session(
        db_session, account_id=l1_user.account_id, user=l1_user,
        ticket_id="t-ai", ticket_kind="internal")

    async def fake_next(problem, category, walked):
        return {"node_type": "resolved", "id": "done", "text": "Fixed."}
    monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next)

    next_node = await svc.advance_ai_build(
        db_session, session_id=s.id, problem_text="printer", category="printer",
        node_id="n1", answer="no", note=None)
    assert next_node["node_type"] == "resolved"
    refreshed = await db_session.get(type(s), s.id)
    assert len(refreshed.walked_path) == 1
    assert refreshed.walked_path[0]["answer"] == "no"
  • Step 2: Run test to verify it fails

Run: docker exec resolutionflow_backend pytest tests/test_l1_session_service.py::test_advance_ai_build_appends_and_returns_next -v Expected: FAIL — AttributeError: advance_ai_build.

  • Step 3: Implement

Add to l1_session_service.py (imports at top: from app.services import ai_tree_builder):

async def advance_ai_build(
    db: AsyncSession,
    *,
    session_id: UUID,
    problem_text: str,
    category: str,
    node_id: Optional[str] = None,
    answer: Optional[str] = None,
    note: Optional[str] = None,
) -> dict:
    """Append the answered/acked node to walked_path, then generate the next node.

    On the first call (node_id is None) nothing is appended — we just generate the
    first node. Returns the next node dict (caller persists current_node_id).
    Raises ValueError on missing/inactive/non-ai_build session.
    """
    session = await db.get(L1WalkSession, session_id)
    if not session:
        raise ValueError(f"L1WalkSession {session_id} not found")
    if session.session_kind != "ai_build":
        raise ValueError("advance_ai_build requires an ai_build session")
    if session.status != "active":
        raise ValueError(f"Session {session_id} is not active (status={session.status})")

    if node_id is not None:
        # Find the text of the node being answered from current_node payload if
        # the caller passed it via walk; otherwise store id+answer (text optional).
        entry = {"node_type": "question" if answer in ("yes", "no") else "instruction",
                 "id": node_id, "answer": answer, "l1_note": note}
        session.walked_path = [*session.walked_path, entry]

    next_node = await ai_tree_builder.generate_next_node(
        problem_text, category, session.walked_path)
    session.current_node_id = next_node.get("id")
    session.last_step_at = datetime.now(timezone.utc)
    await db.flush()
    return next_node

Note: the node text for traversed nodes is filled by the endpoint layer (Task 10) which knows the current node it served; advance_ai_build records the answer against the id. The endpoint passes the served node's text in note-adjacent payload if richer transcript is desired — keep Phase 2A minimal (id + answer).

  • Step 4: Run test to verify it passes

Run: docker exec resolutionflow_backend pytest tests/test_l1_session_service.py::test_advance_ai_build_appends_and_returns_next -v Expected: PASS.

  • Step 5: Commit
git add backend/app/services/l1_session_service.py backend/tests/test_l1_session_service.py
git commit -m "feat(l1): advance_ai_build — record answer + generate next node"

Task 9: Session service — flywheel capture on resolve + engineer notification on escalate

Files:

  • Modify: backend/app/services/l1_session_service.py (resolve, escalate)

  • Modify: backend/app/schemas/notification.py (VALID_EVENTS)

  • Modify: backend/app/services/notification_service.py (link + body)

  • Test: backend/tests/test_l1_session_service.py (add)

  • Step 1: Write the failing tests

# add to backend/tests/test_l1_session_service.py
@pytest.mark.asyncio
async def test_resolve_ai_build_creates_outcome_validated_proposal(db_session, l1_user, monkeypatch):
    from app.services import l1_session_service as svc
    from app.models.flow_proposal import FlowProposal
    from sqlalchemy import select
    s = await svc.start_ai_build_session(
        db_session, account_id=l1_user.account_id, user=l1_user,
        ticket_id="t-ai", ticket_kind="internal")
    s.walked_path = [
        {"node_type": "question", "id": "n1", "text": "On?", "answer": "no"},
        {"node_type": "resolved", "id": "n2", "text": "Fixed."},
    ]
    await db_session.flush()
    await svc.resolve(db_session, session_id=s.id, helpful=True, resolution_notes="ok")
    props = (await db_session.execute(
        select(FlowProposal).where(FlowProposal.l1_session_id == s.id))).scalars().all()
    assert len(props) == 1
    assert props[0].source == "ai_realtime_l1"
    assert props[0].validated_by_outcome is True
    assert props[0].source_session_id is None
    assert props[0].proposed_flow_data["tree_structure"]["id"] == "n1"


@pytest.mark.asyncio
async def test_escalate_notifies_engineers(db_session, l1_user, monkeypatch):
    from app.services import l1_session_service as svc
    calls = {}
    async def fake_notify(event, account_id, payload, db, target_user_ids=None):
        calls["event"] = event
        calls["target_user_ids"] = target_user_ids
    monkeypatch.setattr(svc, "notify", fake_notify)
    s = await svc.start_ai_build_session(
        db_session, account_id=l1_user.account_id, user=l1_user,
        ticket_id="t-ai", ticket_kind="internal")
    await svc.escalate(db_session, session_id=s.id, reason="stuck", reason_category="exhausted_safe_steps")
    assert calls["event"] == "l1.session.escalated"
    assert calls["target_user_ids"] is not None  # explicit engineer recipients
  • Step 2: Run tests to verify they fail

Run: docker exec resolutionflow_backend pytest tests/test_l1_session_service.py -k "ai_build_creates_outcome or notifies_engineers" -v Expected: FAIL — no proposal created / notify not called with the new event.

  • Step 3a: Add notification event + link + body

backend/app/schemas/notification.py — add to VALID_EVENTS:

    "l1.session.escalated",

backend/app/services/notification_service.py — in _build_notification_link links dict add:

        "l1.session.escalated": "/escalations",

and in the body-template builder (the bodies dict near _build_notification_link) add:

        "l1.session.escalated": "L1 escalated a ticket: {problem_summary}",
  • Step 3b: Flywheel capture in resolve

In l1_session_service.resolve, after the existing proposal.validated_by_outcome block and before the ticket close, add (imports: from app.services import ai_tree_builder, from app.models.flow_proposal import FlowProposal already present):

    if helpful and session.session_kind == "ai_build" and session.walked_path:
        tree_structure = ai_tree_builder.normalize_walked_path(session.walked_path)
        db.add(FlowProposal(
            account_id=session.account_id,
            l1_session_id=session.id,
            source_session_id=None,
            proposal_type="new_flow",
            title=(session.resolution_notes or "AI L1 resolution")[:255],
            proposed_flow_data={"tree_structure": tree_structure, "match_keywords": []},
            source="ai_realtime_l1",
            validated_by_outcome=True,
            linked_ticket_id=session.ticket_id,
            linked_ticket_kind=session.ticket_kind,
            status="pending",
        ))

Dedupe via _find_similar_pending_proposal is a nice-to-have; Phase 2A inserts directly. If duplicate noise appears in QA, wire the existing dedupe helper here.

  • Step 3c: Engineer notification in escalate

In l1_session_service.escalate, after await log_audit(...) and before the final await db.flush(), add (imports: from app.services.notification_service import notify, from app.models.user import User, from sqlalchemy import select):

    eng_rows = await db.execute(
        select(User.id).where(
            User.account_id == session.account_id,
            User.is_active.is_(True),
            User.account_role.in_(("owner", "admin", "engineer")),
        )
    )
    target_ids = [r[0] for r in eng_rows.all()]
    await notify(
        "l1.session.escalated",
        session.account_id,
        {"problem_summary": session.ticket_id, "session_id": str(session.id),
         "reason_category": reason_category},
        db,
        target_user_ids=target_ids,
    )
  • Step 4: Run tests to verify they pass

Run: docker exec resolutionflow_backend pytest tests/test_l1_session_service.py -k "ai_build_creates_outcome or notifies_engineers" -v Expected: PASS.

  • Step 5: Run notification schema test

Run: docker exec resolutionflow_backend pytest tests/ -k notification -v Expected: PASS (the new event is accepted by validate_event_keys).

  • Step 6: Commit
git add backend/app/services/l1_session_service.py backend/app/schemas/notification.py backend/app/services/notification_service.py backend/tests/test_l1_session_service.py
git commit -m "feat(l1): flywheel capture on resolve + engineer notification on escalate"

Task 10: API — intake dispatch, next-node, escalations; schemas + deps

Files:

  • Modify: backend/app/schemas/l1.py

  • Modify: backend/app/api/deps.py

  • Modify: backend/app/api/endpoints/l1.py

  • Test: backend/tests/test_l1_api_ai_build.py

  • Step 1: Write the failing test

# backend/tests/test_l1_api_ai_build.py
import pytest
from unittest.mock import AsyncMock, patch


@pytest.mark.asyncio
async def test_intake_build_outcome_creates_ai_build_session(l1_client):
    with patch("app.api.endpoints.l1.match_or_build.match_or_build",
               new=AsyncMock(return_value={"outcome": "build", "session_kind": "ai_build", "category": "printer"})):
        r = await l1_client.post("/api/v1/l1/intake", json={"problem_statement": "printer jam"})
    assert r.status_code == 200
    body = r.json()
    assert body["outcome"] == "build"
    assert body["session_kind"] == "ai_build"
    assert body["session_id"]


@pytest.mark.asyncio
async def test_intake_out_of_scope(l1_client):
    with patch("app.api.endpoints.l1.match_or_build.match_or_build",
               new=AsyncMock(return_value={"outcome": "out_of_scope", "category": "unknown"})):
        r = await l1_client.post("/api/v1/l1/intake", json={"problem_statement": "weird"})
    assert r.status_code == 200
    assert r.json()["outcome"] == "out_of_scope"

(Use the existing L1 client fixture pattern from test_l1_api*/conftest; l1_client is an authed AsyncClient for an l1_tech user.)

  • Step 2: Run test to verify it fails

Run: docker exec resolutionflow_backend pytest tests/test_l1_api_ai_build.py -v Expected: FAIL — outcome not in response / KeyError.

  • Step 3a: Schemas

In backend/app/schemas/l1.py:

  • Change IntakeResponse.session_kind literal to include ai_build and make session_id/session_kind optional (non-build outcomes have no session):
class IntakeResponse(BaseModel):
    outcome: Literal["matched", "suggest", "out_of_scope", "build"]
    session_id: Optional[UUID] = None
    session_kind: Optional[Literal["flow", "proposal", "adhoc", "ai_build"]] = None
    ticket_id: Optional[str] = None
    ticket_kind: Optional[str] = None
    flow_id: Optional[UUID] = None           # for 'matched'
    near_miss: Optional[dict] = None          # for 'suggest'
    category: Optional[str] = None            # for 'out_of_scope'

Add NextNodeRequest / NextNodeResponse:

class NextNodeRequest(BaseModel):
    node_id: Optional[str] = None
    answer: Optional[str] = None      # 'yes' | 'no' for questions
    acknowledged: Optional[bool] = None
    note: Optional[str] = None

class NextNodeResponse(BaseModel):
    node: dict
    session_status: str

Ensure IntakeRequest has an optional force_build: bool = False and flow_id is no longer required.

  • Step 3b: Auth dep

In backend/app/api/deps.py, after require_account_owner:

async def require_account_owner_or_admin(
    current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
    """Require account owner or account-admin (blocks engineers); super_admin bypass."""
    if current_user.is_super_admin:
        return current_user
    if current_user.account_role in ("owner", "admin"):
        return current_user
    raise HTTPException(
        status_code=status.HTTP_403_FORBIDDEN,
        detail="Account owner or admin access required",
    )
  • Step 3c: Rewrite intake + add next-node + escalations in l1.py

Replace the intake body to run the orchestrator (imports: from app.services import match_or_build):

@router.post("/intake", response_model=IntakeResponse)
async def intake(
    payload: IntakeRequest,
    db: Annotated[AsyncSession, Depends(get_db)],
    user: Annotated[User, Depends(require_l1_or_coverage)],
):
    """L1 intake: match a published flow, else gate + build, else suggest/out-of-scope."""
    result = await match_or_build.match_or_build(
        user.account_id, payload.problem_statement, None, ticket_ref="",
        db=db, force_build=payload.force_build,
    )
    outcome = result["outcome"]

    if outcome in ("suggest", "out_of_scope"):
        await db.commit()
        return IntakeResponse(outcome=outcome, near_miss=result.get("near_miss"),
                              category=result.get("category"))

    # matched OR build → create a ticket and a session
    ticket = await internal_ticket_service.create_ticket(
        db, account_id=user.account_id, created_by_user_id=user.id,
        problem_statement=payload.problem_statement,
        customer_name=payload.customer_name, customer_contact=payload.customer_contact,
    )
    if outcome == "matched":
        session = await l1_session_service.start_flow_session(
            db, account_id=user.account_id, user=user, flow_id=UUID(result["flow_id"]),
            ticket_id=str(ticket.id), ticket_kind="internal")
    else:  # build
        session = await l1_session_service.start_ai_build_session(
            db, account_id=user.account_id, user=user,
            ticket_id=str(ticket.id), ticket_kind="internal")
    await db.commit()
    return IntakeResponse(
        outcome=outcome, session_id=session.id, session_kind=session.session_kind,
        ticket_id=str(ticket.id), ticket_kind="internal",
        flow_id=UUID(result["flow_id"]) if outcome == "matched" else None,
    )

Add next-node endpoint:

@router.post("/sessions/{session_id}/next-node", response_model=NextNodeResponse)
async def next_node(
    session_id: UUID,
    payload: NextNodeRequest,
    db: Annotated[AsyncSession, Depends(get_db)],
    user: Annotated[User, Depends(require_l1_or_coverage)],
):
    session = await _get_session_or_404(db, session_id, user)
    # problem_text + category come from the linked internal ticket + stored category.
    ticket = await internal_ticket_service.get_ticket(db, ticket_id=UUID(session.ticket_id))
    problem_text = ticket.problem_statement if ticket else ""
    category = session.walked_path[0].get("category") if session.walked_path else None
    try:
        node = await l1_session_service.advance_ai_build(
            db, session_id=session_id, problem_text=problem_text,
            category=category or "unknown", node_id=payload.node_id,
            answer=payload.answer, note=payload.note)
    except ValueError as e:
        raise HTTPException(status_code=http_status.HTTP_409_CONFLICT, detail=str(e))
    await db.commit()
    return NextNodeResponse(node=node, session_status=session.status)


@router.get("/escalations", response_model=list[WalkSessionResponse])
async def l1_escalations(
    db: Annotated[AsyncSession, Depends(get_db)],
    user: Annotated[User, Depends(require_engineer_or_admin)],
    limit: int = 50,
):
    rows = await db.execute(
        select(L1WalkSession)
        .where(L1WalkSession.account_id == user.account_id,
               L1WalkSession.status == "escalated")
        .order_by(L1WalkSession.resolved_at.desc()).limit(limit))
    return [_to_response(s) for s in rows.scalars()]

Update the import line for deps: from app.api.deps import get_db, require_l1_or_coverage, require_engineer_or_admin and add the new schema imports (NextNodeRequest, NextNodeResponse).

Category persistence: store the resolved category on the first walked_path entry. In advance_ai_build, when node_id is None (first call), seed walked_path with a hidden meta entry {"node_type":"meta","category":category} OR persist category on the session. Simplest: pass category from intake by storing it — add a category field write in start_ai_build_session is out of scope; instead the endpoint seeds the first node call with the classified category by re-classifying once and caching in walked_path meta. Decision for implementer: add a nullable meta first entry on session creation in the intake build branch: after start_ai_build_session, call advance_ai_build(..., node_id=None) is NOT done here; instead store category by setting session.walked_path=[{"node_type":"meta","category":result["category"]}] before commit, and have normalize_walked_path/generate_next_node skip meta entries. Add a one-line filter in both.

  • Step 4: Handle the meta entry

In ai_tree_builder._build_context and normalize_walked_path, skip entries with node_type == "meta":

    walked_path = [s for s in walked_path if s.get("node_type") != "meta"]

(add as the first line of both functions). In the next-node endpoint, read category from the meta entry:

    category = next((s.get("category") for s in session.walked_path if s.get("node_type") == "meta"), "unknown")
  • Step 5: Run tests to verify they pass

Run: docker exec resolutionflow_backend pytest tests/test_l1_api_ai_build.py -v Expected: PASS.

  • Step 6: Commit
git add backend/app/schemas/l1.py backend/app/api/deps.py backend/app/api/endpoints/l1.py backend/tests/test_l1_api_ai_build.py
git commit -m "feat(l1): intake dispatch + next-node + escalations endpoints, owner/admin dep"

Task 11: Category settings API

Files:

  • Create: backend/app/schemas/l1_categories.py

  • Modify: backend/app/api/endpoints/accounts.py

  • Test: backend/tests/test_l1_categories_api.py

  • Step 1: Write the failing test

# backend/tests/test_l1_categories_api.py
import pytest


@pytest.mark.asyncio
async def test_get_categories(owner_client):
    r = await owner_client.get("/api/v1/accounts/me/l1-categories")
    assert r.status_code == 200
    body = r.json()
    assert "enabled" in body and "available" in body and "hard_floor" in body


@pytest.mark.asyncio
async def test_patch_categories_owner_only(owner_client, engineer_client):
    r = await engineer_client.patch("/api/v1/accounts/me/l1-categories",
                                    json={"enabled": ["printer"]})
    assert r.status_code == 403
    r2 = await owner_client.patch("/api/v1/accounts/me/l1-categories",
                                  json={"enabled": ["printer", "vpn_connect"]})
    assert r2.status_code == 200
    assert set(r2.json()["enabled"]) == {"printer", "vpn_connect"}
  • Step 2: Run test to verify it fails

Run: docker exec resolutionflow_backend pytest tests/test_l1_categories_api.py -v Expected: FAIL — 404 (routes not defined).

  • Step 3: Schema + endpoints
# backend/app/schemas/l1_categories.py
from pydantic import BaseModel

class L1CategoriesResponse(BaseModel):
    enabled: list[str]
    available: list[str]
    hard_floor: list[str]

class L1CategoriesUpdate(BaseModel):
    enabled: list[str]

In backend/app/api/endpoints/accounts.py (imports: the category service + new deps/schemas):

@router.get("/me/l1-categories", response_model=L1CategoriesResponse)
async def get_l1_categories(
    db: Annotated[AsyncSession, Depends(get_db)],
    user: Annotated[User, Depends(require_l1_or_above)],
):
    enabled = await l1_category_service.get_enabled_categories(user.account_id, db)
    return L1CategoriesResponse(
        enabled=enabled,
        available=l1_category_service.DEFAULT_L1_CATEGORIES,
        hard_floor=l1_category_service.HARD_FLOOR_FORBIDDEN,
    )


@router.patch("/me/l1-categories", response_model=L1CategoriesResponse)
async def set_l1_categories(
    payload: L1CategoriesUpdate,
    db: Annotated[AsyncSession, Depends(get_db)],
    user: Annotated[User, Depends(require_account_owner_or_admin)],
):
    enabled = await l1_category_service.set_enabled_categories(user.account_id, payload.enabled, db)
    await db.commit()
    return L1CategoriesResponse(
        enabled=enabled,
        available=l1_category_service.DEFAULT_L1_CATEGORIES,
        hard_floor=l1_category_service.HARD_FLOOR_FORBIDDEN,
    )

Add imports: from app.services import l1_category_service, from app.api.deps import require_l1_or_above, require_account_owner_or_admin, from app.schemas.l1_categories import L1CategoriesResponse, L1CategoriesUpdate.

  • Step 4: Run test to verify it passes

Run: docker exec resolutionflow_backend pytest tests/test_l1_categories_api.py -v Expected: PASS. (If engineer_client/owner_client fixtures don't exist, add them mirroring l1_client with account_role engineer/owner.)

  • Step 5: Commit
git add backend/app/schemas/l1_categories.py backend/app/api/endpoints/accounts.py backend/tests/test_l1_categories_api.py
git commit -m "feat(l1): account L1 category settings API (owner/admin write)"

Task 12: Backend integration test — full intake→build→resolve and →escalate

Files:

  • Test: backend/tests/test_l1_ai_build_flow.py

  • Step 1: Write the integration test

# backend/tests/test_l1_ai_build_flow.py
import pytest
from unittest.mock import AsyncMock, patch
from sqlalchemy import select
from app.models.flow_proposal import FlowProposal


@pytest.mark.asyncio
async def test_intake_build_walk_resolve_creates_proposal(l1_client, db_session, monkeypatch):
    from app.services import ai_tree_builder
    # 1. force a build outcome
    with patch("app.api.endpoints.l1.match_or_build.match_or_build",
               new=AsyncMock(return_value={"outcome": "build", "session_kind": "ai_build", "category": "printer"})):
        r = await l1_client.post("/api/v1/l1/intake", json={"problem_statement": "printer jam"})
    sid = r.json()["session_id"]

    # 2. drive next-node deterministically to a resolved node
    seq = iter([
        {"node_type": "question", "id": "n1", "text": "Powered on?"},
        {"node_type": "resolved", "id": "n2", "text": "Fixed."},
    ])
    async def fake_next(problem, category, walked):
        return next(seq)
    monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next)

    r1 = await l1_client.post(f"/api/v1/l1/sessions/{sid}/next-node", json={})
    assert r1.json()["node"]["node_type"] == "question"
    r2 = await l1_client.post(f"/api/v1/l1/sessions/{sid}/next-node",
                              json={"node_id": "n1", "answer": "no"})
    assert r2.json()["node"]["node_type"] == "resolved"

    # 3. resolve → proposal
    await l1_client.post(f"/api/v1/l1/sessions/{sid}/resolve",
                         json={"helpful": True, "resolution_notes": "ok"})
    props = (await db_session.execute(
        select(FlowProposal).where(FlowProposal.source == "ai_realtime_l1"))).scalars().all()
    assert len(props) >= 1
  • Step 2: Run test

Run: docker exec resolutionflow_backend pytest tests/test_l1_ai_build_flow.py -v Expected: PASS. Fix any wiring gaps surfaced here (this is the end-to-end backend gate).

  • Step 3: Run the full L1 backend suite for regressions

Run: docker exec resolutionflow_backend pytest tests/ -k "l1 or match_or_build or ai_tree_builder or notification" -q Expected: all pass.

  • Step 4: Commit
git add backend/tests/test_l1_ai_build_flow.py
git commit -m "test(l1): integration — intake build → walk → resolve → proposal"

Task 13: Frontend — API client + types

Files:

  • Modify: frontend/src/types/l1.ts, frontend/src/api/l1.ts

  • Step 1: Add types

In frontend/src/types/l1.ts:

export type IntakeOutcome = 'matched' | 'suggest' | 'out_of_scope' | 'build'

export interface IntakeResult {
  outcome: IntakeOutcome
  session_id?: string
  session_kind?: 'flow' | 'proposal' | 'adhoc' | 'ai_build'
  ticket_id?: string
  ticket_kind?: string
  flow_id?: string
  near_miss?: { flow_id: string; flow_name: string; score: number }
  category?: string
}

export type TreeNode =
  | { node_type: 'question'; id: string; text: string }
  | { node_type: 'instruction'; id: string; text: string }
  | { node_type: 'resolved'; id: string; text: string }
  | { node_type: 'escalate'; id: string; reason_category?: string; text: string }
  | { node_type: 'needs_review'; id: string; text: string }

export interface NextNodeResult { node: TreeNode; session_status: string }

export interface L1Categories { enabled: string[]; available: string[]; hard_floor: string[] }
  • Step 2: Add API methods

In frontend/src/api/l1.ts:

  nextNode: (sessionId: string, body: { node_id?: string; answer?: 'yes' | 'no'; acknowledged?: boolean; note?: string }) =>
    apiClient.post<NextNodeResult>(`/l1/sessions/${sessionId}/next-node`, body).then(r => r.data),

  getCategories: () =>
    apiClient.get<L1Categories>('/accounts/me/l1-categories').then(r => r.data),

  setCategories: (enabled: string[]) =>
    apiClient.patch<L1Categories>('/accounts/me/l1-categories', { enabled }).then(r => r.data),

  escalations: () =>
    apiClient.get<WalkSession[]>('/l1/escalations').then(r => r.data),

Update the existing intake method's return type to IntakeResult.

  • Step 3: Type-check

Run: docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json' Expected: clean (callers updated in Tasks 14-15; if tsc flags intake callers, proceed — they're fixed next).

  • Step 4: Commit
git add frontend/src/types/l1.ts frontend/src/api/l1.ts
git commit -m "feat(l1): frontend api/types for next-node, outcome, categories"

Task 14: Frontend — L1Dashboard intake dispatch

Files:

  • Modify: frontend/src/pages/l1/L1Dashboard.tsx

  • Step 1: Replace handleStart to dispatch on outcome

  const handleStart = async () => {
    if (!problem.trim()) return
    setSubmitting(true)
    try {
      const res = await l1Api.intake({
        problem_statement: problem.trim(),
        customer_name: customerName.trim() || undefined,
        customer_contact: customerContact.trim() || undefined,
      })
      if (res.outcome === 'matched' || res.outcome === 'build') {
        navigate(`/l1/walk/${res.session_id}`)
      } else if (res.outcome === 'suggest') {
        setSuggestion(res.near_miss ?? null)   // render an inline prompt (below)
      } else if (res.outcome === 'out_of_scope') {
        setOutOfScope(res.category ?? 'unknown')
      }
    } catch (err) {
      const detail = (err as { response?: { data?: { detail?: string } } }).response?.data?.detail
      toast.error(typeof detail === 'string' ? detail : 'Failed to start. Try again.')
    } finally {
      setSubmitting(false)
    }
  }

  const buildNew = async () => {
    setSuggestion(null)
    const res = await l1Api.intake({ problem_statement: problem.trim(), force_build: true })
    if (res.outcome === 'build') navigate(`/l1/walk/${res.session_id}`)
    else if (res.outcome === 'out_of_scope') setOutOfScope(res.category ?? 'unknown')
  }

Add state near the top: const [suggestion, setSuggestion] = useState<{flow_id:string;flow_name:string;score:number}|null>(null) and const [outOfScope, setOutOfScope] = useState<string|null>(null). Add force_build as an optional field in the l1Api.intake body type.

  • Step 2: Render the suggest + out-of-scope prompts

Below the intake card, add:

{suggestion && (
  <div className="rounded-lg border border-default bg-card p-4 space-y-3">
    <p className="text-sm">Found a similar flow: <strong>{suggestion.flow_name}</strong>.</p>
    <div className="flex gap-2">
      <button className="rounded-md bg-accent text-white px-4 py-2 text-sm"
        onClick={() => navigate('/l1/walk/use-flow', { state: { flowId: suggestion.flow_id } })}>
        Use this flow
      </button>
      <button className="rounded-md border border-default px-4 py-2 text-sm" onClick={buildNew}>
        Build new
      </button>
    </div>
  </div>
)}
{outOfScope && (
  <div className="rounded-lg border border-default bg-card p-4 space-y-3">
    <p className="text-sm">This problem isnt in your enabled L1 categories. Start an ad-hoc walk or escalate.</p>
    {/* reuse existing adhoc/escalate CTAs from Phase 1 */}
  </div>
)}

For "Use this flow", reuse the Phase-1 matched-flow path: re-call intake is unnecessary — the matched outcome already created a session. Simplest Phase 2A: when outcome==='matched' we already navigated; for suggest → Use this flow, call l1Api.intake({problem_statement, ...}) is the matched path again is not guaranteed. Implementer: on "Use this flow", POST intake with the original text (it will match again and return matched with a session) — acceptable for Phase 2A.

  • Step 3: Type-check + lint

Run: docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npx eslint src/pages/l1/L1Dashboard.tsx' Expected: clean.

  • Step 4: Commit
git add frontend/src/pages/l1/L1Dashboard.tsx
git commit -m "feat(l1): dashboard intake dispatch on match_or_build outcome"

Task 15: Frontend — L1WalkTreeVariant real node rendering + disclaimer

Files:

  • Modify: frontend/src/components/l1/L1WalkTreeVariant.tsx

  • Step 1: Drive nodes from /next-node

Replace the synthetic stepping. On mount, if session.session_kind === 'ai_build', fetch the first node (l1Api.nextNode(session.id, {})). On answer/ack, POST the current node id + answer, render the returned node. Terminal nodes (resolved/escalate/needs_review) switch to the existing Resolve/Escalate modal affordances.

const [node, setNode] = useState<TreeNode | null>(null)
const [loading, setLoading] = useState(false)

useEffect(() => {
  if (session.session_kind !== 'ai_build') return
  setLoading(true)
  l1Api.nextNode(session.id, {}).then(r => setNode(r.node)).finally(() => setLoading(false))
}, [session.id, session.session_kind])

const answer = async (a: 'yes' | 'no') => {
  if (!node) return
  setLoading(true)
  try {
    const r = await l1Api.nextNode(session.id, { node_id: node.id, answer: a })
    setNode(r.node)
  } finally { setLoading(false) }
}

const acknowledge = async () => {
  if (!node) return
  setLoading(true)
  try {
    const r = await l1Api.nextNode(session.id, { node_id: node.id, acknowledged: true })
    setNode(r.node)
  } finally { setLoading(false) }
}
  • Step 2: Render by node_type + disclaimer banner
{session.session_kind === 'ai_build' && (
  <div className="rounded-md border border-amber-500/30 bg-amber-500/10 px-4 py-2 text-xs text-amber-200">
    These are high-confidence troubleshooting steps, but they come from outside your
    organizations knowledge base  review them before acting. When in doubt, escalate early.
  </div>
)}
{loading && <p className="text-sm text-muted-foreground">Thinking through the next step</p>}
{node?.node_type === 'question' && (
  <>
    <p className="text-lg">{node.text}</p>
    <div className="flex gap-3">
      <button onClick={() => answer('yes')} className="rounded-md bg-accent text-white px-5 py-2">Yes</button>
      <button onClick={() => answer('no')} className="rounded-md border border-default px-5 py-2">No</button>
    </div>
  </>
)}
{node?.node_type === 'instruction' && (
  <>
    <p className="text-lg">{node.text}</p>
    <button onClick={acknowledge} className="rounded-md bg-accent text-white px-5 py-2">Done  next</button>
  </>
)}
{(node?.node_type === 'resolved') && (
  <ResolveCta sessionId={session.id} prefillNote={node.text} />  /* opens existing Resolve modal */
)}
{(node?.node_type === 'escalate' || node?.node_type === 'needs_review') && (
  <EscalateCta sessionId={session.id} reason={node.text} />     /* opens existing Escalate modal */
)}

Wire ResolveCta/EscalateCta to the existing WalkModals Resolve/Escalate handlers already in this component (reuse, don't duplicate).

  • Step 3: Type-check + lint

Run: docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npx eslint src/components/l1/L1WalkTreeVariant.tsx' Expected: clean.

  • Step 4: Commit
git add frontend/src/components/l1/L1WalkTreeVariant.tsx
git commit -m "feat(l1): walker renders AI-built nodes via next-node + disclaimer banner"

Task 16: Frontend — admin category settings page

Files:

  • Create: frontend/src/pages/account/L1CategoriesPage.tsx

  • Modify: router + account nav (follow the existing /account/* child-route pattern)

  • Step 1: Build the page

// frontend/src/pages/account/L1CategoriesPage.tsx
import { useEffect, useState } from 'react'
import { l1Api } from '@/api/l1'
import { toast } from '@/lib/toast'
import type { L1Categories } from '@/types/l1'

export default function L1CategoriesPage() {
  const [data, setData] = useState<L1Categories | null>(null)
  useEffect(() => { l1Api.getCategories().then(setData) }, [])
  if (!data) return null
  const toggle = async (cat: string) => {
    const enabled = data.enabled.includes(cat)
      ? data.enabled.filter(c => c !== cat) : [...data.enabled, cat]
    const updated = await l1Api.setCategories(enabled)
    setData({ ...data, enabled: updated.enabled })
    toast.success('L1 categories updated')
  }
  return (
    <div className="max-w-2xl space-y-6">
      <h1 className="font-heading text-2xl font-bold">L1 AI build categories</h1>
      <p className="text-sm text-muted-foreground">
        Problems in enabled categories can be built into AI troubleshooting trees when no
        flow exists. Disabled categories fall back to ad-hoc or escalation.
      </p>
      <div className="space-y-2">
        {data.available.map(cat => (
          <label key={cat} className="flex items-center gap-3 rounded-md border border-default bg-card px-4 py-3">
            <input type="checkbox" checked={data.enabled.includes(cat)} onChange={() => toggle(cat)} />
            <span className="text-sm">{cat.replace(/_/g, ' ')}</span>
          </label>
        ))}
      </div>
      <div>
        <h2 className="font-heading text-sm font-semibold mb-2">Always excluded (safety)</h2>
        <ul className="text-xs text-muted-foreground list-disc pl-5">
          {data.hard_floor.map(h => <li key={h}>{h.replace(/_/g, ' ')}</li>)}
        </ul>
      </div>
    </div>
  )
}
  • Step 2: Register route + nav

Add a lazy import + a child route under the /account subtree in frontend/src/router.tsx (mirror existing account children, e.g. { path: 'l1-categories', element: page(L1CategoriesPage) } under the AccountLayout route), and a nav entry in the account settings sidebar/menu following the existing pattern. Gate visibility to owner/admin in the menu (reuse usePermissions).

  • Step 3: Type-check + lint + build

Run: docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npx eslint src/pages/account/L1CategoriesPage.tsx' Expected: clean.

  • Step 4: Commit
git add frontend/src/pages/account/L1CategoriesPage.tsx frontend/src/router.tsx
git commit -m "feat(l1): admin L1 category settings page"

Task 17: Frontend — ProposalDetail L1 source + engineer escalations section

Files:

  • Modify: frontend/src/components/flowpilot/ProposalDetail.tsx

  • Modify: frontend/src/pages/EscalationQueuePage.tsx

  • Step 1: ProposalDetail — L1-sourced source block (Finding 1)

Where it currently renders the /pilot/{source_session_id} link, branch on the new l1_session_id:

{proposal.l1_session_id ? (
  <div className="text-sm text-text-muted">
    Source: AI L1 walk (outcome-validated). Unexplored branches are marked
    <span className="font-medium"> needs review</span> below.
  </div>
) : proposal.source_session_id ? (
  <Link to={`/pilot/${proposal.source_session_id}`} target="_blank" className="...">
    {/* existing link */}
  </Link>
) : null}

Add l1_session_id?: string | null to the proposal type used here.

  • Step 2: EscalationQueuePage — L1 escalations section

Fetch l1Api.escalations() and render a section above/below the existing queue:

const [l1Escalations, setL1Escalations] = useState<WalkSession[]>([])
useEffect(() => { l1Api.escalations().then(setL1Escalations).catch(() => setL1Escalations([])) }, [])
// render: problem (from ticket), walked-path length, escalated-at, reason

Each row shows the walked-path summary and links to a read-only view (Phase 2A: a simple expandable row is sufficient; no new route required).

  • Step 3: Type-check + lint + build

Run: docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npm run build' Expected: tsc clean, build succeeds.

  • Step 4: Commit
git add frontend/src/components/flowpilot/ProposalDetail.tsx frontend/src/pages/EscalationQueuePage.tsx
git commit -m "feat(l1): proposal L1 source block + engineer L1-escalations section"

Task 18: E2E — AI build flow

Files:

  • Modify: frontend/e2e/l1-workspace.spec.ts

  • Step 1: Add an AI-build e2e test

Because the builder calls a live model, stub the network at the Playwright layer: intercept POST **/l1/intake to return {outcome:'build', session_kind:'ai_build', session_id:<seeded>} and POST **/l1/sessions/*/next-node to return scripted nodes (question → resolved). Assert: L1 lands on the walker, sees the disclaimer banner, answers the question, reaches the resolved CTA.

test('L1 AI build: intake → answer node → resolve CTA', async ({ page }) => {
  await login(page, L1_EMAIL)
  await page.route('**/api/v1/l1/intake', route => route.fulfill({
    status: 200, contentType: 'application/json',
    body: JSON.stringify({ outcome: 'build', session_kind: 'ai_build', session_id: 'e2e-sess', ticket_id: 't', ticket_kind: 'internal' }),
  }))
  let call = 0
  await page.route('**/api/v1/l1/sessions/*/next-node', route => {
    call += 1
    const node = call === 1
      ? { node_type: 'question', id: 'n1', text: 'Is it powered on?' }
      : { node_type: 'resolved', id: 'n2', text: 'Resolved.' }
    route.fulfill({ status: 200, contentType: 'application/json',
      body: JSON.stringify({ node, session_status: 'active' }) })
  })
  // also stub GET session fetch the walker does on load, if any, to return an ai_build session
  await page.goto('/l1')
  await page.getByPlaceholder(/What's the user calling about/i).fill('printer jam')
  await page.getByRole('button', { name: /Start walk/i }).click()
  await expect(page.getByText(/outside your organizations knowledge base/i)).toBeVisible()
  await expect(page.getByText('Is it powered on?')).toBeVisible()
  await page.getByRole('button', { name: 'No' }).click()
  await expect(page.getByText(/Resolved\./i)).toBeVisible()
})

Adjust selectors/route patterns to the actual walker data-loading (stub the session GET the walker performs so it reports session_kind: 'ai_build').

  • Step 2: Run e2e locally only if chromium available; otherwise rely on CI

This container cannot launch chromium (sandbox). Push and let CI run npm run test:e2e. Do not block on local e2e.

  • Step 3: Commit
git add frontend/e2e/l1-workspace.spec.ts
git commit -m "test(l1): e2e AI build flow (network-stubbed)"

Task 19: Final verification

  • Step 1: Backend suite

Run: docker exec resolutionflow_backend pytest tests/ -q Expected: all pass (note any pre-existing xdist-only failures per Phase-1 acceptance report §7).

  • Step 2: Frontend gates

Run: docker exec resolutionflow_frontend sh -c 'cd /app && npx tsc --noEmit -p tsconfig.app.json && npm run lint && npm run build' Expected: tsc clean, lint 0 errors, build succeeds.

  • Step 3: Migration roundtrip on a clean DB

Run: docker exec resolutionflow_backend alembic downgrade -3 && docker exec resolutionflow_backend alembic upgrade head Expected: clean down+up for the three new migrations (run against a DB without ai_build/L1-proposal rows, or accept the documented downgrade caveat).

  • Step 4: Open PR

Push the branch and open a PR to main summarizing Phase 2A, linking the spec, and listing the deferred items (KB grounding/connectors, PSA reassign, escalation package, AI chat handoff, proposal-matching).


Self-Review notes (author)

  • Spec coverage: §3 match_or_build → Task 6/10; §4 streaming + node schema → Task 5/8/10/15; §5 safety (classify, constrained prompt, validation, depth cap, disclaimer) → Task 4/5/15; §6 flywheel + §6.1 normalize + §6.2 linkage → Task 3/9; §7 escalation handoff → Task 9/10/17; §8 migrations → Task 1/2/3; §9 API → Task 10/11; §10 frontend → Task 13-17; §11 testing → throughout + Task 12/18.
  • Known soft spots flagged for the implementer: category persistence via a meta walked_path entry (Task 10 Step 3c/Step 4) and the "Use this flow" suggest path (Task 14 Step 2) are the two places to validate carefully during review.
  • Model calls are mocked/stubbed in tests; a live constrained-decoding smoke test + the Sonnet-vs-Opus benchmark for l1_realtime_build should run in staging before wide enablement (spec §5.3).