Phase B addresses 7 high-severity gaps from the permissions audit: - B1: Enforce tree access check on session start via can_access_tree - B2: Replace all inline permission helpers with centralized permissions.py - B3: Fix require_engineer_or_admin to check is_team_admin before role - B4: Add is_active field on User with enforcement in get_current_active_user - B5: Add admin user management endpoints (list, get, role, team-admin, deactivate, activate) - B6: Add rate limiting on auth/invite endpoints via slowapi (disabled in DEBUG) - B7: Implement refresh token rotation with JTI-based revocation and meaningful logout Also reduces access token TTL from 15 to 5 minutes and updates CLAUDE.md with SaaS/MSP context for future planning sessions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
104 lines
3.1 KiB
Python
104 lines
3.1 KiB
Python
from typing import Annotated
|
|
from uuid import UUID
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
|
|
from app.core.database import get_db
|
|
from app.core.security import decode_token
|
|
from app.models.user import User
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
|
|
|
|
|
|
async def get_current_user(
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
token: Annotated[str, Depends(oauth2_scheme)]
|
|
) -> User:
|
|
"""Get current authenticated user from JWT token."""
|
|
credentials_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
payload = decode_token(token)
|
|
if payload is None:
|
|
raise credentials_exception
|
|
|
|
token_type = payload.get("type")
|
|
if token_type != "access":
|
|
raise credentials_exception
|
|
|
|
user_id: str = payload.get("sub")
|
|
if user_id is None:
|
|
raise credentials_exception
|
|
|
|
try:
|
|
user_uuid = UUID(user_id)
|
|
except ValueError:
|
|
raise credentials_exception
|
|
|
|
result = await db.execute(select(User).where(User.id == user_uuid))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if user is None:
|
|
raise credentials_exception
|
|
|
|
return user
|
|
|
|
|
|
async def get_refresh_token_payload(
|
|
token: Annotated[str, Depends(oauth2_scheme)]
|
|
) -> dict:
|
|
"""Extract and validate a refresh token from the Authorization header."""
|
|
payload = decode_token(token)
|
|
if payload is None or payload.get("type") != "refresh":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid refresh token",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
return payload
|
|
|
|
|
|
async def get_current_active_user(
|
|
current_user: Annotated[User, Depends(get_current_user)]
|
|
) -> User:
|
|
"""Ensure user is active (not disabled)."""
|
|
if not current_user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Account has been deactivated"
|
|
)
|
|
return current_user
|
|
|
|
|
|
async def require_admin(
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
) -> User:
|
|
"""Require super admin access."""
|
|
if not current_user.is_super_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Super admin access required"
|
|
)
|
|
return current_user
|
|
|
|
|
|
async def require_engineer_or_admin(
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
) -> User:
|
|
"""Require engineer, team admin, or super admin role (blocks viewers)."""
|
|
if current_user.is_super_admin:
|
|
return current_user
|
|
if current_user.is_team_admin and current_user.team_id is not None:
|
|
return current_user
|
|
if current_user.role not in ("engineer",):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Engineer or admin access required"
|
|
)
|
|
return current_user
|