Files
resolutionflow/backend/app/api/endpoints/scripts.py
chihlasm 9bad49d568 feat(knowledge-flywheel): add Phase 3 Knowledge Flywheel — AI analysis, review queue, analytics
Phase 3 implementation:
- AI session analysis service that generates flow proposals from resolved sessions
- APScheduler job for batch processing pending analyses (max_instances=1)
- Knowledge gap detection (weak options, high escalation signals)
- Flow proposals CRUD with team admin review workflow (approve/edit/dismiss/reject)
- FlowPilot analytics dashboard with confidence tiers, PSA metrics, knowledge gaps
- In-session script generator component
- Review queue page with filtering and proposal detail panel

Bug fixes from review (12 total):
- Fix "Edit & Publish" navigating to non-existent /editor/new route
- Hide Approve button for enhancement proposals (require Edit & Publish)
- Add max_instances=1 to scheduler to prevent TOCTOU race
- Fix eventual_success case() double-counting failed retries
- Add tree_structure validation before creating tree from proposal
- Simplify script generator rendering condition
- Add severity style fallback, toFixed on rates, Link instead of <a href>
- Add toast.warning on dismiss failure, fix dedup for domain-less sessions
- Cast Decimal to int in knowledge gap evidence dicts

Also updates CLAUDE.md with lessons 67-71 and Phase 3 project structure.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 05:12:10 +00:00

415 lines
14 KiB
Python

