fix(l1): resolve PR #193 backend review findings (1,4,5,6,7,8,9,10)

Server-assigns a uuid4 id to every AI-generated node (Finding 1 showstopper:
nodes had no id but the advance protocol keys on node_id, so ai_build walks
never advanced past question 1). Replaces the hidden {"node_type":"meta"}
walked_path convention with real category/problem_text/pending_node columns on
l1_walk_sessions (migration 61dda4f615c6) — fixes junk proposals + off-by-one
depth cap (Findings 8,9), and pending_node replays the served node on re-mount
(no duplicate paid LLM call). Intake honors explicit flow_id and adhoc=True
(Findings 4,5); flow_proposals.l1_session_id FK -> CASCADE (Finding 6 time
bomb); L1 category GET is owner+admin like PATCH and require_account_owner_or_admin
delegates to User.can_manage_account (Finding 7); escalate falls back to default
recipients + filters deleted_at + warns when empty (Finding 10). Cleanups: dead
ticket_ref removed, IntakeResponse per-outcome validator, unused acknowledged
dropped, escalations partial index, restored a deleted audit assertion.

Full Phase 2A backend set: 110 passed / 0 failed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-09 15:55:45 -04:00
parent 42a4536c63
commit ac89e7b2fa
17 changed files with 592 additions and 80 deletions

View File

