Files
resolutionflow/backend/app/api/endpoints/public_templates.py
chihlasm bacdb9d466 feat(public-gallery): add public templates gallery API (Tasks 2 & 3)
Implements a no-auth read-only API for the public templates gallery page:
- Schemas in schemas/public_templates.py (PublicFlowTemplate, PublicScriptTemplate, PublicGalleryResponse, PublicFlowDetail, PublicScriptDetail)
- Five endpoints under /api/v1/public/templates: listing, flow detail, script detail, categories with counts, full-text search
- Tree preview truncated to 3 levels max; script_body never exposed
- Rate limited at 30/minute; paginated with category/type/sort filters
- 25 passing integration tests covering feature flags, truncation, script body protection, search, categories, and 404 behavior

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 19:12:34 +00:00

459 lines
17 KiB
Python

"""Public templates gallery endpoints. No authentication required."""
import logging
from typing import Annotated, Any
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy import func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.database import get_db
from app.core.rate_limit import limiter
from app.models.category import TreeCategory
from app.models.script_template import ScriptCategory, ScriptTemplate
from app.models.tag import TreeTag
from app.models.tree import Tree
from app.schemas.public_templates import (
PublicFlowDetail,
PublicFlowTemplate,
PublicGalleryResponse,
PublicScriptDetail,
PublicScriptTemplate,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/public/templates", tags=["public-gallery"])
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _count_tree_steps(structure: dict[str, Any] | None) -> int:
"""Count nodes in a tree structure via BFS."""
if not structure:
return 0
count = 0
queue = [structure]
while queue:
node = queue.pop()
count += 1
for child in node.get("children", []):
queue.append(child)
for option in node.get("options", []):
if isinstance(option, dict) and "children" in option:
for child in option["children"]:
queue.append(child)
return count
def _truncate_structure(structure: dict[str, Any] | None, max_depth: int = 3) -> dict[str, Any] | None:
"""Return a copy of the tree structure truncated to max_depth levels."""
if not structure:
return None
def _truncate_node(node: dict[str, Any], depth: int) -> dict[str, Any]:
if depth >= max_depth:
# Return node skeleton without children
return {k: v for k, v in node.items() if k not in ("children", "options")}
result = {k: v for k, v in node.items() if k not in ("children", "options")}
if "children" in node:
result["children"] = [_truncate_node(c, depth + 1) for c in node["children"]]
if "options" in node:
truncated_options = []
for opt in node["options"]:
if isinstance(opt, dict):
opt_copy = {k: v for k, v in opt.items() if k != "children"}
if "children" in opt:
opt_copy["children"] = [_truncate_node(c, depth + 1) for c in opt["children"]]
truncated_options.append(opt_copy)
else:
truncated_options.append(opt)
result["options"] = truncated_options
return result
return _truncate_node(structure, 0)
def _build_flow_template(tree: Tree) -> PublicFlowTemplate:
category_name = None
if tree.category_rel:
category_name = tree.category_rel.name
elif tree.category:
category_name = tree.category
tag_names = [tag.name for tag in (tree.tags or [])]
return PublicFlowTemplate(
id=tree.id,
name=tree.name,
description=tree.description,
category=category_name,
tree_type=tree.tree_type,
step_count=_count_tree_steps(tree.tree_structure),
usage_count=tree.usage_count,
success_rate=tree.success_rate,
tags=tag_names,
preview_structure=_truncate_structure(tree.tree_structure),
created_at=tree.created_at,
)
def _build_script_template(script: ScriptTemplate) -> PublicScriptTemplate:
param_count = 0
if isinstance(script.parameters_schema, dict):
params = script.parameters_schema.get("parameters", script.parameters_schema.get("properties", {}))
if isinstance(params, list):
param_count = len(params)
elif isinstance(params, dict):
param_count = len(params)
return PublicScriptTemplate(
id=script.id,
name=script.name,
description=script.description,
category_name=script.category.name if script.category else None,
category_icon=script.category.icon if script.category else None,
complexity=script.complexity,
tags=list(script.tags) if script.tags else [],
parameter_count=param_count,
requires_elevation=script.requires_elevation,
requires_modules=list(script.requires_modules) if script.requires_modules else [],
usage_count=script.usage_count,
is_verified=script.is_verified,
created_at=script.created_at,
)
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.get("", response_model=PublicGalleryResponse)
@limiter.limit("30/minute")
async def list_gallery(
request: Request,
db: Annotated[AsyncSession, Depends(get_db)],
category: str | None = Query(None, description="Filter by category name"),
type: str = Query("all", description="Filter type: flows, scripts, or all"),
sort: str = Query("usage", description="Sort order: usage, newest, success_rate"),
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
):
"""Public gallery listing. Returns featured flows and scripts.
No authentication required. Rate limited to 30/minute.
"""
offset = (page - 1) * per_page
flow_templates: list[PublicFlowTemplate] = []
script_templates: list[PublicScriptTemplate] = []
total_flows = 0
total_scripts = 0
category_names: set[str] = set()
domains: list[str] = []
# --- Flow templates ---
if type in ("all", "flows"):
q = (
select(Tree)
.options(selectinload(Tree.category_rel), selectinload(Tree.tags))
.where(Tree.is_gallery_featured == True) # noqa: E712
.where(Tree.is_active == True) # noqa: E712
.where(Tree.deleted_at == None) # noqa: E711
)
if category:
q = q.join(TreeCategory, Tree.category_id == TreeCategory.id, isouter=True).where(
or_(TreeCategory.name == category, Tree.category == category)
)
# Count query
count_q = select(func.count()).select_from(q.subquery())
total_flows = (await db.execute(count_q)).scalar_one()
# Sort
if sort == "newest":
q = q.order_by(Tree.created_at.desc())
elif sort == "success_rate":
q = q.order_by(Tree.success_rate.desc().nulls_last(), Tree.usage_count.desc())
else: # usage (default)
q = q.order_by(Tree.usage_count.desc(), Tree.gallery_sort_order.asc())
q = q.offset(offset).limit(per_page)
result = await db.execute(q)
trees = result.scalars().all()
for tree in trees:
flow_templates.append(_build_flow_template(tree))
cat = None
if tree.category_rel:
cat = tree.category_rel.name
elif tree.category:
cat = tree.category
if cat:
category_names.add(cat)
# --- Script templates ---
if type in ("all", "scripts"):
sq = (
select(ScriptTemplate)
.options(selectinload(ScriptTemplate.category))
.where(ScriptTemplate.is_gallery_featured == True) # noqa: E712
.where(ScriptTemplate.is_active == True) # noqa: E712
)
if category:
sq = sq.join(ScriptCategory, ScriptTemplate.category_id == ScriptCategory.id).where(
ScriptCategory.name == category
)
count_sq = select(func.count()).select_from(sq.subquery())
total_scripts = (await db.execute(count_sq)).scalar_one()
if sort == "newest":
sq = sq.order_by(ScriptTemplate.created_at.desc())
elif sort == "success_rate":
sq = sq.order_by(ScriptTemplate.usage_count.desc())
else:
sq = sq.order_by(ScriptTemplate.usage_count.desc(), ScriptTemplate.gallery_sort_order.asc())
sq = sq.offset(offset).limit(per_page)
result = await db.execute(sq)
scripts = result.scalars().all()
for script in scripts:
script_templates.append(_build_script_template(script))
if script.category:
category_names.add(script.category.name)
return PublicGalleryResponse(
flow_templates=flow_templates,
script_templates=script_templates,
total_flows=total_flows,
total_scripts=total_scripts,
categories=sorted(category_names),
domains=domains,
)
@router.get("/flows/{flow_id}", response_model=PublicFlowDetail)
@limiter.limit("30/minute")
async def get_flow_detail(
request: Request,
flow_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Get a single featured flow template (preview only — tree truncated to 3 levels).
No authentication required. Script body is never exposed.
"""
result = await db.execute(
select(Tree)
.options(selectinload(Tree.category_rel), selectinload(Tree.tags))
.where(Tree.id == flow_id)
.where(Tree.is_gallery_featured == True) # noqa: E712
.where(Tree.is_active == True) # noqa: E712
.where(Tree.deleted_at == None) # noqa: E711
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(status_code=404, detail="Flow template not found")
return _build_flow_template(tree)
@router.get("/scripts/{script_id}", response_model=PublicScriptDetail)
@limiter.limit("30/minute")
async def get_script_detail(
request: Request,
script_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Get a single featured script template detail.
NOTE: script_body is NEVER returned — it is behind the signup wall.
Only parameter names/descriptions are included.
"""
result = await db.execute(
select(ScriptTemplate)
.options(selectinload(ScriptTemplate.category))
.where(ScriptTemplate.id == script_id)
.where(ScriptTemplate.is_gallery_featured == True) # noqa: E712
.where(ScriptTemplate.is_active == True) # noqa: E712
)
script = result.scalar_one_or_none()
if not script:
raise HTTPException(status_code=404, detail="Script template not found")
# Build safe parameter list (no script_body — that stays locked behind auth)
safe_params: list[dict[str, Any]] = []
schema = script.parameters_schema or {}
raw_params = schema.get("parameters", schema.get("properties", {}))
if isinstance(raw_params, list):
for p in raw_params:
if isinstance(p, dict):
safe_params.append({
"name": p.get("name", ""),
"description": p.get("description", ""),
"type": p.get("type", "string"),
"required": p.get("required", False),
"default": p.get("default"),
})
elif isinstance(raw_params, dict):
for param_name, param_def in raw_params.items():
if isinstance(param_def, dict):
safe_params.append({
"name": param_name,
"description": param_def.get("description", ""),
"type": param_def.get("type", "string"),
"required": param_def.get("required", False),
"default": param_def.get("default"),
})
return PublicScriptDetail(
id=script.id,
name=script.name,
description=script.description,
category_name=script.category.name if script.category else None,
complexity=script.complexity,
tags=list(script.tags) if script.tags else [],
parameters=safe_params,
requires_elevation=script.requires_elevation,
requires_modules=list(script.requires_modules) if script.requires_modules else [],
usage_count=script.usage_count,
is_verified=script.is_verified,
created_at=script.created_at,
)
@router.get("/categories")
@limiter.limit("30/minute")
async def list_categories(
request: Request,
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Return categories that have at least one gallery-featured item, with counts.
No authentication required.
"""
# Flow categories
flow_cat_result = await db.execute(
select(TreeCategory.name, func.count(Tree.id).label("flow_count"))
.join(Tree, Tree.category_id == TreeCategory.id)
.where(Tree.is_gallery_featured == True) # noqa: E712
.where(Tree.is_active == True) # noqa: E712
.where(Tree.deleted_at == None) # noqa: E711
.group_by(TreeCategory.name)
.order_by(TreeCategory.name)
)
flow_cats = flow_cat_result.all()
# Script categories
script_cat_result = await db.execute(
select(ScriptCategory.name, func.count(ScriptTemplate.id).label("script_count"))
.join(ScriptTemplate, ScriptTemplate.category_id == ScriptCategory.id)
.where(ScriptTemplate.is_gallery_featured == True) # noqa: E712
.where(ScriptTemplate.is_active == True) # noqa: E712
.group_by(ScriptCategory.name)
.order_by(ScriptCategory.name)
)
script_cats = script_cat_result.all()
categories = []
seen: set[str] = set()
for name, flow_count in flow_cats:
if name not in seen:
categories.append({"name": name, "flow_count": flow_count, "script_count": 0})
seen.add(name)
for name, script_count in script_cats:
if name in seen:
for cat in categories:
if cat["name"] == name:
cat["script_count"] = script_count
break
else:
categories.append({"name": name, "flow_count": 0, "script_count": script_count})
seen.add(name)
return {"categories": categories}
@router.get("/search")
@limiter.limit("30/minute")
async def search_gallery(
request: Request,
db: Annotated[AsyncSession, Depends(get_db)],
q: str = Query(..., min_length=1, max_length=200, description="Search query"),
type: str = Query("all", description="Filter type: flows, scripts, or all"),
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
):
"""Full-text search across featured gallery items.
Searches name, description, and tags. No authentication required.
"""
offset = (page - 1) * per_page
search_term = f"%{q.lower()}%"
flow_templates: list[PublicFlowTemplate] = []
script_templates: list[PublicScriptTemplate] = []
total_flows = 0
total_scripts = 0
if type in ("all", "flows"):
flow_q = (
select(Tree)
.options(selectinload(Tree.category_rel), selectinload(Tree.tags))
.where(Tree.is_gallery_featured == True) # noqa: E712
.where(Tree.is_active == True) # noqa: E712
.where(Tree.deleted_at == None) # noqa: E711
.where(
or_(
func.lower(Tree.name).like(search_term),
func.lower(Tree.description).like(search_term),
func.lower(Tree.category).like(search_term),
)
)
.order_by(Tree.usage_count.desc())
)
count_q = select(func.count()).select_from(flow_q.subquery())
total_flows = (await db.execute(count_q)).scalar_one()
result = await db.execute(flow_q.offset(offset).limit(per_page))
for tree in result.scalars().all():
flow_templates.append(_build_flow_template(tree))
if type in ("all", "scripts"):
script_q = (
select(ScriptTemplate)
.options(selectinload(ScriptTemplate.category))
.where(ScriptTemplate.is_gallery_featured == True) # noqa: E712
.where(ScriptTemplate.is_active == True) # noqa: E712
.where(
or_(
func.lower(ScriptTemplate.name).like(search_term),
func.lower(ScriptTemplate.description).like(search_term),
)
)
.order_by(ScriptTemplate.usage_count.desc())
)
count_sq = select(func.count()).select_from(script_q.subquery())
total_scripts = (await db.execute(count_sq)).scalar_one()
result = await db.execute(script_q.offset(offset).limit(per_page))
for script in result.scalars().all():
script_templates.append(_build_script_template(script))
return {
"flow_templates": flow_templates,
"script_templates": script_templates,
"total_flows": total_flows,
"total_scripts": total_scripts,
"query": q,
}