"""Script Generator API endpoints."""
from typing import Annotated, Optional
from uuid import UUID
import re
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, or_
from app.core.database import get_db
from app.api.deps import get_current_active_user
from app.core.permissions import can_manage_script_template, can_create_content
from app.models.user import User
from app.models.script_template import ScriptCategory, ScriptTemplate, ScriptGeneration
from app.schemas.script_template import (
ScriptCategoryResponse,
ScriptTemplateCreate,
ScriptTemplateUpdate,
ScriptTemplateListItem,
ScriptTemplateDetail,
ScriptGenerateRequest,
ScriptGenerateResponse,
ScriptGenerationRecord,
)
from app.services.script_template_engine import ScriptTemplateEngine, ScriptRenderError
router = APIRouter(prefix="/scripts", tags=["scripts"])
_engine = ScriptTemplateEngine()
# ── Categories ────────────────────────────────────────────────────────────
@router.get("/categories", response_model=list[ScriptCategoryResponse])
async def list_categories(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> list[ScriptCategoryResponse]:
result = await db.execute(
select(ScriptCategory)
.where(ScriptCategory.is_active == True) # noqa: E712
.order_by(ScriptCategory.sort_order)
)
categories = result.scalars().all()
count_result = await db.execute(
select(ScriptTemplate.category_id, func.count(ScriptTemplate.id))
.where(ScriptTemplate.is_active == True) # noqa: E712
.group_by(ScriptTemplate.category_id)
)
counts = dict(count_result.all())
return [
ScriptCategoryResponse(
id=cat.id,
name=cat.name,
slug=cat.slug,
description=cat.description,
icon=cat.icon,
sort_order=cat.sort_order,
template_count=counts.get(cat.id, 0),
)
for cat in categories
]
# ── Templates ─────────────────────────────────────────────────────────────
@router.get("/templates", response_model=list[ScriptTemplateListItem])
async def list_templates(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
category_slug: Optional[str] = Query(None),
search: Optional[str] = Query(None),
tags: Optional[str] = Query(None, description="Comma-separated tags"),
managed: Optional[bool] = Query(None, description="If true, return only templates this user can edit"),
) -> list[ScriptTemplateListItem]:
query = (
select(ScriptTemplate)
.join(ScriptCategory, ScriptTemplate.category_id == ScriptCategory.id)
.where(ScriptTemplate.is_active == True) # noqa: E712
.where(
or_(
ScriptTemplate.team_id == None, # noqa: E711
ScriptTemplate.team_id == current_user.team_id,
)
)
)
if category_slug:
query = query.where(ScriptCategory.slug == category_slug)
if search:
term = f"%{search.lower()}%"
query = query.where(
or_(
func.lower(ScriptTemplate.name).like(term),
func.lower(ScriptTemplate.description).like(term),
func.lower(ScriptTemplate.slug).like(term),
)
)
if managed:
if current_user.is_super_admin:
pass # super admin can edit all
elif current_user.account_role == "owner":
query = query.where(
or_(
ScriptTemplate.created_by == current_user.id,
ScriptTemplate.team_id != None, # noqa: E711
)
)
else:
# engineers see only their own
query = query.where(ScriptTemplate.created_by == current_user.id)
result = await db.execute(query.order_by(ScriptTemplate.name))
templates = result.scalars().all()
if tags:
tag_list = [t.strip().lower() for t in tags.split(",")]
templates = [
t
for t in templates
if any(tag in [tg.lower() for tg in (t.tags or [])] for tag in tag_list)
]
return [ScriptTemplateListItem.model_validate(t) for t in templates]
@router.get("/templates/{template_id}", response_model=ScriptTemplateDetail)
async def get_template(
template_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> ScriptTemplateDetail:
result = await db.execute(
select(ScriptTemplate).where(
ScriptTemplate.id == template_id,
ScriptTemplate.is_active == True, # noqa: E712
or_(
ScriptTemplate.team_id == None, # noqa: E711
ScriptTemplate.team_id == current_user.team_id,
),
)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Template not found"
)
return ScriptTemplateDetail.model_validate(template)
@router.post(
"/templates",
response_model=ScriptTemplateDetail,
status_code=status.HTTP_201_CREATED,
)
async def create_template(
data: ScriptTemplateCreate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> ScriptTemplateDetail:
if not can_create_content(current_user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Engineer access required to create templates",
)
cat_result = await db.execute(
select(ScriptCategory).where(
ScriptCategory.id == data.category_id,
ScriptCategory.is_active == True, # noqa: E712
)
)
if not cat_result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Category not found"
)
slug = re.sub(r"[^a-z0-9]+", "-", data.name.lower()).strip("-")
template = ScriptTemplate(
category_id=data.category_id,
team_id=current_user.team_id,
created_by=current_user.id,
name=data.name,
slug=slug,
description=data.description,
use_case=data.use_case,
script_body=data.script_body,
parameters_schema=data.parameters_schema,
default_values=data.default_values,
validation_rules=data.validation_rules,
tags=data.tags,
complexity=data.complexity,
estimated_runtime=data.estimated_runtime,
requires_elevation=data.requires_elevation,
requires_modules=data.requires_modules,
)
db.add(template)
await db.commit()
await db.refresh(template)
return ScriptTemplateDetail.model_validate(template)
@router.put("/templates/{template_id}", response_model=ScriptTemplateDetail)
async def update_template(
template_id: UUID,
data: ScriptTemplateUpdate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> ScriptTemplateDetail:
result = await db.execute(
select(ScriptTemplate).where(
ScriptTemplate.id == template_id,
ScriptTemplate.is_active == True, # noqa: E712
)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Template not found",
)
if not can_manage_script_template(current_user, template.created_by, template.team_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to edit this template",
)
update_data = data.model_dump(exclude_unset=True)
if "script_body" in update_data or "parameters_schema" in update_data:
template.version += 1
for field, value in update_data.items():
setattr(template, field, value)
await db.commit()
await db.refresh(template)
return ScriptTemplateDetail.model_validate(template)
@router.delete("/templates/{template_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_template(
template_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> None:
result = await db.execute(
select(ScriptTemplate).where(
ScriptTemplate.id == template_id,
ScriptTemplate.is_active == True, # noqa: E712
)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Template not found",
)
if not can_manage_script_template(current_user, template.created_by, template.team_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to delete this template",
)
template.is_active = False
await db.commit()
@router.patch("/templates/{template_id}/share", response_model=ScriptTemplateDetail)
async def share_template(
template_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
shared: bool = Query(..., description="true to share with team, false to make personal"),
) -> ScriptTemplateDetail:
"""Toggle team sharing for a template. Owner/admin/super_admin only."""
if not (current_user.is_super_admin or current_user.account_role == "owner"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only account owners and admins can share templates",
)
result = await db.execute(
select(ScriptTemplate).where(
ScriptTemplate.id == template_id,
ScriptTemplate.is_active == True, # noqa: E712
)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Template not found",
)
if shared:
template.team_id = current_user.team_id
else:
template.team_id = None
await db.commit()
await db.refresh(template)
return ScriptTemplateDetail.model_validate(template)
# ── Generate ──────────────────────────────────────────────────────────────
@router.post("/generate", response_model=ScriptGenerateResponse)
async def generate_script(
data: ScriptGenerateRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> ScriptGenerateResponse:
result = await db.execute(
select(ScriptTemplate).where(
ScriptTemplate.id == data.template_id,
ScriptTemplate.is_active == True, # noqa: E712
or_(
ScriptTemplate.team_id == None, # noqa: E711
ScriptTemplate.team_id == current_user.team_id,
),
)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Template not found"
)
try:
rendered_script = _engine.render(template.script_body, data.parameters)
except ScriptRenderError as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(e)
)
params_schema = template.parameters_schema or {}
sensitive_keys = {
p["key"]
for p in params_schema.get("parameters", [])
if p.get("sensitive", False)
}
redacted_params = _engine.redact_sensitive(data.parameters, sensitive_keys)
generation = ScriptGeneration(
template_id=template.id,
user_id=current_user.id,
team_id=current_user.team_id,
session_id=data.session_id,
ai_session_id=data.ai_session_id,
parameters_used=redacted_params,
generated_script=rendered_script,
)
db.add(generation)
template.usage_count += 1
await db.commit()
await db.refresh(generation)
warnings: list[str] = []
if template.requires_elevation:
warnings.append("This script requires 'Run as Administrator'")
return ScriptGenerateResponse(
id=generation.id,
script=rendered_script,
warnings=warnings,
metadata={
"template_name": template.name,
"template_version": template.version,
"requires_elevation": template.requires_elevation,
"requires_modules": template.requires_modules,
"generated_at": generation.created_at.isoformat(),
"estimated_runtime": template.estimated_runtime,
},
)
# ── Generations history ───────────────────────────────────────────────────
@router.get("/generations", response_model=list[ScriptGenerationRecord])
async def list_generations(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
) -> list[ScriptGenerationRecord]:
result = await db.execute(
select(ScriptGeneration, ScriptTemplate.name)
.join(ScriptTemplate, ScriptGeneration.template_id == ScriptTemplate.id)
.where(ScriptGeneration.user_id == current_user.id)
.order_by(ScriptGeneration.created_at.desc())
.limit(limit)
.offset(offset)
)
rows = result.all()
return [
ScriptGenerationRecord(
id=gen.id,
template_id=gen.template_id,
template_name=name,
parameters_used=gen.parameters_used,
created_at=gen.created_at,
)
for gen, name in rows
]