@@ -0,0 +1,92 @@
"""l1 ai_build columns (category/problem_text/pending_node) + l1_session FK cascade
Two changes that ship together for the Phase 2A L1 AI tree builder:
1. Add real ``category`` / ``problem_text`` / ``pending_node`` columns to
``l1_walk_sessions``. These replace the former hidden
``{"node_type": "meta"}`` walked_path entry that smuggled the intake category:
that convention leaked into every consumer that forgot to skip it (junk
proposals, off-by-one depth cap, blank escalation rows). ``pending_node``
persists the served-but-unanswered node so a refresh / StrictMode double-mount
replays it instead of firing a fresh paid LLM call.
2. Flip ``flow_proposals.l1_session_id`` FK from SET NULL to CASCADE. Under the
exactly-one-source CHECK an L1-sourced proposal has ``source_session_id`` NULL,
so a SET NULL on l1_session deletion would NULL both columns and the
non-deferrable CHECK would abort the DELETE — making the session undeletable.
Also adds a partial index for the engineer escalations list.
Revision ID: 61dda4f615c6
Revises: 1fd88a68b145
Create Date: 2026-06-09
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = '61dda4f615c6'
down_revision: Union[str, None] = '1fd88a68b145'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# 1. New ai_build context columns on l1_walk_sessions.
op.add_column(
"l1_walk_sessions",
sa.Column("category", sa.String(length=100), nullable=True),
)
op.add_column(
"l1_walk_sessions",
sa.Column("problem_text", sa.Text(), nullable=True),
)
op.add_column(
"l1_walk_sessions",
sa.Column("pending_node", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
)
# Partial index for GET /l1/escalations (engineer handoff queue).
op.create_index(
"ix_l1_walk_sessions_escalated",
"l1_walk_sessions",
["account_id", sa.text("last_step_at DESC")],
postgresql_where=sa.text("status = 'escalated'"),
)
# 2. flow_proposals.l1_session_id: SET NULL -> CASCADE.
op.drop_constraint(
"fk_flow_proposals_l1_session_id", "flow_proposals", type_="foreignkey"
)
op.create_foreign_key(
"fk_flow_proposals_l1_session_id",
"flow_proposals",
"l1_walk_sessions",
["l1_session_id"],
["id"],
ondelete="CASCADE",
)
def downgrade() -> None:
op.drop_constraint(
"fk_flow_proposals_l1_session_id", "flow_proposals", type_="foreignkey"
)
op.create_foreign_key(
"fk_flow_proposals_l1_session_id",
"flow_proposals",
"l1_walk_sessions",
["l1_session_id"],
["id"],
ondelete="SET NULL",
)
op.drop_index("ix_l1_walk_sessions_escalated", table_name="l1_walk_sessions")
op.drop_column("l1_walk_sessions", "pending_node")
op.drop_column("l1_walk_sessions", "problem_text")
op.drop_column("l1_walk_sessions", "category")

View File

@@ -279,10 +279,11 @@ async def require_account_owner(
async def require_account_owner_or_admin( async def require_account_owner_or_admin(
current_user: Annotated[User, Depends(get_current_active_user)] current_user: Annotated[User, Depends(get_current_active_user)]
) -> User: ) -> User:
"""Require account owner or account-admin (blocks engineers); super_admin bypass.""" """Require account owner or account-admin (blocks engineers); super_admin bypass.
if current_user.is_super_admin:
return current_user Delegates to ``User.can_manage_account`` so the rule lives in exactly one place.
if current_user.account_role in ("owner", "admin"): """
if current_user.can_manage_account:
return current_user return current_user
raise HTTPException( raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, status_code=status.HTTP_403_FORBIDDEN,

View File

@@ -28,7 +28,6 @@ from app.api.deps import (
require_account_owner, require_account_owner,
require_account_owner_or_admin, require_account_owner_or_admin,
require_engineer_or_admin, require_engineer_or_admin,
require_l1_or_above,
) )
from app.services import l1_category_service from app.services import l1_category_service
from app.services.seat_enforcement import check_seat_available, get_seat_usage from app.services.seat_enforcement import check_seat_available, get_seat_usage
@@ -175,12 +174,13 @@ async def get_my_account_seat_usage(
@router.get("/me/l1-categories", response_model=L1CategoriesResponse) @router.get("/me/l1-categories", response_model=L1CategoriesResponse)
async def get_l1_categories( async def get_l1_categories(
db: Annotated[AsyncSession, Depends(get_db)], db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_l1_or_above)], current_user: Annotated[User, Depends(require_account_owner_or_admin)],
): ):
"""The account's enabled L1 AI-build categories + the available + hard-floor lists. """The account's enabled L1 AI-build categories + the available + hard-floor lists.
Readable by any L1-or-above user (the walker needs to know what's buildable); Owner/admin only — this is a settings surface, and read and write must agree
only owners/admins may change it (PATCH below). (the walker gates server-side via match_or_build, it never fetches this). Same
dep as PATCH so account admins can both read and save (Finding 7).
""" """
enabled = await l1_category_service.get_enabled_categories(current_user.account_id, db) enabled = await l1_category_service.get_enabled_categories(current_user.account_id, db)
return L1CategoriesResponse( return L1CategoriesResponse(

View File

@@ -35,6 +35,8 @@ def _to_response(session: L1WalkSession) -> WalkSessionResponse:
return WalkSessionResponse( return WalkSessionResponse(
id=session.id, id=session.id,
session_kind=session.session_kind, session_kind=session.session_kind,
category=session.category,
problem_text=session.problem_text,
flow_id=session.flow_id, flow_id=session.flow_id,
flow_proposal_id=session.flow_proposal_id, flow_proposal_id=session.flow_proposal_id,
current_node_id=session.current_node_id, current_node_id=session.current_node_id,
@@ -68,6 +70,17 @@ async def _get_session_or_404(
return session return session
async def _create_intake_ticket(db: AsyncSession, payload: IntakeRequest, user: User):
return await internal_ticket_service.create_ticket(
db,
account_id=user.account_id,
created_by_user_id=user.id,
problem_statement=payload.problem_statement,
customer_name=payload.customer_name,
customer_contact=payload.customer_contact,
)
@router.post("/intake", response_model=IntakeResponse) @router.post("/intake", response_model=IntakeResponse)
async def intake( async def intake(
payload: IntakeRequest, payload: IntakeRequest,
@@ -76,18 +89,49 @@ async def intake(
): ):
"""L1 intake (Phase 2A): match a published flow, else gate + build. """L1 intake (Phase 2A): match a published flow, else gate + build.
Runs the match_or_build orchestrator. Outcomes: Two explicit shortcuts run before the matcher (the client already knows what
it wants, so re-running the embedding + pgvector + keyword pipeline would be
wasteful and — for flow_id — can't reliably re-derive the same flow):
- flow_id set → start that published flow directly (suggest card's "Use this flow").
- adhoc=True → start a free-form ad-hoc walk (out_of_scope prompt's fallback).
Otherwise match_or_build dispatches:
- matched → create ticket + flow session, walk the published flow. - matched → create ticket + flow session, walk the published flow.
- build → create ticket + ai_build session (category persisted as a hidden - build → create ticket + ai_build session (category + problem_text stored
meta entry on walked_path for /next-node), walk an AI-built tree. on the session for /next-node), walk an AI-built tree.
- suggest → near-miss prompt; no session created. - suggest → near-miss prompt; no session created.
- out_of_scope → category disabled/unknown; no session created. - out_of_scope → category disabled/unknown; no session created.
""" """
# Explicit flow_id: bypass the matcher, walk the flow the client already holds.
if payload.flow_id is not None:
ticket = await _create_intake_ticket(db, payload, user)
session = await l1_session_service.start_flow_session(
db, account_id=user.account_id, user=user, flow_id=payload.flow_id,
ticket_id=str(ticket.id), ticket_kind="internal",
)
await db.commit()
return IntakeResponse(
outcome="matched", session_id=session.id, session_kind=session.session_kind,
ticket_id=str(ticket.id), ticket_kind="internal", flow_id=payload.flow_id,
)
# Explicit ad-hoc walk: the out_of_scope fallback ("Walk it ad-hoc").
if payload.adhoc:
ticket = await _create_intake_ticket(db, payload, user)
session = await l1_session_service.start_adhoc_session(
db, account_id=user.account_id, user=user,
ticket_id=str(ticket.id), ticket_kind="internal",
)
await db.commit()
return IntakeResponse(
outcome="adhoc", session_id=session.id, session_kind=session.session_kind,
ticket_id=str(ticket.id), ticket_kind="internal",
)
result = await match_or_build.match_or_build( result = await match_or_build.match_or_build(
user.account_id, user.account_id,
payload.problem_statement, payload.problem_statement,
None, None,
ticket_ref="",
db=db, db=db,
force_build=payload.force_build, force_build=payload.force_build,
) )
@@ -102,14 +146,7 @@ async def intake(
) )
# matched OR build → create a ticket and a session # matched OR build → create a ticket and a session
ticket = await internal_ticket_service.create_ticket( ticket = await _create_intake_ticket(db, payload, user)
db,
account_id=user.account_id,
created_by_user_id=user.id,
problem_statement=payload.problem_statement,
customer_name=payload.customer_name,
customer_contact=payload.customer_contact,
)
if outcome == "matched": if outcome == "matched":
session = await l1_session_service.start_flow_session( session = await l1_session_service.start_flow_session(
db, db,
@@ -126,13 +163,9 @@ async def intake(
user=user, user=user,
ticket_id=str(ticket.id), ticket_id=str(ticket.id),
ticket_kind="internal", ticket_kind="internal",
category=result.get("category", "unknown"),
problem_text=payload.problem_statement,
) )
# Persist the classified category as a hidden meta entry so /next-node
# can recover it (no dedicated column; ai_tree_builder skips meta entries).
session.walked_path = [
{"node_type": "meta", "category": result.get("category", "unknown")}
]
await db.flush()
await db.commit() await db.commit()
return IntakeResponse( return IntakeResponse(
@@ -293,27 +326,18 @@ async def next_node(
): ):
"""Record the answer/ack on the current node, then generate the next node. """Record the answer/ack on the current node, then generate the next node.
problem_text comes from the linked internal ticket; category from the hidden problem_text + category are read straight off the session (stored at intake) —
meta entry seeded at intake (ai_tree_builder skips meta entries). node_text is no ticket re-fetch, no walked_path scan. node_text is the rendered text of the
the rendered text of the node being answered (the client holds it) so the node being answered (the client holds it) so the walked path and the captured
walked path and the captured tree stay legible. tree stay legible.
""" """
session = await _get_session_or_404(db, session_id, user) session = await _get_session_or_404(db, session_id, user)
ticket = await internal_ticket_service.get_ticket(
db, ticket_id=UUID(session.ticket_id)
)
problem_text = ticket.problem_statement if ticket else ""
category = next(
(s.get("category") for s in (session.walked_path or [])
if s.get("node_type") == "meta"),
"unknown",
)
try: try:
node = await l1_session_service.advance_ai_build( node = await l1_session_service.advance_ai_build(
db, db,
session_id=session_id, session_id=session_id,
problem_text=problem_text, problem_text=session.problem_text or "",
category=category or "unknown", category=session.category or "unknown",
node_id=payload.node_id, node_id=payload.node_id,
node_text=payload.node_text, node_text=payload.node_text,
answer=payload.answer, answer=payload.answer,

View File

@@ -86,7 +86,13 @@ class FlowProposal(Base):
) )
l1_session_id: Mapped[Optional[uuid.UUID]] = mapped_column( l1_session_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True), UUID(as_uuid=True),
ForeignKey("l1_walk_sessions.id", ondelete="SET NULL"), # CASCADE, not SET NULL: the exactly-one-source CHECK below means an
# L1-sourced proposal has source_session_id NULL by construction, so a
# SET NULL on l1_session deletion would NULL both columns and the
# non-deferrable CHECK would abort the DELETE — making any L1 session
# referenced by a proposal undeletable (hard_delete_user, GDPR purge).
# The proposal dies with its source, matching source_session_id's CASCADE.
ForeignKey("l1_walk_sessions.id", ondelete="CASCADE"),
nullable=True, nullable=True,
index=True, index=True,
) )

View File

@@ -8,8 +8,7 @@ import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any, Optional, TYPE_CHECKING from typing import Any, Optional, TYPE_CHECKING
import sqlalchemy as sa from sqlalchemy import String, Text, DateTime, Boolean, ForeignKey, CheckConstraint, Index
from sqlalchemy import String, Text, DateTime, Boolean, ForeignKey, CheckConstraint
from sqlalchemy import text as sa_text from sqlalchemy import text as sa_text
from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB from sqlalchemy.dialects.postgresql import UUID, JSONB
@@ -59,6 +58,12 @@ class L1WalkSession(Base):
"OR (session_kind IN ('adhoc', 'ai_build') AND flow_id IS NULL AND flow_proposal_id IS NULL)", "OR (session_kind IN ('adhoc', 'ai_build') AND flow_id IS NULL AND flow_proposal_id IS NULL)",
name="ck_l1_walk_sessions_target_consistency", name="ck_l1_walk_sessions_target_consistency",
), ),
# Partial index backing GET /l1/escalations (the engineer handoff queue).
Index(
"ix_l1_walk_sessions_escalated",
"account_id", sa_text("last_step_at DESC"),
postgresql_where=sa_text("status = 'escalated'"),
),
) )
id: Mapped[uuid.UUID] = mapped_column( id: Mapped[uuid.UUID] = mapped_column(
@@ -86,6 +91,14 @@ class L1WalkSession(Base):
# ── Session kind + target ── # ── Session kind + target ──
session_kind: Mapped[str] = mapped_column(String(20), nullable=False) session_kind: Mapped[str] = mapped_column(String(20), nullable=False)
# AI-build context (ai_build sessions only). Persisted at intake so /next-node
# never has to re-fetch the ticket or scan walked_path to recover them — they
# are immutable for the life of the session. Replaces the former hidden
# ``{"node_type":"meta"}`` walked_path entry (deleted: it leaked into every
# consumer that forgot to skip it — junk proposals, off-by-one depth cap,
# blank escalation rows).
category: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
problem_text: Mapped[Optional[str]] = mapped_column(Text(), nullable=True)
flow_id: Mapped[Optional[uuid.UUID]] = mapped_column( flow_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True), UUID(as_uuid=True),
ForeignKey("trees.id", ondelete="SET NULL"), ForeignKey("trees.id", ondelete="SET NULL"),
@@ -99,6 +112,12 @@ class L1WalkSession(Base):
# ── Navigation state ── # ── Navigation state ──
current_node_id: Mapped[Optional[str]] = mapped_column(String(100), nullable=True) current_node_id: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
# The node served to the tech but not yet answered (ai_build only). Replayed on
# the next /next-node call with node_id=None so a refresh / StrictMode double-mount
# doesn't fire a fresh paid LLM call (and possibly swap the question mid-answer).
pending_node: Mapped[Optional[dict[str, Any]]] = mapped_column(
JSONB(), nullable=True,
)
walked_path: Mapped[list[dict[str, Any]]] = mapped_column( walked_path: Mapped[list[dict[str, Any]]] = mapped_column(
JSONB(), nullable=False, server_default=sa_text("'[]'::jsonb"), JSONB(), nullable=False, server_default=sa_text("'[]'::jsonb"),
) )

View File

@@ -3,33 +3,54 @@ from datetime import datetime
from typing import Any, Literal, Optional from typing import Any, Literal, Optional
from uuid import UUID from uuid import UUID
from pydantic import BaseModel, Field from pydantic import BaseModel, Field, model_validator
class IntakeRequest(BaseModel): class IntakeRequest(BaseModel):
problem_statement: str = Field(..., min_length=1) problem_statement: str = Field(..., min_length=1)
customer_name: Optional[str] = None customer_name: Optional[str] = None
customer_contact: Optional[str] = None customer_contact: Optional[str] = None
# When set, bypass the matcher and start this published flow directly (the
# suggest card's "Use this flow" — the client already holds the flow id).
flow_id: Optional[UUID] = None flow_id: Optional[UUID] = None
# When True, start an ad-hoc free-form walk (the out_of_scope prompt's
# "Walk it ad-hoc" fallback). Mutually informative with flow_id/force_build;
# flow_id takes precedence if both are somehow set.
adhoc: bool = False
force_build: bool = False force_build: bool = False
# Outcomes that start a session (and therefore must carry session_id + ticket).
_SESSION_OUTCOMES = {"matched", "build", "adhoc"}
class IntakeResponse(BaseModel): class IntakeResponse(BaseModel):
outcome: Literal["matched", "suggest", "out_of_scope", "build"] outcome: Literal["matched", "suggest", "out_of_scope", "build", "adhoc"]
session_id: Optional[UUID] = None session_id: Optional[UUID] = None
session_kind: Optional[Literal["flow", "proposal", "adhoc", "ai_build"]] = None session_kind: Optional[Literal["flow", "proposal", "adhoc", "ai_build"]] = None
ticket_id: Optional[str] = None ticket_id: Optional[str] = None
ticket_kind: Optional[str] = None ticket_kind: Optional[Literal["psa", "internal"]] = None
flow_id: Optional[UUID] = None # for 'matched' flow_id: Optional[UUID] = None # for 'matched'
near_miss: Optional[dict] = None # for 'suggest' near_miss: Optional[dict] = None # for 'suggest'
category: Optional[str] = None # for 'out_of_scope' category: Optional[str] = None # for 'out_of_scope'
@model_validator(mode="after")
def _check_outcome_invariants(self) -> "IntakeResponse":
"""Restore the per-outcome contract the frontend depends on: a session
outcome MUST carry the session_id + ticket the walker navigates to, so a
backend regression surfaces here instead of as /l1/walk/undefined."""
if self.outcome in _SESSION_OUTCOMES:
if self.session_id is None or self.ticket_id is None:
raise ValueError(
f"intake outcome '{self.outcome}' requires session_id + ticket_id"
)
return self
class NextNodeRequest(BaseModel): class NextNodeRequest(BaseModel):
node_id: Optional[str] = None node_id: Optional[str] = None
node_text: Optional[str] = None # rendered text of the node being answered (carry-forward Task 8) node_text: Optional[str] = None # rendered text of the node being answered (carry-forward Task 8)
answer: Optional[str] = None # 'yes' | 'no' for questions answer: Optional[str] = None # 'yes' | 'no' for questions; None acks an instruction
acknowledged: Optional[bool] = None
note: Optional[str] = None note: Optional[str] = None
@@ -70,6 +91,8 @@ class EscalateWithoutWalkRequest(BaseModel):
class WalkSessionResponse(BaseModel): class WalkSessionResponse(BaseModel):
id: UUID id: UUID
session_kind: str session_kind: str
category: Optional[str] = None
problem_text: Optional[str] = None
flow_id: Optional[UUID] flow_id: Optional[UUID]
flow_proposal_id: Optional[UUID] flow_proposal_id: Optional[UUID]
current_node_id: Optional[str] current_node_id: Optional[str]

View File

@@ -7,6 +7,7 @@ for flywheel capture.
""" """
import logging import logging
from typing import Any, Optional from typing import Any, Optional
from uuid import uuid4
from app.core.ai_provider import get_ai_provider from app.core.ai_provider import get_ai_provider
from app.core.config import settings from app.core.config import settings
@@ -45,19 +46,21 @@ No prose, no markdown fences.
""" """
def _strip_meta(walked_path: list[dict]) -> list[dict]: def _assign_id(node: dict[str, Any]) -> dict[str, Any]:
"""Drop the hidden ``meta`` entry (category carrier) the intake endpoint seeds. """Stamp a stable server-side id on a generated node (Finding 1).
The first walked_path entry on an ai_build session may be a The SYSTEM_PROMPT never asks the model for an id — and we must not, since a
``{"node_type": "meta", "category": ...}`` marker used to persist the model-invented id is neither stable nor trustworthy. But the advance protocol
classified category; it is not a real walk step and must be excluded from keys on ``node_id``: without one, the answer to every node is discarded and
both model context and tree normalization. the walk can never progress past the first question. So every node the builder
hands back — generated, depth-capped, or generation-failed — gets an id here.
""" """
return [s for s in walked_path if s.get("node_type") != "meta"] if not node.get("id"):
node["id"] = uuid4().hex[:8]
return node
def _build_context(problem_text: str, category: str, walked_path: list[dict]) -> str: def _build_context(problem_text: str, category: str, walked_path: list[dict]) -> str:
walked_path = _strip_meta(walked_path)
lines = [f"PROBLEM: {problem_text}", f"CATEGORY: {category}", "STEPS SO FAR:"] lines = [f"PROBLEM: {problem_text}", f"CATEGORY: {category}", "STEPS SO FAR:"]
if not walked_path: if not walked_path:
lines.append("(none yet — produce the first diagnostic question)") lines.append("(none yet — produce the first diagnostic question)")
@@ -81,11 +84,11 @@ def validate_node(node: dict[str, Any]) -> dict[str, Any]:
def escalate_if_depth_exceeded(walked_path: list[dict]) -> Optional[dict[str, Any]]: def escalate_if_depth_exceeded(walked_path: list[dict]) -> Optional[dict[str, Any]]:
if len(walked_path) >= MAX_DEPTH: if len(walked_path) >= MAX_DEPTH:
return { return _assign_id({
"node_type": "escalate", "node_type": "escalate",
"reason_category": "depth_cap", "reason_category": "depth_cap",
"text": "Reached the L1 troubleshooting depth limit — escalating to engineering.", "text": "Reached the L1 troubleshooting depth limit — escalating to engineering.",
} })
return None return None
@@ -108,16 +111,16 @@ async def generate_next_node(
max_tokens=1024, max_tokens=1024,
) )
node = parse_llm_json(raw) node = parse_llm_json(raw)
return validate_node(node) return _assign_id(validate_node(node))
except Exception as e: except Exception as e:
logger.warning("ai_tree_builder node attempt %d failed: %s", attempt + 1, e) logger.warning("ai_tree_builder node attempt %d failed: %s", attempt + 1, e)
continue continue
return { return _assign_id({
"node_type": "escalate", "node_type": "escalate",
"reason_category": "generation_failed", "reason_category": "generation_failed",
"text": "Could not generate a safe next step — escalating to engineering.", "text": "Could not generate a safe next step — escalating to engineering.",
} })
def normalize_walked_path(walked_path: list[dict]) -> dict[str, Any]: def normalize_walked_path(walked_path: list[dict]) -> dict[str, Any]:
@@ -128,7 +131,6 @@ def normalize_walked_path(walked_path: list[dict]) -> dict[str, Any]:
Returns {id, nodes: {id: node}} — a dict with an id (passes the proposal Returns {id, nodes: {id: node}} — a dict with an id (passes the proposal
approval guard). approval guard).
""" """
walked_path = _strip_meta(walked_path)
nodes: dict[str, Any] = {} nodes: dict[str, Any] = {}
if not walked_path: if not walked_path:
root_id = "root" root_id = "root"

View File

@@ -3,6 +3,7 @@
start_* functions live in T12; step/notes are T13; resolve/escalate are T14. start_* functions live in T12; step/notes are T13; resolve/escalate are T14.
""" """
import json import json
import logging
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional from typing import Optional
from uuid import UUID from uuid import UUID
@@ -18,6 +19,8 @@ from app.services import ai_tree_builder
from app.services import internal_ticket_service from app.services import internal_ticket_service
from app.services.notification_service import notify from app.services.notification_service import notify
logger = logging.getLogger(__name__)
def _resolve_acting_as(user: User) -> Optional[str]: def _resolve_acting_as(user: User) -> Optional[str]:
"""An engineer (whether covering or not) gets tagged for audit when using L1 surface. """An engineer (whether covering or not) gets tagged for audit when using L1 surface.
@@ -108,8 +111,15 @@ async def start_ai_build_session(
user: User, user: User,
ticket_id: str, ticket_id: str,
ticket_kind: str, ticket_kind: str,
category: Optional[str] = None,
problem_text: Optional[str] = None,
) -> L1WalkSession: ) -> L1WalkSession:
"""Start an AI-built tree session (nodes generated on demand via next-node).""" """Start an AI-built tree session (nodes generated on demand via next-node).
``category`` and ``problem_text`` are the immutable AI-build context, stored
once here so /next-node never re-derives them (no ticket re-fetch, no
walked_path scan, no hidden meta entry).
"""
session = L1WalkSession( session = L1WalkSession(
account_id=account_id, account_id=account_id,
created_by_user_id=user.id, created_by_user_id=user.id,
@@ -117,6 +127,8 @@ async def start_ai_build_session(
ticket_id=ticket_id, ticket_id=ticket_id,
ticket_kind=ticket_kind, ticket_kind=ticket_kind,
session_kind="ai_build", session_kind="ai_build",
category=category,
problem_text=problem_text,
) )
db.add(session) db.add(session)
await db.flush() await db.flush()
@@ -144,6 +156,11 @@ async def advance_ai_build(
the caller/endpoint, which holds the served node. Storing it here ensures that the caller/endpoint, which holds the served node. Storing it here ensures that
later nodes receive full prior-step context via ``ai_tree_builder._build_context`` later nodes receive full prior-step context via ``ai_tree_builder._build_context``
and that captured flywheel trees (``normalize_walked_path``) have meaningful text. and that captured flywheel trees (``normalize_walked_path``) have meaningful text.
Pending-node replay (Finding 8): the node served but not yet answered is stored
on ``session.pending_node``. When node_id is None and a pending node exists (a
refresh, a StrictMode double-mount, or back/forward), we replay it instead of
firing a fresh paid LLM call that might also swap the question mid-answer.
""" """
session = await db.get(L1WalkSession, session_id) session = await db.get(L1WalkSession, session_id)
if not session: if not session:
@@ -168,9 +185,14 @@ async def advance_ai_build(
} }
# JSONB requires assigning a new list — in-place mutation isn't tracked # JSONB requires assigning a new list — in-place mutation isn't tracked
session.walked_path = [*session.walked_path, entry] session.walked_path = [*session.walked_path, entry]
session.pending_node = None # the served node has now been answered
elif session.pending_node is not None:
# Re-mount before answering — return the already-served node verbatim.
return session.pending_node
next_node = await ai_tree_builder.generate_next_node( next_node = await ai_tree_builder.generate_next_node(
problem_text, category, session.walked_path) problem_text, category, session.walked_path)
session.pending_node = next_node
session.current_node_id = next_node.get("id") session.current_node_id = next_node.get("id")
session.last_step_at = datetime.now(timezone.utc) session.last_step_at = datetime.now(timezone.utc)
await db.flush() await db.flush()
@@ -361,24 +383,36 @@ async def escalate(
) )
# Notify engineers (owner/admin/engineer roles) about the escalation. # Notify engineers (owner/admin/engineer roles) about the escalation.
# Filter soft-deleted users too (is_active alone misses them — handoff_manager
# does the same): a deleted engineer must not be paged.
eng_rows = await db.execute( eng_rows = await db.execute(
select(User.id).where( select(User.id).where(
User.account_id == session.account_id, User.account_id == session.account_id,
User.is_active.is_(True), User.is_active.is_(True),
User.deleted_at.is_(None),
User.account_role.in_(("owner", "admin", "engineer")), User.account_role.in_(("owner", "admin", "engineer")),
) )
) )
target_ids = [r[0] for r in eng_rows.all()] target_ids = [r[0] for r in eng_rows.all()]
if not target_ids:
# No eligible engineer. Passing [] to notify() would suppress the in-app
# notification entirely (explicit-empty is honored). Fall back to the
# default owner/admin recipient set instead of silently dropping it.
logger.warning(
"L1 escalation for session %s has no active engineer recipients; "
"falling back to default owner/admin notification set.",
session.id,
)
await notify( await notify(
"l1.session.escalated", "l1.session.escalated",
session.account_id, session.account_id,
{ {
"problem_summary": session.ticket_id, "problem_summary": session.problem_text or session.ticket_id,
"session_id": str(session.id), "session_id": str(session.id),
"reason_category": reason_category, "reason_category": reason_category,
}, },
db, db,
target_user_ids=target_ids, target_user_ids=target_ids or None,
) )
await db.flush() await db.flush()

View File

@@ -52,7 +52,6 @@ async def match_or_build(
account_id: UUID, account_id: UUID,
problem_text: str, problem_text: str,
problem_domain: Optional[str], problem_domain: Optional[str],
ticket_ref: str, # passed through for caller/session use; not consumed here (Task 10)
*, *,
db: AsyncSession, db: AsyncSession,
force_build: bool = False, force_build: bool = False,

View File

@@ -2,6 +2,52 @@ import pytest
from app.services import ai_tree_builder as atb from app.services import ai_tree_builder as atb
class _FakeProvider:
def __init__(self, raw):
self._raw = raw
async def generate_json(self, *, system_prompt, messages, max_tokens):
return self._raw, None, None
@pytest.mark.asyncio
async def test_generate_next_node_assigns_id_when_model_omits_it(monkeypatch):
"""The SYSTEM_PROMPT never asks the model for an id (Finding 1). The server
must assign one to every generated node, or the advance protocol — which keys
on node_id — can never record an answer and the walk stalls on question 1."""
monkeypatch.setattr(
atb, "get_ai_provider",
lambda *a, **k: _FakeProvider('{"node_type":"question","text":"Plugged in?"}'),
)
node = await atb.generate_next_node("printer down", "printer", [])
assert node["node_type"] == "question"
assert node.get("id"), "generated node must carry a server-assigned id"
@pytest.mark.asyncio
async def test_generate_next_node_depth_cap_node_has_id(monkeypatch):
"""The depth-cap escalate node must also carry an id (it is persisted as
current_node_id and may be appended to walked_path)."""
walked = [{"node_type": "question", "id": f"n{i}", "text": "?", "answer": "no"}
for i in range(atb.MAX_DEPTH)]
node = await atb.generate_next_node("x", "printer", walked)
assert node["node_type"] == "escalate"
assert node.get("id")
@pytest.mark.asyncio
async def test_generate_next_node_generation_failed_node_has_id(monkeypatch):
"""When both generation attempts fail, the fallback escalate node carries an id."""
monkeypatch.setattr(
atb, "get_ai_provider",
lambda *a, **k: _FakeProvider("not json at all"),
)
node = await atb.generate_next_node("x", "printer", [])
assert node["node_type"] == "escalate"
assert node["reason_category"] == "generation_failed"
assert node.get("id")
def test_validate_node_rejects_hard_floor_text(): def test_validate_node_rejects_hard_floor_text():
node = {"node_type": "instruction", "id": "n1", "text": "Open regedit and change the key", "next": "generate"} node = {"node_type": "instruction", "id": "n1", "text": "Open regedit and change the key", "next": "generate"}
with pytest.raises(atb.UnsafeNodeError): with pytest.raises(atb.UnsafeNodeError):

View File

@@ -1,5 +1,13 @@
import uuid import uuid
import pytest
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.account import Account
from app.models.flow_proposal import FlowProposal from app.models.flow_proposal import FlowProposal
from app.models.l1_walk_session import L1WalkSession
from app.models.user import User
def test_flow_proposal_accepts_l1_session_id_without_source_session(): def test_flow_proposal_accepts_l1_session_id_without_source_session():
@@ -14,3 +22,44 @@ def test_flow_proposal_accepts_l1_session_id_without_source_session():
status="pending", status="pending",
) )
assert p.l1_session_id is not None and p.source_session_id is None assert p.l1_session_id is not None and p.source_session_id is None
@pytest.mark.asyncio
async def test_deleting_l1_session_cascades_proposal_not_check_violation(test_db: AsyncSession):
"""Finding 6: an L1-sourced proposal has source_session_id NULL by the exactly-one
CHECK. With ondelete=CASCADE the proposal dies with its session; the old SET NULL
would have NULLed both columns and aborted the DELETE on the CHECK (time bomb)."""
s = str(uuid.uuid4())[:8]
account = Account(id=uuid.uuid4(), name=f"Acct {s}", display_code=s.upper())
test_db.add(account)
await test_db.flush()
user = User(
id=uuid.uuid4(), email=f"u-{uuid.uuid4()}@example.com", name="U",
account_id=account.id, account_role="l1_tech", role="engineer", is_active=True,
)
test_db.add(user)
await test_db.flush()
session = L1WalkSession(
account_id=account.id, created_by_user_id=user.id,
ticket_id="t-cascade", ticket_kind="internal", session_kind="ai_build",
)
test_db.add(session)
await test_db.flush()
proposal = FlowProposal(
account_id=account.id, l1_session_id=session.id, source_session_id=None,
proposal_type="new_flow", title="AI L1 draft",
proposed_flow_data={"tree_structure": {"id": "root"}},
source="ai_realtime_l1", status="pending",
)
test_db.add(proposal)
await test_db.flush()
pid = proposal.id
# Delete the session — must succeed and cascade to the proposal.
await test_db.delete(session)
await test_db.flush()
remaining = (await test_db.execute(
select(FlowProposal).where(FlowProposal.id == pid)
)).scalar_one_or_none()
assert remaining is None

View File

@@ -155,3 +155,73 @@ async def test_escalations_forbidden_for_l1_tech(client: AsyncClient, test_db: A
info = await _make_user(client, test_db, email="aib_l1@example.com", account_role="l1_tech") info = await _make_user(client, test_db, email="aib_l1@example.com", account_role="l1_tech")
r = await client.get("/api/v1/l1/escalations", headers=info["headers"]) r = await client.get("/api/v1/l1/escalations", headers=info["headers"])
assert r.status_code == 403, r.text assert r.status_code == 403, r.text
@pytest.mark.asyncio
async def test_intake_with_flow_id_starts_flow_directly(client: AsyncClient, test_db: AsyncSession):
"""Finding 4: an explicit flow_id bypasses the matcher and starts that flow."""
from app.models.tree import Tree
info = await _make_user(client, test_db, email="aib_flowid@example.com", account_role="l1_tech")
tree = Tree(
id=uuid.uuid4(), name="VPN Flow", account_id=info["account_id"],
author_id=info["user_id"], tree_type="troubleshooting",
tree_structure={"nodes": [], "edges": []}, visibility="team", status="published",
)
test_db.add(tree)
await test_db.commit()
# match_or_build must NOT be called when flow_id is supplied.
with patch(
"app.api.endpoints.l1.match_or_build.match_or_build",
new=AsyncMock(side_effect=AssertionError("matcher should be bypassed")),
):
r = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "vpn down", "flow_id": str(tree.id)},
headers=info["headers"],
)
assert r.status_code == 200, r.text
body = r.json()
assert body["outcome"] == "matched"
assert body["session_kind"] == "flow"
assert body["flow_id"] == str(tree.id)
assert body["session_id"]
@pytest.mark.asyncio
async def test_intake_adhoc_starts_adhoc_session(client: AsyncClient, test_db: AsyncSession):
"""Finding 5: adhoc=True starts a free-form ad-hoc walk (out_of_scope fallback)."""
info = await _make_user(client, test_db, email="aib_adhoc@example.com", account_role="l1_tech")
with patch(
"app.api.endpoints.l1.match_or_build.match_or_build",
new=AsyncMock(side_effect=AssertionError("matcher should be bypassed")),
):
r = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "weird thing", "adhoc": True},
headers=info["headers"],
)
assert r.status_code == 200, r.text
body = r.json()
assert body["outcome"] == "adhoc"
assert body["session_kind"] == "adhoc"
assert body["session_id"]
@pytest.mark.asyncio
async def test_intake_build_persists_category_and_problem_text(client: AsyncClient, test_db: AsyncSession):
"""Root cause B: build stores category + problem_text on the session (no meta entry)."""
info = await _make_user(client, test_db, email="aib_cols@example.com", account_role="l1_tech")
with patch(
"app.api.endpoints.l1.match_or_build.match_or_build",
new=AsyncMock(return_value={"outcome": "build", "session_kind": "ai_build",
"category": "printer"}),
):
r = await client.post("/api/v1/l1/intake",
json={"problem_statement": "printer jam"}, headers=info["headers"])
sid = r.json()["session_id"]
sess = await test_db.get(L1WalkSession, uuid.UUID(sid))
assert sess.category == "printer"
assert sess.problem_text == "printer jam"
# No hidden meta entry smuggled into walked_path.
assert sess.walked_path == []

View File

@@ -1,6 +1,6 @@
"""Tests for the account L1 AI-build category settings API (Phase 2A). """Tests for the account L1 AI-build category settings API (Phase 2A).
GET /accounts/me/l1-categories — readable by L1-or-above. GET /accounts/me/l1-categories — owner/admin only (Finding 7: read and write agree).
PATCH /accounts/me/l1-categories — owner/admin only; drops unknown/hard-floored keys. PATCH /accounts/me/l1-categories — owner/admin only; drops unknown/hard-floored keys.
""" """
import uuid import uuid
@@ -65,12 +65,22 @@ async def test_get_categories_returns_enabled_available_hard_floor(client: Async
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_categories_readable_by_l1_tech(client: AsyncClient, test_db: AsyncSession): async def test_get_categories_readable_by_admin(client: AsyncClient, test_db: AsyncSession):
info = await _make_user(client, test_db, email="cat_l1_get@example.com", account_role="l1_tech") """Finding 7: account admins can READ (previously 403 on GET while they could PATCH)."""
info = await _make_user(client, test_db, email="cat_admin_get@example.com", account_role="admin")
r = await client.get("/api/v1/accounts/me/l1-categories", headers=info["headers"]) r = await client.get("/api/v1/accounts/me/l1-categories", headers=info["headers"])
assert r.status_code == 200, r.text assert r.status_code == 200, r.text
@pytest.mark.asyncio
async def test_get_categories_forbidden_for_l1_tech(client: AsyncClient, test_db: AsyncSession):
"""Finding 7: GET now matches PATCH (owner/admin only). The walker gates
server-side and never fetches this, so l1_tech read access was unused."""
info = await _make_user(client, test_db, email="cat_l1_get@example.com", account_role="l1_tech")
r = await client.get("/api/v1/accounts/me/l1-categories", headers=info["headers"])
assert r.status_code == 403, r.text
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_patch_categories_owner_can_set(client: AsyncClient, test_db: AsyncSession): async def test_patch_categories_owner_can_set(client: AsyncClient, test_db: AsyncSession):
info = await _make_user(client, test_db, email="cat_owner_patch@example.com", account_role="owner") info = await _make_user(client, test_db, email="cat_owner_patch@example.com", account_role="owner")

View File

@@ -124,8 +124,9 @@ async def _create_adhoc_session(db: AsyncSession, info: dict, *, problem: str =
async def test_intake_build_creates_ai_build_session(client: AsyncClient, test_db: AsyncSession): async def test_intake_build_creates_ai_build_session(client: AsyncClient, test_db: AsyncSession):
"""POST /l1/intake with a 'build' outcome creates an ai_build session. """POST /l1/intake with a 'build' outcome creates an ai_build session.
Phase 2A: intake dispatches via match_or_build; 'adhoc' is no longer a direct Phase 2A: intake dispatches via match_or_build. An explicit adhoc=True (the
intake outcome (it is offered from the out_of_scope prompt on the frontend). out_of_scope prompt's "Walk it ad-hoc") starts an ad-hoc session directly —
see test_l1_api_ai_build.test_intake_adhoc_starts_adhoc_session.
""" """
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, patch
info = await _make_l1_user(client, test_db, email="l1intake@example.com") info = await _make_l1_user(client, test_db, email="l1intake@example.com")

View File

@@ -11,6 +11,7 @@ from app.models.user import User
from app.models.tree import Tree from app.models.tree import Tree
from app.models.ai_session import AISession from app.models.ai_session import AISession
from app.models.flow_proposal import FlowProposal from app.models.flow_proposal import FlowProposal
from app.models.l1_walk_session import L1WalkSession
from app.services.l1_session_service import ( from app.services.l1_session_service import (
start_flow_session, start_flow_session,
start_proposal_session, start_proposal_session,
@@ -1073,3 +1074,138 @@ async def test_escalate_without_walk_writes_audit_log(test_db: AsyncSession):
) )
row = result.scalar_one() row = result.scalar_one()
assert row.account_id == account.id assert row.account_id == account.id
# Audit coverage: the reason category must be recorded (restored — a prior
# edit dropped this assertion, weakening the audit guarantee).
assert row.details["escalation_reason_category"] == "no_kb_content"
# ---------------------------------------------------------------------------
# Finding 1 (server-assigned node ids) + Finding 8 (pending-node replay)
# ---------------------------------------------------------------------------
class _FakeProvider:
def __init__(self, raw):
self._raw = raw
async def generate_json(self, *, system_prompt, messages, max_tokens):
return self._raw, None, None
@pytest.mark.asyncio
async def test_ai_build_first_node_carries_id_and_advance_grows_walk(
test_db: AsyncSession, monkeypatch,
):
"""Finding 1 contract: the SYSTEM_PROMPT never asks for an id, yet the first
generated node must carry one — and advancing with that id must grow walked_path
(the original showstopper: node_id was always None, so the walk never advanced)."""
from app.services import l1_session_service as svc
from app.services import ai_tree_builder
account = await _make_account(test_db)
l1_user = await _make_user(test_db, account_id=account.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id="t-contract", ticket_kind="internal",
category="printer", problem_text="printer offline")
# Real generator + a provider that omits id (the shape the model produces).
monkeypatch.setattr(
ai_tree_builder, "get_ai_provider",
lambda *a, **k: _FakeProvider('{"node_type":"question","text":"Plugged in?"}'))
first = await svc.advance_ai_build(
test_db, session_id=s.id, problem_text="printer offline",
category="printer", node_id=None)
assert first.get("id"), "first node must carry a server-assigned id"
# Answer it with the id we were handed; walked_path must grow by one.
await svc.advance_ai_build(
test_db, session_id=s.id, problem_text="printer offline", category="printer",
node_id=first["id"], node_text=first["text"], answer="no")
refreshed = await test_db.get(L1WalkSession, s.id)
assert len(refreshed.walked_path) == 1
assert refreshed.walked_path[0]["id"] == first["id"]
@pytest.mark.asyncio
async def test_advance_ai_build_replays_pending_node_without_regenerating(
test_db: AsyncSession, monkeypatch,
):
"""Finding 8: a re-mount (node_id=None) replays the served-but-unanswered node
instead of firing a fresh paid LLM call (which could also swap the question)."""
from app.services import l1_session_service as svc
from app.services import ai_tree_builder
account = await _make_account(test_db)
l1_user = await _make_user(test_db, account_id=account.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id="t-replay", ticket_kind="internal",
category="printer", problem_text="printer offline")
calls = {"n": 0}
async def fake_next(problem, category, walked):
calls["n"] += 1
return {"node_type": "question", "id": f"q{calls['n']}", "text": "?"}
monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next)
first = await svc.advance_ai_build(
test_db, session_id=s.id, problem_text="p", category="printer", node_id=None)
# Re-mount without answering — must NOT regenerate.
replay = await svc.advance_ai_build(
test_db, session_id=s.id, problem_text="p", category="printer", node_id=None)
assert calls["n"] == 1
assert replay["id"] == first["id"]
# ---------------------------------------------------------------------------
# Finding 10: escalation recipient resolution
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_escalate_skips_soft_deleted_engineer(test_db: AsyncSession, monkeypatch):
"""A soft-deleted engineer must not be paged (is_active alone misses them)."""
from datetime import datetime, timezone
from app.services import l1_session_service as svc
calls = {}
async def fake_notify(event, account_id, payload, db, target_user_ids=None):
calls["target_user_ids"] = target_user_ids
monkeypatch.setattr(svc, "notify", fake_notify)
account = await _make_account(test_db)
l1_user = await _make_user(test_db, account_id=account.id)
live_eng = await _make_user(test_db, account_id=account.id, account_role="engineer")
dead_eng = await _make_user(test_db, account_id=account.id, account_role="engineer")
dead_eng.deleted_at = datetime.now(timezone.utc)
await test_db.flush()
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1_user.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id=str(ticket.id), ticket_kind="internal")
await svc.escalate(test_db, session_id=s.id, reason="x", reason_category="exhausted_safe_steps")
assert live_eng.id in calls["target_user_ids"]
assert dead_eng.id not in calls["target_user_ids"]
@pytest.mark.asyncio
async def test_escalate_with_no_engineers_falls_back_to_default_recipients(
test_db: AsyncSession, monkeypatch,
):
"""Finding 10: when no eligible engineer exists, pass None (not []) so notify()
falls back to the default owner/admin set instead of silently dropping it."""
from app.services import l1_session_service as svc
calls = {}
async def fake_notify(event, account_id, payload, db, target_user_ids=None):
calls["target_user_ids"] = target_user_ids
monkeypatch.setattr(svc, "notify", fake_notify)
account = await _make_account(test_db)
# Only an l1_tech exists — not in the owner/admin/engineer recipient query.
l1_user = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1_user.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id=str(ticket.id), ticket_kind="internal")
await svc.escalate(test_db, session_id=s.id, reason="x", reason_category="exhausted_safe_steps")
assert calls["target_user_ids"] is None

View File

@@ -10,7 +10,7 @@ async def test_match_wins_before_category_gate():
with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock( with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(
return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "VPN", "score": 0.9}])), \ return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "VPN", "score": 0.9}])), \
patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=[])): patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=[])):
res = await mob.match_or_build(uuid.uuid4(), "vpn down", None, "t1", db=AsyncMock(), force_build=False) res = await mob.match_or_build(uuid.uuid4(), "vpn down", None, db=AsyncMock(), force_build=False)
assert res["outcome"] == "matched" assert res["outcome"] == "matched"
assert res["session_kind"] == "flow" assert res["session_kind"] == "flow"
@@ -19,7 +19,7 @@ async def test_match_wins_before_category_gate():
async def test_suggest_band(): async def test_suggest_band():
with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock( with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(
return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.66}])): return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.66}])):
res = await mob.match_or_build(uuid.uuid4(), "p", None, "t1", db=AsyncMock(), force_build=False) res = await mob.match_or_build(uuid.uuid4(), "p", None, db=AsyncMock(), force_build=False)
assert res["outcome"] == "suggest" assert res["outcome"] == "suggest"
assert res["near_miss"]["flow_name"] == "X" assert res["near_miss"]["flow_name"] == "X"
assert "flow_id" in res["near_miss"] and isinstance(res["near_miss"]["flow_id"], str) assert "flow_id" in res["near_miss"] and isinstance(res["near_miss"]["flow_id"], str)
@@ -32,7 +32,7 @@ async def test_out_of_scope_when_category_disabled_on_build_path():
with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(return_value=[])), \ with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(return_value=[])), \
patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \ patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \
patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["vpn_connect"])): patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["vpn_connect"])):
res = await mob.match_or_build(uuid.uuid4(), "printer jam", None, "t1", db=AsyncMock(), force_build=False) res = await mob.match_or_build(uuid.uuid4(), "printer jam", None, db=AsyncMock(), force_build=False)
assert res["outcome"] == "out_of_scope" assert res["outcome"] == "out_of_scope"
@@ -41,7 +41,7 @@ async def test_build_when_enabled_and_no_match():
with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(return_value=[])), \ with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(return_value=[])), \
patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \ patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \
patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["printer"])): patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["printer"])):
res = await mob.match_or_build(uuid.uuid4(), "printer jam", None, "t1", db=AsyncMock(), force_build=False) res = await mob.match_or_build(uuid.uuid4(), "printer jam", None, db=AsyncMock(), force_build=False)
assert res["outcome"] == "build" assert res["outcome"] == "build"
assert res["session_kind"] == "ai_build" assert res["session_kind"] == "ai_build"
@@ -52,7 +52,7 @@ async def test_force_build_skips_match_but_still_gates():
with patch.object(mob.flow_matching_engine, "find_matches", new=fm), \ with patch.object(mob.flow_matching_engine, "find_matches", new=fm), \
patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \ patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \
patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["printer"])): patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["printer"])):
res = await mob.match_or_build(uuid.uuid4(), "p", None, "t1", db=AsyncMock(), force_build=True) res = await mob.match_or_build(uuid.uuid4(), "p", None, db=AsyncMock(), force_build=True)
fm.assert_not_called() fm.assert_not_called()
assert res["outcome"] == "build" assert res["outcome"] == "build"
@@ -61,7 +61,7 @@ async def test_force_build_skips_match_but_still_gates():
async def test_score_exactly_match_threshold_is_matched(): async def test_score_exactly_match_threshold_is_matched():
with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock( with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(
return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.75}])): return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.75}])):
res = await mob.match_or_build(uuid.uuid4(), "p", None, "t1", db=AsyncMock(), force_build=False) res = await mob.match_or_build(uuid.uuid4(), "p", None, db=AsyncMock(), force_build=False)
assert res["outcome"] == "matched" assert res["outcome"] == "matched"
@@ -69,7 +69,7 @@ async def test_score_exactly_match_threshold_is_matched():
async def test_score_exactly_suggest_threshold_is_suggest(): async def test_score_exactly_suggest_threshold_is_suggest():
with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock( with patch.object(mob.flow_matching_engine, "find_matches", new=AsyncMock(
return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.60}])): return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.60}])):
res = await mob.match_or_build(uuid.uuid4(), "p", None, "t1", db=AsyncMock(), force_build=False) res = await mob.match_or_build(uuid.uuid4(), "p", None, db=AsyncMock(), force_build=False)
assert res["outcome"] == "suggest" assert res["outcome"] == "suggest"
@@ -79,7 +79,7 @@ async def test_score_below_suggest_falls_through_to_build_path():
return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.4}])), \ return_value=[{"tree_id": str(uuid.uuid4()), "tree_name": "X", "score": 0.4}])), \
patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \ patch.object(mob, "classify", new=AsyncMock(return_value="printer")), \
patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["printer"])): patch.object(mob, "get_enabled_categories", new=AsyncMock(return_value=["printer"])):
res = await mob.match_or_build(uuid.uuid4(), "printer", None, "t1", db=AsyncMock(), force_build=False) res = await mob.match_or_build(uuid.uuid4(), "printer", None, db=AsyncMock(), force_build=False)
assert res["outcome"] == "build" assert res["outcome"] == "build"