"""Constrained, node-by-node L1 decision-tree generation (spec §4/§5/§6.1). Each call produces ONE node given the problem, category, and full walked path. Generation is constrained to safe/reversible L1 steps and biased to escalate early. normalize_walked_path() turns a resolved walk into a valid tree object for flywheel capture. """ import logging from typing import Any, Optional from app.core.ai_provider import get_ai_provider from app.core.config import settings from app.services.l1_category_service import HARD_FLOOR_TEXT_PATTERNS from app.services.llm_utils import parse_llm_json logger = logging.getLogger(__name__) MAX_DEPTH = 12 VALID_NODE_TYPES = {"question", "instruction", "resolved", "escalate"} class UnsafeNodeError(ValueError): """Raised when a generated node violates the hard floor or is malformed.""" SYSTEM_PROMPT = """\ You are an L1 helpdesk troubleshooting guide builder. Given a problem and the steps already tried, produce the SINGLE next node of a yes/no decision tree. HARD RULES: - Only safe, reversible, observe-or-restart-class steps: checking status, toggling, restarting, reconnecting, re-entering credentials the USER already knows. - NEVER produce steps that: edit the registry/system files/boot config; delete or format data/disks; change credentials/MFA/security/firewall/AV; run elevated or admin scripts; touch domain controllers/DNS/DHCP or production servers; or have billing/license impact. These are out of L1 scope. - When you run out of safe in-scope steps, DO NOT GUESS. Emit an "escalate" node. Return ONLY a JSON object for ONE node, one of: {"node_type":"question","text":""} {"node_type":"instruction","text":""} {"node_type":"resolved","text":""} {"node_type":"escalate","reason_category":"exhausted_safe_steps","text":""} No prose, no markdown fences. """ def _strip_meta(walked_path: list[dict]) -> list[dict]: """Drop the hidden ``meta`` entry (category carrier) the intake endpoint seeds. The first walked_path entry on an ai_build session may be a ``{"node_type": "meta", "category": ...}`` marker used to persist the classified category; it is not a real walk step and must be excluded from both model context and tree normalization. """ return [s for s in walked_path if s.get("node_type") != "meta"] def _build_context(problem_text: str, category: str, walked_path: list[dict]) -> str: walked_path = _strip_meta(walked_path) lines = [f"PROBLEM: {problem_text}", f"CATEGORY: {category}", "STEPS SO FAR:"] if not walked_path: lines.append("(none yet — produce the first diagnostic question)") for i, step in enumerate(walked_path, 1): ans = step.get("answer") suffix = f" -> {ans}" if ans else "" lines.append(f"{i}. [{step.get('node_type','?')}] {step.get('text','')}{suffix}") return "\n".join(lines) def validate_node(node: dict[str, Any]) -> dict[str, Any]: """Shape + hard-floor validation. Raises UnsafeNodeError on violation.""" if not isinstance(node, dict) or node.get("node_type") not in VALID_NODE_TYPES: raise UnsafeNodeError(f"invalid node_type: {node!r}") text = (node.get("text") or "").lower() for pat in HARD_FLOOR_TEXT_PATTERNS: if pat in text: raise UnsafeNodeError(f"hard-floor pattern '{pat}' in node text") return node def escalate_if_depth_exceeded(walked_path: list[dict]) -> Optional[dict[str, Any]]: if len(walked_path) >= MAX_DEPTH: return { "node_type": "escalate", "reason_category": "depth_cap", "text": "Reached the L1 troubleshooting depth limit — escalating to engineering.", } return None async def generate_next_node( problem_text: str, category: str, walked_path: list[dict] ) -> dict[str, Any]: """Generate + validate the next node. Regenerate once on failure, then escalate.""" capped = escalate_if_depth_exceeded(walked_path) if capped: return capped provider = get_ai_provider(settings.get_model_for_action("l1_realtime_build")) context = _build_context(problem_text, category, walked_path) for attempt in range(2): try: raw, _, _ = await provider.generate_json( system_prompt=SYSTEM_PROMPT, messages=[{"role": "user", "content": context}], max_tokens=1024, ) node = parse_llm_json(raw) return validate_node(node) except Exception as e: logger.warning("ai_tree_builder node attempt %d failed: %s", attempt + 1, e) continue return { "node_type": "escalate", "reason_category": "generation_failed", "text": "Could not generate a safe next step — escalating to engineering.", } def normalize_walked_path(walked_path: list[dict]) -> dict[str, Any]: """Turn a resolved walk into a valid troubleshooting tree (spec §6.1). Root = first node's id; question nodes' traversed branch points to the next node, the untraversed branch to a needs_review stub; terminal node ends it. Returns {id, nodes: {id: node}} — a dict with an id (passes the proposal approval guard). """ walked_path = _strip_meta(walked_path) nodes: dict[str, Any] = {} if not walked_path: root_id = "root" nodes[root_id] = {"id": root_id, "node_type": "needs_review", "text": "Empty walk — needs authoring."} return {"id": root_id, "nodes": nodes} stub_seq = 0 for i, step in enumerate(walked_path): nid = step.get("id") or f"n{i+1}" ntype = step.get("node_type", "question") nxt = walked_path[i + 1].get("id", f"n{i+2}") if i + 1 < len(walked_path) else None node: dict[str, Any] = {"id": nid, "node_type": ntype, "text": step.get("text", "")} if step.get("reason_category"): node["reason_category"] = step["reason_category"] if ntype == "question": answer = (step.get("answer") or "").lower() stub_seq += 1 stub_id = f"review-{stub_seq}" nodes[stub_id] = {"id": stub_id, "node_type": "needs_review", "text": "Branch not explored during the originating call."} traversed_next = nxt if traversed_next is None: # Walk ended on this question (no terminal recorded) — stub the # branch the tech actually took so the tree has no dangling edge. stub_seq += 1 traversed_next = f"review-{stub_seq}" nodes[traversed_next] = {"id": traversed_next, "node_type": "needs_review", "text": "Walk ended here before a terminal step was reached."} node["yes_next"] = traversed_next if answer == "yes" else stub_id node["no_next"] = traversed_next if answer == "no" else stub_id elif ntype == "instruction": node["next"] = nxt nodes[nid] = node return {"id": walked_path[0].get("id", "n1"), "nodes": nodes}