532 lines
17 KiB
Python
532 lines
17 KiB
Python
from typing import Annotated
|
|
from uuid import UUID
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, func
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.core.database import get_db
|
|
from app.models.folder import UserFolder, user_folder_trees
|
|
from app.models.tree import Tree
|
|
from app.models.user import User
|
|
from app.schemas.folder import (
|
|
FolderCreate,
|
|
FolderUpdate,
|
|
FolderResponse,
|
|
FolderListResponse,
|
|
FolderReorderRequest,
|
|
FolderTreeRequest
|
|
)
|
|
from app.api.deps import get_current_active_user
|
|
from app.core.permissions import can_access_tree
|
|
|
|
router = APIRouter(prefix="/folders", tags=["folders"])
|
|
|
|
# Maximum nesting depth for folders (root -> child -> grandchild = 3 levels)
|
|
MAX_FOLDER_DEPTH = 3
|
|
|
|
|
|
async def get_folder_depth(db: AsyncSession, folder_id: UUID, current_depth: int = 1) -> int:
|
|
"""Calculate the depth of a folder in the hierarchy.
|
|
|
|
A root folder has depth 1, its child has depth 2, etc.
|
|
"""
|
|
result = await db.execute(
|
|
select(UserFolder.parent_id).where(UserFolder.id == folder_id)
|
|
)
|
|
parent_id = result.scalar_one_or_none()
|
|
|
|
if parent_id is None:
|
|
return current_depth
|
|
return await get_folder_depth(db, parent_id, current_depth + 1)
|
|
|
|
|
|
async def is_descendant(db: AsyncSession, potential_descendant_id: UUID, ancestor_id: UUID) -> bool:
|
|
"""Check if potential_descendant_id is a descendant of ancestor_id.
|
|
|
|
Used to prevent cycles when moving folders.
|
|
"""
|
|
current_id = potential_descendant_id
|
|
visited = set()
|
|
|
|
while current_id:
|
|
if current_id in visited:
|
|
return False # Cycle detected, shouldn't happen but be safe
|
|
if current_id == ancestor_id:
|
|
return True
|
|
visited.add(current_id)
|
|
|
|
result = await db.execute(
|
|
select(UserFolder.parent_id).where(UserFolder.id == current_id)
|
|
)
|
|
current_id = result.scalar_one_or_none()
|
|
|
|
return False
|
|
|
|
|
|
@router.get("", response_model=list[FolderListResponse])
|
|
async def list_folders(
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
):
|
|
"""List all folders for the current user.
|
|
|
|
Returns folders ordered by display_order.
|
|
"""
|
|
query = (
|
|
select(UserFolder)
|
|
.options(selectinload(UserFolder.trees))
|
|
.where(UserFolder.user_id == current_user.id)
|
|
.order_by(UserFolder.display_order, UserFolder.name)
|
|
)
|
|
|
|
result = await db.execute(query)
|
|
folders = result.scalars().all()
|
|
|
|
return [
|
|
FolderListResponse(
|
|
id=folder.id,
|
|
name=folder.name,
|
|
color=folder.color,
|
|
icon=folder.icon,
|
|
parent_id=folder.parent_id,
|
|
display_order=folder.display_order,
|
|
tree_count=folder.tree_count
|
|
)
|
|
for folder in folders
|
|
]
|
|
|
|
|
|
@router.get("/{folder_id}", response_model=FolderResponse)
|
|
async def get_folder(
|
|
folder_id: UUID,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
):
|
|
"""Get a specific folder by ID."""
|
|
result = await db.execute(
|
|
select(UserFolder)
|
|
.options(selectinload(UserFolder.trees))
|
|
.where(UserFolder.id == folder_id)
|
|
)
|
|
folder = result.scalar_one_or_none()
|
|
|
|
if not folder:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Folder not found"
|
|
)
|
|
|
|
# Folders are private to their owner
|
|
if folder.user_id != current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have access to this folder"
|
|
)
|
|
|
|
return FolderResponse(
|
|
id=folder.id,
|
|
name=folder.name,
|
|
color=folder.color,
|
|
icon=folder.icon,
|
|
parent_id=folder.parent_id,
|
|
display_order=folder.display_order,
|
|
tree_count=folder.tree_count,
|
|
created_at=folder.created_at,
|
|
updated_at=folder.updated_at
|
|
)
|
|
|
|
|
|
@router.post("", response_model=FolderResponse, status_code=status.HTTP_201_CREATED)
|
|
async def create_folder(
|
|
folder_data: FolderCreate,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
):
|
|
"""Create a new folder for the current user.
|
|
|
|
Supports creating subfolders by specifying parent_id.
|
|
Maximum nesting depth is 3 levels.
|
|
"""
|
|
# Validate parent folder if specified
|
|
if folder_data.parent_id:
|
|
parent_result = await db.execute(
|
|
select(UserFolder).where(
|
|
UserFolder.id == folder_data.parent_id,
|
|
UserFolder.user_id == current_user.id
|
|
)
|
|
)
|
|
parent = parent_result.scalar_one_or_none()
|
|
if not parent:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Parent folder not found"
|
|
)
|
|
|
|
# Check nesting depth (parent depth + 1 for new folder)
|
|
parent_depth = await get_folder_depth(db, folder_data.parent_id)
|
|
if parent_depth >= MAX_FOLDER_DEPTH:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Maximum folder nesting depth ({MAX_FOLDER_DEPTH} levels) exceeded"
|
|
)
|
|
|
|
# Check for duplicate name within same parent
|
|
existing_query = select(UserFolder).where(
|
|
UserFolder.user_id == current_user.id,
|
|
UserFolder.name == folder_data.name,
|
|
UserFolder.parent_id == folder_data.parent_id
|
|
)
|
|
existing = await db.execute(existing_query)
|
|
if existing.scalar_one_or_none():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail=f"A folder named '{folder_data.name}' already exists at this level"
|
|
)
|
|
|
|
# Get next display order for this parent level
|
|
order_query = select(func.max(UserFolder.display_order)).where(
|
|
UserFolder.user_id == current_user.id,
|
|
UserFolder.parent_id == folder_data.parent_id
|
|
)
|
|
order_result = await db.execute(order_query)
|
|
max_order = order_result.scalar() or 0
|
|
|
|
new_folder = UserFolder(
|
|
user_id=current_user.id,
|
|
account_id=current_user.account_id,
|
|
name=folder_data.name,
|
|
color=folder_data.color,
|
|
icon=folder_data.icon,
|
|
parent_id=folder_data.parent_id,
|
|
display_order=max_order + 1
|
|
)
|
|
db.add(new_folder)
|
|
await db.commit()
|
|
await db.refresh(new_folder)
|
|
|
|
return FolderResponse(
|
|
id=new_folder.id,
|
|
name=new_folder.name,
|
|
color=new_folder.color,
|
|
icon=new_folder.icon,
|
|
parent_id=new_folder.parent_id,
|
|
display_order=new_folder.display_order,
|
|
tree_count=0,
|
|
created_at=new_folder.created_at,
|
|
updated_at=new_folder.updated_at
|
|
)
|
|
|
|
|
|
@router.put("/{folder_id}", response_model=FolderResponse)
|
|
async def update_folder(
|
|
folder_id: UUID,
|
|
folder_data: FolderUpdate,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
):
|
|
"""Update a folder.
|
|
|
|
Supports moving folders by changing parent_id.
|
|
Validates to prevent cycles and excessive nesting.
|
|
"""
|
|
result = await db.execute(
|
|
select(UserFolder)
|
|
.options(selectinload(UserFolder.trees))
|
|
.where(UserFolder.id == folder_id)
|
|
)
|
|
folder = result.scalar_one_or_none()
|
|
|
|
if not folder:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Folder not found"
|
|
)
|
|
|
|
if folder.user_id != current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have permission to update this folder"
|
|
)
|
|
|
|
update_data = folder_data.model_dump(exclude_unset=True)
|
|
|
|
# Handle parent_id change (moving folder)
|
|
if "parent_id" in update_data:
|
|
new_parent_id = update_data["parent_id"]
|
|
|
|
# Only validate if actually changing parent
|
|
if new_parent_id != folder.parent_id:
|
|
if new_parent_id is not None:
|
|
# Can't be its own parent
|
|
if new_parent_id == folder_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="A folder cannot be its own parent"
|
|
)
|
|
|
|
# Check parent exists and belongs to user
|
|
parent_result = await db.execute(
|
|
select(UserFolder).where(
|
|
UserFolder.id == new_parent_id,
|
|
UserFolder.user_id == current_user.id
|
|
)
|
|
)
|
|
if not parent_result.scalar_one_or_none():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Parent folder not found"
|
|
)
|
|
|
|
# Prevent cycles - new parent can't be a descendant of this folder
|
|
if await is_descendant(db, new_parent_id, folder_id):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot move folder into its own descendant"
|
|
)
|
|
|
|
# Check nesting depth after move
|
|
parent_depth = await get_folder_depth(db, new_parent_id)
|
|
if parent_depth >= MAX_FOLDER_DEPTH:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Maximum folder nesting depth ({MAX_FOLDER_DEPTH} levels) exceeded"
|
|
)
|
|
|
|
# Check for duplicate name if changing name or parent
|
|
new_name = update_data.get("name", folder.name)
|
|
new_parent_id = update_data.get("parent_id", folder.parent_id)
|
|
|
|
if new_name != folder.name or ("parent_id" in update_data and new_parent_id != folder.parent_id):
|
|
existing_query = select(UserFolder).where(
|
|
UserFolder.user_id == current_user.id,
|
|
UserFolder.name == new_name,
|
|
UserFolder.parent_id == new_parent_id,
|
|
UserFolder.id != folder_id
|
|
)
|
|
existing = await db.execute(existing_query)
|
|
if existing.scalar_one_or_none():
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail=f"A folder named '{new_name}' already exists at this level"
|
|
)
|
|
|
|
for field, value in update_data.items():
|
|
setattr(folder, field, value)
|
|
|
|
await db.commit()
|
|
await db.refresh(folder)
|
|
|
|
return FolderResponse(
|
|
id=folder.id,
|
|
name=folder.name,
|
|
color=folder.color,
|
|
icon=folder.icon,
|
|
parent_id=folder.parent_id,
|
|
display_order=folder.display_order,
|
|
tree_count=folder.tree_count,
|
|
created_at=folder.created_at,
|
|
updated_at=folder.updated_at
|
|
)
|
|
|
|
|
|
@router.delete("/{folder_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_folder(
|
|
folder_id: UUID,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
):
|
|
"""Delete a folder.
|
|
|
|
This only removes the folder, not the trees in it.
|
|
"""
|
|
result = await db.execute(
|
|
select(UserFolder).where(UserFolder.id == folder_id)
|
|
)
|
|
folder = result.scalar_one_or_none()
|
|
|
|
if not folder:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Folder not found"
|
|
)
|
|
|
|
if folder.user_id != current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have permission to delete this folder"
|
|
)
|
|
|
|
await db.delete(folder)
|
|
await db.commit()
|
|
return None
|
|
|
|
|
|
@router.post("/reorder", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def reorder_folders(
|
|
reorder_data: FolderReorderRequest,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
):
|
|
"""Reorder folders by providing folder IDs in desired order."""
|
|
# Get all user's folders
|
|
result = await db.execute(
|
|
select(UserFolder).where(UserFolder.user_id == current_user.id)
|
|
)
|
|
folders = {f.id: f for f in result.scalars().all()}
|
|
|
|
# Verify all provided folder IDs belong to user
|
|
for folder_id in reorder_data.folder_ids:
|
|
if folder_id not in folders:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Folder {folder_id} not found or doesn't belong to you"
|
|
)
|
|
|
|
# Update display orders
|
|
for order, folder_id in enumerate(reorder_data.folder_ids):
|
|
folders[folder_id].display_order = order
|
|
|
|
await db.commit()
|
|
return None
|
|
|
|
|
|
@router.post("/{folder_id}/trees", status_code=status.HTTP_201_CREATED)
|
|
async def add_tree_to_folder(
|
|
folder_id: UUID,
|
|
request: FolderTreeRequest,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
):
|
|
"""Add a tree to a folder."""
|
|
# Get folder with trees
|
|
folder_result = await db.execute(
|
|
select(UserFolder)
|
|
.options(selectinload(UserFolder.trees))
|
|
.where(UserFolder.id == folder_id)
|
|
)
|
|
folder = folder_result.scalar_one_or_none()
|
|
|
|
if not folder:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Folder not found"
|
|
)
|
|
|
|
if folder.user_id != current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have permission to modify this folder"
|
|
)
|
|
|
|
# Get tree
|
|
tree_result = await db.execute(
|
|
select(Tree).where(Tree.id == request.tree_id)
|
|
)
|
|
tree = tree_result.scalar_one_or_none()
|
|
|
|
if not tree:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Tree not found"
|
|
)
|
|
|
|
if not can_access_tree(current_user, tree):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have access to this tree"
|
|
)
|
|
|
|
# Check if already in folder
|
|
if tree in folder.trees:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT,
|
|
detail="Tree is already in this folder"
|
|
)
|
|
|
|
# Add tree to folder
|
|
folder.trees.append(tree)
|
|
await db.commit()
|
|
|
|
return {"message": "Tree added to folder"}
|
|
|
|
|
|
@router.delete("/{folder_id}/trees/{tree_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def remove_tree_from_folder(
|
|
folder_id: UUID,
|
|
tree_id: UUID,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
):
|
|
"""Remove a tree from a folder."""
|
|
# Get folder with trees
|
|
folder_result = await db.execute(
|
|
select(UserFolder)
|
|
.options(selectinload(UserFolder.trees))
|
|
.where(UserFolder.id == folder_id)
|
|
)
|
|
folder = folder_result.scalar_one_or_none()
|
|
|
|
if not folder:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Folder not found"
|
|
)
|
|
|
|
if folder.user_id != current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have permission to modify this folder"
|
|
)
|
|
|
|
# Find tree in folder
|
|
tree_to_remove = None
|
|
for tree in folder.trees:
|
|
if tree.id == tree_id:
|
|
tree_to_remove = tree
|
|
break
|
|
|
|
if not tree_to_remove:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Tree not found in this folder"
|
|
)
|
|
|
|
folder.trees.remove(tree_to_remove)
|
|
await db.commit()
|
|
return None
|
|
|
|
|
|
@router.get("/{folder_id}/trees", response_model=list[UUID])
|
|
async def get_folder_tree_ids(
|
|
folder_id: UUID,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
):
|
|
"""Get all tree IDs in a folder.
|
|
|
|
Returns just the IDs for lightweight checking.
|
|
Use the trees endpoint with folder_id filter for full tree data.
|
|
"""
|
|
# Get folder with trees
|
|
folder_result = await db.execute(
|
|
select(UserFolder)
|
|
.options(selectinload(UserFolder.trees))
|
|
.where(UserFolder.id == folder_id)
|
|
)
|
|
folder = folder_result.scalar_one_or_none()
|
|
|
|
if not folder:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Folder not found"
|
|
)
|
|
|
|
if folder.user_id != current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You don't have access to this folder"
|
|
)
|
|
|
|
return [tree.id for tree in folder.trees]
|