feat: KB Accelerator — convert KB articles into interactive flows

Full-stack implementation of the KB Accelerator feature that converts
static MSP knowledge base articles into interactive troubleshooting
and procedural flows using AI.

Backend:
- Migrations 054/055: kb_imports, kb_import_nodes tables + plan_limits KB columns
- SQLAlchemy models with relationships and self-referential node hierarchy
- Text extraction service (txt, paste, docx with structural metadata)
- AI conversion service with MSP-specialist prompts for both flow types
- 8 API endpoints: upload, get, list, convert, edit node, commit, delete, quota
- Tier-gated access via plan_limits (free: 3 lifetime, pro/team: unlimited)
- 8 integration tests covering upload, get/list, quota, commit, delete

Frontend:
- TypeScript types and API client for all KB Accelerator endpoints
- Multi-step wizard page: upload → processing → review → success
- Upload screen with paste/file tabs, drag-drop, target type selector
- Two-panel review screen with source highlighting and node cards
- Per-node actions: approve, edit, regenerate, insert, delete
- Confidence color indicators (green/amber/red)
- Sidebar navigation with Sparkles icon
- Code-split lazy-loaded route at /kb-accelerator

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Michael Chihlas
2026-03-10 20:56:28 -04:00
parent c65aa4f0b7
commit 71ff4a8c35
27 changed files with 4426 additions and 2 deletions

View File

@@ -98,6 +98,7 @@ class Settings(BaseSettings):
"quick_action": "fast",
"open_chat": "standard",
"variable_inference": "fast",
"kb_convert": "standard",
}
def get_model_for_action(self, action_type: str) -> str:

View File

