Add tree organization system with categories, tags, and folders

Features:
- Categories: Global and team-specific tree categorization (admin-managed)
- Tags: Flexible tree tagging with autocomplete (author + admin)
- User folders: Personal tree collections with subfolder support
  - Hierarchical structure (max 3 levels deep)
  - Right-click context menu for folder management
  - Cascade delete for subfolders
- Filter trees by category, tags, and folder in library view

Backend:
- New models: Category, Tag, UserFolder with relationships
- New API endpoints for categories, tags, and folders
- Tree organization migrations (005, 006)

Frontend:
- FolderSidebar with hierarchical folder tree
- FolderEditModal for create/edit with color picker
- AddToFolderMenu for quick tree organization
- TagInput with autocomplete and TagBadges display
- Updated TreeMetadataForm and TreeLibraryPage

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-02-02 01:31:13 -05:00
parent 2d99c52025
commit fafdaa50a5
41 changed files with 5006 additions and 221 deletions

View File

@@ -0,0 +1,193 @@
"""Add tree organization: tags, categories, folders, team admin
Revision ID: 005
Revises: 004
Create Date: 2026-02-01
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = '005'
down_revision: Union[str, None] = '004'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ===========================================
# 1. Add is_team_admin to users
# ===========================================
op.add_column('users', sa.Column('is_team_admin', sa.Boolean(), nullable=False, server_default='false'))
# ===========================================
# 2. Create tree_categories table
# ===========================================
op.create_table(
'tree_categories',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')),
sa.Column('name', sa.String(100), nullable=False),
sa.Column('slug', sa.String(100), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('team_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('teams.id', ondelete='CASCADE'), nullable=True),
sa.Column('display_order', sa.Integer(), nullable=False, server_default='0'),
sa.Column('is_active', sa.Boolean(), nullable=False, server_default='true'),
sa.Column('created_by', postgresql.UUID(as_uuid=True), sa.ForeignKey('users.id', ondelete='SET NULL'), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
sa.UniqueConstraint('slug', 'team_id', name='uq_tree_categories_slug_team')
)
op.create_index('ix_tree_categories_team_id', 'tree_categories', ['team_id'])
op.create_index('ix_tree_categories_slug', 'tree_categories', ['slug'])
op.create_index('ix_tree_categories_display_order', 'tree_categories', ['display_order'])
# ===========================================
# 3. Create tree_tags table
# ===========================================
op.create_table(
'tree_tags',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')),
sa.Column('name', sa.String(50), nullable=False),
sa.Column('slug', sa.String(50), nullable=False),
sa.Column('team_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('teams.id', ondelete='CASCADE'), nullable=True),
sa.Column('usage_count', sa.Integer(), nullable=False, server_default='0'),
sa.Column('created_by', postgresql.UUID(as_uuid=True), sa.ForeignKey('users.id', ondelete='SET NULL'), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
sa.UniqueConstraint('slug', 'team_id', name='uq_tree_tags_slug_team')
)
op.create_index('ix_tree_tags_slug', 'tree_tags', ['slug'])
op.create_index('ix_tree_tags_team_id', 'tree_tags', ['team_id'])
op.create_index('ix_tree_tags_usage_count', 'tree_tags', ['usage_count'])
# ===========================================
# 4. Create tree_tag_assignments junction table
# ===========================================
op.create_table(
'tree_tag_assignments',
sa.Column('tree_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('trees.id', ondelete='CASCADE'), primary_key=True),
sa.Column('tag_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('tree_tags.id', ondelete='CASCADE'), primary_key=True),
sa.Column('assigned_by', postgresql.UUID(as_uuid=True), sa.ForeignKey('users.id', ondelete='SET NULL'), nullable=True),
sa.Column('assigned_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()'))
)
op.create_index('ix_tree_tag_assignments_tree_id', 'tree_tag_assignments', ['tree_id'])
op.create_index('ix_tree_tag_assignments_tag_id', 'tree_tag_assignments', ['tag_id'])
# ===========================================
# 5. Create user_folders table
# ===========================================
op.create_table(
'user_folders',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True, server_default=sa.text('gen_random_uuid()')),
sa.Column('user_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('users.id', ondelete='CASCADE'), nullable=False),
sa.Column('name', sa.String(100), nullable=False),
sa.Column('color', sa.String(7), nullable=False, server_default='#6366f1'),
sa.Column('icon', sa.String(50), nullable=False, server_default='folder'),
sa.Column('display_order', sa.Integer(), nullable=False, server_default='0'),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
sa.UniqueConstraint('user_id', 'name', name='uq_user_folders_user_name')
)
op.create_index('ix_user_folders_user_id', 'user_folders', ['user_id'])
# ===========================================
# 6. Create user_folder_trees junction table
# ===========================================
op.create_table(
'user_folder_trees',
sa.Column('folder_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('user_folders.id', ondelete='CASCADE'), primary_key=True),
sa.Column('tree_id', postgresql.UUID(as_uuid=True), sa.ForeignKey('trees.id', ondelete='CASCADE'), primary_key=True),
sa.Column('added_at', sa.DateTime(timezone=True), nullable=False, server_default=sa.text('now()')),
sa.Column('display_order', sa.Integer(), nullable=False, server_default='0')
)
op.create_index('ix_user_folder_trees_folder_id', 'user_folder_trees', ['folder_id'])
op.create_index('ix_user_folder_trees_tree_id', 'user_folder_trees', ['tree_id'])
# ===========================================
# 7. Add category_id to trees table
# ===========================================
op.add_column('trees', sa.Column('category_id', postgresql.UUID(as_uuid=True), nullable=True))
op.create_foreign_key('fk_trees_category_id', 'trees', 'tree_categories', ['category_id'], ['id'], ondelete='SET NULL')
op.create_index('ix_trees_category_id', 'trees', ['category_id'])
# ===========================================
# 8. Seed default global categories
# ===========================================
op.execute("""
INSERT INTO tree_categories (name, slug, description, display_order, team_id) VALUES
('Help Desk', 'help-desk', 'Tier 1 support - password resets, basic troubleshooting', 1, NULL),
('Desktop Support', 'desktop-support', 'Tier 2 support - workstation issues, software problems', 2, NULL),
('Systems Administration', 'systems-administration', 'Tier 3 support - servers, Active Directory, infrastructure', 3, NULL),
('Networking', 'networking', 'Network connectivity, VPN, firewall, DNS/DHCP', 4, NULL),
('Security', 'security', 'Security incidents, access issues, compliance', 5, NULL),
('Cloud & Microsoft 365', 'cloud-m365', 'Microsoft 365, Azure, cloud services', 6, NULL),
('Backup & Recovery', 'backup-recovery', 'Backup systems, disaster recovery, data restoration', 7, NULL),
('Printing', 'printing', 'Print servers, printers, drivers, spooler issues', 8, NULL)
""")
# ===========================================
# 9. Migrate existing category strings to new system
# ===========================================
# First, create tags from existing categories
op.execute("""
INSERT INTO tree_tags (name, slug, team_id, usage_count)
SELECT DISTINCT
category as name,
LOWER(REGEXP_REPLACE(REGEXP_REPLACE(category, '[^a-zA-Z0-9 ]', '', 'g'), ' +', '-', 'g')) as slug,
NULL::uuid as team_id,
COUNT(*) OVER (PARTITION BY category) as usage_count
FROM trees
WHERE category IS NOT NULL AND category != ''
ON CONFLICT (slug, team_id) DO NOTHING
""")
# Assign tags to trees based on their old category
op.execute("""
INSERT INTO tree_tag_assignments (tree_id, tag_id)
SELECT t.id as tree_id, tt.id as tag_id
FROM trees t
JOIN tree_tags tt ON tt.slug = LOWER(REGEXP_REPLACE(REGEXP_REPLACE(t.category, '[^a-zA-Z0-9 ]', '', 'g'), ' +', '-', 'g'))
AND tt.team_id IS NULL
WHERE t.category IS NOT NULL AND t.category != ''
ON CONFLICT DO NOTHING
""")
def downgrade() -> None:
# Drop foreign key and column from trees
op.drop_constraint('fk_trees_category_id', 'trees', type_='foreignkey')
op.drop_index('ix_trees_category_id', table_name='trees')
op.drop_column('trees', 'category_id')
# Drop user_folder_trees
op.drop_index('ix_user_folder_trees_tree_id', table_name='user_folder_trees')
op.drop_index('ix_user_folder_trees_folder_id', table_name='user_folder_trees')
op.drop_table('user_folder_trees')
# Drop user_folders
op.drop_index('ix_user_folders_user_id', table_name='user_folders')
op.drop_table('user_folders')
# Drop tree_tag_assignments
op.drop_index('ix_tree_tag_assignments_tag_id', table_name='tree_tag_assignments')
op.drop_index('ix_tree_tag_assignments_tree_id', table_name='tree_tag_assignments')
op.drop_table('tree_tag_assignments')
# Drop tree_tags
op.drop_index('ix_tree_tags_usage_count', table_name='tree_tags')
op.drop_index('ix_tree_tags_team_id', table_name='tree_tags')
op.drop_index('ix_tree_tags_slug', table_name='tree_tags')
op.drop_table('tree_tags')
# Drop tree_categories
op.drop_index('ix_tree_categories_display_order', table_name='tree_categories')
op.drop_index('ix_tree_categories_slug', table_name='tree_categories')
op.drop_index('ix_tree_categories_team_id', table_name='tree_categories')
op.drop_table('tree_categories')
# Remove is_team_admin from users
op.drop_column('users', 'is_team_admin')

View File

@@ -0,0 +1,54 @@
"""Add folder hierarchy support
Revision ID: 006
Revises: 005
Create Date: 2026-02-02
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = '006'
down_revision: Union[str, None] = '005'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Add parent_id column for folder hierarchy
op.add_column(
'user_folders',
sa.Column(
'parent_id',
postgresql.UUID(as_uuid=True),
sa.ForeignKey('user_folders.id', ondelete='CASCADE'),
nullable=True
)
)
op.create_index('ix_user_folders_parent_id', 'user_folders', ['parent_id'])
# Update unique constraint to allow same name in different parent folders
# Old constraint: (user_id, name) must be unique
# New constraint: (user_id, name, parent_id) must be unique
# This allows folders with same name under different parents
op.drop_constraint('uq_user_folders_user_name', 'user_folders', type_='unique')
op.create_unique_constraint(
'uq_user_folders_user_name_parent',
'user_folders',
['user_id', 'name', 'parent_id']
)
def downgrade() -> None:
# Restore original unique constraint
op.drop_constraint('uq_user_folders_user_name_parent', 'user_folders', type_='unique')
op.create_unique_constraint('uq_user_folders_user_name', 'user_folders', ['user_id', 'name'])
# Remove parent_id column and index
op.drop_index('ix_user_folders_parent_id', table_name='user_folders')
op.drop_column('user_folders', 'parent_id')

