from datetime import datetime, timezone, timedelta from typing import Annotated, Optional from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from sqlalchemy.orm import selectinload from app.core.database import get_db from app.core.audit import log_audit from app.models.user import User from app.models.account import Account from app.models.subscription import Subscription from app.models.session import Session from app.models.audit_log import AuditLog from app.models.invite_code import InviteCode from app.schemas.user import UserResponse, RoleUpdate, AccountRoleUpdate from app.schemas.admin import MoveUserAccount from app.schemas.subscription import SubscriptionPlanUpdate, ExtendTrialRequest from app.schemas.user_detail import ( UserDetailResponse, AccountSummary, SubscriptionSummary, SessionSummary, AuditLogSummary, InviteCodeUsedSummary, ) from app.api.deps import require_admin router = APIRouter(prefix="/admin", tags=["admin"]) @router.get("/users", response_model=list[UserResponse]) async def list_users( db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)], skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=100), is_active: Optional[bool] = Query(None, description="Filter by active status"), role: Optional[str] = Query(None, description="Filter by role"), account_id: Optional[UUID] = Query(None, description="Filter by account") ): """List all users (super admin only).""" query = select(User) if is_active is not None: query = query.where(User.is_active == is_active) if role: query = query.where(User.role == role) if account_id: query = query.where(User.account_id == account_id) query = query.order_by(User.created_at.desc()).offset(skip).limit(limit) result = await db.execute(query) users = result.scalars().all() return users @router.get("/users/{user_id}", response_model=UserDetailResponse) async def get_user( user_id: UUID, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)] ): """Get enriched user details (super admin only).""" result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Account + subscription account_summary = None subscription_summary = None if user.account_id: acc_result = await db.execute(select(Account).where(Account.id == user.account_id)) account = acc_result.scalar_one_or_none() if account: account_summary = AccountSummary( id=account.id, name=account.name, display_code=getattr(account, "display_code", None), ) sub_result = await db.execute( select(Subscription).where(Subscription.account_id == user.account_id) ) subscription = sub_result.scalar_one_or_none() if subscription: subscription_summary = SubscriptionSummary( id=subscription.id, plan=subscription.plan, status=subscription.status, current_period_start=subscription.current_period_start, current_period_end=subscription.current_period_end, ) # Recent sessions (latest 10 + total) total_sessions_result = await db.execute( select(func.count()).select_from(Session).where(Session.user_id == user_id) ) total_sessions = total_sessions_result.scalar() or 0 sessions_result = await db.execute( select(Session).options(selectinload(Session.tree)) .where(Session.user_id == user_id) .order_by(Session.started_at.desc()) .limit(10) ) sessions = sessions_result.scalars().all() recent_sessions = [ SessionSummary( id=s.id, tree_name=s.tree.name if s.tree else None, started_at=s.started_at, completed_at=s.completed_at, outcome=s.outcome, ) for s in sessions ] # Recent audit logs (latest 10 + total) total_audits_result = await db.execute( select(func.count()).select_from(AuditLog).where(AuditLog.user_id == user_id) ) total_audit_logs = total_audits_result.scalar() or 0 audits_result = await db.execute( select(AuditLog).where(AuditLog.user_id == user_id) .order_by(AuditLog.created_at.desc()) .limit(10) ) audits = audits_result.scalars().all() recent_audit_logs = [ AuditLogSummary( id=a.id, action=a.action, resource_type=a.resource_type, resource_id=str(a.resource_id) if a.resource_id else None, created_at=a.created_at, details=a.details, ) for a in audits ] # Invite code used invite_code_used = None if user.invite_code_id: ic_result = await db.execute( select(InviteCode).where(InviteCode.id == user.invite_code_id) ) ic = ic_result.scalar_one_or_none() if ic: creator_email = None if ic.created_by_id: creator_result = await db.execute( select(User.email).where(User.id == ic.created_by_id) ) creator_email = creator_result.scalar_one_or_none() invite_code_used = InviteCodeUsedSummary( code=ic.code, assigned_plan=ic.assigned_plan, trial_duration_days=ic.trial_duration_days, created_by_email=creator_email, ) return UserDetailResponse( id=user.id, email=user.email, full_name=user.name, role=user.role, is_active=user.is_active, is_super_admin=user.is_super_admin, is_team_admin=getattr(user, "is_team_admin", False), created_at=user.created_at, account=account_summary, subscription=subscription_summary, invite_code_used=invite_code_used, recent_sessions=recent_sessions, total_sessions=total_sessions, recent_audit_logs=recent_audit_logs, total_audit_logs=total_audit_logs, ) @router.put("/users/{user_id}/role", response_model=UserResponse) async def update_user_role( user_id: UUID, role_data: RoleUpdate, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)] ): """Change user role (super admin only).""" result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) if user.id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot change your own role" ) old_role = user.role user.role = role_data.role await log_audit(db, current_user.id, "user.role_change", "user", user.id, {"old_role": old_role, "new_role": role_data.role}) await db.commit() await db.refresh(user) return user @router.put("/users/{user_id}/account-role", response_model=UserResponse) async def update_account_role( user_id: UUID, data: AccountRoleUpdate, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)] ): """Change a user's account role (super admin only).""" result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) old_role = user.account_role user.account_role = data.account_role await log_audit(db, current_user.id, "user.account_role_change", "user", user.id, {"old_account_role": old_role, "new_account_role": data.account_role}) await db.commit() await db.refresh(user) return user @router.put("/users/{user_id}/deactivate", response_model=UserResponse) async def deactivate_user( user_id: UUID, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)] ): """Deactivate a user account (super admin only).""" result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) if user.id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot deactivate your own account" ) user.is_active = False await log_audit(db, current_user.id, "user.deactivate", "user", user.id) await db.commit() await db.refresh(user) return user @router.put("/users/{user_id}/activate", response_model=UserResponse) async def activate_user( user_id: UUID, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)] ): """Reactivate a user account (super admin only).""" result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user.is_active = True await log_audit(db, current_user.id, "user.activate", "user", user.id) await db.commit() await db.refresh(user) return user @router.put("/users/{user_id}/move-account", response_model=UserResponse) async def move_user_account( user_id: UUID, data: MoveUserAccount, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)], ): """Move a user to a different account (super admin only).""" result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") result = await db.execute(select(Account).where(Account.display_code == data.display_code)) target_account = result.scalar_one_or_none() if not target_account: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Target account not found") old_account_id = user.account_id user.account_id = target_account.id user.account_role = "engineer" # Reset to engineer on move await log_audit(db, current_user.id, "user.move_account", "user", user.id, {"old_account_id": str(old_account_id), "new_account_id": str(target_account.id)}) await db.commit() await db.refresh(user) return user async def _get_user_subscription(user_id: UUID, db: AsyncSession) -> tuple[User, Subscription]: """Helper to load user and their subscription.""" result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") if not user.account_id: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="User has no account") sub_result = await db.execute( select(Subscription).where(Subscription.account_id == user.account_id) ) subscription = sub_result.scalar_one_or_none() if not subscription: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Subscription not found") return user, subscription @router.put("/users/{user_id}/subscription/plan") async def update_user_plan( user_id: UUID, data: SubscriptionPlanUpdate, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)], ): """Change a user's subscription plan (super admin only).""" if data.plan not in ("free", "pro", "team"): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid plan") user, subscription = await _get_user_subscription(user_id, db) old_plan = subscription.plan subscription.plan = data.plan await log_audit(db, current_user.id, "subscription.plan_change", "subscription", subscription.id, {"old_plan": old_plan, "new_plan": data.plan, "user_id": str(user_id)}) await db.commit() return {"plan": subscription.plan, "status": subscription.status} @router.put("/users/{user_id}/subscription/extend-trial") async def extend_user_trial( user_id: UUID, data: ExtendTrialRequest, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)], ): """Extend or start a trial for a user's subscription (super admin only).""" if data.days < 1 or data.days > 90: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Days must be 1-90") user, subscription = await _get_user_subscription(user_id, db) now = datetime.now(timezone.utc) if subscription.status == "trialing" and subscription.current_period_end: # Extend existing trial new_end = subscription.current_period_end + timedelta(days=data.days) else: # Start new trial subscription.status = "trialing" subscription.current_period_start = now new_end = now + timedelta(days=data.days) subscription.current_period_end = new_end await log_audit(db, current_user.id, "subscription.extend_trial", "subscription", subscription.id, {"days": data.days, "new_end": new_end.isoformat(), "user_id": str(user_id)}) await db.commit() return {"plan": subscription.plan, "status": subscription.status, "current_period_end": subscription.current_period_end}