@@ -0,0 +1,498 @@
"""KB Accelerator AI conversion service.
Converts extracted KB article text into ResolutionFlow tree structures
using the Anthropic API (via the shared AI provider layer).
"""
import json
import logging
import re
import time
from typing import Any
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.ai_provider import get_ai_provider
from app.core.ai_quota_service import record_ai_usage, get_user_plan
from app.core.config import settings
from app.models.kb_import import KBImport, KBImportNode
logger = logging.getLogger(__name__)
# Cost estimation (Sonnet pricing)
COST_PER_INPUT_TOKEN = 3.0 / 1_000_000
COST_PER_OUTPUT_TOKEN = 15.0 / 1_000_000
def _strip_markdown_fences(text: str) -> str:
"""Strip markdown code fences if the model wrapped its JSON response."""
text = text.strip()
match = re.match(r"^```(?:json)?\s*([\s\S]*?)```$", text)
if match:
return match.group(1).strip()
return text
def _estimate_cost(input_tokens: int, output_tokens: int) -> float:
return (input_tokens * COST_PER_INPUT_TOKEN) + (output_tokens * COST_PER_OUTPUT_TOKEN)
# ── System Prompts ──
TROUBLESHOOTING_SYSTEM_PROMPT = """You are an MSP documentation specialist for ResolutionFlow. Your task is to convert a knowledge base article into an interactive troubleshooting decision tree.
Analyze the article and produce a JSON array of nodes that form a troubleshooting flow. Each node represents either a diagnostic question (decision point) or a resolution (solution).
## Node Types
- **question**: A diagnostic question with multiple answer options. Each option leads to another node.
- **resolution**: A terminal node with the solution/fix text.
- **action**: An instruction step that leads to the next node via next_node_id.
- **warning**: A caution or important note.
## Output Format
Return a JSON object with this structure:
```json
{
"title": "Flow title derived from the article",
"description": "Brief description of what this flow troubleshoots",
"nodes": [
{
"id": "unique-node-id",
"type": "question",
"question": "What symptom is the user experiencing?",
"options": [
{"label": "Cannot connect", "next_node_id": "check-network"},
{"label": "Slow performance", "next_node_id": "check-resources"}
],
"confidence": 0.95,
"source_excerpt": "The exact text from the article this node was derived from"
},
{
"id": "check-network",
"type": "action",
"question": "Check the network connection and ping the server",
"next_node_id": "network-result",
"confidence": 0.88,
"source_excerpt": "Step 1: Verify network connectivity..."
},
{
"id": "solution-restart",
"type": "resolution",
"question": "Restart the service. The issue should now be resolved.",
"confidence": 0.92,
"source_excerpt": "Restarting the service resolves the connectivity issue."
}
]
}
```
## Rules
1. Every node MUST have a unique `id` (descriptive kebab-case).
2. Every node MUST have a `confidence` score between 0.0 and 1.0.
3. Every node MUST have a `source_excerpt` — the exact text from the source article it was derived from.
4. The first node is the root of the decision tree.
5. All `next_node_id` and option `next_node_id` references must point to existing node IDs.
6. Detect implicit branching logic (e.g., "If X, do Y; otherwise Z") and create decision nodes.
7. Produce at least 3 nodes. Maximum 50 nodes.
8. Use high confidence (0.9+) for directly stated steps, medium (0.7-0.89) for reasonable inferences, low (<0.7) for significant interpretation.
9. Return ONLY valid JSON — no markdown fences, no explanation text."""
PROCEDURAL_SYSTEM_PROMPT = """You are an MSP documentation specialist for ResolutionFlow. Your task is to convert a knowledge base article into a procedural (step-by-step) flow.
Analyze the article and produce a JSON object with sequential steps and detected variables.
## Step Types
- **step**: A regular instruction step.
- **section_header**: A section divider/title (no action, just organizational).
- **warning**: A caution or important note that should be highlighted.
## Variable Detection
Identify values that would change between executions (server names, IPs, usernames, domains, etc.) and replace them with `[VAR:variable_name]` tokens. Also produce an intake_form that captures these variables before execution.
## Output Format
Return a JSON object:
```json
{
"title": "Procedure title derived from the article",
"description": "Brief description of what this procedure accomplishes",
"steps": [
{
"id": "unique-step-id",
"type": "step",
"content": "Open Server Manager and navigate to Add Roles on [VAR:server_name]",
"confidence": 0.95,
"source_excerpt": "Step 1: Open Server Manager on DC01..."
},
{
"id": "warning-dns",
"type": "warning",
"content": "WARNING: This will restart DNS and cause brief connectivity loss",
"confidence": 0.90,
"source_excerpt": "Note: Restarting DNS will cause a brief outage"
},
{
"id": "section-verification",
"type": "section_header",
"content": "Verification Steps",
"confidence": 1.0,
"source_excerpt": "Verification"
}
],
"intake_form": [
{
"variable_name": "server_name",
"label": "Server Name",
"field_type": "text",
"required": true,
"display_order": 1
},
{
"variable_name": "ip_address",
"label": "IP Address",
"field_type": "text",
"required": true,
"display_order": 2
}
]
}
```
## Variable Type Mapping
- IP addresses → field_type: "text", variable like `ip_address`
- Server/computer names → field_type: "text", variable like `server_name`
- Domain names → field_type: "text", variable like `domain_name`
- Usernames/email → field_type: "text", variable like `username`
- Port numbers → field_type: "number", variable like `port`
## Rules
1. Every step MUST have a unique `id` (descriptive kebab-case).
2. Every step MUST have a `confidence` score between 0.0 and 1.0.
3. Every step MUST have a `source_excerpt` — the exact text from the source article.
4. Preserve the original step ordering from the article.
5. Detect ALL instance-specific values and replace with `[VAR:name]` tokens.
6. Generate an intake_form entry for each unique variable detected.
7. Produce at least 2 steps. Maximum 100 steps.
8. Use high confidence (0.9+) for directly stated steps, medium (0.7-0.89) for inferences, low (<0.7) for significant interpretation.
9. Return ONLY valid JSON — no markdown fences, no explanation text."""
def _build_user_message(
source_text: str,
source_metadata: dict[str, Any] | None,
source_filename: str | None,
) -> str:
"""Build the user message containing the extracted text and metadata."""
parts = []
if source_filename:
parts.append(f"Source file: {source_filename}")
if source_metadata:
headings = source_metadata.get("headings", [])
if headings:
heading_text = ", ".join(
f"H{h['level']}: {h['text']}" for h in headings[:20]
)
parts.append(f"Detected headings: {heading_text}")
lists = source_metadata.get("lists", [])
if lists:
parts.append(f"Detected {len(lists)} list(s) in the document.")
tables = source_metadata.get("tables", [])
if tables:
parts.append(f"Detected {len(tables)} table(s) in the document.")
parts.append(f"\n--- ARTICLE CONTENT ---\n\n{source_text}")
return "\n".join(parts)
def _parse_troubleshooting_response(
data: dict[str, Any],
kb_import_id: UUID,
) -> tuple[list[KBImportNode], str, str | None]:
"""Parse AI response into KBImportNode records for troubleshooting flows.
Returns (nodes, title, description).
"""
title = data.get("title", "Imported Troubleshooting Flow")
description = data.get("description")
raw_nodes = data.get("nodes", [])
if not raw_nodes:
raise ValueError("AI returned no nodes")
# Build parent mapping from the tree structure
# First node is root (no parent). For others, trace via options/next_node_id.
node_id_to_parent: dict[str, str | None] = {}
node_id_to_data: dict[str, dict[str, Any]] = {}
for node in raw_nodes:
nid = node.get("id", "")
node_id_to_data[nid] = node
if nid not in node_id_to_parent:
node_id_to_parent[nid] = None # default: no parent
# Trace parent relationships
for node in raw_nodes:
nid = node.get("id", "")
# Options point to children
for opt in node.get("options", []):
child_id = opt.get("next_node_id")
if child_id and child_id in node_id_to_data:
node_id_to_parent[child_id] = nid
# next_node_id points to child
next_id = node.get("next_node_id")
if next_id and next_id in node_id_to_data:
node_id_to_parent[next_id] = nid
# Create import node records preserving order
import uuid as uuid_mod
node_id_map: dict[str, uuid_mod.UUID] = {}
nodes: list[KBImportNode] = []
for order, raw_node in enumerate(raw_nodes):
node_uuid = uuid_mod.uuid4()
nid = raw_node.get("id", f"node-{order}")
node_id_map[nid] = node_uuid
for order, raw_node in enumerate(raw_nodes):
nid = raw_node.get("id", f"node-{order}")
node_type = raw_node.get("type", "question")
if node_type == "decision":
node_type = "question"
parent_str_id = node_id_to_parent.get(nid)
parent_uuid = node_id_map.get(parent_str_id) if parent_str_id else None
# Build content JSONB
content: dict[str, Any] = {
"original_id": nid,
"question": raw_node.get("question", ""),
}
if raw_node.get("options"):
content["options"] = raw_node["options"]
if raw_node.get("next_node_id"):
content["next_node_id"] = raw_node["next_node_id"]
import_node = KBImportNode(
id=node_id_map[nid],
kb_import_id=kb_import_id,
node_order=order,
node_type=node_type,
content=content,
parent_node_id=parent_uuid,
source_excerpt=raw_node.get("source_excerpt"),
confidence_score=float(raw_node.get("confidence", 0.5)),
user_edited=False,
user_approved=False,
)
nodes.append(import_node)
return nodes, title, description
def _parse_procedural_response(
data: dict[str, Any],
kb_import_id: UUID,
) -> tuple[list[KBImportNode], str, str | None, list[dict[str, Any]] | None]:
"""Parse AI response into KBImportNode records for procedural flows.
Returns (nodes, title, description, intake_form).
"""
title = data.get("title", "Imported Procedure")
description = data.get("description")
raw_steps = data.get("steps", [])
intake_form = data.get("intake_form")
if not raw_steps:
raise ValueError("AI returned no steps")
import uuid as uuid_mod
nodes: list[KBImportNode] = []
for order, raw_step in enumerate(raw_steps):
content: dict[str, Any] = {
"original_id": raw_step.get("id", f"step-{order}"),
"content": raw_step.get("content", ""),
}
node_type = raw_step.get("type", "step")
if node_type not in ("step", "section_header", "warning"):
node_type = "step"
import_node = KBImportNode(
id=uuid_mod.uuid4(),
kb_import_id=kb_import_id,
node_order=order,
node_type=node_type,
content=content,
parent_node_id=None, # Procedural flows are linear
source_excerpt=raw_step.get("source_excerpt"),
confidence_score=float(raw_step.get("confidence", 0.5)),
user_edited=False,
user_approved=False,
)
nodes.append(import_node)
return nodes, title, description, intake_form
async def convert_document(
kb_import: KBImport,
db: AsyncSession,
) -> list[KBImportNode]:
"""Run AI conversion on an extracted KB article.
Creates KBImportNode records and updates the kb_import status.
Returns the created nodes.
"""
start_time = time.monotonic()
# Select system prompt based on target type
if kb_import.target_type == "troubleshooting":
system_prompt = TROUBLESHOOTING_SYSTEM_PROMPT
else:
system_prompt = PROCEDURAL_SYSTEM_PROMPT
user_message = _build_user_message(
source_text=kb_import.source_text,
source_metadata=kb_import.source_metadata,
source_filename=kb_import.source_filename,
)
# Get AI provider with model routing
model = settings.get_model_for_action("kb_convert")
provider = get_ai_provider(model=model)
try:
raw_text, input_tokens, output_tokens = await provider.generate_json(
system_prompt=system_prompt,
messages=[{"role": "user", "content": user_message}],
max_tokens=8192,
)
except Exception as e:
logger.error("AI conversion failed for kb_import=%s: %s", kb_import.id, e)
kb_import.status = "failed"
kb_import.error_message = f"AI processing error: {str(e)}"
kb_import.processing_time_ms = int((time.monotonic() - start_time) * 1000)
await db.flush()
# Record failed usage
plan = await get_user_plan(kb_import.account_id, db)
await record_ai_usage(
user_id=kb_import.created_by,
account_id=kb_import.account_id,
conversation_id=None,
generation_type="kb_convert",
tier=plan,
input_tokens=0,
output_tokens=0,
estimated_cost=0.0,
succeeded=False,
counts_toward_quota=False,
error_code="ai_error",
extra_data={"kb_import_id": str(kb_import.id)},
db=db,
)
return []
# Parse JSON response
raw_text = _strip_markdown_fences(raw_text)
try:
data = json.loads(raw_text)
except json.JSONDecodeError as e:
logger.error(
"KB conversion JSON parse failed for kb_import=%s (%d chars): %s",
kb_import.id, len(raw_text), raw_text[:500],
)
kb_import.status = "failed"
kb_import.error_message = f"AI returned invalid JSON: {e}"
kb_import.processing_time_ms = int((time.monotonic() - start_time) * 1000)
kb_import.ai_tokens_input = input_tokens
kb_import.ai_tokens_output = output_tokens
await db.flush()
return []
# Parse into nodes based on target type
try:
intake_form = None
if kb_import.target_type == "troubleshooting":
nodes, title, description = _parse_troubleshooting_response(
data, kb_import.id
)
else:
nodes, title, description, intake_form = _parse_procedural_response(
data, kb_import.id
)
except (ValueError, KeyError, TypeError) as e:
logger.error("KB node parsing failed for kb_import=%s: %s", kb_import.id, e)
kb_import.status = "failed"
kb_import.error_message = f"Failed to parse AI response: {e}"
kb_import.processing_time_ms = int((time.monotonic() - start_time) * 1000)
kb_import.ai_tokens_input = input_tokens
kb_import.ai_tokens_output = output_tokens
await db.flush()
return []
# Persist nodes
for node in nodes:
db.add(node)
# Update import record
elapsed_ms = int((time.monotonic() - start_time) * 1000)
confidence_scores = [n.confidence_score for n in nodes]
avg_confidence = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0.0
kb_import.status = "ready"
kb_import.confidence_avg = avg_confidence
kb_import.processing_time_ms = elapsed_ms
kb_import.ai_tokens_input = input_tokens
kb_import.ai_tokens_output = output_tokens
# Store parsed metadata for commit phase
if not kb_import.source_metadata:
kb_import.source_metadata = {}
kb_import.source_metadata["_conversion"] = {
"title": title,
"description": description,
"node_count": len(nodes),
}
if intake_form:
kb_import.source_metadata["_intake_form"] = intake_form
await db.flush()
# Record successful usage
plan = await get_user_plan(kb_import.account_id, db)
cost = _estimate_cost(input_tokens, output_tokens)
await record_ai_usage(
user_id=kb_import.created_by,
account_id=kb_import.account_id,
conversation_id=None,
generation_type="kb_convert",
tier=plan,
input_tokens=input_tokens,
output_tokens=output_tokens,
estimated_cost=cost,
succeeded=True,
counts_toward_quota=True,
error_code=None,
extra_data={"kb_import_id": str(kb_import.id), "node_count": len(nodes)},
db=db,
)
logger.info(
"KB conversion complete: import=%s, nodes=%d, confidence=%.2f, time=%dms, tokens=%d/%d",
kb_import.id, len(nodes), avg_confidence, elapsed_ms, input_tokens, output_tokens,
)
return nodes