View File

@@ -0,0 +1,314 @@
from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, or_
import re
from app.core.database import get_db
from app.models.category import TreeCategory
from app.models.tree import Tree
from app.models.user import User
from app.schemas.category import CategoryCreate, CategoryUpdate, CategoryResponse, CategoryListResponse
from app.api.deps import get_current_user
router = APIRouter(prefix="/categories", tags=["categories"])
def slugify(name: str) -> str:
"""Convert a name to a URL-safe slug."""
slug = re.sub(r'[^a-zA-Z0-9 ]', '', name.lower())
slug = re.sub(r' +', '-', slug.strip())
return slug
def can_manage_category(user: User, category: TreeCategory) -> bool:
"""Check if user can manage (edit/delete) a category."""
# Global admins can manage any category
if user.role == "admin":
return True
# Team admins can manage their team's categories
if user.is_team_admin and category.team_id == user.team_id:
return True
return False
def can_create_category(user: User, team_id: Optional[UUID]) -> bool:
"""Check if user can create a category for the given team."""
# Global admins can create global categories (team_id=None) or any team's categories
if user.role == "admin":
return True
# Team admins can only create categories for their own team
if user.is_team_admin and team_id == user.team_id:
return True
return False
@router.get("", response_model=list[CategoryListResponse])
async def list_categories(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
include_inactive: bool = Query(False, description="Include inactive categories"),
team_only: bool = Query(False, description="Only show team-specific categories")
):
"""List categories visible to the user.
Returns global categories plus team-specific categories for the user's team.
"""
# Build query for accessible categories
query = select(TreeCategory)
# Filter by active status
if not include_inactive:
query = query.where(TreeCategory.is_active == True)
# Filter by visibility: global OR user's team
if team_only and current_user.team_id:
query = query.where(TreeCategory.team_id == current_user.team_id)
elif current_user.team_id:
query = query.where(
or_(
TreeCategory.team_id.is_(None), # Global
TreeCategory.team_id == current_user.team_id # User's team
)
)
else:
# User has no team, only show global categories
query = query.where(TreeCategory.team_id.is_(None))
query = query.order_by(TreeCategory.display_order, TreeCategory.name)
result = await db.execute(query)
categories = result.scalars().all()
# Get tree counts for each category
response = []
for cat in categories:
# Count trees in this category
count_query = select(func.count(Tree.id)).where(
Tree.category_id == cat.id,
Tree.is_active == True
)
count_result = await db.execute(count_query)
tree_count = count_result.scalar() or 0
response.append(CategoryListResponse(
id=cat.id,
name=cat.name,
slug=cat.slug,
description=cat.description,
team_id=cat.team_id,
display_order=cat.display_order,
is_active=cat.is_active,
tree_count=tree_count
))
return response
@router.get("/{category_id}", response_model=CategoryResponse)
async def get_category(
category_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Get a specific category by ID."""
result = await db.execute(select(TreeCategory).where(TreeCategory.id == category_id))
category = result.scalar_one_or_none()
if not category:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Category not found"
)
# Check access: global categories visible to all, team categories only to team members
if category.team_id and category.team_id != current_user.team_id and current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this category"
)
# Get tree count
count_query = select(func.count(Tree.id)).where(
Tree.category_id == category.id,
Tree.is_active == True
)
count_result = await db.execute(count_query)
tree_count = count_result.scalar() or 0
return CategoryResponse(
id=category.id,
name=category.name,
slug=category.slug,
description=category.description,
team_id=category.team_id,
display_order=category.display_order,
is_active=category.is_active,
created_at=category.created_at,
updated_at=category.updated_at,
tree_count=tree_count
)
@router.post("", response_model=CategoryResponse, status_code=status.HTTP_201_CREATED)
async def create_category(
category_data: CategoryCreate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Create a new category.
- Global admins can create global categories (team_id=None)
- Team admins can create team-specific categories for their team
"""
if not can_create_category(current_user, category_data.team_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to create this category"
)
# Generate slug
slug = slugify(category_data.name)
# Check for duplicate slug within same scope (global or team)
existing_query = select(TreeCategory).where(
TreeCategory.slug == slug,
TreeCategory.team_id == category_data.team_id
)
existing = await db.execute(existing_query)
if existing.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"A category with slug '{slug}' already exists"
)
# Get next display order
order_query = select(func.max(TreeCategory.display_order)).where(
TreeCategory.team_id == category_data.team_id
)
order_result = await db.execute(order_query)
max_order = order_result.scalar() or 0
new_category = TreeCategory(
name=category_data.name,
slug=slug,
description=category_data.description,
team_id=category_data.team_id,
display_order=max_order + 1,
created_by=current_user.id
)
db.add(new_category)
await db.commit()
await db.refresh(new_category)
return CategoryResponse(
id=new_category.id,
name=new_category.name,
slug=new_category.slug,
description=new_category.description,
team_id=new_category.team_id,
display_order=new_category.display_order,
is_active=new_category.is_active,
created_at=new_category.created_at,
updated_at=new_category.updated_at,
tree_count=0
)
@router.put("/{category_id}", response_model=CategoryResponse)
async def update_category(
category_id: UUID,
category_data: CategoryUpdate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Update a category."""
result = await db.execute(select(TreeCategory).where(TreeCategory.id == category_id))
category = result.scalar_one_or_none()
if not category:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Category not found"
)
if not can_manage_category(current_user, category):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to update this category"
)
# Update fields
update_data = category_data.model_dump(exclude_unset=True)
# If name is being updated, regenerate slug
if "name" in update_data:
new_slug = slugify(update_data["name"])
# Check for duplicate slug
existing_query = select(TreeCategory).where(
TreeCategory.slug == new_slug,
TreeCategory.team_id == category.team_id,
TreeCategory.id != category_id
)
existing = await db.execute(existing_query)
if existing.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"A category with slug '{new_slug}' already exists"
)
category.slug = new_slug
for field, value in update_data.items():
setattr(category, field, value)
await db.commit()
await db.refresh(category)
# Get tree count
count_query = select(func.count(Tree.id)).where(
Tree.category_id == category.id,
Tree.is_active == True
)
count_result = await db.execute(count_query)
tree_count = count_result.scalar() or 0
return CategoryResponse(
id=category.id,
name=category.name,
slug=category.slug,
description=category.description,
team_id=category.team_id,
display_order=category.display_order,
is_active=category.is_active,
created_at=category.created_at,
updated_at=category.updated_at,
tree_count=tree_count
)
@router.delete("/{category_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_category(
category_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Soft delete (archive) a category."""
result = await db.execute(select(TreeCategory).where(TreeCategory.id == category_id))
category = result.scalar_one_or_none()
if not category:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Category not found"
)
if not can_manage_category(current_user, category):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to delete this category"
)
category.is_active = False
await db.commit()
return None

