Files
resolutionflow/backend/app/api/endpoints/admin.py
chihlasm 71ba0b95a5 fix: high-severity security hardening (Phase B permissions audit)
Phase B addresses 7 high-severity gaps from the permissions audit:

- B1: Enforce tree access check on session start via can_access_tree
- B2: Replace all inline permission helpers with centralized permissions.py
- B3: Fix require_engineer_or_admin to check is_team_admin before role
- B4: Add is_active field on User with enforcement in get_current_active_user
- B5: Add admin user management endpoints (list, get, role, team-admin, deactivate, activate)
- B6: Add rate limiting on auth/invite endpoints via slowapi (disabled in DEBUG)
- B7: Implement refresh token rotation with JTI-based revocation and meaningful logout

Also reduces access token TTL from 15 to 5 minutes and updates CLAUDE.md
with SaaS/MSP context for future planning sessions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-05 22:44:05 -05:00

167 lines
5.0 KiB
Python

from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.core.database import get_db
from app.models.user import User
from app.schemas.user import UserResponse, RoleUpdate, TeamAdminUpdate
from app.api.deps import require_admin
router = APIRouter(prefix="/admin", tags=["admin"])
@router.get("/users", response_model=list[UserResponse])
async def list_users(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_admin)],
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100),
is_active: Optional[bool] = Query(None, description="Filter by active status"),
role: Optional[str] = Query(None, description="Filter by role"),
team_id: Optional[UUID] = Query(None, description="Filter by team")
):
"""List all users (super admin only)."""
query = select(User)
if is_active is not None:
query = query.where(User.is_active == is_active)
if role:
query = query.where(User.role == role)
if team_id:
query = query.where(User.team_id == team_id)
query = query.order_by(User.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(query)
users = result.scalars().all()
return users
@router.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
user_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_admin)]
):
"""Get user details (super admin only)."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
@router.put("/users/{user_id}/role", response_model=UserResponse)
async def update_user_role(
user_id: UUID,
role_data: RoleUpdate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_admin)]
):
"""Change user role (super admin only)."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
if user.id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot change your own role"
)
user.role = role_data.role
await db.commit()
await db.refresh(user)
return user
@router.put("/users/{user_id}/team-admin", response_model=UserResponse)
async def toggle_team_admin(
user_id: UUID,
data: TeamAdminUpdate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_admin)]
):
"""Toggle is_team_admin for a user (super admin only)."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
if data.is_team_admin and user.team_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User must belong to a team to be a team admin"
)
user.is_team_admin = data.is_team_admin
await db.commit()
await db.refresh(user)
return user
@router.put("/users/{user_id}/deactivate", response_model=UserResponse)
async def deactivate_user(
user_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_admin)]
):
"""Deactivate a user account (super admin only)."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
if user.id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot deactivate your own account"
)
user.is_active = False
await db.commit()
await db.refresh(user)
return user
@router.put("/users/{user_id}/activate", response_model=UserResponse)
async def activate_user(
user_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_admin)]
):
"""Reactivate a user account (super admin only)."""
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
user.is_active = True
await db.commit()
await db.refresh(user)
return user