Creates AuditLog model with JSONB details column for tracking admin actions. Integrates log_audit() helper into admin endpoints (role change, team admin toggle, deactivate, activate) and tree delete. IP address column reserved for future Railway proxy header support. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
175 lines
5.5 KiB
Python
175 lines
5.5 KiB
Python
from typing import Annotated, Optional
|
|
from uuid import UUID
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Query
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, func
|
|
|
|
from app.core.database import get_db
|
|
from app.core.audit import log_audit
|
|
from app.models.user import User
|
|
from app.schemas.user import UserResponse, RoleUpdate, TeamAdminUpdate
|
|
from app.api.deps import require_admin
|
|
|
|
router = APIRouter(prefix="/admin", tags=["admin"])
|
|
|
|
|
|
@router.get("/users", response_model=list[UserResponse])
|
|
async def list_users(
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_admin)],
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=100),
|
|
is_active: Optional[bool] = Query(None, description="Filter by active status"),
|
|
role: Optional[str] = Query(None, description="Filter by role"),
|
|
team_id: Optional[UUID] = Query(None, description="Filter by team")
|
|
):
|
|
"""List all users (super admin only)."""
|
|
query = select(User)
|
|
|
|
if is_active is not None:
|
|
query = query.where(User.is_active == is_active)
|
|
if role:
|
|
query = query.where(User.role == role)
|
|
if team_id:
|
|
query = query.where(User.team_id == team_id)
|
|
|
|
query = query.order_by(User.created_at.desc()).offset(skip).limit(limit)
|
|
|
|
result = await db.execute(query)
|
|
users = result.scalars().all()
|
|
return users
|
|
|
|
|
|
@router.get("/users/{user_id}", response_model=UserResponse)
|
|
async def get_user(
|
|
user_id: UUID,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_admin)]
|
|
):
|
|
"""Get user details (super admin only)."""
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
return user
|
|
|
|
|
|
@router.put("/users/{user_id}/role", response_model=UserResponse)
|
|
async def update_user_role(
|
|
user_id: UUID,
|
|
role_data: RoleUpdate,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_admin)]
|
|
):
|
|
"""Change user role (super admin only)."""
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
if user.id == current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot change your own role"
|
|
)
|
|
|
|
old_role = user.role
|
|
user.role = role_data.role
|
|
await log_audit(db, current_user.id, "user.role_change", "user", user.id,
|
|
{"old_role": old_role, "new_role": role_data.role})
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
@router.put("/users/{user_id}/team-admin", response_model=UserResponse)
|
|
async def toggle_team_admin(
|
|
user_id: UUID,
|
|
data: TeamAdminUpdate,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_admin)]
|
|
):
|
|
"""Toggle is_team_admin for a user (super admin only)."""
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
if data.is_team_admin and user.team_id is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="User must belong to a team to be a team admin"
|
|
)
|
|
|
|
user.is_team_admin = data.is_team_admin
|
|
await log_audit(db, current_user.id, "user.team_admin_toggle", "user", user.id,
|
|
{"is_team_admin": data.is_team_admin})
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
@router.put("/users/{user_id}/deactivate", response_model=UserResponse)
|
|
async def deactivate_user(
|
|
user_id: UUID,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_admin)]
|
|
):
|
|
"""Deactivate a user account (super admin only)."""
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
if user.id == current_user.id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cannot deactivate your own account"
|
|
)
|
|
|
|
user.is_active = False
|
|
await log_audit(db, current_user.id, "user.deactivate", "user", user.id)
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
@router.put("/users/{user_id}/activate", response_model=UserResponse)
|
|
async def activate_user(
|
|
user_id: UUID,
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_admin)]
|
|
):
|
|
"""Reactivate a user account (super admin only)."""
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found"
|
|
)
|
|
|
|
user.is_active = True
|
|
await log_audit(db, current_user.id, "user.activate", "user", user.id)
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|