View File

@@ -0,0 +1,549 @@
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from sqlalchemy.orm import selectinload
from app.core.database import get_db
from app.models.folder import UserFolder, user_folder_trees
from app.models.tree import Tree
from app.models.user import User
from app.schemas.folder import (
FolderCreate,
FolderUpdate,
FolderResponse,
FolderListResponse,
FolderReorderRequest,
FolderTreeRequest
)
from app.api.deps import get_current_user
router = APIRouter(prefix="/folders", tags=["folders"])
# Maximum nesting depth for folders (root -> child -> grandchild = 3 levels)
MAX_FOLDER_DEPTH = 3
async def get_folder_depth(db: AsyncSession, folder_id: UUID, current_depth: int = 1) -> int:
"""Calculate the depth of a folder in the hierarchy.
A root folder has depth 1, its child has depth 2, etc.
"""
result = await db.execute(
select(UserFolder.parent_id).where(UserFolder.id == folder_id)
)
parent_id = result.scalar_one_or_none()
if parent_id is None:
return current_depth
return await get_folder_depth(db, parent_id, current_depth + 1)
async def is_descendant(db: AsyncSession, potential_descendant_id: UUID, ancestor_id: UUID) -> bool:
"""Check if potential_descendant_id is a descendant of ancestor_id.
Used to prevent cycles when moving folders.
"""
current_id = potential_descendant_id
visited = set()
while current_id:
if current_id in visited:
return False # Cycle detected, shouldn't happen but be safe
if current_id == ancestor_id:
return True
visited.add(current_id)
result = await db.execute(
select(UserFolder.parent_id).where(UserFolder.id == current_id)
)
current_id = result.scalar_one_or_none()
return False
def can_access_tree(user: User, tree: Tree) -> bool:
"""Check if user can access a tree (to add to folder).
User can access tree if:
- Tree is public
- User is the author
- Tree belongs to user's team
- User is a global admin
"""
if tree.is_public:
return True
if user.id == tree.author_id:
return True
if tree.team_id == user.team_id and user.team_id is not None:
return True
if user.role == "admin":
return True
return False
@router.get("", response_model=list[FolderListResponse])
async def list_folders(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""List all folders for the current user.
Returns folders ordered by display_order.
"""
query = (
select(UserFolder)
.options(selectinload(UserFolder.trees))
.where(UserFolder.user_id == current_user.id)
.order_by(UserFolder.display_order, UserFolder.name)
)
result = await db.execute(query)
folders = result.scalars().all()
return [
FolderListResponse(
id=folder.id,
name=folder.name,
color=folder.color,
icon=folder.icon,
parent_id=folder.parent_id,
display_order=folder.display_order,
tree_count=folder.tree_count
)
for folder in folders
]
@router.get("/{folder_id}", response_model=FolderResponse)
async def get_folder(
folder_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Get a specific folder by ID."""
result = await db.execute(
select(UserFolder)
.options(selectinload(UserFolder.trees))
.where(UserFolder.id == folder_id)
)
folder = result.scalar_one_or_none()
if not folder:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Folder not found"
)
# Folders are private to their owner
if folder.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this folder"
)
return FolderResponse(
id=folder.id,
name=folder.name,
color=folder.color,
icon=folder.icon,
parent_id=folder.parent_id,
display_order=folder.display_order,
tree_count=folder.tree_count,
created_at=folder.created_at,
updated_at=folder.updated_at
)
@router.post("", response_model=FolderResponse, status_code=status.HTTP_201_CREATED)
async def create_folder(
folder_data: FolderCreate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Create a new folder for the current user.
Supports creating subfolders by specifying parent_id.
Maximum nesting depth is 3 levels.
"""
# Validate parent folder if specified
if folder_data.parent_id:
parent_result = await db.execute(
select(UserFolder).where(
UserFolder.id == folder_data.parent_id,
UserFolder.user_id == current_user.id
)
)
parent = parent_result.scalar_one_or_none()
if not parent:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Parent folder not found"
)
# Check nesting depth (parent depth + 1 for new folder)
parent_depth = await get_folder_depth(db, folder_data.parent_id)
if parent_depth >= MAX_FOLDER_DEPTH:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Maximum folder nesting depth ({MAX_FOLDER_DEPTH} levels) exceeded"
)
# Check for duplicate name within same parent
existing_query = select(UserFolder).where(
UserFolder.user_id == current_user.id,
UserFolder.name == folder_data.name,
UserFolder.parent_id == folder_data.parent_id
)
existing = await db.execute(existing_query)
if existing.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"A folder named '{folder_data.name}' already exists at this level"
)
# Get next display order for this parent level
order_query = select(func.max(UserFolder.display_order)).where(
UserFolder.user_id == current_user.id,
UserFolder.parent_id == folder_data.parent_id
)
order_result = await db.execute(order_query)
max_order = order_result.scalar() or 0
new_folder = UserFolder(
user_id=current_user.id,
name=folder_data.name,
color=folder_data.color,
icon=folder_data.icon,
parent_id=folder_data.parent_id,
display_order=max_order + 1
)
db.add(new_folder)
await db.commit()
await db.refresh(new_folder)
return FolderResponse(
id=new_folder.id,
name=new_folder.name,
color=new_folder.color,
icon=new_folder.icon,
parent_id=new_folder.parent_id,
display_order=new_folder.display_order,
tree_count=0,
created_at=new_folder.created_at,
updated_at=new_folder.updated_at
)
@router.put("/{folder_id}", response_model=FolderResponse)
async def update_folder(
folder_id: UUID,
folder_data: FolderUpdate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Update a folder.
Supports moving folders by changing parent_id.
Validates to prevent cycles and excessive nesting.
"""
result = await db.execute(
select(UserFolder)
.options(selectinload(UserFolder.trees))
.where(UserFolder.id == folder_id)
)
folder = result.scalar_one_or_none()
if not folder:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Folder not found"
)
if folder.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to update this folder"
)
update_data = folder_data.model_dump(exclude_unset=True)
# Handle parent_id change (moving folder)
if "parent_id" in update_data:
new_parent_id = update_data["parent_id"]
# Only validate if actually changing parent
if new_parent_id != folder.parent_id:
if new_parent_id is not None:
# Can't be its own parent
if new_parent_id == folder_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="A folder cannot be its own parent"
)
# Check parent exists and belongs to user
parent_result = await db.execute(
select(UserFolder).where(
UserFolder.id == new_parent_id,
UserFolder.user_id == current_user.id
)
)
if not parent_result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Parent folder not found"
)
# Prevent cycles - new parent can't be a descendant of this folder
if await is_descendant(db, new_parent_id, folder_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot move folder into its own descendant"
)
# Check nesting depth after move
parent_depth = await get_folder_depth(db, new_parent_id)
if parent_depth >= MAX_FOLDER_DEPTH:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Maximum folder nesting depth ({MAX_FOLDER_DEPTH} levels) exceeded"
)
# Check for duplicate name if changing name or parent
new_name = update_data.get("name", folder.name)
new_parent_id = update_data.get("parent_id", folder.parent_id)
if new_name != folder.name or ("parent_id" in update_data and new_parent_id != folder.parent_id):
existing_query = select(UserFolder).where(
UserFolder.user_id == current_user.id,
UserFolder.name == new_name,
UserFolder.parent_id == new_parent_id,
UserFolder.id != folder_id
)
existing = await db.execute(existing_query)
if existing.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"A folder named '{new_name}' already exists at this level"
)
for field, value in update_data.items():
setattr(folder, field, value)
await db.commit()
await db.refresh(folder)
return FolderResponse(
id=folder.id,
name=folder.name,
color=folder.color,
icon=folder.icon,
parent_id=folder.parent_id,
display_order=folder.display_order,
tree_count=folder.tree_count,
created_at=folder.created_at,
updated_at=folder.updated_at
)
@router.delete("/{folder_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_folder(
folder_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Delete a folder.
This only removes the folder, not the trees in it.
"""
result = await db.execute(
select(UserFolder).where(UserFolder.id == folder_id)
)
folder = result.scalar_one_or_none()
if not folder:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Folder not found"
)
if folder.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to delete this folder"
)
await db.delete(folder)
await db.commit()
return None
@router.post("/reorder", status_code=status.HTTP_204_NO_CONTENT)
async def reorder_folders(
reorder_data: FolderReorderRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Reorder folders by providing folder IDs in desired order."""
# Get all user's folders
result = await db.execute(
select(UserFolder).where(UserFolder.user_id == current_user.id)
)
folders = {f.id: f for f in result.scalars().all()}
# Verify all provided folder IDs belong to user
for folder_id in reorder_data.folder_ids:
if folder_id not in folders:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Folder {folder_id} not found or doesn't belong to you"
)
# Update display orders
for order, folder_id in enumerate(reorder_data.folder_ids):
folders[folder_id].display_order = order
await db.commit()
return None
@router.post("/{folder_id}/trees", status_code=status.HTTP_201_CREATED)
async def add_tree_to_folder(
folder_id: UUID,
request: FolderTreeRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Add a tree to a folder."""
# Get folder with trees
folder_result = await db.execute(
select(UserFolder)
.options(selectinload(UserFolder.trees))
.where(UserFolder.id == folder_id)
)
folder = folder_result.scalar_one_or_none()
if not folder:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Folder not found"
)
if folder.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to modify this folder"
)
# Get tree
tree_result = await db.execute(
select(Tree).where(Tree.id == request.tree_id)
)
tree = tree_result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
if not can_access_tree(current_user, tree):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this tree"
)
# Check if already in folder
if tree in folder.trees:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Tree is already in this folder"
)
# Add tree to folder
folder.trees.append(tree)
await db.commit()
return {"message": "Tree added to folder"}
@router.delete("/{folder_id}/trees/{tree_id}", status_code=status.HTTP_204_NO_CONTENT)
async def remove_tree_from_folder(
folder_id: UUID,
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Remove a tree from a folder."""
# Get folder with trees
folder_result = await db.execute(
select(UserFolder)
.options(selectinload(UserFolder.trees))
.where(UserFolder.id == folder_id)
)
folder = folder_result.scalar_one_or_none()
if not folder:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Folder not found"
)
if folder.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to modify this folder"
)
# Find tree in folder
tree_to_remove = None
for tree in folder.trees:
if tree.id == tree_id:
tree_to_remove = tree
break
if not tree_to_remove:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found in this folder"
)
folder.trees.remove(tree_to_remove)
await db.commit()
return None
@router.get("/{folder_id}/trees", response_model=list[UUID])
async def get_folder_tree_ids(
folder_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Get all tree IDs in a folder.
Returns just the IDs for lightweight checking.
Use the trees endpoint with folder_id filter for full tree data.
"""
# Get folder with trees
folder_result = await db.execute(
select(UserFolder)
.options(selectinload(UserFolder.trees))
.where(UserFolder.id == folder_id)
)
folder = folder_result.scalar_one_or_none()
if not folder:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Folder not found"
)
if folder.user_id != current_user.id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this folder"
)
return [tree.id for tree in folder.trees]

View File

@@ -0,0 +1,437 @@
from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, or_, delete
from sqlalchemy.orm import selectinload
from app.core.database import get_db
from app.models.tag import TreeTag, tree_tag_assignments
from app.models.tree import Tree
from app.models.user import User
from app.schemas.tag import TagCreate, TagResponse, TagListResponse, TagAssignment
from app.api.deps import get_current_user
router = APIRouter(prefix="/tags", tags=["tags"])
def can_manage_tree_tags(user: User, tree: Tree) -> bool:
"""Check if user can manage tags on a tree.
Allowed:
- Tree author
- Global admins
- Team admins for their team's trees
"""
if user.id == tree.author_id:
return True
if user.role == "admin":
return True
if user.is_team_admin and tree.team_id == user.team_id:
return True
return False
def can_create_tag(user: User, team_id: Optional[UUID]) -> bool:
"""Check if user can create a tag for the given scope.
- Global admins can create global tags (team_id=None)
- Team admins and global admins can create team-specific tags
- Regular users can create team tags for their own team
"""
if user.role == "admin":
return True
# For team-specific tags, user must belong to that team
if team_id is not None and team_id == user.team_id:
return True
return False
@router.get("", response_model=list[TagListResponse])
async def list_tags(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
include_team: bool = Query(True, description="Include team-specific tags")
):
"""List tags visible to the user.
Returns global tags plus team-specific tags for the user's team.
Tags are ordered by usage count (most used first).
"""
query = select(TreeTag)
# Filter by visibility: global OR user's team
if include_team and current_user.team_id:
query = query.where(
or_(
TreeTag.team_id.is_(None), # Global
TreeTag.team_id == current_user.team_id # User's team
)
)
else:
# Only show global tags
query = query.where(TreeTag.team_id.is_(None))
query = query.order_by(TreeTag.usage_count.desc(), TreeTag.name)
result = await db.execute(query)
tags = result.scalars().all()
return [TagListResponse.model_validate(tag) for tag in tags]
@router.get("/search", response_model=list[TagListResponse])
async def search_tags(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
q: str = Query(..., min_length=1, description="Search query"),
limit: int = Query(10, ge=1, le=50),
include_team: bool = Query(True, description="Include team-specific tags")
):
"""Search/autocomplete tags.
Searches tag names for the query string.
Returns matching tags ordered by usage count.
"""
query = select(TreeTag).where(
TreeTag.name.ilike(f"%{q}%")
)
# Filter by visibility
if include_team and current_user.team_id:
query = query.where(
or_(
TreeTag.team_id.is_(None),
TreeTag.team_id == current_user.team_id
)
)
else:
query = query.where(TreeTag.team_id.is_(None))
query = query.order_by(TreeTag.usage_count.desc(), TreeTag.name).limit(limit)
result = await db.execute(query)
tags = result.scalars().all()
return [TagListResponse.model_validate(tag) for tag in tags]
@router.get("/{tag_id}", response_model=TagResponse)
async def get_tag(
tag_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Get a specific tag by ID."""
result = await db.execute(select(TreeTag).where(TreeTag.id == tag_id))
tag = result.scalar_one_or_none()
if not tag:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tag not found"
)
# Check access: global tags visible to all, team tags only to team members
if tag.team_id and tag.team_id != current_user.team_id and current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this tag"
)
return TagResponse.model_validate(tag)
@router.post("", response_model=TagResponse, status_code=status.HTTP_201_CREATED)
async def create_tag(
tag_data: TagCreate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Create a new tag.
- Global admins can create global tags (team_id=None)
- Team members can create team-specific tags for their team
"""
if not can_create_tag(current_user, tag_data.team_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to create this tag"
)
# Generate slug
slug = TreeTag.slugify(tag_data.name)
# Check for duplicate slug within same scope (global or team)
existing_query = select(TreeTag).where(
TreeTag.slug == slug,
TreeTag.team_id == tag_data.team_id
)
existing = await db.execute(existing_query)
if existing.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"A tag with slug '{slug}' already exists"
)
new_tag = TreeTag(
name=tag_data.name,
slug=slug,
team_id=tag_data.team_id,
created_by=current_user.id
)
db.add(new_tag)
await db.commit()
await db.refresh(new_tag)
return TagResponse.model_validate(new_tag)
@router.post("/trees/{tree_id}", response_model=list[TagListResponse])
async def add_tags_to_tree(
tree_id: UUID,
tag_data: TagAssignment,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Add tags to a tree.
If a tag doesn't exist, it will be created as a team tag (or global for admins).
Returns the updated list of tags on the tree.
"""
# Get tree with tags
result = await db.execute(
select(Tree)
.options(selectinload(Tree.tags))
.where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
if not can_manage_tree_tags(current_user, tree):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to manage tags on this tree"
)
# Process each tag name
existing_tag_slugs = {tag.slug for tag in tree.tags}
for tag_name in tag_data.tags:
slug = TreeTag.slugify(tag_name)
# Skip if already assigned
if slug in existing_tag_slugs:
continue
# Try to find existing tag
# Determine scope: use tree's team, or global for admin-owned trees
tag_team_id = tree.team_id or (current_user.team_id if current_user.role != "admin" else None)
tag_query = select(TreeTag).where(
TreeTag.slug == slug,
or_(
TreeTag.team_id.is_(None), # Global tag
TreeTag.team_id == tag_team_id # Team tag
)
)
tag_result = await db.execute(tag_query)
tag = tag_result.scalar_one_or_none()
if not tag:
# Create new tag - prefer team scope unless admin creating on public tree
new_team_id = tag_team_id
if not can_create_tag(current_user, new_team_id):
# Fall back to user's team if they can't create in tree's scope
new_team_id = current_user.team_id
tag = TreeTag(
name=tag_name,
slug=slug,
team_id=new_team_id,
created_by=current_user.id
)
db.add(tag)
await db.flush() # Get the ID
# Add tag to tree
tree.tags.append(tag)
tag.usage_count += 1
existing_tag_slugs.add(slug)
await db.commit()
await db.refresh(tree)
# Return updated tags
return [TagListResponse.model_validate(tag) for tag in tree.tags]
@router.delete("/trees/{tree_id}/{tag_slug}", status_code=status.HTTP_204_NO_CONTENT)
async def remove_tag_from_tree(
tree_id: UUID,
tag_slug: str,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Remove a tag from a tree."""
# Get tree with tags
result = await db.execute(
select(Tree)
.options(selectinload(Tree.tags))
.where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
if not can_manage_tree_tags(current_user, tree):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to manage tags on this tree"
)
# Find the tag to remove
tag_to_remove = None
for tag in tree.tags:
if tag.slug == tag_slug:
tag_to_remove = tag
break
if not tag_to_remove:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tag not found on this tree"
)
# Remove the tag from tree
tree.tags.remove(tag_to_remove)
tag_to_remove.usage_count = max(0, tag_to_remove.usage_count - 1)
await db.commit()
return None
@router.put("/trees/{tree_id}", response_model=list[TagListResponse])
async def replace_tree_tags(
tree_id: UUID,
tag_data: TagAssignment,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Replace all tags on a tree.
Removes all existing tags and assigns the new list.
If a tag doesn't exist, it will be created.
Returns the updated list of tags on the tree.
"""
# Get tree with tags
result = await db.execute(
select(Tree)
.options(selectinload(Tree.tags))
.where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
if not can_manage_tree_tags(current_user, tree):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to manage tags on this tree"
)
# Decrement usage count for all existing tags
for tag in tree.tags:
tag.usage_count = max(0, tag.usage_count - 1)
# Clear all existing tags
tree.tags.clear()
# Add new tags
tag_team_id = tree.team_id or (current_user.team_id if current_user.role != "admin" else None)
for tag_name in tag_data.tags:
slug = TreeTag.slugify(tag_name)
# Try to find existing tag
tag_query = select(TreeTag).where(
TreeTag.slug == slug,
or_(
TreeTag.team_id.is_(None),
TreeTag.team_id == tag_team_id
)
)
tag_result = await db.execute(tag_query)
tag = tag_result.scalar_one_or_none()
if not tag:
# Create new tag
new_team_id = tag_team_id
if not can_create_tag(current_user, new_team_id):
new_team_id = current_user.team_id
tag = TreeTag(
name=tag_name,
slug=slug,
team_id=new_team_id,
created_by=current_user.id
)
db.add(tag)
await db.flush()
# Add tag to tree
if tag not in tree.tags: # Avoid duplicates from input
tree.tags.append(tag)
tag.usage_count += 1
await db.commit()
await db.refresh(tree)
return [TagListResponse.model_validate(tag) for tag in tree.tags]
@router.get("/trees/{tree_id}", response_model=list[TagListResponse])
async def get_tree_tags(
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""Get all tags assigned to a tree."""
# Get tree with tags
result = await db.execute(
select(Tree)
.options(selectinload(Tree.tags))
.where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
# Check if user can view the tree
if not tree.is_public:
if tree.author_id != current_user.id:
if tree.team_id != current_user.team_id:
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this tree"
)
return [TagListResponse.model_validate(tag) for tag in tree.tags]

View File

@@ -3,53 +3,173 @@ from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, or_
from sqlalchemy.orm import selectinload
from app.core.database import get_db
from app.models.tree import Tree
from app.models.user import User
from app.schemas.tree import TreeCreate, TreeUpdate, TreeResponse, TreeListResponse
from app.models.category import TreeCategory
from app.models.tag import TreeTag
from app.models.folder import UserFolder
from app.schemas.tree import TreeCreate, TreeUpdate, TreeResponse, TreeListResponse, CategoryInfo
from app.api.deps import get_current_user, require_engineer_or_admin, require_admin
router = APIRouter(prefix="/trees", tags=["trees"])
def build_tree_access_filter(current_user: User):
"""Build the access filter for trees based on user permissions.
Returns trees that are:
- Default/system trees (visible to all)
- Public trees
- User's own trees
- Trees from user's team
"""
return or_(
Tree.is_default == True,
Tree.is_public == True,
Tree.author_id == current_user.id,
Tree.team_id == current_user.team_id if current_user.team_id else False
)
def build_tree_response(tree: Tree) -> TreeListResponse:
"""Build TreeListResponse with category_info and tags."""
category_info = None
if tree.category_rel:
category_info = CategoryInfo(
id=tree.category_rel.id,
name=tree.category_rel.name,
slug=tree.category_rel.slug
)
return TreeListResponse(
id=tree.id,
name=tree.name,
description=tree.description,
category=tree.category,
category_id=tree.category_id,
category_info=category_info,
tags=tree.tag_names,
author_id=tree.author_id,
is_active=tree.is_active,
is_public=tree.is_public,
is_default=tree.is_default,
version=tree.version,
usage_count=tree.usage_count,
created_at=tree.created_at,
updated_at=tree.updated_at
)
def build_full_tree_response(tree: Tree) -> TreeResponse:
"""Build TreeResponse with all details including category_info and tags."""
category_info = None
if tree.category_rel:
category_info = CategoryInfo(
id=tree.category_rel.id,
name=tree.category_rel.name,
slug=tree.category_rel.slug
)
return TreeResponse(
id=tree.id,
name=tree.name,
description=tree.description,
category=tree.category,
category_id=tree.category_id,
category_info=category_info,
tags=tree.tag_names,
tree_structure=tree.tree_structure,
author_id=tree.author_id,
team_id=tree.team_id,
is_active=tree.is_active,
is_public=tree.is_public,
is_default=tree.is_default,
version=tree.version,
usage_count=tree.usage_count,
created_at=tree.created_at,
updated_at=tree.updated_at
)
@router.get("", response_model=list[TreeListResponse])
async def list_trees(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)],
category: Optional[str] = Query(None, description="Filter by category"),
category: Optional[str] = Query(None, description="Filter by legacy category string"),
category_id: Optional[UUID] = Query(None, description="Filter by category ID"),
tags: Optional[str] = Query(None, description="Comma-separated tag slugs to filter by"),
folder_id: Optional[UUID] = Query(None, description="Filter by folder ID (user's folders only)"),
is_active: Optional[bool] = Query(None, description="Filter by active status"),
author_id: Optional[UUID] = Query(None, description="Filter by author ID"),
is_public: Optional[bool] = Query(None, description="Filter by public status"),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100)
):
"""List all trees with optional filters."""
query = select(Tree)
"""List all trees with optional filters.
New filters:
- category_id: Filter by category (from tree_categories table)
- tags: Comma-separated tag slugs (e.g., "citrix,networking")
- folder_id: Show only trees in a specific folder
"""
query = select(Tree).options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
)
# Apply filters
if category:
query = query.where(Tree.category == category)
if category_id:
query = query.where(Tree.category_id == category_id)
if is_active is not None:
query = query.where(Tree.is_active == is_active)
else:
# Default to only showing active trees
query = query.where(Tree.is_active == True)
if author_id:
query = query.where(Tree.author_id == author_id)
if is_public is not None:
query = query.where(Tree.is_public == is_public)
# Only show trees user has access to:
# - Default/system trees (visible to all)
# - Public trees
# - User's own trees (public or private)
query = query.where(
Tree.is_active == True,
or_(
Tree.is_default == True,
Tree.is_public == True,
Tree.author_id == current_user.id
# Filter by tags (all specified tags must be present)
if tags:
tag_slugs = [t.strip() for t in tags.split(",") if t.strip()]
for tag_slug in tag_slugs:
query = query.where(
Tree.tags.any(TreeTag.slug == tag_slug)
)
# Filter by folder
if folder_id:
# Verify folder belongs to user
folder_result = await db.execute(
select(UserFolder).where(
UserFolder.id == folder_id,
UserFolder.user_id == current_user.id
)
)
)
folder = folder_result.scalar_one_or_none()
if not folder:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Folder not found"
)
query = query.where(Tree.folders.any(UserFolder.id == folder_id))
# Apply access filter
query = query.where(build_tree_access_filter(current_user))
query = query.order_by(Tree.usage_count.desc(), Tree.updated_at.desc())
query = query.offset(skip).limit(limit)
result = await db.execute(query)
trees = result.scalars().all()
return trees
trees = result.scalars().unique().all()
return [build_tree_response(tree) for tree in trees]
@router.get("/categories", response_model=list[str])
@@ -57,15 +177,15 @@ async def list_categories(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_user)]
):
"""List all unique categories from trees the user can access."""
"""List all unique categories from trees the user can access.
Note: This returns legacy string categories. For the new category system,
use the /categories endpoint.
"""
query = select(Tree.category).where(
Tree.category.isnot(None),
Tree.is_active == True,
or_(
Tree.is_default == True,
Tree.is_public == True,
Tree.author_id == current_user.id
)
build_tree_access_filter(current_user)
).distinct()
result = await db.execute(query)
categories = [row[0] for row in result.all() if row[0]]
@@ -84,21 +204,21 @@ async def search_trees(
search_vector = func.to_tsvector('english', func.coalesce(Tree.name, '') + ' ' + func.coalesce(Tree.description, ''))
search_query = func.plainto_tsquery('english', q)
query = select(Tree).where(
query = select(Tree).options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
).where(
Tree.is_active == True,
or_(
Tree.is_default == True,
Tree.is_public == True,
Tree.author_id == current_user.id
),
build_tree_access_filter(current_user),
search_vector.op('@@')(search_query)
).order_by(
func.ts_rank(search_vector, search_query).desc()
).limit(limit)
result = await db.execute(query)
trees = result.scalars().all()
return trees
trees = result.scalars().unique().all()
return [build_tree_response(tree) for tree in trees]
@router.get("/{tree_id}", response_model=TreeResponse)
@@ -108,7 +228,14 @@ async def get_tree(
current_user: Annotated[User, Depends(get_current_user)]
):
"""Get a specific tree by ID."""
result = await db.execute(select(Tree).where(Tree.id == tree_id))
result = await db.execute(
select(Tree)
.options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
)
.where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
@@ -117,15 +244,21 @@ async def get_tree(
detail="Tree not found"
)
# Check access: tree must be active AND (default OR public OR author)
can_access = tree.is_default or tree.is_public or tree.author_id == current_user.id
# Check access: tree must be active AND (default OR public OR author OR same team)
can_access = (
tree.is_default or
tree.is_public or
tree.author_id == current_user.id or
(tree.team_id == current_user.team_id and current_user.team_id is not None) or
current_user.role == "admin"
)
if not tree.is_active or not can_access:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this tree"
)
return tree
return build_full_tree_response(tree)
@router.post("", response_model=TreeResponse, status_code=status.HTTP_201_CREATED)
@@ -134,14 +267,38 @@ async def create_tree(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_engineer_or_admin)]
):
"""Create a new tree (engineers and admins only)."""
"""Create a new tree (engineers and admins only).
Supports:
- category_id: Assign to a category from tree_categories
- tags: List of tag names to assign (creates new tags if needed)
"""
# Only admins can create default/system trees
is_default = tree_data.is_default and current_user.role == "admin"
# Verify category exists if provided
if tree_data.category_id:
cat_result = await db.execute(
select(TreeCategory).where(TreeCategory.id == tree_data.category_id)
)
category = cat_result.scalar_one_or_none()
if not category:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Category not found"
)
# Check category access
if category.team_id and category.team_id != current_user.team_id and current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this category"
)
new_tree = Tree(
name=tree_data.name,
description=tree_data.description,
category=tree_data.category,
category_id=tree_data.category_id,
tree_structure=tree_data.tree_structure,
author_id=None if is_default else current_user.id, # Default trees have no author
team_id=None if is_default else current_user.team_id,
@@ -149,9 +306,67 @@ async def create_tree(
is_default=is_default
)
db.add(new_tree)
await db.flush() # Get the ID
# Handle tags
if tree_data.tags:
tree_team_id = new_tree.team_id or (current_user.team_id if current_user.role != "admin" else None)
# Collect tags to add
tags_to_add = []
for tag_name in tree_data.tags:
slug = TreeTag.slugify(tag_name)
# Try to find existing tag
tag_query = select(TreeTag).where(
TreeTag.slug == slug,
or_(
TreeTag.team_id.is_(None),
TreeTag.team_id == tree_team_id
)
)
tag_result = await db.execute(tag_query)
tag = tag_result.scalar_one_or_none()
if not tag:
# Create new tag
tag = TreeTag(
name=tag_name,
slug=slug,
team_id=tree_team_id,
created_by=current_user.id
)
db.add(tag)
await db.flush()
tags_to_add.append(tag)
tag.usage_count += 1
# Use direct SQL insert for the junction table to avoid lazy load issues
from app.models.tag import tree_tag_assignments
for tag in tags_to_add:
await db.execute(
tree_tag_assignments.insert().values(
tree_id=new_tree.id,
tag_id=tag.id,
assigned_by=current_user.id
)
)
await db.commit()
await db.refresh(new_tree)
return new_tree
# Reload with relationships
result = await db.execute(
select(Tree)
.options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
)
.where(Tree.id == new_tree.id)
)
tree = result.scalar_one()
return build_full_tree_response(tree)
@router.put("/{tree_id}", response_model=TreeResponse)
@@ -161,8 +376,20 @@ async def update_tree(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_engineer_or_admin)]
):
"""Update an existing tree (engineers and admins only)."""
result = await db.execute(select(Tree).where(Tree.id == tree_id))
"""Update an existing tree (engineers and admins only).
Supports:
- category_id: Change category assignment
- tags: Replace all tags on the tree
"""
result = await db.execute(
select(Tree)
.options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
)
.where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
@@ -171,15 +398,40 @@ async def update_tree(
detail="Tree not found"
)
# Check if user can edit: must be author or admin
if tree.author_id != current_user.id and current_user.role != "admin":
# Check if user can edit: must be author, team admin for team trees, or global admin
can_edit = (
tree.author_id == current_user.id or
current_user.role == "admin" or
(current_user.is_team_admin and tree.team_id == current_user.team_id)
)
if not can_edit:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only edit your own trees"
)
# Update fields
# Extract tags for separate handling
update_data = tree_data.model_dump(exclude_unset=True)
tags_data = update_data.pop("tags", None)
# Verify new category if provided
if "category_id" in update_data and update_data["category_id"]:
cat_result = await db.execute(
select(TreeCategory).where(TreeCategory.id == update_data["category_id"])
)
category = cat_result.scalar_one_or_none()
if not category:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Category not found"
)
if category.team_id and category.team_id != current_user.team_id and current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this category"
)
# Update basic fields
for field, value in update_data.items():
setattr(tree, field, value)
@@ -187,9 +439,73 @@ async def update_tree(
if "tree_structure" in update_data:
tree.version += 1
# Handle tags replacement
if tags_data is not None:
from app.models.tag import tree_tag_assignments
# Decrement usage count for old tags (already eagerly loaded)
for tag in tree.tags:
tag.usage_count = max(0, tag.usage_count - 1)
# Delete existing tag assignments using direct SQL
await db.execute(
tree_tag_assignments.delete().where(
tree_tag_assignments.c.tree_id == tree.id
)
)
# Add new tags
tree_team_id = tree.team_id or (current_user.team_id if current_user.role != "admin" else None)
added_tag_ids = set()
for tag_name in tags_data:
slug = TreeTag.slugify(tag_name)
tag_query = select(TreeTag).where(
TreeTag.slug == slug,
or_(
TreeTag.team_id.is_(None),
TreeTag.team_id == tree_team_id
)
)
tag_result = await db.execute(tag_query)
tag = tag_result.scalar_one_or_none()
if not tag:
tag = TreeTag(
name=tag_name,
slug=slug,
team_id=tree_team_id,
created_by=current_user.id
)
db.add(tag)
await db.flush()
if tag.id not in added_tag_ids:
await db.execute(
tree_tag_assignments.insert().values(
tree_id=tree.id,
tag_id=tag.id,
assigned_by=current_user.id
)
)
added_tag_ids.add(tag.id)
tag.usage_count += 1
await db.commit()
await db.refresh(tree)
return tree
# Reload with relationships
result = await db.execute(
select(Tree)
.options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
)
.where(Tree.id == tree_id)
)
tree = result.scalar_one()
return build_full_tree_response(tree)
@router.delete("/{tree_id}", status_code=status.HTTP_204_NO_CONTENT)

View File

@@ -1,5 +1,5 @@
from fastapi import APIRouter
from app.api.endpoints import auth, trees, sessions, invite
from app.api.endpoints import auth, trees, sessions, invite, categories, tags, folders
api_router = APIRouter()
@@ -7,3 +7,6 @@ api_router.include_router(auth.router)
api_router.include_router(trees.router)
api_router.include_router(sessions.router)
api_router.include_router(invite.router)
api_router.include_router(categories.router)
api_router.include_router(tags.router)
api_router.include_router(folders.router)

View File

@@ -4,5 +4,20 @@ from .tree import Tree
from .session import Session
from .attachment import Attachment
from .invite_code import InviteCode
from .category import TreeCategory
from .tag import TreeTag, tree_tag_assignments
from .folder import UserFolder, user_folder_trees
__all__ = ["User", "Team", "Tree", "Session", "Attachment", "InviteCode"]
__all__ = [
"User",
"Team",
"Tree",
"Session",
"Attachment",
"InviteCode",
"TreeCategory",
"TreeTag",
"tree_tag_assignments",
"UserFolder",
"user_folder_trees",
]

View File

@@ -0,0 +1,66 @@
import uuid
from datetime import datetime, timezone
from typing import Optional, TYPE_CHECKING
from sqlalchemy import String, Text, DateTime, ForeignKey, Boolean, Integer, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from app.core.database import Base
if TYPE_CHECKING:
from app.models.tree import Tree
from app.models.team import Team
from app.models.user import User
class TreeCategory(Base):
"""Admin-managed categories for organizing trees.
Categories can be:
- Global (team_id=NULL): Created by Patherly admins, visible to all
- Team-specific (team_id set): Created by team admins, visible to team members
"""
__tablename__ = "tree_categories"
__table_args__ = (
UniqueConstraint('slug', 'team_id', name='uq_tree_categories_slug_team'),
)
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4
)
name: Mapped[str] = mapped_column(String(100), nullable=False)
slug: Mapped[str] = mapped_column(String(100), nullable=False, index=True)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
team_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("teams.id", ondelete="CASCADE"),
nullable=True,
index=True
)
display_order: Mapped[int] = mapped_column(Integer, nullable=False, default=0, index=True)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
created_by: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="SET NULL"),
nullable=True
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc)
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc)
)
# Relationships
team: Mapped[Optional["Team"]] = relationship("Team", back_populates="categories")
creator: Mapped[Optional["User"]] = relationship("User", foreign_keys=[created_by])
trees: Mapped[list["Tree"]] = relationship("Tree", back_populates="category_rel")
@property
def is_global(self) -> bool:
"""Returns True if this is a global category (not team-specific)."""
return self.team_id is None

View File

@@ -0,0 +1,93 @@
import uuid
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Optional
from sqlalchemy import String, DateTime, ForeignKey, Integer, UniqueConstraint, Table, Column
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from app.core.database import Base
if TYPE_CHECKING:
from app.models.tree import Tree
from app.models.user import User
# Junction table for folder-tree many-to-many relationship
user_folder_trees = Table(
'user_folder_trees',
Base.metadata,
Column('folder_id', UUID(as_uuid=True), ForeignKey('user_folders.id', ondelete='CASCADE'), primary_key=True),
Column('tree_id', UUID(as_uuid=True), ForeignKey('trees.id', ondelete='CASCADE'), primary_key=True),
Column('added_at', DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc)),
Column('display_order', Integer, nullable=False, default=0)
)
class UserFolder(Base):
"""User-specific folders for organizing trees.
- Each folder belongs to a single user
- Trees can be in multiple folders (like labels/tags)
- Folders are purely organizational - they don't affect tree visibility or permissions
"""
__tablename__ = "user_folders"
__table_args__ = (
# Allow same name under different parents
UniqueConstraint('user_id', 'name', 'parent_id', name='uq_user_folders_user_name_parent'),
)
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4
)
user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
index=True
)
name: Mapped[str] = mapped_column(String(100), nullable=False)
color: Mapped[str] = mapped_column(String(7), nullable=False, default="#6366f1")
icon: Mapped[str] = mapped_column(String(50), nullable=False, default="folder")
display_order: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
parent_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("user_folders.id", ondelete="CASCADE"),
nullable=True,
index=True
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc)
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc)
)
# Relationships
user: Mapped["User"] = relationship("User", back_populates="folders")
trees: Mapped[list["Tree"]] = relationship(
"Tree",
secondary=user_folder_trees,
back_populates="folders"
)
# Self-referential relationships for folder hierarchy
parent: Mapped[Optional["UserFolder"]] = relationship(
"UserFolder",
remote_side="UserFolder.id",
back_populates="children",
foreign_keys=[parent_id]
)
children: Mapped[list["UserFolder"]] = relationship(
"UserFolder",
back_populates="parent",
cascade="all, delete-orphan",
foreign_keys="UserFolder.parent_id"
)
@property
def tree_count(self) -> int:
"""Returns the number of trees in this folder."""
return len(self.trees) if self.trees else 0

86
backend/app/models/tag.py Normal file
View File

@@ -0,0 +1,86 @@
import uuid
from datetime import datetime, timezone
from typing import Optional, TYPE_CHECKING
from sqlalchemy import String, DateTime, ForeignKey, Integer, UniqueConstraint, Table, Column
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from app.core.database import Base
if TYPE_CHECKING:
from app.models.tree import Tree
from app.models.team import Team
from app.models.user import User
# Junction table for tree-tag many-to-many relationship
tree_tag_assignments = Table(
'tree_tag_assignments',
Base.metadata,
Column('tree_id', UUID(as_uuid=True), ForeignKey('trees.id', ondelete='CASCADE'), primary_key=True),
Column('tag_id', UUID(as_uuid=True), ForeignKey('tree_tags.id', ondelete='CASCADE'), primary_key=True),
Column('assigned_by', UUID(as_uuid=True), ForeignKey('users.id', ondelete='SET NULL'), nullable=True),
Column('assigned_at', DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc))
)
class TreeTag(Base):
"""Tags for categorizing and filtering trees.
Tags can be:
- Global (team_id=NULL): Available to all users
- Team-specific (team_id set): Only visible to team members
Tags are managed by tree authors and admins (global or team).
"""
__tablename__ = "tree_tags"
__table_args__ = (
UniqueConstraint('slug', 'team_id', name='uq_tree_tags_slug_team'),
)
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4
)
name: Mapped[str] = mapped_column(String(50), nullable=False)
slug: Mapped[str] = mapped_column(String(50), nullable=False, index=True)
team_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("teams.id", ondelete="CASCADE"),
nullable=True,
index=True
)
usage_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0, index=True)
created_by: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="SET NULL"),
nullable=True
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc)
)
# Relationships
team: Mapped[Optional["Team"]] = relationship("Team", back_populates="tags")
creator: Mapped[Optional["User"]] = relationship("User", foreign_keys=[created_by])
trees: Mapped[list["Tree"]] = relationship(
"Tree",
secondary=tree_tag_assignments,
back_populates="tags"
)
@property
def is_global(self) -> bool:
"""Returns True if this is a global tag (not team-specific)."""
return self.team_id is None
@classmethod
def slugify(cls, name: str) -> str:
"""Convert a tag name to a URL-safe slug."""
import re
# Remove non-alphanumeric chars except spaces, convert to lowercase
slug = re.sub(r'[^a-zA-Z0-9 ]', '', name.lower())
# Replace spaces with hyphens
slug = re.sub(r' +', '-', slug.strip())
return slug

View File

@@ -1,10 +1,17 @@
import uuid
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from sqlalchemy import String, DateTime
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from app.core.database import Base
if TYPE_CHECKING:
from app.models.user import User
from app.models.tree import Tree
from app.models.category import TreeCategory
from app.models.tag import TreeTag
class Team(Base):
__tablename__ = "teams"
@@ -23,3 +30,5 @@ class Team(Base):
# Relationships
users: Mapped[list["User"]] = relationship("User", back_populates="team")
trees: Mapped[list["Tree"]] = relationship("Tree", back_populates="team")
categories: Mapped[list["TreeCategory"]] = relationship("TreeCategory", back_populates="team")
tags: Mapped[list["TreeTag"]] = relationship("TreeTag", back_populates="team")

View File

@@ -1,11 +1,19 @@
import uuid
from datetime import datetime, timezone
from typing import Optional, Any
from typing import Optional, Any, TYPE_CHECKING
from sqlalchemy import String, Text, DateTime, ForeignKey, Boolean, Integer, Index
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
if TYPE_CHECKING:
from app.models.user import User
from app.models.team import Team
from app.models.session import Session
from app.models.category import TreeCategory
from app.models.tag import TreeTag
from app.models.folder import UserFolder
class Tree(Base):
__tablename__ = "trees"
@@ -17,7 +25,16 @@ class Tree(Base):
)
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
# Legacy category field - kept for backward compatibility
# New code should use category_id instead
category: Mapped[Optional[str]] = mapped_column(String(100), nullable=True, index=True)
# New category relationship
category_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("tree_categories.id", ondelete="SET NULL"),
nullable=True,
index=True
)
tree_structure: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)
author_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
@@ -50,4 +67,22 @@ class Tree(Base):
team: Mapped[Optional["Team"]] = relationship("Team", back_populates="trees")
sessions: Mapped[list["Session"]] = relationship("Session", back_populates="tree")
# New organization relationships
category_rel: Mapped[Optional["TreeCategory"]] = relationship("TreeCategory", back_populates="trees")
tags: Mapped[list["TreeTag"]] = relationship(
"TreeTag",
secondary="tree_tag_assignments",
back_populates="trees"
)
folders: Mapped[list["UserFolder"]] = relationship(
"UserFolder",
secondary="user_folder_trees",
back_populates="trees"
)
# Full-text search index will be created in migration
@property
def tag_names(self) -> list[str]:
"""Returns list of tag names for this tree."""
return [tag.name for tag in self.tags] if self.tags else []

View File

@@ -1,11 +1,17 @@
import uuid
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import String, DateTime, ForeignKey
from typing import Optional, TYPE_CHECKING
from sqlalchemy import String, DateTime, ForeignKey, Boolean
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from app.core.database import Base
if TYPE_CHECKING:
from app.models.team import Team
from app.models.tree import Tree
from app.models.session import Session
from app.models.folder import UserFolder
class User(Base):
__tablename__ = "users"
@@ -19,6 +25,7 @@ class User(Base):
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
name: Mapped[str] = mapped_column(String(255), nullable=False)
role: Mapped[str] = mapped_column(String(50), nullable=False, default="engineer")
is_team_admin: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
team_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("teams.id"),
@@ -38,4 +45,15 @@ class User(Base):
# Relationships
team: Mapped[Optional["Team"]] = relationship("Team", back_populates="users")
trees: Mapped[list["Tree"]] = relationship("Tree", back_populates="author")
sessions: Mapped[list["Session"]] = relationship("Session", back_populates="user")
sessions: Mapped[list["Session"]] = relationship("Session", back_populates="user")
folders: Mapped[list["UserFolder"]] = relationship("UserFolder", back_populates="user")
@property
def is_admin(self) -> bool:
"""Returns True if user is a global (Patherly) admin."""
return self.role == "admin"
@property
def can_manage_team(self) -> bool:
"""Returns True if user can manage their team (team admin or global admin)."""
return self.is_admin or (self.is_team_admin and self.team_id is not None)

View File

@@ -2,10 +2,23 @@ from .user import UserCreate, UserUpdate, UserResponse, UserLogin
from .token import Token, TokenPayload
from .tree import TreeCreate, TreeUpdate, TreeResponse, TreeListResponse
from .session import SessionCreate, SessionUpdate, SessionResponse, SessionExport, DecisionRecord
from .category import CategoryCreate, CategoryUpdate, CategoryResponse, CategoryListResponse
from .tag import TagCreate, TagResponse, TagListResponse, TagAssignment
from .folder import FolderCreate, FolderUpdate, FolderResponse, FolderListResponse, FolderReorderRequest, FolderTreeRequest
__all__ = [
# User
"UserCreate", "UserUpdate", "UserResponse", "UserLogin",
# Token
"Token", "TokenPayload",
# Tree
"TreeCreate", "TreeUpdate", "TreeResponse", "TreeListResponse",
"SessionCreate", "SessionUpdate", "SessionResponse", "SessionExport", "DecisionRecord"
# Session
"SessionCreate", "SessionUpdate", "SessionResponse", "SessionExport", "DecisionRecord",
# Category
"CategoryCreate", "CategoryUpdate", "CategoryResponse", "CategoryListResponse",
# Tag
"TagCreate", "TagResponse", "TagListResponse", "TagAssignment",
# Folder
"FolderCreate", "FolderUpdate", "FolderResponse", "FolderListResponse", "FolderReorderRequest", "FolderTreeRequest",
]

View File

@@ -0,0 +1,58 @@
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel, Field
import re
def slugify(name: str) -> str:
"""Convert a name to a URL-safe slug."""
# Remove non-alphanumeric chars except spaces, convert to lowercase
slug = re.sub(r'[^a-zA-Z0-9 ]', '', name.lower())
# Replace spaces with hyphens
slug = re.sub(r' +', '-', slug.strip())
return slug
class CategoryBase(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
description: Optional[str] = None
class CategoryCreate(CategoryBase):
team_id: Optional[UUID] = Field(None, description="Team ID for team-specific category. NULL for global.")
class CategoryUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=100)
description: Optional[str] = None
display_order: Optional[int] = None
is_active: Optional[bool] = None
class CategoryResponse(CategoryBase):
id: UUID
slug: str
team_id: Optional[UUID] = None
display_order: int
is_active: bool
created_at: datetime
updated_at: datetime
tree_count: int = 0 # Computed field
class Config:
from_attributes = True
class CategoryListResponse(BaseModel):
id: UUID
name: str
slug: str
description: Optional[str] = None
team_id: Optional[UUID] = None
display_order: int
is_active: bool
tree_count: int = 0
class Config:
from_attributes = True

View File

@@ -0,0 +1,62 @@
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel, Field
import re
# Valid hex color pattern
HEX_COLOR_PATTERN = r'^#[0-9A-Fa-f]{6}$'
class FolderBase(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
color: str = Field("#6366f1", pattern=HEX_COLOR_PATTERN)
icon: str = Field("folder", max_length=50)
class FolderCreate(FolderBase):
parent_id: Optional[UUID] = Field(None, description="Parent folder ID for creating subfolders")
class FolderUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=100)
color: Optional[str] = Field(None, pattern=HEX_COLOR_PATTERN)
icon: Optional[str] = Field(None, max_length=50)
display_order: Optional[int] = None
parent_id: Optional[UUID] = Field(None, description="Parent folder ID to move folder")
class FolderResponse(FolderBase):
id: UUID
parent_id: Optional[UUID] = None
display_order: int
tree_count: int = 0 # Computed field
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class FolderListResponse(BaseModel):
id: UUID
name: str
color: str
icon: str
parent_id: Optional[UUID] = None
display_order: int
tree_count: int = 0
class Config:
from_attributes = True
class FolderReorderRequest(BaseModel):
"""Request body for reordering folders."""
folder_ids: list[UUID] = Field(..., min_length=1, description="Folder IDs in desired order")
class FolderTreeRequest(BaseModel):
"""Request body for adding a tree to a folder."""
tree_id: UUID

View File

@@ -0,0 +1,56 @@
from datetime import datetime
from typing import Optional
from uuid import UUID
from pydantic import BaseModel, Field
import re
def slugify(name: str) -> str:
"""Convert a tag name to a URL-safe slug."""
# Remove non-alphanumeric chars except spaces, convert to lowercase
slug = re.sub(r'[^a-zA-Z0-9 ]', '', name.lower())
# Replace spaces with hyphens
slug = re.sub(r' +', '-', slug.strip())
return slug
class TagBase(BaseModel):
name: str = Field(..., min_length=1, max_length=50)
class TagCreate(TagBase):
team_id: Optional[UUID] = Field(None, description="Team ID for team-specific tag. NULL for global.")
class TagResponse(TagBase):
id: UUID
slug: str
team_id: Optional[UUID] = None
usage_count: int
created_at: datetime
class Config:
from_attributes = True
class TagListResponse(BaseModel):
id: UUID
name: str
slug: str
team_id: Optional[UUID] = None
usage_count: int
class Config:
from_attributes = True
class TagAssignment(BaseModel):
"""Request body for adding/removing tags from a tree."""
tags: list[str] = Field(..., min_length=1, max_length=10, description="List of tag names to assign")
class TagSearchParams(BaseModel):
"""Query parameters for tag search/autocomplete."""
q: str = Field(..., min_length=1, description="Search query")
limit: int = Field(10, ge=1, le=50)
include_team: bool = Field(True, description="Include team-specific tags")

View File

@@ -4,9 +4,20 @@ from uuid import UUID
from pydantic import BaseModel, Field
class CategoryInfo(BaseModel):
"""Embedded category info for tree responses."""
id: UUID
name: str
slug: str
class Config:
from_attributes = True
class TreeBase(BaseModel):
name: str = Field(..., min_length=1, max_length=255)
description: Optional[str] = None
# Legacy category field - kept for backward compatibility
category: Optional[str] = Field(None, max_length=100)
@@ -14,15 +25,19 @@ class TreeCreate(TreeBase):
tree_structure: dict[str, Any] = Field(..., description="The decision tree structure in JSON format")
is_public: bool = Field(False, description="Make tree visible to all users")
is_default: bool = Field(False, description="Mark as a default/system tree (admin only)")
category_id: Optional[UUID] = Field(None, description="Category ID from tree_categories table")
tags: Optional[list[str]] = Field(None, max_length=10, description="List of tag names to assign")
class TreeUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=255)
description: Optional[str] = None
category: Optional[str] = Field(None, max_length=100)
category_id: Optional[UUID] = None
tree_structure: Optional[dict[str, Any]] = None
is_public: Optional[bool] = None
is_active: Optional[bool] = None
tags: Optional[list[str]] = Field(None, max_length=10, description="List of tag names to assign (replaces existing)")
class TreeResponse(TreeBase):
@@ -30,6 +45,9 @@ class TreeResponse(TreeBase):
tree_structure: dict[str, Any]
author_id: Optional[UUID] = None
team_id: Optional[UUID] = None
category_id: Optional[UUID] = None
category_info: Optional[CategoryInfo] = None
tags: list[str] = [] # List of tag names
is_active: bool
is_public: bool
is_default: bool
@@ -47,6 +65,10 @@ class TreeListResponse(BaseModel):
name: str
description: Optional[str] = None
category: Optional[str] = None
category_id: Optional[UUID] = None
category_info: Optional[CategoryInfo] = None
tags: list[str] = [] # List of tag names
author_id: Optional[UUID] = None
is_active: bool
is_public: bool
is_default: bool

View File

@@ -28,6 +28,7 @@ class UserLogin(BaseModel):
class UserResponse(UserBase):
id: UUID
role: str
is_team_admin: bool = False
team_id: Optional[UUID] = None
created_at: datetime
last_login: Optional[datetime] = None