from typing import Annotated, Optional from uuid import UUID from fastapi import Depends, HTTPException, Request, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.core.database import get_db from app.core.security import decode_token from app.models.user import User from app.models.plan_limits import PlanLimits # Routes that are allowed even when must_change_password is True _PASSWORD_CHANGE_ALLOWLIST = { "/api/v1/auth/password/change", "/api/v1/auth/logout", "/api/v1/auth/me", } oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") async def get_current_user( db: Annotated[AsyncSession, Depends(get_db)], token: Annotated[str, Depends(oauth2_scheme)] ) -> User: """Get current authenticated user from JWT token.""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) payload = decode_token(token) if payload is None: raise credentials_exception token_type = payload.get("type") if token_type != "access": raise credentials_exception user_id: str = payload.get("sub") if user_id is None: raise credentials_exception try: user_uuid = UUID(user_id) except ValueError: raise credentials_exception result = await db.execute(select(User).where(User.id == user_uuid)) user = result.scalar_one_or_none() if user is None: raise credentials_exception return user async def get_refresh_token_payload( token: Annotated[str, Depends(oauth2_scheme)] ) -> dict: """Extract and validate a refresh token from the Authorization header.""" payload = decode_token(token) if payload is None or payload.get("type") != "refresh": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token", headers={"WWW-Authenticate": "Bearer"}, ) return payload async def get_current_active_user( request: Request, current_user: Annotated[User, Depends(get_current_user)], db: Annotated[AsyncSession, Depends(get_db)], ) -> User: """Ensure user is active (not disabled). Auto-downgrades expired trials. Enforces must_change_password — blocks all routes except allowlist.""" if not current_user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Account has been deactivated" ) # Enforce must_change_password (backend hard lock) if current_user.must_change_password: if request.url.path not in _PASSWORD_CHANGE_ALLOWLIST: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="password_change_required" ) # Lightweight trial expiry check if current_user.account_id: from app.models.subscription import Subscription from datetime import datetime, timezone result = await db.execute( select(Subscription).where(Subscription.account_id == current_user.account_id) ) subscription = result.scalar_one_or_none() if ( subscription and subscription.status == "trialing" and subscription.current_period_end and subscription.current_period_end < datetime.now(timezone.utc) ): subscription.plan = "free" subscription.status = "active" subscription.current_period_end = None subscription.current_period_start = None await db.commit() return current_user async def require_admin( current_user: Annotated[User, Depends(get_current_active_user)] ) -> User: """Require super admin access.""" if not current_user.is_super_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Super admin access required" ) return current_user async def require_engineer_or_admin( current_user: Annotated[User, Depends(get_current_active_user)] ) -> User: """Require engineer, account owner, or super admin role (blocks viewers).""" if current_user.is_super_admin: return current_user if current_user.account_role in ("owner", "admin", "engineer"): return current_user raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Engineer or admin access required" ) async def require_account_owner( current_user: Annotated[User, Depends(get_current_active_user)] ) -> User: """Require account owner or super admin access.""" if current_user.is_super_admin: return current_user if current_user.account_role == "owner": return current_user raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Account owner access required" ) def get_service_account_id(request: Request) -> Optional[UUID]: """Return the cached ResolutionFlow service account UUID from app.state. Returns None in test environments where lifespan startup did not run. """ return getattr(request.app.state, "service_account_id", None) async def get_plan_limits_for_user( current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ) -> Optional[PlanLimits]: """Get plan limits for the current user's account.""" from app.core.subscriptions import get_user_plan_limits return await get_user_plan_limits(current_user.account_id, db)