View File

@@ -0,0 +1,199 @@
"""KB Accelerator text extraction service.
Extracts plain text and structural metadata from uploaded KB articles.
Phase 1: txt, paste, docx. Phase 2 will add pdf, html, md.
"""
import io
import logging
from typing import Any, Callable
logger = logging.getLogger(__name__)
# Type alias for extraction handlers
ExtractResult = tuple[str, dict[str, Any] | None]
ExtractHandler = Callable[[bytes], ExtractResult]
def _extract_txt(content_bytes: bytes) -> ExtractResult:
"""Extract from plain text — pass through with no metadata."""
text = content_bytes.decode("utf-8", errors="replace")
return text.strip(), None
def _extract_paste(content_bytes: bytes) -> ExtractResult:
"""Extract from pasted text — identical to txt."""
return _extract_txt(content_bytes)
def _extract_docx(content_bytes: bytes) -> ExtractResult:
"""Extract text and structural metadata from a DOCX file.
Preserves heading levels, list structures, table content,
and bold/italic emphasis markers.
"""
try:
from docx import Document
from docx.enum.text import WD_ALIGN_PARAGRAPH
except ImportError:
raise RuntimeError(
"python-docx is required for DOCX extraction. "
"Install it with: pip install python-docx"
)
doc = Document(io.BytesIO(content_bytes))
text_parts: list[str] = []
metadata: dict[str, Any] = {
"headings": [],
"lists": [],
"tables": [],
"emphasis": [],
}
list_items: list[dict[str, Any]] = []
current_list_type: str | None = None
for i, para in enumerate(doc.paragraphs):
style_name = para.style.name if para.style else ""
text = para.text.strip()
if not text:
# Flush any accumulated list
if list_items:
metadata["lists"].append({
"type": current_list_type or "unordered",
"items": list_items,
})
list_items = []
current_list_type = None
text_parts.append("")
continue
# Detect headings
if style_name.startswith("Heading"):
try:
level = int(style_name.split()[-1])
except (ValueError, IndexError):
level = 1
metadata["headings"].append({
"level": level,
"text": text,
"paragraph_index": i,
})
text_parts.append(text)
continue
# Detect list items
if style_name.startswith("List"):
is_ordered = "Number" in style_name or "Ordered" in style_name
list_type = "ordered" if is_ordered else "unordered"
if current_list_type is not None and current_list_type != list_type:
# Flush previous list
metadata["lists"].append({
"type": current_list_type,
"items": list_items,
})
list_items = []
current_list_type = list_type
list_items.append({"text": text, "paragraph_index": i})
text_parts.append(text)
continue
# Flush any accumulated list before a non-list paragraph
if list_items:
metadata["lists"].append({
"type": current_list_type or "unordered",
"items": list_items,
})
list_items = []
current_list_type = None
# Detect emphasis (bold/italic runs)
for run in para.runs:
run_text = run.text.strip()
if not run_text:
continue
if run.bold:
metadata["emphasis"].append({
"type": "bold",
"text": run_text,
"paragraph_index": i,
})
if run.italic:
metadata["emphasis"].append({
"type": "italic",
"text": run_text,
"paragraph_index": i,
})
text_parts.append(text)
# Flush trailing list
if list_items:
metadata["lists"].append({
"type": current_list_type or "unordered",
"items": list_items,
})
# Extract tables
for t_idx, table in enumerate(doc.tables):
table_data: list[list[str]] = []
for row in table.rows:
table_data.append([cell.text.strip() for cell in row.cells])
if table_data:
metadata["tables"].append({
"table_index": t_idx,
"rows": table_data,
})
# Also add table content to text
for row in table_data:
text_parts.append(" | ".join(row))
full_text = "\n".join(text_parts).strip()
# Clean up empty metadata sections
metadata = {k: v for k, v in metadata.items() if v}
return full_text, metadata if metadata else None
# Registry of format handlers — extend for Phase 2
FORMAT_HANDLERS: dict[str, ExtractHandler] = {
"txt": _extract_txt,
"paste": _extract_paste,
"docx": _extract_docx,
}
def extract_text(
content_bytes: bytes,
source_format: str,
) -> ExtractResult:
"""Extract plain text and structural metadata from uploaded content.
Args:
content_bytes: Raw bytes of the uploaded content.
source_format: Format identifier ('txt', 'paste', 'docx', etc.)
Returns:
Tuple of (plain_text, structural_metadata_or_none).
Raises:
ValueError: If the format is not supported.
RuntimeError: If a required extraction library is not installed.
"""
handler = FORMAT_HANDLERS.get(source_format)
if handler is None:
raise ValueError(f"Unsupported format: {source_format}")
logger.info("Extracting text from format=%s", source_format)
text, metadata = handler(content_bytes)
if not text.strip():
raise ValueError("Extracted text is empty — the document may be blank or contain only images.")
logger.info(
"Extraction complete: %d chars, metadata=%s",
len(text),
"yes" if metadata else "no",
)
return text, metadata