Moved md from Phase 2 extensions to allowed formats, added extraction handler (reuses txt handler), and updated plan_limits defaults to include md for all plans. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
959 lines
34 KiB
Python
959 lines
34 KiB
Python
"""KB Accelerator endpoints.
|
|
|
|
Upload KB articles, convert to flows via AI, review, and commit.
|
|
|
|
POST /kb-accelerator/upload — Upload file or paste text
|
|
GET /kb-accelerator/{id} — Get import with nodes
|
|
GET /kb-accelerator — List imports for account
|
|
POST /kb-accelerator/{id}/convert — Re-trigger AI conversion
|
|
PATCH /kb-accelerator/{id}/nodes/{nid} — Edit a node
|
|
POST /kb-accelerator/{id}/commit — Commit to flow library
|
|
DELETE /kb-accelerator/{id} — Cancel/cleanup
|
|
GET /kb-accelerator/quota — Plan entitlements + usage
|
|
"""
|
|
import logging
|
|
import mimetypes
|
|
from datetime import datetime, timezone
|
|
from typing import Annotated, Optional
|
|
from uuid import UUID
|
|
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request, UploadFile, File, Form, status
|
|
from sqlalchemy import select, func, delete
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin
|
|
from app.core.config import settings
|
|
from app.core.rate_limit import limiter
|
|
from app.core.subscriptions import get_plan_limits
|
|
from app.core.ai_quota_service import get_user_plan
|
|
from app.core.ai_tree_validator import validate_generated_tree
|
|
from app.core.tree_validation import validate_procedural_structure
|
|
from app.core.kb_extraction_service import extract_text
|
|
from app.core.kb_conversion_service import convert_document
|
|
from app.models.kb_import import KBImport, KBImportNode
|
|
from app.models.plan_limits import PlanLimits
|
|
from app.models.tree import Tree
|
|
from app.models.user import User
|
|
from app.schemas.kb_accelerator import (
|
|
KBUploadTextRequest,
|
|
KBNodeEditRequest,
|
|
KBCommitRequest,
|
|
KBUploadResponse,
|
|
KBImportResponse,
|
|
KBImportNodeResponse,
|
|
KBImportSummary,
|
|
KBImportListResponse,
|
|
KBCommitResponse,
|
|
KBQuotaResponse,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/kb-accelerator", tags=["kb-accelerator"])
|
|
|
|
# Max upload size: 10MB
|
|
MAX_UPLOAD_SIZE = 10 * 1024 * 1024
|
|
|
|
ALLOWED_EXTENSIONS = {
|
|
"txt": ["text/plain"],
|
|
"md": ["text/markdown", "text/plain"],
|
|
"docx": ["application/vnd.openxmlformats-officedocument.wordprocessingml.document"],
|
|
}
|
|
|
|
# Phase 2 formats (not yet enabled)
|
|
PHASE2_EXTENSIONS = {
|
|
"pdf": ["application/pdf"],
|
|
"html": ["text/html"],
|
|
}
|
|
|
|
|
|
def _detect_format(filename: str) -> str | None:
|
|
"""Detect source format from filename extension."""
|
|
if not filename:
|
|
return None
|
|
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else None
|
|
if ext in ALLOWED_EXTENSIONS or ext in PHASE2_EXTENSIONS:
|
|
return ext
|
|
return None
|
|
|
|
|
|
async def _get_kb_limits(user: User, db: AsyncSession) -> PlanLimits | None:
|
|
plan = await get_user_plan(user.account_id, db)
|
|
return await get_plan_limits(plan, db)
|
|
|
|
|
|
async def _check_kb_enabled(user: User, db: AsyncSession) -> PlanLimits:
|
|
limits = await _get_kb_limits(user, db)
|
|
if not limits or not limits.kb_accelerator_enabled:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="KB Accelerator is not available on your plan.",
|
|
)
|
|
return limits
|
|
|
|
|
|
async def _check_lifetime_limit(user: User, limits: PlanLimits, db: AsyncSession) -> None:
|
|
if limits.kb_max_lifetime_conversions is None:
|
|
return # Unlimited
|
|
count = await db.scalar(
|
|
select(func.count(KBImport.id)).where(
|
|
KBImport.account_id == user.account_id,
|
|
KBImport.status == "committed",
|
|
)
|
|
) or 0
|
|
if count >= limits.kb_max_lifetime_conversions:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"You have reached your lifetime limit of {limits.kb_max_lifetime_conversions} KB conversions. Upgrade your plan for unlimited conversions.",
|
|
)
|
|
|
|
|
|
async def _check_format_allowed(source_format: str, limits: PlanLimits) -> None:
|
|
allowed = limits.kb_allowed_formats or ["txt", "paste", "md"]
|
|
if source_format not in allowed:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=f"Format '{source_format}' is not available on your plan. Allowed: {', '.join(allowed)}",
|
|
)
|
|
|
|
|
|
async def _get_import_or_404(
|
|
import_id: UUID, user: User, db: AsyncSession, *, load_nodes: bool = True
|
|
) -> KBImport:
|
|
query = select(KBImport).where(
|
|
KBImport.id == import_id,
|
|
KBImport.account_id == user.account_id,
|
|
)
|
|
if load_nodes:
|
|
query = query.options(selectinload(KBImport.nodes))
|
|
result = await db.execute(query)
|
|
kb_import = result.scalar_one_or_none()
|
|
if not kb_import:
|
|
raise HTTPException(status_code=404, detail="KB import not found")
|
|
return kb_import
|
|
|
|
|
|
async def _run_conversion(import_id: UUID, db_url: str) -> None:
|
|
"""Background task: run AI conversion on a KB import."""
|
|
from app.core.database import async_session_maker
|
|
async with async_session_maker() as db:
|
|
result = await db.execute(
|
|
select(KBImport).where(KBImport.id == import_id)
|
|
)
|
|
kb_import = result.scalar_one_or_none()
|
|
if not kb_import or kb_import.status != "processing":
|
|
return
|
|
try:
|
|
await convert_document(kb_import, db)
|
|
await db.commit()
|
|
except Exception as e:
|
|
logger.error("Background KB conversion failed: %s", e)
|
|
kb_import.status = "failed"
|
|
kb_import.error_message = f"Conversion error: {str(e)}"
|
|
await db.commit()
|
|
|
|
|
|
def _serialize_import(kb_import: KBImport) -> dict:
|
|
"""Serialize a KBImport to dict for response."""
|
|
return {
|
|
"id": kb_import.id,
|
|
"account_id": kb_import.account_id,
|
|
"created_by": kb_import.created_by,
|
|
"source_filename": kb_import.source_filename,
|
|
"source_format": kb_import.source_format,
|
|
"source_text": kb_import.source_text,
|
|
"source_metadata": kb_import.source_metadata,
|
|
"target_type": kb_import.target_type,
|
|
"status": kb_import.status,
|
|
"confidence_avg": kb_import.confidence_avg,
|
|
"error_message": kb_import.error_message,
|
|
"processing_time_ms": kb_import.processing_time_ms,
|
|
"ai_tokens_input": kb_import.ai_tokens_input,
|
|
"ai_tokens_output": kb_import.ai_tokens_output,
|
|
"tree_id": kb_import.tree_id,
|
|
"nodes": [
|
|
KBImportNodeResponse.model_validate(n) for n in kb_import.nodes
|
|
] if kb_import.nodes else [],
|
|
"created_at": kb_import.created_at.isoformat(),
|
|
"updated_at": kb_import.updated_at.isoformat(),
|
|
}
|
|
|
|
|
|
# ── Endpoints ──
|
|
|
|
|
|
@router.get("/quota", response_model=KBQuotaResponse)
|
|
async def get_quota(
|
|
user: Annotated[User, Depends(require_engineer_or_admin)],
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
):
|
|
"""Get KB Accelerator entitlements and usage for the current account."""
|
|
plan = await get_user_plan(user.account_id, db)
|
|
limits = await get_plan_limits(plan, db)
|
|
|
|
committed_count = await db.scalar(
|
|
select(func.count(KBImport.id)).where(
|
|
KBImport.account_id == user.account_id,
|
|
KBImport.status == "committed",
|
|
)
|
|
) or 0
|
|
|
|
if not limits:
|
|
return KBQuotaResponse(
|
|
plan=plan,
|
|
kb_accelerator_enabled=False,
|
|
lifetime_conversions_used=committed_count,
|
|
lifetime_conversions_limit=0,
|
|
allowed_formats=["txt", "paste", "md"],
|
|
detailed_analysis=False,
|
|
conversational_refinement=False,
|
|
step_library_matching=False,
|
|
history_limit=3,
|
|
can_convert=False,
|
|
)
|
|
|
|
can_convert = limits.kb_accelerator_enabled
|
|
if limits.kb_max_lifetime_conversions is not None:
|
|
can_convert = can_convert and committed_count < limits.kb_max_lifetime_conversions
|
|
|
|
return KBQuotaResponse(
|
|
plan=plan,
|
|
kb_accelerator_enabled=limits.kb_accelerator_enabled,
|
|
lifetime_conversions_used=committed_count,
|
|
lifetime_conversions_limit=limits.kb_max_lifetime_conversions,
|
|
allowed_formats=limits.kb_allowed_formats or ["txt", "paste"],
|
|
detailed_analysis=limits.kb_detailed_analysis,
|
|
conversational_refinement=limits.kb_conversational_refinement,
|
|
step_library_matching=limits.kb_step_library_matching,
|
|
history_limit=limits.kb_history_limit,
|
|
can_convert=can_convert,
|
|
)
|
|
|
|
|
|
@router.post("/upload", response_model=KBUploadResponse, status_code=201)
|
|
@limiter.limit("10/minute")
|
|
async def upload_kb_article(
|
|
request: Request,
|
|
background_tasks: BackgroundTasks,
|
|
user: Annotated[User, Depends(require_engineer_or_admin)],
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
file: Optional[UploadFile] = File(None),
|
|
content: Optional[str] = Form(None),
|
|
title: Optional[str] = Form(None),
|
|
target_type: Optional[str] = Form(None),
|
|
):
|
|
"""Upload a KB article file or paste text for conversion."""
|
|
if not settings.ai_enabled:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="AI is not configured.",
|
|
)
|
|
|
|
limits = await _check_kb_enabled(user, db)
|
|
await _check_lifetime_limit(user, limits, db)
|
|
|
|
# Determine source format and extract text
|
|
if file and file.filename:
|
|
source_format = _detect_format(file.filename)
|
|
if not source_format:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Unsupported file format. Supported: {', '.join(ALLOWED_EXTENSIONS.keys())}",
|
|
)
|
|
await _check_format_allowed(source_format, limits)
|
|
|
|
file_bytes = await file.read()
|
|
if len(file_bytes) > MAX_UPLOAD_SIZE:
|
|
raise HTTPException(status_code=413, detail="File exceeds 10MB limit.")
|
|
if len(file_bytes) == 0:
|
|
raise HTTPException(status_code=400, detail="Uploaded file is empty.")
|
|
|
|
source_filename = file.filename
|
|
try:
|
|
source_text, source_metadata = extract_text(file_bytes, source_format)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except RuntimeError as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
elif content:
|
|
source_format = "paste"
|
|
await _check_format_allowed(source_format, limits)
|
|
source_filename = title
|
|
source_text = content.strip()
|
|
source_metadata = None
|
|
if len(source_text) < 10:
|
|
raise HTTPException(status_code=400, detail="Content must be at least 10 characters.")
|
|
else:
|
|
raise HTTPException(status_code=400, detail="Provide either a file or content text.")
|
|
|
|
# Validate target_type (required — frontend must specify)
|
|
if not target_type:
|
|
target_type = "troubleshooting"
|
|
if target_type not in ("troubleshooting", "procedural"):
|
|
raise HTTPException(status_code=400, detail="target_type must be 'troubleshooting' or 'procedural'.")
|
|
|
|
# Create KB import record
|
|
kb_import = KBImport(
|
|
account_id=user.account_id,
|
|
created_by=user.id,
|
|
source_filename=source_filename,
|
|
source_format=source_format,
|
|
source_text=source_text,
|
|
source_metadata=source_metadata,
|
|
target_type=target_type,
|
|
status="processing",
|
|
)
|
|
db.add(kb_import)
|
|
await db.flush()
|
|
|
|
# Trigger AI conversion in background
|
|
background_tasks.add_task(_run_conversion, kb_import.id, settings.DATABASE_URL)
|
|
await db.commit()
|
|
|
|
return KBUploadResponse(
|
|
id=kb_import.id,
|
|
status=kb_import.status,
|
|
source_format=kb_import.source_format,
|
|
)
|
|
|
|
|
|
@router.get("/{import_id}", response_model=KBImportResponse)
|
|
async def get_kb_import(
|
|
import_id: UUID,
|
|
user: Annotated[User, Depends(require_engineer_or_admin)],
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
):
|
|
"""Get a KB import with its generated nodes."""
|
|
kb_import = await _get_import_or_404(import_id, user, db)
|
|
return _serialize_import(kb_import)
|
|
|
|
|
|
@router.get("", response_model=KBImportListResponse)
|
|
async def list_kb_imports(
|
|
user: Annotated[User, Depends(require_engineer_or_admin)],
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
skip: int = 0,
|
|
limit: int = 20,
|
|
status_filter: Optional[str] = None,
|
|
):
|
|
"""List KB imports for the current account."""
|
|
limits = await _get_kb_limits(user, db)
|
|
history_limit = limits.kb_history_limit if limits else 3
|
|
|
|
query = select(KBImport).where(KBImport.account_id == user.account_id)
|
|
count_query = select(func.count(KBImport.id)).where(KBImport.account_id == user.account_id)
|
|
|
|
if status_filter:
|
|
query = query.where(KBImport.status == status_filter)
|
|
count_query = count_query.where(KBImport.status == status_filter)
|
|
|
|
total = await db.scalar(count_query) or 0
|
|
|
|
query = query.order_by(KBImport.created_at.desc())
|
|
|
|
# Apply history limit for free tier
|
|
effective_limit = limit
|
|
if history_limit is not None:
|
|
effective_limit = min(limit, history_limit - skip) if skip < history_limit else 0
|
|
|
|
if effective_limit <= 0:
|
|
return KBImportListResponse(items=[], total=total, skip=skip, limit=limit)
|
|
|
|
query = query.offset(skip).limit(effective_limit)
|
|
query = query.options(selectinload(KBImport.nodes))
|
|
result = await db.execute(query)
|
|
imports = result.scalars().all()
|
|
|
|
items = []
|
|
for imp in imports:
|
|
items.append(KBImportSummary(
|
|
id=imp.id,
|
|
source_filename=imp.source_filename,
|
|
source_format=imp.source_format,
|
|
target_type=imp.target_type,
|
|
status=imp.status,
|
|
confidence_avg=imp.confidence_avg,
|
|
node_count=len(imp.nodes) if imp.nodes else 0,
|
|
created_at=imp.created_at.isoformat(),
|
|
))
|
|
|
|
return KBImportListResponse(items=items, total=total, skip=skip, limit=limit)
|
|
|
|
|
|
@router.post("/{import_id}/convert", response_model=KBUploadResponse)
|
|
@limiter.limit("30/minute")
|
|
async def reconvert(
|
|
request: Request,
|
|
import_id: UUID,
|
|
background_tasks: BackgroundTasks,
|
|
user: Annotated[User, Depends(require_engineer_or_admin)],
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
):
|
|
"""Re-trigger AI conversion on an existing import (retry/regenerate)."""
|
|
if not settings.ai_enabled:
|
|
raise HTTPException(status_code=503, detail="AI is not configured.")
|
|
|
|
kb_import = await _get_import_or_404(import_id, user, db, load_nodes=False)
|
|
|
|
if kb_import.status == "committed":
|
|
raise HTTPException(status_code=400, detail="Cannot reconvert a committed import.")
|
|
|
|
# Delete existing nodes
|
|
await db.execute(
|
|
delete(KBImportNode).where(KBImportNode.kb_import_id == kb_import.id)
|
|
)
|
|
|
|
kb_import.status = "processing"
|
|
kb_import.error_message = None
|
|
kb_import.confidence_avg = None
|
|
await db.flush()
|
|
|
|
background_tasks.add_task(_run_conversion, kb_import.id, settings.DATABASE_URL)
|
|
await db.commit()
|
|
|
|
return KBUploadResponse(
|
|
id=kb_import.id, status="processing", source_format=kb_import.source_format
|
|
)
|
|
|
|
|
|
@router.patch("/{import_id}/nodes/{node_id}", response_model=KBImportNodeResponse)
|
|
async def edit_node(
|
|
import_id: UUID,
|
|
node_id: UUID,
|
|
data: KBNodeEditRequest,
|
|
user: Annotated[User, Depends(require_engineer_or_admin)],
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
):
|
|
"""Edit a specific node in a KB import during review."""
|
|
kb_import = await _get_import_or_404(import_id, user, db, load_nodes=False)
|
|
if kb_import.status != "ready":
|
|
raise HTTPException(status_code=400, detail="Import must be in 'ready' status to edit nodes.")
|
|
|
|
result = await db.execute(
|
|
select(KBImportNode).where(
|
|
KBImportNode.id == node_id,
|
|
KBImportNode.kb_import_id == import_id,
|
|
)
|
|
)
|
|
node = result.scalar_one_or_none()
|
|
if not node:
|
|
raise HTTPException(status_code=404, detail="Node not found")
|
|
|
|
op = data.operation
|
|
|
|
if op == "approve":
|
|
node.user_approved = True
|
|
|
|
elif op == "reject":
|
|
node.user_approved = False
|
|
|
|
elif op == "edit":
|
|
if not data.content:
|
|
raise HTTPException(status_code=400, detail="Content required for edit operation.")
|
|
node.content = data.content
|
|
node.user_edited = True
|
|
|
|
elif op == "delete":
|
|
await db.delete(node)
|
|
# Reorder remaining nodes
|
|
remaining = await db.execute(
|
|
select(KBImportNode)
|
|
.where(KBImportNode.kb_import_id == import_id)
|
|
.order_by(KBImportNode.node_order)
|
|
)
|
|
for idx, n in enumerate(remaining.scalars().all()):
|
|
n.node_order = idx
|
|
await db.flush()
|
|
await db.commit()
|
|
# Return a placeholder response for deleted node
|
|
return KBImportNodeResponse(
|
|
id=node_id,
|
|
kb_import_id=import_id,
|
|
node_order=-1,
|
|
node_type="step",
|
|
content={"deleted": True},
|
|
confidence_score=0,
|
|
user_edited=False,
|
|
user_approved=False,
|
|
)
|
|
|
|
elif op == "insert_after":
|
|
if not data.content:
|
|
raise HTTPException(status_code=400, detail="Content required for insert_after operation.")
|
|
# Shift subsequent nodes
|
|
subsequent = await db.execute(
|
|
select(KBImportNode)
|
|
.where(
|
|
KBImportNode.kb_import_id == import_id,
|
|
KBImportNode.node_order > node.node_order,
|
|
)
|
|
.order_by(KBImportNode.node_order)
|
|
)
|
|
for n in subsequent.scalars().all():
|
|
n.node_order += 1
|
|
|
|
new_node = KBImportNode(
|
|
kb_import_id=import_id,
|
|
node_order=node.node_order + 1,
|
|
node_type=data.content.get("type", "step"),
|
|
content=data.content,
|
|
confidence_score=1.0, # User-created nodes are fully trusted
|
|
user_edited=True,
|
|
user_approved=True,
|
|
)
|
|
db.add(new_node)
|
|
await db.flush()
|
|
await db.commit()
|
|
return KBImportNodeResponse.model_validate(new_node)
|
|
|
|
elif op == "regenerate":
|
|
# Re-run AI for just this node (simplified: update placeholder)
|
|
# Full implementation would call AI with node context + guidance
|
|
node.user_edited = False
|
|
node.user_approved = False
|
|
|
|
node.updated_at = datetime.now(timezone.utc)
|
|
await db.flush()
|
|
await db.commit()
|
|
return KBImportNodeResponse.model_validate(node)
|
|
|
|
|
|
@router.post("/{import_id}/commit", response_model=KBCommitResponse)
|
|
async def commit_import(
|
|
import_id: UUID,
|
|
user: Annotated[User, Depends(require_engineer_or_admin)],
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
data: Optional[KBCommitRequest] = None,
|
|
):
|
|
"""Commit a reviewed KB import to the flow library as a Tree."""
|
|
kb_import = await _get_import_or_404(import_id, user, db)
|
|
|
|
if kb_import.status != "ready":
|
|
raise HTTPException(status_code=400, detail="Import must be in 'ready' status to commit.")
|
|
|
|
if not kb_import.nodes:
|
|
raise HTTPException(status_code=400, detail="No nodes to commit.")
|
|
|
|
# Extract title/description from conversion metadata
|
|
conversion_meta = (kb_import.source_metadata or {}).get("_conversion", {})
|
|
tree_name = (data.name if data and data.name else None) or conversion_meta.get("title", "Imported Flow")
|
|
tree_description = (data.description if data else None) or conversion_meta.get("description")
|
|
|
|
# Build tree_structure from nodes
|
|
if kb_import.target_type == "troubleshooting":
|
|
tree_structure = _build_troubleshooting_tree(kb_import.nodes)
|
|
else:
|
|
tree_structure = _build_procedural_tree(kb_import.nodes)
|
|
|
|
# Validate the built tree before committing
|
|
if kb_import.target_type == "troubleshooting":
|
|
validation_errors = validate_generated_tree(tree_structure)
|
|
if validation_errors:
|
|
logger.warning(
|
|
"KB commit blocked: tree failed validation with %d errors: %s",
|
|
len(validation_errors), "; ".join(validation_errors[:5]),
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail={
|
|
"message": "The converted flow has structural issues that need to be fixed before committing.",
|
|
"validation_errors": validation_errors,
|
|
},
|
|
)
|
|
else:
|
|
# Procedural/maintenance validation
|
|
is_valid, proc_errors = validate_procedural_structure(tree_structure)
|
|
if not is_valid:
|
|
error_messages = [e.get("message", str(e)) for e in proc_errors]
|
|
logger.warning(
|
|
"KB commit blocked: procedural flow failed validation with %d errors: %s",
|
|
len(proc_errors), "; ".join(error_messages[:5]),
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail={
|
|
"message": "The converted flow has structural issues that need to be fixed before committing.",
|
|
"validation_errors": error_messages,
|
|
},
|
|
)
|
|
|
|
# Build intake_form for procedural flows
|
|
intake_form = None
|
|
if kb_import.target_type == "procedural":
|
|
intake_form = (kb_import.source_metadata or {}).get("_intake_form")
|
|
|
|
# Create the Tree record
|
|
tree = Tree(
|
|
name=tree_name,
|
|
description=tree_description,
|
|
tree_type=kb_import.target_type,
|
|
tree_structure=tree_structure,
|
|
intake_form=intake_form,
|
|
author_id=user.id,
|
|
account_id=user.account_id,
|
|
status="draft",
|
|
import_metadata={
|
|
"source": "kb_accelerator",
|
|
"kb_import_id": str(kb_import.id),
|
|
"source_filename": kb_import.source_filename,
|
|
"source_format": kb_import.source_format,
|
|
"confidence_avg": kb_import.confidence_avg,
|
|
"node_count": len(kb_import.nodes),
|
|
"converted_at": datetime.now(timezone.utc).isoformat(),
|
|
},
|
|
)
|
|
if data and data.category_id:
|
|
tree.category_id = data.category_id
|
|
|
|
db.add(tree)
|
|
await db.flush()
|
|
|
|
kb_import.status = "committed"
|
|
kb_import.tree_id = tree.id
|
|
await db.commit()
|
|
|
|
return KBCommitResponse(
|
|
tree_id=tree.id,
|
|
import_id=kb_import.id,
|
|
tree_type=kb_import.target_type,
|
|
)
|
|
|
|
|
|
@router.delete("/{import_id}", status_code=204)
|
|
async def delete_import(
|
|
import_id: UUID,
|
|
user: Annotated[User, Depends(require_engineer_or_admin)],
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
):
|
|
"""Cancel and clean up a KB import."""
|
|
kb_import = await _get_import_or_404(import_id, user, db, load_nodes=False)
|
|
if kb_import.status == "committed":
|
|
raise HTTPException(status_code=400, detail="Cannot delete a committed import.")
|
|
await db.execute(
|
|
delete(KBImportNode).where(KBImportNode.kb_import_id == import_id)
|
|
)
|
|
await db.delete(kb_import)
|
|
await db.commit()
|
|
|
|
|
|
# ── Tree Structure Builders ──
|
|
|
|
|
|
def _build_troubleshooting_tree(nodes: list[KBImportNode]) -> dict:
|
|
"""Build a troubleshooting tree_structure from import nodes.
|
|
|
|
The tree editor expects a deeply nested structure where each decision
|
|
node's `children` array contains all reachable descendant nodes.
|
|
Action/solution nodes use `title`/`description` (not `question`).
|
|
|
|
The AI generates a DAG (shared nodes reachable from multiple paths),
|
|
but the tree editor requires unique IDs — each node can only appear
|
|
once. We embed each node the first time it's encountered; subsequent
|
|
references just use next_node_id / options[].next_node_id to point
|
|
back to the already-embedded node.
|
|
"""
|
|
if not nodes:
|
|
return {"id": "root", "type": "decision", "question": "Empty", "children": []}
|
|
|
|
# Map original IDs to import nodes
|
|
original_id_map: dict[str, KBImportNode] = {}
|
|
for node in nodes:
|
|
orig_id = node.content.get("original_id", str(node.id))
|
|
original_id_map[orig_id] = node
|
|
|
|
# Track which nodes have been placed in the tree to avoid duplicates
|
|
placed: set[str] = set()
|
|
|
|
def _build_node(import_node: KBImportNode) -> dict | None:
|
|
content = import_node.content
|
|
node_type = import_node.node_type
|
|
node_id = content.get("original_id", str(import_node.id))
|
|
|
|
# Already placed in the tree — don't create a duplicate.
|
|
# The reference (next_node_id / options) is sufficient.
|
|
if node_id in placed:
|
|
return None
|
|
placed.add(node_id)
|
|
|
|
question_text = content.get("question", "")
|
|
|
|
if node_type == "resolution":
|
|
return {
|
|
"id": node_id,
|
|
"type": "solution",
|
|
"title": question_text,
|
|
"description": content.get("description", ""),
|
|
}
|
|
|
|
if node_type in ("action", "warning"):
|
|
result: dict = {
|
|
"id": node_id,
|
|
"type": "action",
|
|
"title": question_text,
|
|
"description": content.get("description", ""),
|
|
}
|
|
next_id = content.get("next_node_id")
|
|
if next_id and next_id in original_id_map:
|
|
result["next_node_id"] = next_id
|
|
return result
|
|
|
|
# question/decision type — recursively build children
|
|
options = content.get("options", [])
|
|
|
|
# Count how many options point to buildable (not-yet-placed) targets
|
|
buildable_targets = []
|
|
for opt in options:
|
|
next_id = opt.get("next_node_id")
|
|
if next_id and next_id in original_id_map and next_id not in placed:
|
|
buildable_targets.append(next_id)
|
|
|
|
# Decision nodes MUST have at least 2 branches to pass validation.
|
|
# If fewer than 2 buildable targets, demote to action node.
|
|
if len(buildable_targets) < 2:
|
|
demoted: dict = {
|
|
"id": node_id,
|
|
"type": "action",
|
|
"title": question_text,
|
|
"description": content.get("description", ""),
|
|
}
|
|
if buildable_targets:
|
|
demoted["next_node_id"] = buildable_targets[0]
|
|
elif options:
|
|
# All targets already placed; reference first option's target
|
|
first_next = options[0].get("next_node_id")
|
|
if first_next:
|
|
demoted["next_node_id"] = first_next
|
|
return demoted
|
|
|
|
# Build children for decision node
|
|
children = []
|
|
built_options = []
|
|
for opt in options:
|
|
next_id = opt.get("next_node_id")
|
|
opt_id = opt.get("id", f"opt-{node_id}-{len(built_options)}")
|
|
if next_id and next_id in original_id_map:
|
|
child_node = _build_node(original_id_map[next_id])
|
|
if child_node is not None:
|
|
children.append(child_node)
|
|
_collect_action_chain(child_node, children)
|
|
built_options.append({
|
|
"id": opt_id,
|
|
"label": opt.get("label", ""),
|
|
"next_node_id": next_id,
|
|
})
|
|
else:
|
|
built_options.append({
|
|
"id": opt_id,
|
|
"label": opt.get("label", ""),
|
|
"next_node_id": next_id or "",
|
|
})
|
|
|
|
return {
|
|
"id": node_id,
|
|
"type": "decision",
|
|
"question": question_text,
|
|
"options": built_options,
|
|
"children": children,
|
|
}
|
|
|
|
def _collect_action_chain(node: dict, siblings: list[dict]) -> None:
|
|
"""Follow action node next_node_id chains and add targets as siblings."""
|
|
if node.get("type") != "action":
|
|
return
|
|
next_id = node.get("next_node_id")
|
|
if not next_id or next_id not in original_id_map:
|
|
return
|
|
# Don't add if already in this siblings list or already placed
|
|
if any(s["id"] == next_id for s in siblings):
|
|
return
|
|
target = _build_node(original_id_map[next_id])
|
|
if target is None:
|
|
return
|
|
siblings.append(target)
|
|
# Continue chain if the target is also an action
|
|
_collect_action_chain(target, siblings)
|
|
|
|
root_node = nodes[0]
|
|
result = _build_node(root_node)
|
|
if not result:
|
|
return {"id": "root", "type": "decision", "question": "Empty", "children": []}
|
|
|
|
# Post-build repair: fix structural issues caused by placed-set race conditions
|
|
_repair_tree(result)
|
|
|
|
# Ensure root is a valid decision node (validator requires this)
|
|
if result.get("type") == "decision":
|
|
children = result.get("children", [])
|
|
options = result.get("options", [])
|
|
# Root must have >= 2 children and >= 2 options
|
|
if len(children) < 2 or len(options) < 2:
|
|
logger.warning(
|
|
"KB tree root has %d children and %d options after repair; "
|
|
"tree may fail validation",
|
|
len(children), len(options),
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def _repair_tree(node: dict) -> None:
|
|
"""Walk the built tree and fix structural issues.
|
|
|
|
Fixes (applied bottom-up so child repairs happen before parent checks):
|
|
- Decision nodes with < 2 children → demote to action, hoist children to parent
|
|
- Decision nodes with < 2 options → rebuild options from children
|
|
- Action nodes missing next_node_id → convert to solution
|
|
"""
|
|
# Repair children first, then handle this node's children list
|
|
# We process the children list in-place, potentially expanding it
|
|
# when demoted decisions hoist their children up.
|
|
i = 0
|
|
children = node.get("children", [])
|
|
while i < len(children):
|
|
child = children[i]
|
|
if not isinstance(child, dict):
|
|
i += 1
|
|
continue
|
|
|
|
# Recurse into child first
|
|
_repair_tree(child)
|
|
|
|
# After recursion, check if this child is a decision that needs demotion
|
|
if child.get("type") == "decision":
|
|
child_children = child.get("children", [])
|
|
if len(child_children) < 2:
|
|
_demote_decision_to_action(child, children, i)
|
|
|
|
i += 1
|
|
|
|
# Now fix this node itself
|
|
node_type = node.get("type")
|
|
node_id = node.get("id", "unknown")
|
|
|
|
if node_type == "decision":
|
|
children = node.get("children", [])
|
|
options = node.get("options", [])
|
|
|
|
if len(options) < 2 and len(children) >= 2:
|
|
# Rebuild options from children
|
|
node["options"] = [
|
|
{
|
|
"id": f"opt-{node_id}-{i}",
|
|
"label": c.get("question") or c.get("title", f"Option {i+1}"),
|
|
"next_node_id": c.get("id", ""),
|
|
}
|
|
for i, c in enumerate(children)
|
|
]
|
|
elif not options:
|
|
node["options"] = []
|
|
|
|
elif node_type == "action":
|
|
if not node.get("next_node_id"):
|
|
# Action with no next_node_id → convert to solution
|
|
node["type"] = "solution"
|
|
if not node.get("title"):
|
|
node["title"] = node.get("question", "Resolution")
|
|
if not node.get("description"):
|
|
node["description"] = ""
|
|
|
|
|
|
def _demote_decision_to_action(node: dict, siblings: list[dict], index: int) -> None:
|
|
"""Demote a decision node to action and hoist its children as siblings.
|
|
|
|
Args:
|
|
node: The decision node to demote (modified in-place).
|
|
siblings: The parent's children list (may be expanded).
|
|
index: Position of this node in siblings.
|
|
"""
|
|
child_children = node.get("children", [])
|
|
question = node.get("question", "")
|
|
|
|
# Pick next_node_id from first child
|
|
next_id = None
|
|
if child_children:
|
|
next_id = child_children[0].get("id")
|
|
else:
|
|
options = node.get("options", [])
|
|
if options:
|
|
next_id = options[0].get("next_node_id")
|
|
|
|
# Convert in-place to action
|
|
node["type"] = "action"
|
|
node["title"] = question
|
|
node["description"] = ""
|
|
if next_id:
|
|
node["next_node_id"] = next_id
|
|
node.pop("question", None)
|
|
node.pop("options", None)
|
|
|
|
# Hoist children as siblings after this node
|
|
if child_children:
|
|
hoisted = node.pop("children", [])
|
|
for j, hoisted_child in enumerate(hoisted):
|
|
siblings.insert(index + 1 + j, hoisted_child)
|
|
|
|
|
|
# Delete the broken _repair_tree and replace with the working version
|
|
# by removing the first broken attempt
|
|
def _build_procedural_tree(nodes: list[KBImportNode]) -> dict:
|
|
"""Build a procedural tree_structure from import nodes.
|
|
|
|
Maps AI node types to valid procedural step types:
|
|
- step/action/warning → procedure_step
|
|
- section_header → section_header
|
|
Adds a procedure_end step at the end if missing.
|
|
Each step requires 'title' (from content text) and 'content' fields.
|
|
"""
|
|
# Type mapping from AI output to valid step types
|
|
TYPE_MAP = {
|
|
"step": "procedure_step",
|
|
"action": "procedure_step",
|
|
"warning": "procedure_step",
|
|
"question": "procedure_step",
|
|
"resolution": "procedure_step",
|
|
"section_header": "section_header",
|
|
"procedure_step": "procedure_step",
|
|
"procedure_end": "procedure_end",
|
|
}
|
|
|
|
steps = []
|
|
for node in sorted(nodes, key=lambda n: n.node_order):
|
|
content = node.content
|
|
raw_type = node.node_type
|
|
step_type = TYPE_MAP.get(raw_type, "procedure_step")
|
|
|
|
step_content = content.get("content", "")
|
|
step_title = content.get("title") or content.get("question") or step_content[:80] or "Step"
|
|
|
|
step: dict = {
|
|
"id": content.get("original_id", str(node.id)),
|
|
"type": step_type,
|
|
"title": step_title,
|
|
"description": step_content,
|
|
}
|
|
|
|
# Preserve content_type if present
|
|
content_type = content.get("content_type")
|
|
if content_type:
|
|
step["content_type"] = content_type
|
|
|
|
steps.append(step)
|
|
|
|
# Ensure a procedure_end exists at the end
|
|
has_end = any(s["type"] == "procedure_end" for s in steps)
|
|
if not has_end and steps:
|
|
steps.append({
|
|
"id": "procedure-end",
|
|
"type": "procedure_end",
|
|
"title": "Procedure Complete",
|
|
"description": "All steps have been completed.",
|
|
})
|
|
|
|
return {
|
|
"id": "root",
|
|
"type": "procedural",
|
|
"steps": steps,
|
|
}
|