from typing import Annotated, Optional from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from app.core.database import get_db from app.core.audit import log_audit from app.models.user import User from app.models.account import Account from app.schemas.user import UserResponse, RoleUpdate, AccountRoleUpdate from app.schemas.admin import MoveUserAccount from app.api.deps import require_admin router = APIRouter(prefix="/admin", tags=["admin"]) @router.get("/users", response_model=list[UserResponse]) async def list_users( db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)], skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=100), is_active: Optional[bool] = Query(None, description="Filter by active status"), role: Optional[str] = Query(None, description="Filter by role"), account_id: Optional[UUID] = Query(None, description="Filter by account") ): """List all users (super admin only).""" query = select(User) if is_active is not None: query = query.where(User.is_active == is_active) if role: query = query.where(User.role == role) if account_id: query = query.where(User.account_id == account_id) query = query.order_by(User.created_at.desc()).offset(skip).limit(limit) result = await db.execute(query) users = result.scalars().all() return users @router.get("/users/{user_id}", response_model=UserResponse) async def get_user( user_id: UUID, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)] ): """Get user details (super admin only).""" result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) return user @router.put("/users/{user_id}/role", response_model=UserResponse) async def update_user_role( user_id: UUID, role_data: RoleUpdate, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)] ): """Change user role (super admin only).""" result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) if user.id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot change your own role" ) old_role = user.role user.role = role_data.role await log_audit(db, current_user.id, "user.role_change", "user", user.id, {"old_role": old_role, "new_role": role_data.role}) await db.commit() await db.refresh(user) return user @router.put("/users/{user_id}/account-role", response_model=UserResponse) async def update_account_role( user_id: UUID, data: AccountRoleUpdate, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)] ): """Change a user's account role (super admin only).""" result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) old_role = user.account_role user.account_role = data.account_role await log_audit(db, current_user.id, "user.account_role_change", "user", user.id, {"old_account_role": old_role, "new_account_role": data.account_role}) await db.commit() await db.refresh(user) return user @router.put("/users/{user_id}/deactivate", response_model=UserResponse) async def deactivate_user( user_id: UUID, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)] ): """Deactivate a user account (super admin only).""" result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) if user.id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot deactivate your own account" ) user.is_active = False await log_audit(db, current_user.id, "user.deactivate", "user", user.id) await db.commit() await db.refresh(user) return user @router.put("/users/{user_id}/activate", response_model=UserResponse) async def activate_user( user_id: UUID, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)] ): """Reactivate a user account (super admin only).""" result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user.is_active = True await log_audit(db, current_user.id, "user.activate", "user", user.id) await db.commit() await db.refresh(user) return user @router.put("/users/{user_id}/move-account", response_model=UserResponse) async def move_user_account( user_id: UUID, data: MoveUserAccount, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)], ): """Move a user to a different account (super admin only).""" result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") result = await db.execute(select(Account).where(Account.display_code == data.display_code)) target_account = result.scalar_one_or_none() if not target_account: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Target account not found") old_account_id = user.account_id user.account_id = target_account.id user.account_role = "engineer" # Reset to engineer on move await log_audit(db, current_user.id, "user.move_account", "user", user.id, {"old_account_id": str(old_account_id), "new_account_id": str(target_account.id)}) await db.commit() await db.refresh(user) return user