Add role-based access control with hierarchy: super_admin > team_admin > engineer > viewer. Adds is_super_admin boolean to User model (migration 010), centralized backend permissions module, frontend usePermissions hook, and UI enforcement (conditional Create/Edit buttons, editor redirect for viewers, role badge in header). All endpoint admin checks updated from role=="admin" to is_super_admin. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
531 lines
17 KiB
Python
531 lines
17 KiB
Python
from typing import Annotated, Optional
|
|
from uuid import UUID
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Query
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, func, or_
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.core.database import get_db
|
|
from app.models.tree import Tree
|
|
from app.models.user import User
|
|
from app.models.category import TreeCategory
|
|
from app.models.tag import TreeTag
|
|
from app.models.folder import UserFolder
|
|
from app.schemas.tree import TreeCreate, TreeUpdate, TreeResponse, TreeListResponse, CategoryInfo
|
|
from app.api.deps import get_current_user, require_engineer_or_admin, require_admin
|
|
|
|
router = APIRouter(prefix="/trees", tags=["trees"])
|
|
|
|
|
|
def build_tree_access_filter(current_user: User):
|
|
"""Build the access filter for trees based on user permissions.
|
|
|
|
Returns trees that are:
|
|
- Default/system trees (visible to all)
|
|
- Public trees
|
|
- User's own trees
|
|
- Trees from user's team
|
|
"""
|
|
return or_(
|
|
Tree.is_default == True,
|
|
Tree.is_public == True,
|
|
Tree.author_id == current_user.id,
|
|
Tree.team_id == current_user.team_id if current_user.team_id else False
|
|
)
|
|
|
|
|
|
def build_tree_response(tree: Tree) -> TreeListResponse:
|
|
"""Build TreeListResponse with category_info and tags."""
|
|
category_info = None
|
|
if tree.category_rel:
|
|
category_info = CategoryInfo(
|
|
id=tree.category_rel.id,
|
|
name=tree.category_rel.name,
|
|
slug=tree.category_rel.slug
|
|
)
|
|
|
|
return TreeListResponse(
|
|
id=tree.id,
|
|
name=tree.name,
|
|
description=tree.description,
|
|
category=tree.category,
|
|
category_id=tree.category_id,
|
|
category_info=category_info,
|
|
tags=tree.tag_names,
|
|
author_id=tree.author_id,
|
|
team_id=tree.team_id,
|
|
is_active=tree.is_active,
|
|
is_public=tree.is_public,
|
|
is_default=tree.is_default,
|
|
version=tree.version,
|
|
usage_count=tree.usage_count,
|
|
created_at=tree.created_at,
|
|
updated_at=tree.updated_at
|
|
)
|
|
|
|
|
|
def build_full_tree_response(tree: Tree) -> TreeResponse:
|
|
"""Build TreeResponse with all details including category_info and tags."""
|
|
category_info = None
|
|
if tree.category_rel:
|
|
category_info = CategoryInfo(
|
|
id=tree.category_rel.id,
|
|
name=tree.category_rel.name,
|
|
slug=tree.category_rel.slug
|
|
)
|
|
|
|
return TreeResponse(
|
|
id=tree.id,
|
|
name=tree.name,
|
|
description=tree.description,
|
|
category=tree.category,
|
|
category_id=tree.category_id,
|
|
category_info=category_info,
|
|
tags=tree.tag_names,
|
|
tree_structure=tree.tree_structure,
|
|
author_id=tree.author_id,
|
|
team_id=tree.team_id,
|
|
is_active=tree.is_active,
|
|
is_public=tree.is_public,
|
|
is_default=tree.is_default,
|
|
version=tree.version,
|
|
usage_count=tree.usage_count,
|
|
created_at=tree.created_at,
|
|
updated_at=tree.updated_at
|
|
)
|
|
|
|
|
|
@router.get("", response_model=list[TreeListResponse])
|
|
async def list_trees(
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_user)],
|
|
category: Optional[str] = Query(None, description="Filter by legacy category string"),
|
|
category_id: Optional[UUID] = Query(None, description="Filter by category ID"),
|
|
tags: Optional[str] = Query(None, description="Comma-separated tag slugs to filter by"),
|
|
folder_id: Optional[UUID] = Query(None, description="Filter by folder ID (user's folders only)"),
|
|
is_active: Optional[bool] = Query(None, description="Filter by active status"),
|
|
author_id: Optional[UUID] = Query(None, description="Filter by author ID"),
|
|
is_public: Optional[bool] = Query(None, description="Filter by public status"),
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=100)
|
|
):
|
|
"""List all trees with optional filters.
|
|
|
|
New filters:
|
|
- category_id: Filter by category (from tree_categories table)
|
|
- tags: Comma-separated tag slugs (e.g., "citrix,networking")
|
|
- folder_id: Show only trees in a specific folder
|
|
"""
|
|
query = select(Tree).options(
|
|
selectinload(Tree.category_rel),
|
|
selectinload(Tree.tags)
|
|
)
|
|
|
|
# Apply filters
|
|
if category:
|
|
query = query.where(Tree.category == category)
|
|
if category_id:
|
|
query = query.where(Tree.category_id == category_id)
|
|
if is_active is not None:
|
|
query = query.where(Tree.is_active == is_active)
|
|
else:
|
|
# Default to only showing active trees
|
|
query = query.where(Tree.is_active == True)
|
|
if author_id:
|
|
query = query.where(Tree.author_id == author_id)
|
|
if is_public is not None:
|
|
query = query.where(Tree.is_public == is_public)
|
|
|
|
# Filter by tags (all specified tags must be present)
|
|
if tags:
|
|
tag_slugs = [t.strip() for t in tags.split(",") if t.strip()]
|
|
for tag_slug in tag_slugs:
|
|
query = query.where(
|
|
Tree.tags.any(TreeTag.slug == tag_slug)
|
|
)
|
|
|
|
# Filter by folder
|
|
if folder_id:
|
|
# Verify folder belongs to user
|
|
folder_result = await db.execute(
|
|
select(UserFolder).where(
|
|
UserFolder.id == folder_id,
|
|
UserFolder.user_id == current_user.id
|
|
)
|
|
)
|
|
folder = folder_result.scalar_one_or_none()
|
|
if not folder:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Folder not found"
|
|
)
|
|
query = query.where(Tree.folders.any(UserFolder.id == folder_id))
|
|
|
|
# Apply access filter
|
|
query = query.where(build_tree_access_filter(current_user))
|
|
|
|
query = query.order_by(Tree.usage_count.desc(), Tree.updated_at.desc())
|
|
query = query.offset(skip).limit(limit)
|
|
|
|
result = await db.execute(query)
|
|
trees = result.scalars().unique().all()
|
|
|
|
return [build_tree_response(tree) for tree in trees]
|
|
|
|
|
|
@router.get("/categories", response_model=list[str])
|
|
async def list_categories(
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_user)]
|
|
):
|
|
"""List all unique categories from trees the user can access.
|
|
|
|
Note: This returns legacy string categories. For the new category system,
|
|
use the /categories endpoint.
|
|
"""
|
|
query = select(Tree.category).where(
|
|
Tree.category.isnot(None),
|
|
Tree.is_active == True,
|
|
build_tree_access_filter(current_user)
|
|
).distinct()
|
|
result = await db.execute(query)
|
|
categories = [row[0] for row in result.all() if row[0]]
|
|
return sorted(categories)
|
|
|
|
|
|
@router.get("/search", response_model=list[TreeListResponse])
|
|
async def search_trees(
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_user)],
|
|
q: str = Query(..., min_length=2, description="Search query"),
|
|
limit: int = Query(20, ge=1, le=50)
|
|
):
|
|
"""Full-text search trees by name and description."""
|
|
# Using PostgreSQL full-text search
|
|
search_vector = func.to_tsvector('english', func.coalesce(Tree.name, '') + ' ' + func.coalesce(Tree.description, ''))
|
|
search_query = func.plainto_tsquery('english', q)
|
|
|
|
query = select(Tree).options(
|
|
selectinload(Tree.category_rel),
|
|
selectinload(Tree.tags)
|
|
).where(
|
|
Tree.is_active == True,
|
|
build_tree_access_filter(current_user),
|
|
search_vector.op('@@')(search_query)
|
|
).order_by(
|
|
func.ts_rank(search_vector, search_query).desc()
|
|
).limit(limit)
|
|
|
|
result = await db.execute(query)
|
|
trees = result.scalars().unique().all()
|
|
|
|
return [build_tree_response(tree) for tree in trees]
|
|
|
|
|
|
@router.get("/{tree_id}", response_model=TreeResponse)
|
|
async def get_tree(
|
|
tree_id: UUID,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_user)]
|
|
):
|
|
"""Get a specific tree by ID."""
|
|
result = await db.execute(
|
|
select(Tree)
|
|
.options(
|
|
selectinload(Tree.category_rel),
|
|
selectinload(Tree.tags)
|
|
)
|
|
.where(Tree.id == tree_id)
|
|
)
|
|
tree = result.scalar_one_or_none()
|
|
|
|
if not tree:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Tree not found"
|
|
)
|
|
|
|
# Check access: tree must be active AND (default OR public OR author OR same team)
|
|
can_access = (
|
|
tree.is_default or
|
|
tree.is_public or
|
|
tree.author_id == current_user.id or
|
|
(tree.team_id == current_user.team_id and current_user.team_id is not None) or
|
|
current_user.is_super_admin
|
|
)
|
|
if not tree.is_active or not can_access:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have access to this tree"
|
|
)
|
|
|
|
return build_full_tree_response(tree)
|
|
|
|
|
|
@router.post("", response_model=TreeResponse, status_code=status.HTTP_201_CREATED)
|
|
async def create_tree(
|
|
tree_data: TreeCreate,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_engineer_or_admin)]
|
|
):
|
|
"""Create a new tree (engineers and admins only).
|
|
|
|
Supports:
|
|
- category_id: Assign to a category from tree_categories
|
|
- tags: List of tag names to assign (creates new tags if needed)
|
|
"""
|
|
# Only admins can create default/system trees
|
|
is_default = tree_data.is_default and current_user.is_super_admin
|
|
|
|
# Verify category exists if provided
|
|
if tree_data.category_id:
|
|
cat_result = await db.execute(
|
|
select(TreeCategory).where(TreeCategory.id == tree_data.category_id)
|
|
)
|
|
category = cat_result.scalar_one_or_none()
|
|
if not category:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Category not found"
|
|
)
|
|
# Check category access
|
|
if category.team_id and category.team_id != current_user.team_id and not current_user.is_super_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have access to this category"
|
|
)
|
|
|
|
new_tree = Tree(
|
|
name=tree_data.name,
|
|
description=tree_data.description,
|
|
category=tree_data.category,
|
|
category_id=tree_data.category_id,
|
|
tree_structure=tree_data.tree_structure,
|
|
author_id=None if is_default else current_user.id, # Default trees have no author
|
|
team_id=None if is_default else current_user.team_id,
|
|
is_public=True if is_default else tree_data.is_public, # Default trees are always public
|
|
is_default=is_default
|
|
)
|
|
db.add(new_tree)
|
|
await db.flush() # Get the ID
|
|
|
|
# Handle tags
|
|
if tree_data.tags:
|
|
tree_team_id = new_tree.team_id or (current_user.team_id if not current_user.is_super_admin else None)
|
|
|
|
# Collect tags to add
|
|
tags_to_add = []
|
|
for tag_name in tree_data.tags:
|
|
slug = TreeTag.slugify(tag_name)
|
|
|
|
# Try to find existing tag
|
|
tag_query = select(TreeTag).where(
|
|
TreeTag.slug == slug,
|
|
or_(
|
|
TreeTag.team_id.is_(None),
|
|
TreeTag.team_id == tree_team_id
|
|
)
|
|
)
|
|
tag_result = await db.execute(tag_query)
|
|
tag = tag_result.scalar_one_or_none()
|
|
|
|
if not tag:
|
|
# Create new tag
|
|
tag = TreeTag(
|
|
name=tag_name,
|
|
slug=slug,
|
|
team_id=tree_team_id,
|
|
created_by=current_user.id
|
|
)
|
|
db.add(tag)
|
|
await db.flush()
|
|
|
|
tags_to_add.append(tag)
|
|
tag.usage_count += 1
|
|
|
|
# Use direct SQL insert for the junction table to avoid lazy load issues
|
|
from app.models.tag import tree_tag_assignments
|
|
for tag in tags_to_add:
|
|
await db.execute(
|
|
tree_tag_assignments.insert().values(
|
|
tree_id=new_tree.id,
|
|
tag_id=tag.id,
|
|
assigned_by=current_user.id
|
|
)
|
|
)
|
|
|
|
await db.commit()
|
|
|
|
# Reload with relationships
|
|
result = await db.execute(
|
|
select(Tree)
|
|
.options(
|
|
selectinload(Tree.category_rel),
|
|
selectinload(Tree.tags)
|
|
)
|
|
.where(Tree.id == new_tree.id)
|
|
)
|
|
tree = result.scalar_one()
|
|
|
|
return build_full_tree_response(tree)
|
|
|
|
|
|
@router.put("/{tree_id}", response_model=TreeResponse)
|
|
async def update_tree(
|
|
tree_id: UUID,
|
|
tree_data: TreeUpdate,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_engineer_or_admin)]
|
|
):
|
|
"""Update an existing tree (engineers and admins only).
|
|
|
|
Supports:
|
|
- category_id: Change category assignment
|
|
- tags: Replace all tags on the tree
|
|
"""
|
|
result = await db.execute(
|
|
select(Tree)
|
|
.options(
|
|
selectinload(Tree.category_rel),
|
|
selectinload(Tree.tags)
|
|
)
|
|
.where(Tree.id == tree_id)
|
|
)
|
|
tree = result.scalar_one_or_none()
|
|
|
|
if not tree:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Tree not found"
|
|
)
|
|
|
|
# Check if user can edit: must be author, team admin for team trees, or global admin
|
|
can_edit = (
|
|
tree.author_id == current_user.id or
|
|
current_user.is_super_admin or
|
|
(current_user.is_team_admin and tree.team_id == current_user.team_id)
|
|
)
|
|
if not can_edit:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You can only edit your own trees"
|
|
)
|
|
|
|
# Extract tags for separate handling
|
|
update_data = tree_data.model_dump(exclude_unset=True)
|
|
tags_data = update_data.pop("tags", None)
|
|
|
|
# Verify new category if provided
|
|
if "category_id" in update_data and update_data["category_id"]:
|
|
cat_result = await db.execute(
|
|
select(TreeCategory).where(TreeCategory.id == update_data["category_id"])
|
|
)
|
|
category = cat_result.scalar_one_or_none()
|
|
if not category:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Category not found"
|
|
)
|
|
if category.team_id and category.team_id != current_user.team_id and not current_user.is_super_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have access to this category"
|
|
)
|
|
|
|
# Update basic fields
|
|
for field, value in update_data.items():
|
|
setattr(tree, field, value)
|
|
|
|
# Increment version if tree structure changed
|
|
if "tree_structure" in update_data:
|
|
tree.version += 1
|
|
|
|
# Handle tags replacement
|
|
if tags_data is not None:
|
|
from app.models.tag import tree_tag_assignments
|
|
|
|
# Decrement usage count for old tags (already eagerly loaded)
|
|
for tag in tree.tags:
|
|
tag.usage_count = max(0, tag.usage_count - 1)
|
|
|
|
# Delete existing tag assignments using direct SQL
|
|
await db.execute(
|
|
tree_tag_assignments.delete().where(
|
|
tree_tag_assignments.c.tree_id == tree.id
|
|
)
|
|
)
|
|
|
|
# Add new tags
|
|
tree_team_id = tree.team_id or (current_user.team_id if not current_user.is_super_admin else None)
|
|
added_tag_ids = set()
|
|
|
|
for tag_name in tags_data:
|
|
slug = TreeTag.slugify(tag_name)
|
|
|
|
tag_query = select(TreeTag).where(
|
|
TreeTag.slug == slug,
|
|
or_(
|
|
TreeTag.team_id.is_(None),
|
|
TreeTag.team_id == tree_team_id
|
|
)
|
|
)
|
|
tag_result = await db.execute(tag_query)
|
|
tag = tag_result.scalar_one_or_none()
|
|
|
|
if not tag:
|
|
tag = TreeTag(
|
|
name=tag_name,
|
|
slug=slug,
|
|
team_id=tree_team_id,
|
|
created_by=current_user.id
|
|
)
|
|
db.add(tag)
|
|
await db.flush()
|
|
|
|
if tag.id not in added_tag_ids:
|
|
await db.execute(
|
|
tree_tag_assignments.insert().values(
|
|
tree_id=tree.id,
|
|
tag_id=tag.id,
|
|
assigned_by=current_user.id
|
|
)
|
|
)
|
|
added_tag_ids.add(tag.id)
|
|
tag.usage_count += 1
|
|
|
|
await db.commit()
|
|
|
|
# Reload with relationships
|
|
result = await db.execute(
|
|
select(Tree)
|
|
.options(
|
|
selectinload(Tree.category_rel),
|
|
selectinload(Tree.tags)
|
|
)
|
|
.where(Tree.id == tree_id)
|
|
)
|
|
tree = result.scalar_one()
|
|
|
|
return build_full_tree_response(tree)
|
|
|
|
|
|
@router.delete("/{tree_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_tree(
|
|
tree_id: UUID,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_admin)]
|
|
):
|
|
"""Soft delete a tree (admin only). Sets is_active to False."""
|
|
result = await db.execute(select(Tree).where(Tree.id == tree_id))
|
|
tree = result.scalar_one_or_none()
|
|
|
|
if not tree:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Tree not found"
|
|
)
|
|
|
|
tree.is_active = False
|
|
await db.commit()
|
|
return None
|