Files
resolutionflow/backend/app/api/endpoints/kb_accelerator.py
Michael Chihlas 91d2bc6df3 fix: KB Accelerator tree builder, approve all, canvas delete button
- Fix _build_troubleshooting_tree() to handle deep nesting, warning nodes,
  and DAG deduplication (placed set prevents duplicate IDs)
- Fix step_sync VARCHAR(255) overflow on publish by truncating title
- Add "Approve All" button to KB review screen
- Add delete button (hover-reveal) to flow canvas nodes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 01:59:03 -04:00

732 lines
26 KiB
Python

"""KB Accelerator endpoints.
Upload KB articles, convert to flows via AI, review, and commit.
POST /kb-accelerator/upload — Upload file or paste text
GET /kb-accelerator/{id} — Get import with nodes
GET /kb-accelerator — List imports for account
POST /kb-accelerator/{id}/convert — Re-trigger AI conversion
PATCH /kb-accelerator/{id}/nodes/{nid} — Edit a node
POST /kb-accelerator/{id}/commit — Commit to flow library
DELETE /kb-accelerator/{id} — Cancel/cleanup
GET /kb-accelerator/quota — Plan entitlements + usage
"""
import logging
import mimetypes
from datetime import datetime, timezone
from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, UploadFile, File, Form, status
from sqlalchemy import select, func, delete
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin
from app.core.config import settings
from app.core.rate_limit import limiter
from app.core.subscriptions import get_plan_limits
from app.core.ai_quota_service import get_user_plan
from app.core.kb_extraction_service import extract_text
from app.core.kb_conversion_service import convert_document
from app.models.kb_import import KBImport, KBImportNode
from app.models.plan_limits import PlanLimits
from app.models.tree import Tree
from app.models.user import User
from app.schemas.kb_accelerator import (
KBUploadTextRequest,
KBNodeEditRequest,
KBCommitRequest,
KBUploadResponse,
KBImportResponse,
KBImportNodeResponse,
KBImportSummary,
KBImportListResponse,
KBCommitResponse,
KBQuotaResponse,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/kb-accelerator", tags=["kb-accelerator"])
# Max upload size: 10MB
MAX_UPLOAD_SIZE = 10 * 1024 * 1024
ALLOWED_EXTENSIONS = {
"txt": ["text/plain"],
"docx": ["application/vnd.openxmlformats-officedocument.wordprocessingml.document"],
}
# Phase 2 formats (not yet enabled)
PHASE2_EXTENSIONS = {
"pdf": ["application/pdf"],
"html": ["text/html"],
"md": ["text/markdown", "text/plain"],
}
def _detect_format(filename: str) -> str | None:
"""Detect source format from filename extension."""
if not filename:
return None
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else None
if ext in ALLOWED_EXTENSIONS or ext in PHASE2_EXTENSIONS:
return ext
return None
async def _get_kb_limits(user: User, db: AsyncSession) -> PlanLimits | None:
plan = await get_user_plan(user.account_id, db)
return await get_plan_limits(plan, db)
async def _check_kb_enabled(user: User, db: AsyncSession) -> PlanLimits:
limits = await _get_kb_limits(user, db)
if not limits or not limits.kb_accelerator_enabled:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="KB Accelerator is not available on your plan.",
)
return limits
async def _check_lifetime_limit(user: User, limits: PlanLimits, db: AsyncSession) -> None:
if limits.kb_max_lifetime_conversions is None:
return # Unlimited
count = await db.scalar(
select(func.count(KBImport.id)).where(
KBImport.account_id == user.account_id,
KBImport.status == "committed",
)
) or 0
if count >= limits.kb_max_lifetime_conversions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"You have reached your lifetime limit of {limits.kb_max_lifetime_conversions} KB conversions. Upgrade your plan for unlimited conversions.",
)
async def _check_format_allowed(source_format: str, limits: PlanLimits) -> None:
allowed = limits.kb_allowed_formats or ["txt", "paste"]
if source_format not in allowed:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Format '{source_format}' is not available on your plan. Allowed: {', '.join(allowed)}",
)
async def _get_import_or_404(
import_id: UUID, user: User, db: AsyncSession, *, load_nodes: bool = True
) -> KBImport:
query = select(KBImport).where(
KBImport.id == import_id,
KBImport.account_id == user.account_id,
)
if load_nodes:
query = query.options(selectinload(KBImport.nodes))
result = await db.execute(query)
kb_import = result.scalar_one_or_none()
if not kb_import:
raise HTTPException(status_code=404, detail="KB import not found")
return kb_import
async def _run_conversion(import_id: UUID, db_url: str) -> None:
"""Background task: run AI conversion on a KB import."""
from app.core.database import async_session_maker
async with async_session_maker() as db:
result = await db.execute(
select(KBImport).where(KBImport.id == import_id)
)
kb_import = result.scalar_one_or_none()
if not kb_import or kb_import.status != "processing":
return
try:
await convert_document(kb_import, db)
await db.commit()
except Exception as e:
logger.error("Background KB conversion failed: %s", e)
kb_import.status = "failed"
kb_import.error_message = f"Conversion error: {str(e)}"
await db.commit()
def _serialize_import(kb_import: KBImport) -> dict:
"""Serialize a KBImport to dict for response."""
return {
"id": kb_import.id,
"account_id": kb_import.account_id,
"created_by": kb_import.created_by,
"source_filename": kb_import.source_filename,
"source_format": kb_import.source_format,
"source_text": kb_import.source_text,
"source_metadata": kb_import.source_metadata,
"target_type": kb_import.target_type,
"status": kb_import.status,
"confidence_avg": kb_import.confidence_avg,
"error_message": kb_import.error_message,
"processing_time_ms": kb_import.processing_time_ms,
"ai_tokens_input": kb_import.ai_tokens_input,
"ai_tokens_output": kb_import.ai_tokens_output,
"tree_id": kb_import.tree_id,
"nodes": [
KBImportNodeResponse.model_validate(n) for n in kb_import.nodes
] if kb_import.nodes else [],
"created_at": kb_import.created_at.isoformat(),
"updated_at": kb_import.updated_at.isoformat(),
}
# ── Endpoints ──
@router.get("/quota", response_model=KBQuotaResponse)
async def get_quota(
user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Get KB Accelerator entitlements and usage for the current account."""
plan = await get_user_plan(user.account_id, db)
limits = await get_plan_limits(plan, db)
committed_count = await db.scalar(
select(func.count(KBImport.id)).where(
KBImport.account_id == user.account_id,
KBImport.status == "committed",
)
) or 0
if not limits:
return KBQuotaResponse(
plan=plan,
kb_accelerator_enabled=False,
lifetime_conversions_used=committed_count,
lifetime_conversions_limit=0,
allowed_formats=["txt", "paste"],
detailed_analysis=False,
conversational_refinement=False,
step_library_matching=False,
history_limit=3,
can_convert=False,
)
can_convert = limits.kb_accelerator_enabled
if limits.kb_max_lifetime_conversions is not None:
can_convert = can_convert and committed_count < limits.kb_max_lifetime_conversions
return KBQuotaResponse(
plan=plan,
kb_accelerator_enabled=limits.kb_accelerator_enabled,
lifetime_conversions_used=committed_count,
lifetime_conversions_limit=limits.kb_max_lifetime_conversions,
allowed_formats=limits.kb_allowed_formats or ["txt", "paste"],
detailed_analysis=limits.kb_detailed_analysis,
conversational_refinement=limits.kb_conversational_refinement,
step_library_matching=limits.kb_step_library_matching,
history_limit=limits.kb_history_limit,
can_convert=can_convert,
)
@router.post("/upload", response_model=KBUploadResponse, status_code=201)
@limiter.limit("10/minute")
async def upload_kb_article(
request: Request,
background_tasks: BackgroundTasks,
user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
file: Optional[UploadFile] = File(None),
content: Optional[str] = Form(None),
title: Optional[str] = Form(None),
target_type: Optional[str] = Form(None),
):
"""Upload a KB article file or paste text for conversion."""
if not settings.ai_enabled:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="AI is not configured.",
)
limits = await _check_kb_enabled(user, db)
await _check_lifetime_limit(user, limits, db)
# Determine source format and extract text
if file and file.filename:
source_format = _detect_format(file.filename)
if not source_format:
raise HTTPException(
status_code=400,
detail=f"Unsupported file format. Supported: {', '.join(ALLOWED_EXTENSIONS.keys())}",
)
await _check_format_allowed(source_format, limits)
file_bytes = await file.read()
if len(file_bytes) > MAX_UPLOAD_SIZE:
raise HTTPException(status_code=413, detail="File exceeds 10MB limit.")
if len(file_bytes) == 0:
raise HTTPException(status_code=400, detail="Uploaded file is empty.")
source_filename = file.filename
try:
source_text, source_metadata = extract_text(file_bytes, source_format)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))
elif content:
source_format = "paste"
await _check_format_allowed(source_format, limits)
source_filename = title
source_text = content.strip()
source_metadata = None
if len(source_text) < 10:
raise HTTPException(status_code=400, detail="Content must be at least 10 characters.")
else:
raise HTTPException(status_code=400, detail="Provide either a file or content text.")
# Validate target_type
if target_type and target_type not in ("troubleshooting", "procedural"):
raise HTTPException(status_code=400, detail="target_type must be 'troubleshooting' or 'procedural'.")
if not target_type:
target_type = "troubleshooting" # Default; Phase 2 adds "let AI decide"
# Create KB import record
kb_import = KBImport(
account_id=user.account_id,
created_by=user.id,
source_filename=source_filename,
source_format=source_format,
source_text=source_text,
source_metadata=source_metadata,
target_type=target_type,
status="processing",
)
db.add(kb_import)
await db.flush()
# Trigger AI conversion in background
background_tasks.add_task(_run_conversion, kb_import.id, settings.DATABASE_URL)
await db.commit()
return KBUploadResponse(
id=kb_import.id,
status=kb_import.status,
source_format=kb_import.source_format,
)
@router.get("/{import_id}", response_model=KBImportResponse)
async def get_kb_import(
import_id: UUID,
user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Get a KB import with its generated nodes."""
kb_import = await _get_import_or_404(import_id, user, db)
return _serialize_import(kb_import)
@router.get("", response_model=KBImportListResponse)
async def list_kb_imports(
user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
skip: int = 0,
limit: int = 20,
status_filter: Optional[str] = None,
):
"""List KB imports for the current account."""
limits = await _get_kb_limits(user, db)
history_limit = limits.kb_history_limit if limits else 3
query = select(KBImport).where(KBImport.account_id == user.account_id)
count_query = select(func.count(KBImport.id)).where(KBImport.account_id == user.account_id)
if status_filter:
query = query.where(KBImport.status == status_filter)
count_query = count_query.where(KBImport.status == status_filter)
total = await db.scalar(count_query) or 0
query = query.order_by(KBImport.created_at.desc())
# Apply history limit for free tier
effective_limit = limit
if history_limit is not None:
effective_limit = min(limit, history_limit - skip) if skip < history_limit else 0
if effective_limit <= 0:
return KBImportListResponse(items=[], total=total, skip=skip, limit=limit)
query = query.offset(skip).limit(effective_limit)
query = query.options(selectinload(KBImport.nodes))
result = await db.execute(query)
imports = result.scalars().all()
items = []
for imp in imports:
items.append(KBImportSummary(
id=imp.id,
source_filename=imp.source_filename,
source_format=imp.source_format,
target_type=imp.target_type,
status=imp.status,
confidence_avg=imp.confidence_avg,
node_count=len(imp.nodes) if imp.nodes else 0,
created_at=imp.created_at.isoformat(),
))
return KBImportListResponse(items=items, total=total, skip=skip, limit=limit)
@router.post("/{import_id}/convert", response_model=KBUploadResponse)
@limiter.limit("30/minute")
async def reconvert(
request: Request,
import_id: UUID,
background_tasks: BackgroundTasks,
user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Re-trigger AI conversion on an existing import (retry/regenerate)."""
if not settings.ai_enabled:
raise HTTPException(status_code=503, detail="AI is not configured.")
kb_import = await _get_import_or_404(import_id, user, db, load_nodes=False)
if kb_import.status == "committed":
raise HTTPException(status_code=400, detail="Cannot reconvert a committed import.")
# Delete existing nodes
await db.execute(
delete(KBImportNode).where(KBImportNode.kb_import_id == kb_import.id)
)
kb_import.status = "processing"
kb_import.error_message = None
kb_import.confidence_avg = None
await db.flush()
background_tasks.add_task(_run_conversion, kb_import.id, settings.DATABASE_URL)
await db.commit()
return KBUploadResponse(
id=kb_import.id, status="processing", source_format=kb_import.source_format
)
@router.patch("/{import_id}/nodes/{node_id}", response_model=KBImportNodeResponse)
async def edit_node(
import_id: UUID,
node_id: UUID,
data: KBNodeEditRequest,
user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Edit a specific node in a KB import during review."""
kb_import = await _get_import_or_404(import_id, user, db, load_nodes=False)
if kb_import.status != "ready":
raise HTTPException(status_code=400, detail="Import must be in 'ready' status to edit nodes.")
result = await db.execute(
select(KBImportNode).where(
KBImportNode.id == node_id,
KBImportNode.kb_import_id == import_id,
)
)
node = result.scalar_one_or_none()
if not node:
raise HTTPException(status_code=404, detail="Node not found")
op = data.operation
if op == "approve":
node.user_approved = True
elif op == "reject":
node.user_approved = False
elif op == "edit":
if not data.content:
raise HTTPException(status_code=400, detail="Content required for edit operation.")
node.content = data.content
node.user_edited = True
elif op == "delete":
await db.delete(node)
# Reorder remaining nodes
remaining = await db.execute(
select(KBImportNode)
.where(KBImportNode.kb_import_id == import_id)
.order_by(KBImportNode.node_order)
)
for idx, n in enumerate(remaining.scalars().all()):
n.node_order = idx
await db.flush()
await db.commit()
# Return a placeholder response for deleted node
return KBImportNodeResponse(
id=node_id,
kb_import_id=import_id,
node_order=-1,
node_type="step",
content={"deleted": True},
confidence_score=0,
user_edited=False,
user_approved=False,
)
elif op == "insert_after":
if not data.content:
raise HTTPException(status_code=400, detail="Content required for insert_after operation.")
# Shift subsequent nodes
subsequent = await db.execute(
select(KBImportNode)
.where(
KBImportNode.kb_import_id == import_id,
KBImportNode.node_order > node.node_order,
)
.order_by(KBImportNode.node_order)
)
for n in subsequent.scalars().all():
n.node_order += 1
new_node = KBImportNode(
kb_import_id=import_id,
node_order=node.node_order + 1,
node_type=data.content.get("type", "step"),
content=data.content,
confidence_score=1.0, # User-created nodes are fully trusted
user_edited=True,
user_approved=True,
)
db.add(new_node)
await db.flush()
await db.commit()
return KBImportNodeResponse.model_validate(new_node)
elif op == "regenerate":
# Re-run AI for just this node (simplified: update placeholder)
# Full implementation would call AI with node context + guidance
node.user_edited = False
node.user_approved = False
node.updated_at = datetime.now(timezone.utc)
await db.flush()
await db.commit()
return KBImportNodeResponse.model_validate(node)
@router.post("/{import_id}/commit", response_model=KBCommitResponse)
async def commit_import(
import_id: UUID,
user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
data: Optional[KBCommitRequest] = None,
):
"""Commit a reviewed KB import to the flow library as a Tree."""
kb_import = await _get_import_or_404(import_id, user, db)
if kb_import.status != "ready":
raise HTTPException(status_code=400, detail="Import must be in 'ready' status to commit.")
if not kb_import.nodes:
raise HTTPException(status_code=400, detail="No nodes to commit.")
# Extract title/description from conversion metadata
conversion_meta = (kb_import.source_metadata or {}).get("_conversion", {})
tree_name = (data.name if data and data.name else None) or conversion_meta.get("title", "Imported Flow")
tree_description = (data.description if data else None) or conversion_meta.get("description")
# Build tree_structure from nodes
if kb_import.target_type == "troubleshooting":
tree_structure = _build_troubleshooting_tree(kb_import.nodes)
else:
tree_structure = _build_procedural_tree(kb_import.nodes)
# Build intake_form for procedural flows
intake_form = None
if kb_import.target_type == "procedural":
intake_form = (kb_import.source_metadata or {}).get("_intake_form")
# Create the Tree record
tree = Tree(
name=tree_name,
description=tree_description,
tree_type=kb_import.target_type,
tree_structure=tree_structure,
intake_form=intake_form,
author_id=user.id,
account_id=user.account_id,
status="draft",
import_metadata={
"source": "kb_accelerator",
"kb_import_id": str(kb_import.id),
"source_filename": kb_import.source_filename,
"source_format": kb_import.source_format,
"confidence_avg": kb_import.confidence_avg,
"node_count": len(kb_import.nodes),
"converted_at": datetime.now(timezone.utc).isoformat(),
},
)
if data and data.category_id:
tree.category_id = data.category_id
db.add(tree)
await db.flush()
kb_import.status = "committed"
kb_import.tree_id = tree.id
await db.commit()
return KBCommitResponse(
tree_id=tree.id,
import_id=kb_import.id,
tree_type=kb_import.target_type,
)
@router.delete("/{import_id}", status_code=204)
async def delete_import(
import_id: UUID,
user: Annotated[User, Depends(require_engineer_or_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Cancel and clean up a KB import."""
kb_import = await _get_import_or_404(import_id, user, db, load_nodes=False)
if kb_import.status == "committed":
raise HTTPException(status_code=400, detail="Cannot delete a committed import.")
await db.execute(
delete(KBImportNode).where(KBImportNode.kb_import_id == import_id)
)
await db.delete(kb_import)
await db.commit()
# ── Tree Structure Builders ──
def _build_troubleshooting_tree(nodes: list[KBImportNode]) -> dict:
"""Build a troubleshooting tree_structure from import nodes.
The tree editor expects a deeply nested structure where each decision
node's `children` array contains all reachable descendant nodes.
Action/solution nodes use `title`/`description` (not `question`).
The AI generates a DAG (shared nodes reachable from multiple paths),
but the tree editor requires unique IDs — each node can only appear
once. We embed each node the first time it's encountered; subsequent
references just use next_node_id / options[].next_node_id to point
back to the already-embedded node.
"""
if not nodes:
return {"id": "root", "type": "decision", "question": "Empty", "children": []}
# Map original IDs to import nodes
original_id_map: dict[str, KBImportNode] = {}
for node in nodes:
orig_id = node.content.get("original_id", str(node.id))
original_id_map[orig_id] = node
# Track which nodes have been placed in the tree to avoid duplicates
placed: set[str] = set()
def _build_node(import_node: KBImportNode) -> dict | None:
content = import_node.content
node_type = import_node.node_type
node_id = content.get("original_id", str(import_node.id))
# Already placed in the tree — don't create a duplicate.
# The reference (next_node_id / options) is sufficient.
if node_id in placed:
return None
placed.add(node_id)
question_text = content.get("question", "")
if node_type == "resolution":
return {
"id": node_id,
"type": "solution",
"title": question_text,
"description": content.get("description", ""),
}
if node_type in ("action", "warning"):
result: dict = {
"id": node_id,
"type": "action",
"title": question_text,
"description": content.get("description", ""),
}
next_id = content.get("next_node_id")
if next_id and next_id in original_id_map:
result["next_node_id"] = next_id
return result
# question/decision type — recursively build children
options = content.get("options", [])
children = []
for opt in options:
next_id = opt.get("next_node_id")
if next_id and next_id in original_id_map:
child_node = _build_node(original_id_map[next_id])
if child_node is not None:
children.append(child_node)
# If the child is an action with a next_node_id, also
# build that target as a sibling (the tree editor
# expects reachable nodes nested under the decision)
_collect_action_chain(child_node, children)
return {
"id": node_id,
"type": "decision",
"question": question_text,
"options": [
{"label": opt.get("label", ""), "next_node_id": opt.get("next_node_id", "")}
for opt in options
],
"children": children,
}
def _collect_action_chain(node: dict, siblings: list[dict]) -> None:
"""Follow action node next_node_id chains and add targets as siblings."""
if node.get("type") != "action":
return
next_id = node.get("next_node_id")
if not next_id or next_id not in original_id_map:
return
# Don't add if already in this siblings list or already placed
if any(s["id"] == next_id for s in siblings):
return
target = _build_node(original_id_map[next_id])
if target is None:
return
siblings.append(target)
# Continue chain if the target is also an action
_collect_action_chain(target, siblings)
root_node = nodes[0]
result = _build_node(root_node)
return result or {"id": "root", "type": "decision", "question": "Empty", "children": []}
def _build_procedural_tree(nodes: list[KBImportNode]) -> dict:
"""Build a procedural tree_structure from import nodes."""
steps = []
for node in sorted(nodes, key=lambda n: n.node_order):
content = node.content
step = {
"id": content.get("original_id", str(node.id)),
"type": node.node_type,
"content": content.get("content", ""),
}
steps.append(step)
return {
"id": "root",
"type": "procedural",
"steps": steps,
}