Implements Issue #34 - Tree Library Full View System Backend Changes: - Add sort_by parameter to GET /api/v1/trees endpoint - Support 6 sorting options: usage_count, updated_at, created_at, name, name_desc, version - Maintain backward compatibility (defaults to usage_count) - Add comprehensive test for sorting functionality - All 104 backend tests passing Frontend Changes: - Create ViewToggle component for switching between Grid/List/Table views - Create SortDropdown component for 6 sort options - Create TreeGridView component (extracted from TreeLibraryPage) - Create TreeListView component (compact row-based layout) - Create TreeTableView component (sortable table with columns) - Update userPreferencesStore with view and sort preferences - Update TreeFilters type to include sort_by parameter - Update TreeLibraryPage to integrate new components - View and sort preferences persist to localStorage Features: - Grid view: Best for discovery (default) - List view: Best for quick scanning - Table view: Best for sorting and comparison - Responsive design: Mobile/tablet/desktop optimized - Table view hides columns responsively - Sortable table headers with visual indicators - Smooth transitions and hover effects - No layout shift when switching views Testing: - Backend: 104/104 tests pass - Frontend: Build successful, no TypeScript errors - All existing functionality preserved Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
757 lines
25 KiB
Python
757 lines
25 KiB
Python
from datetime import datetime, timezone
|
|
from typing import Annotated, Optional
|
|
from uuid import UUID
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Query
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, func, or_, true as sa_true, update
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.core.database import get_db
|
|
from app.models.tree import Tree
|
|
from app.models.user import User
|
|
from app.models.category import TreeCategory
|
|
from app.models.tag import TreeTag, tree_tag_assignments
|
|
from app.models.folder import UserFolder, user_folder_trees
|
|
from app.schemas.tree import TreeCreate, TreeUpdate, TreeResponse, TreeListResponse, CategoryInfo, ForkCreate, ForkInfo
|
|
from app.api.deps import get_current_active_user, require_engineer_or_admin, require_admin
|
|
from app.core.permissions import can_edit_tree, can_access_tree
|
|
from app.core.subscriptions import check_tree_limit
|
|
from app.core.audit import log_audit
|
|
|
|
router = APIRouter(prefix="/trees", tags=["trees"])
|
|
|
|
|
|
def build_tree_access_filter(current_user: User):
|
|
"""Build the access filter for trees based on user permissions.
|
|
|
|
Returns trees that are:
|
|
- All trees (for super admins)
|
|
- Default/system trees (visible to all)
|
|
- Public trees
|
|
- User's own trees
|
|
- Trees from user's team
|
|
"""
|
|
if current_user.is_super_admin:
|
|
return sa_true()
|
|
conditions = [
|
|
Tree.is_default == True,
|
|
Tree.is_public == True,
|
|
Tree.author_id == current_user.id,
|
|
]
|
|
if current_user.account_id:
|
|
conditions.append(Tree.account_id == current_user.account_id)
|
|
return or_(*conditions)
|
|
|
|
|
|
def build_tree_response(tree: Tree) -> TreeListResponse:
|
|
"""Build TreeListResponse with category_info and tags."""
|
|
category_info = None
|
|
if tree.category_rel:
|
|
category_info = CategoryInfo(
|
|
id=tree.category_rel.id,
|
|
name=tree.category_rel.name,
|
|
slug=tree.category_rel.slug
|
|
)
|
|
|
|
return TreeListResponse(
|
|
id=tree.id,
|
|
name=tree.name,
|
|
description=tree.description,
|
|
category=tree.category,
|
|
category_id=tree.category_id,
|
|
category_info=category_info,
|
|
tags=tree.tag_names,
|
|
author_id=tree.author_id,
|
|
account_id=tree.account_id,
|
|
is_active=tree.is_active,
|
|
is_public=tree.is_public,
|
|
is_default=tree.is_default,
|
|
version=tree.version,
|
|
usage_count=tree.usage_count,
|
|
created_at=tree.created_at,
|
|
updated_at=tree.updated_at
|
|
)
|
|
|
|
|
|
def build_full_tree_response(tree: Tree, parent_tree: Tree = None) -> TreeResponse:
|
|
"""Build TreeResponse with all details including category_info, tags, and fork_info."""
|
|
category_info = None
|
|
if tree.category_rel:
|
|
category_info = CategoryInfo(
|
|
id=tree.category_rel.id,
|
|
name=tree.category_rel.name,
|
|
slug=tree.category_rel.slug
|
|
)
|
|
|
|
fork_info = None
|
|
if tree.parent_tree_id or tree.fork_depth > 0:
|
|
has_updates = False
|
|
if parent_tree and tree.parent_updated_at:
|
|
has_updates = parent_tree.updated_at > tree.parent_updated_at
|
|
fork_info = ForkInfo(
|
|
parent_tree_id=tree.parent_tree_id,
|
|
root_tree_id=tree.root_tree_id,
|
|
fork_reason=tree.fork_reason,
|
|
fork_depth=tree.fork_depth,
|
|
parent_updated_at=tree.parent_updated_at,
|
|
has_parent_updates=has_updates
|
|
)
|
|
|
|
return TreeResponse(
|
|
id=tree.id,
|
|
name=tree.name,
|
|
description=tree.description,
|
|
category=tree.category,
|
|
category_id=tree.category_id,
|
|
category_info=category_info,
|
|
tags=tree.tag_names,
|
|
fork_info=fork_info,
|
|
tree_structure=tree.tree_structure,
|
|
author_id=tree.author_id,
|
|
account_id=tree.account_id,
|
|
is_active=tree.is_active,
|
|
is_public=tree.is_public,
|
|
is_default=tree.is_default,
|
|
version=tree.version,
|
|
usage_count=tree.usage_count,
|
|
created_at=tree.created_at,
|
|
updated_at=tree.updated_at
|
|
)
|
|
|
|
|
|
@router.get("", response_model=list[TreeListResponse])
|
|
async def list_trees(
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)],
|
|
category: Optional[str] = Query(None, description="Filter by legacy category string"),
|
|
category_id: Optional[UUID] = Query(None, description="Filter by category ID"),
|
|
tags: Optional[str] = Query(None, description="Comma-separated tag slugs to filter by"),
|
|
folder_id: Optional[UUID] = Query(None, description="Filter by folder ID (user's folders only)"),
|
|
is_active: Optional[bool] = Query(None, description="Filter by active status"),
|
|
author_id: Optional[UUID] = Query(None, description="Filter by author ID"),
|
|
is_public: Optional[bool] = Query(None, description="Filter by public status"),
|
|
sort_by: Optional[str] = Query("usage_count", description="Sort order: usage_count, updated_at, created_at, name, name_desc, version"),
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=100)
|
|
):
|
|
"""List all trees with optional filters.
|
|
|
|
New filters:
|
|
- category_id: Filter by category (from tree_categories table)
|
|
- tags: Comma-separated tag slugs (e.g., "citrix,networking")
|
|
- folder_id: Show only trees in a specific folder
|
|
- sort_by: Sort order (usage_count [default], updated_at, created_at, name, name_desc, version)
|
|
"""
|
|
query = select(Tree).options(
|
|
selectinload(Tree.category_rel),
|
|
selectinload(Tree.tags)
|
|
)
|
|
|
|
# Apply filters
|
|
if category:
|
|
query = query.where(Tree.category == category)
|
|
if category_id:
|
|
query = query.where(Tree.category_id == category_id)
|
|
if is_active is not None:
|
|
query = query.where(Tree.is_active == is_active)
|
|
else:
|
|
# Default to only showing active trees
|
|
query = query.where(Tree.is_active == True)
|
|
if author_id:
|
|
query = query.where(Tree.author_id == author_id)
|
|
if is_public is not None:
|
|
query = query.where(Tree.is_public == is_public)
|
|
|
|
# Filter by tags (all specified tags must be present)
|
|
if tags:
|
|
tag_slugs = [t.strip() for t in tags.split(",") if t.strip()]
|
|
for tag_slug in tag_slugs:
|
|
query = query.where(
|
|
Tree.tags.any(TreeTag.slug == tag_slug)
|
|
)
|
|
|
|
# Filter by folder
|
|
if folder_id:
|
|
# Verify folder belongs to user
|
|
folder_result = await db.execute(
|
|
select(UserFolder).where(
|
|
UserFolder.id == folder_id,
|
|
UserFolder.user_id == current_user.id
|
|
)
|
|
)
|
|
folder = folder_result.scalar_one_or_none()
|
|
if not folder:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Folder not found"
|
|
)
|
|
query = query.where(Tree.folders.any(UserFolder.id == folder_id))
|
|
|
|
# Apply access filter
|
|
query = query.where(build_tree_access_filter(current_user))
|
|
|
|
# Apply sorting
|
|
if sort_by == "updated_at":
|
|
query = query.order_by(Tree.updated_at.desc())
|
|
elif sort_by == "created_at":
|
|
query = query.order_by(Tree.created_at.desc())
|
|
elif sort_by == "name":
|
|
query = query.order_by(Tree.name.asc())
|
|
elif sort_by == "name_desc":
|
|
query = query.order_by(Tree.name.desc())
|
|
elif sort_by == "version":
|
|
query = query.order_by(Tree.version.desc(), Tree.updated_at.desc())
|
|
else: # Default to usage_count
|
|
query = query.order_by(Tree.usage_count.desc(), Tree.updated_at.desc())
|
|
|
|
query = query.offset(skip).limit(limit)
|
|
|
|
result = await db.execute(query)
|
|
trees = result.scalars().unique().all()
|
|
|
|
return [build_tree_response(tree) for tree in trees]
|
|
|
|
|
|
@router.get("/categories", response_model=list[str])
|
|
async def list_categories(
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
):
|
|
"""List all unique categories from trees the user can access.
|
|
|
|
Note: This returns legacy string categories. For the new category system,
|
|
use the /categories endpoint.
|
|
"""
|
|
query = select(Tree.category).where(
|
|
Tree.category.isnot(None),
|
|
Tree.is_active == True,
|
|
build_tree_access_filter(current_user)
|
|
).distinct()
|
|
result = await db.execute(query)
|
|
categories = [row[0] for row in result.all() if row[0]]
|
|
return sorted(categories)
|
|
|
|
|
|
@router.get("/search", response_model=list[TreeListResponse])
|
|
async def search_trees(
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)],
|
|
q: str = Query(..., min_length=2, description="Search query"),
|
|
limit: int = Query(20, ge=1, le=50)
|
|
):
|
|
"""Full-text search trees by name and description."""
|
|
# Using PostgreSQL full-text search
|
|
search_vector = func.to_tsvector('english', func.coalesce(Tree.name, '') + ' ' + func.coalesce(Tree.description, ''))
|
|
search_query = func.plainto_tsquery('english', q)
|
|
|
|
query = select(Tree).options(
|
|
selectinload(Tree.category_rel),
|
|
selectinload(Tree.tags)
|
|
).where(
|
|
Tree.is_active == True,
|
|
build_tree_access_filter(current_user),
|
|
search_vector.op('@@')(search_query)
|
|
).order_by(
|
|
func.ts_rank(search_vector, search_query).desc()
|
|
).limit(limit)
|
|
|
|
result = await db.execute(query)
|
|
trees = result.scalars().unique().all()
|
|
|
|
return [build_tree_response(tree) for tree in trees]
|
|
|
|
|
|
@router.get("/{tree_id}", response_model=TreeResponse)
|
|
async def get_tree(
|
|
tree_id: UUID,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
):
|
|
"""Get a specific tree by ID."""
|
|
result = await db.execute(
|
|
select(Tree)
|
|
.options(
|
|
selectinload(Tree.category_rel),
|
|
selectinload(Tree.tags)
|
|
)
|
|
.where(Tree.id == tree_id)
|
|
)
|
|
tree = result.scalar_one_or_none()
|
|
|
|
if not tree:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Tree not found"
|
|
)
|
|
|
|
if not tree.is_active or not can_access_tree(current_user, tree):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have access to this tree"
|
|
)
|
|
|
|
return build_full_tree_response(tree)
|
|
|
|
|
|
@router.post("", response_model=TreeResponse, status_code=status.HTTP_201_CREATED)
|
|
async def create_tree(
|
|
tree_data: TreeCreate,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_engineer_or_admin)]
|
|
):
|
|
"""Create a new tree (engineers and admins only).
|
|
|
|
Supports:
|
|
- category_id: Assign to a category from tree_categories
|
|
- tags: List of tag names to assign (creates new tags if needed)
|
|
"""
|
|
# Only admins can create default/system trees
|
|
is_default = tree_data.is_default and current_user.is_super_admin
|
|
|
|
# Verify category exists if provided
|
|
if tree_data.category_id:
|
|
cat_result = await db.execute(
|
|
select(TreeCategory).where(TreeCategory.id == tree_data.category_id)
|
|
)
|
|
category = cat_result.scalar_one_or_none()
|
|
if not category:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Category not found"
|
|
)
|
|
# Check category access
|
|
if category.account_id and category.account_id != current_user.account_id and not current_user.is_super_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have access to this category"
|
|
)
|
|
|
|
new_tree = Tree(
|
|
name=tree_data.name,
|
|
description=tree_data.description,
|
|
category=tree_data.category,
|
|
category_id=tree_data.category_id,
|
|
tree_structure=tree_data.tree_structure,
|
|
author_id=None if is_default else current_user.id, # Default trees have no author
|
|
account_id=None if is_default else current_user.account_id,
|
|
is_public=True if is_default else tree_data.is_public, # Default trees are always public
|
|
is_default=is_default
|
|
)
|
|
# Check subscription tree limit
|
|
if not is_default and current_user.account_id:
|
|
can_create, limit, count = await check_tree_limit(current_user.account_id, db)
|
|
if not can_create:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_402_PAYMENT_REQUIRED,
|
|
detail=f"Tree limit reached ({count}/{limit}). Upgrade your plan to create more trees."
|
|
)
|
|
|
|
db.add(new_tree)
|
|
await db.flush() # Get the ID
|
|
|
|
# Handle tags
|
|
if tree_data.tags:
|
|
tree_account_id = new_tree.account_id or (current_user.account_id if not current_user.is_super_admin else None)
|
|
|
|
# Collect tags to add
|
|
tags_to_add = []
|
|
for tag_name in tree_data.tags:
|
|
slug = TreeTag.slugify(tag_name)
|
|
|
|
# Try to find existing tag
|
|
tag_query = select(TreeTag).where(
|
|
TreeTag.slug == slug,
|
|
or_(
|
|
TreeTag.account_id.is_(None),
|
|
TreeTag.account_id == tree_account_id
|
|
)
|
|
)
|
|
tag_result = await db.execute(tag_query)
|
|
tag = tag_result.scalar_one_or_none()
|
|
|
|
if not tag:
|
|
# Create new tag
|
|
tag = TreeTag(
|
|
name=tag_name,
|
|
slug=slug,
|
|
account_id=tree_account_id,
|
|
created_by=current_user.id
|
|
)
|
|
db.add(tag)
|
|
await db.flush()
|
|
|
|
tags_to_add.append(tag)
|
|
tag.usage_count += 1
|
|
|
|
# Use direct SQL insert for the junction table to avoid lazy load issues
|
|
from app.models.tag import tree_tag_assignments
|
|
for tag in tags_to_add:
|
|
await db.execute(
|
|
tree_tag_assignments.insert().values(
|
|
tree_id=new_tree.id,
|
|
tag_id=tag.id,
|
|
assigned_by=current_user.id
|
|
)
|
|
)
|
|
|
|
await db.commit()
|
|
|
|
# Reload with relationships
|
|
result = await db.execute(
|
|
select(Tree)
|
|
.options(
|
|
selectinload(Tree.category_rel),
|
|
selectinload(Tree.tags)
|
|
)
|
|
.where(Tree.id == new_tree.id)
|
|
)
|
|
tree = result.scalar_one()
|
|
|
|
return build_full_tree_response(tree)
|
|
|
|
|
|
@router.put("/{tree_id}", response_model=TreeResponse)
|
|
async def update_tree(
|
|
tree_id: UUID,
|
|
tree_data: TreeUpdate,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_engineer_or_admin)]
|
|
):
|
|
"""Update an existing tree (engineers and admins only).
|
|
|
|
Supports:
|
|
- category_id: Change category assignment
|
|
- tags: Replace all tags on the tree
|
|
"""
|
|
result = await db.execute(
|
|
select(Tree)
|
|
.options(
|
|
selectinload(Tree.category_rel),
|
|
selectinload(Tree.tags)
|
|
)
|
|
.where(Tree.id == tree_id)
|
|
)
|
|
tree = result.scalar_one_or_none()
|
|
|
|
if not tree:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Tree not found"
|
|
)
|
|
|
|
if not can_edit_tree(current_user, tree):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You can only edit your own trees"
|
|
)
|
|
|
|
# Extract tags for separate handling
|
|
update_data = tree_data.model_dump(exclude_unset=True)
|
|
tags_data = update_data.pop("tags", None)
|
|
|
|
# Verify new category if provided
|
|
if "category_id" in update_data and update_data["category_id"]:
|
|
cat_result = await db.execute(
|
|
select(TreeCategory).where(TreeCategory.id == update_data["category_id"])
|
|
)
|
|
category = cat_result.scalar_one_or_none()
|
|
if not category:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Category not found"
|
|
)
|
|
if category.account_id and category.account_id != current_user.account_id and not current_user.is_super_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have access to this category"
|
|
)
|
|
|
|
# Update basic fields
|
|
for field, value in update_data.items():
|
|
setattr(tree, field, value)
|
|
|
|
# Increment version if tree structure changed
|
|
if "tree_structure" in update_data:
|
|
tree.version += 1
|
|
|
|
# Handle tags replacement
|
|
if tags_data is not None:
|
|
from app.models.tag import tree_tag_assignments
|
|
|
|
# Decrement usage count for old tags (already eagerly loaded)
|
|
for tag in tree.tags:
|
|
tag.usage_count = max(0, tag.usage_count - 1)
|
|
|
|
# Delete existing tag assignments using direct SQL
|
|
await db.execute(
|
|
tree_tag_assignments.delete().where(
|
|
tree_tag_assignments.c.tree_id == tree.id
|
|
)
|
|
)
|
|
|
|
# Add new tags
|
|
tree_account_id = tree.account_id or (current_user.account_id if not current_user.is_super_admin else None)
|
|
added_tag_ids = set()
|
|
|
|
for tag_name in tags_data:
|
|
slug = TreeTag.slugify(tag_name)
|
|
|
|
tag_query = select(TreeTag).where(
|
|
TreeTag.slug == slug,
|
|
or_(
|
|
TreeTag.account_id.is_(None),
|
|
TreeTag.account_id == tree_account_id
|
|
)
|
|
)
|
|
tag_result = await db.execute(tag_query)
|
|
tag = tag_result.scalar_one_or_none()
|
|
|
|
if not tag:
|
|
tag = TreeTag(
|
|
name=tag_name,
|
|
slug=slug,
|
|
account_id=tree_account_id,
|
|
created_by=current_user.id
|
|
)
|
|
db.add(tag)
|
|
await db.flush()
|
|
|
|
if tag.id not in added_tag_ids:
|
|
await db.execute(
|
|
tree_tag_assignments.insert().values(
|
|
tree_id=tree.id,
|
|
tag_id=tag.id,
|
|
assigned_by=current_user.id
|
|
)
|
|
)
|
|
added_tag_ids.add(tag.id)
|
|
tag.usage_count += 1
|
|
|
|
await db.commit()
|
|
|
|
# Reload with relationships
|
|
result = await db.execute(
|
|
select(Tree)
|
|
.options(
|
|
selectinload(Tree.category_rel),
|
|
selectinload(Tree.tags)
|
|
)
|
|
.where(Tree.id == tree_id)
|
|
)
|
|
tree = result.scalar_one()
|
|
|
|
return build_full_tree_response(tree)
|
|
|
|
|
|
@router.delete("/{tree_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_tree(
|
|
tree_id: UUID,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_admin)]
|
|
):
|
|
"""Soft delete a tree (admin only). Sets deleted_at timestamp and is_active=False."""
|
|
result = await db.execute(select(Tree).where(Tree.id == tree_id))
|
|
tree = result.scalar_one_or_none()
|
|
|
|
if not tree:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Tree not found"
|
|
)
|
|
|
|
tree.is_active = False
|
|
tree.deleted_at = datetime.now(timezone.utc)
|
|
tree.deleted_by = current_user.id
|
|
|
|
# Clean up folder assignments
|
|
await db.execute(
|
|
user_folder_trees.delete().where(user_folder_trees.c.tree_id == tree.id)
|
|
)
|
|
|
|
# Decrement usage_count on associated tags (floor at 0)
|
|
tag_ids_result = await db.execute(
|
|
select(tree_tag_assignments.c.tag_id).where(
|
|
tree_tag_assignments.c.tree_id == tree.id
|
|
)
|
|
)
|
|
tag_ids = [row[0] for row in tag_ids_result.fetchall()]
|
|
if tag_ids:
|
|
await db.execute(
|
|
update(TreeTag)
|
|
.where(TreeTag.id.in_(tag_ids))
|
|
.values(usage_count=func.greatest(TreeTag.usage_count - 1, 0))
|
|
)
|
|
|
|
# Clean up tag assignments
|
|
await db.execute(
|
|
tree_tag_assignments.delete().where(tree_tag_assignments.c.tree_id == tree.id)
|
|
)
|
|
|
|
await log_audit(db, current_user.id, "tree.delete", "tree", tree.id,
|
|
{"tree_name": tree.name})
|
|
await db.commit()
|
|
return None
|
|
|
|
|
|
# --- Fork Endpoints ---
|
|
|
|
|
|
@router.post("/{tree_id}/fork", response_model=TreeResponse, status_code=status.HTTP_201_CREATED)
|
|
async def fork_tree(
|
|
tree_id: UUID,
|
|
fork_data: ForkCreate,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_engineer_or_admin)]
|
|
):
|
|
"""Fork a tree to create a personal copy.
|
|
|
|
Engineers can fork any tree they can access (public, account, or default).
|
|
Fork inherits tree_structure but gets new ownership.
|
|
"""
|
|
# Load parent tree
|
|
result = await db.execute(
|
|
select(Tree)
|
|
.options(selectinload(Tree.category_rel), selectinload(Tree.tags))
|
|
.where(Tree.id == tree_id)
|
|
)
|
|
parent = result.scalar_one_or_none()
|
|
|
|
if not parent:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Tree not found"
|
|
)
|
|
|
|
if not can_access_tree(current_user, parent):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have access to this tree"
|
|
)
|
|
|
|
# Check subscription tree limit
|
|
if current_user.account_id:
|
|
can_create, limit, count = await check_tree_limit(current_user.account_id, db)
|
|
if not can_create:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_402_PAYMENT_REQUIRED,
|
|
detail=f"Tree limit reached ({count}/{limit}). Upgrade your plan to create more trees."
|
|
)
|
|
|
|
# Build fork
|
|
fork_name = fork_data.name or f"Fork of {parent.name}"
|
|
fork = Tree(
|
|
name=fork_name,
|
|
description=parent.description,
|
|
category=parent.category,
|
|
category_id=parent.category_id,
|
|
tree_structure=parent.tree_structure,
|
|
author_id=current_user.id,
|
|
account_id=current_user.account_id,
|
|
is_public=False,
|
|
is_default=False,
|
|
version=1,
|
|
# Fork tracking
|
|
parent_tree_id=parent.id,
|
|
fork_reason=fork_data.fork_reason,
|
|
parent_updated_at=parent.updated_at,
|
|
# Lineage tracking
|
|
root_tree_id=parent.root_tree_id if parent.root_tree_id else parent.id,
|
|
fork_depth=parent.fork_depth + 1,
|
|
)
|
|
|
|
db.add(fork)
|
|
await db.flush()
|
|
|
|
await log_audit(db, current_user.id, "tree.fork", "tree", fork.id,
|
|
{"parent_tree_id": str(parent.id), "parent_name": parent.name,
|
|
"fork_reason": fork_data.fork_reason})
|
|
await db.commit()
|
|
|
|
# Reload with relationships
|
|
result = await db.execute(
|
|
select(Tree)
|
|
.options(selectinload(Tree.category_rel), selectinload(Tree.tags))
|
|
.where(Tree.id == fork.id)
|
|
)
|
|
fork = result.scalar_one()
|
|
|
|
return build_full_tree_response(fork, parent_tree=parent)
|
|
|
|
|
|
@router.get("/{tree_id}/forks", response_model=list[TreeListResponse])
|
|
async def list_forks(
|
|
tree_id: UUID,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)],
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(50, ge=1, le=100)
|
|
):
|
|
"""List all direct forks of a tree."""
|
|
# Verify parent exists and user can access it
|
|
parent_result = await db.execute(select(Tree).where(Tree.id == tree_id))
|
|
parent = parent_result.scalar_one_or_none()
|
|
|
|
if not parent:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Tree not found"
|
|
)
|
|
|
|
if not can_access_tree(current_user, parent):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have access to this tree"
|
|
)
|
|
|
|
# Query direct forks, filtered by access
|
|
query = select(Tree).options(
|
|
selectinload(Tree.category_rel),
|
|
selectinload(Tree.tags)
|
|
).where(
|
|
Tree.parent_tree_id == tree_id,
|
|
Tree.is_active == True,
|
|
build_tree_access_filter(current_user)
|
|
).order_by(Tree.created_at.desc()).offset(skip).limit(limit)
|
|
|
|
result = await db.execute(query)
|
|
forks = result.scalars().unique().all()
|
|
|
|
return [build_tree_response(tree) for tree in forks]
|
|
|
|
|
|
@router.get("/{tree_id}/lineage", response_model=list[TreeListResponse])
|
|
async def get_tree_lineage(
|
|
tree_id: UUID,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
):
|
|
"""Get the fork lineage chain from current tree back to root.
|
|
|
|
Returns ordered list: [current tree, parent, grandparent, ..., root]
|
|
Limited to 10 levels to prevent infinite loops.
|
|
"""
|
|
lineage = []
|
|
current_id = tree_id
|
|
visited = set()
|
|
max_depth = 10
|
|
|
|
for _ in range(max_depth):
|
|
if current_id is None or current_id in visited:
|
|
break
|
|
visited.add(current_id)
|
|
|
|
result = await db.execute(
|
|
select(Tree)
|
|
.options(selectinload(Tree.category_rel), selectinload(Tree.tags))
|
|
.where(Tree.id == current_id)
|
|
)
|
|
tree = result.scalar_one_or_none()
|
|
|
|
if not tree:
|
|
break
|
|
|
|
lineage.append(build_tree_response(tree))
|
|
current_id = tree.parent_tree_id
|
|
|
|
return lineage
|