Files
resolutionflow/backend/app/api/deps.py
chihlasm 71ba0b95a5 fix: high-severity security hardening (Phase B permissions audit)
Phase B addresses 7 high-severity gaps from the permissions audit:

- B1: Enforce tree access check on session start via can_access_tree
- B2: Replace all inline permission helpers with centralized permissions.py
- B3: Fix require_engineer_or_admin to check is_team_admin before role
- B4: Add is_active field on User with enforcement in get_current_active_user
- B5: Add admin user management endpoints (list, get, role, team-admin, deactivate, activate)
- B6: Add rate limiting on auth/invite endpoints via slowapi (disabled in DEBUG)
- B7: Implement refresh token rotation with JTI-based revocation and meaningful logout

Also reduces access token TTL from 15 to 5 minutes and updates CLAUDE.md
with SaaS/MSP context for future planning sessions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-05 22:44:05 -05:00

104 lines
3.1 KiB
Python

from typing import Annotated
from uuid import UUID
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.core.database import get_db
from app.core.security import decode_token
from app.models.user import User
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_user(
db: Annotated[AsyncSession, Depends(get_db)],
token: Annotated[str, Depends(oauth2_scheme)]
) -> User:
"""Get current authenticated user from JWT token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_token(token)
if payload is None:
raise credentials_exception
token_type = payload.get("type")
if token_type != "access":
raise credentials_exception
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
try:
user_uuid = UUID(user_id)
except ValueError:
raise credentials_exception
result = await db.execute(select(User).where(User.id == user_uuid))
user = result.scalar_one_or_none()
if user is None:
raise credentials_exception
return user
async def get_refresh_token_payload(
token: Annotated[str, Depends(oauth2_scheme)]
) -> dict:
"""Extract and validate a refresh token from the Authorization header."""
payload = decode_token(token)
if payload is None or payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
return payload
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)]
) -> User:
"""Ensure user is active (not disabled)."""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account has been deactivated"
)
return current_user
async def require_admin(
current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
"""Require super admin access."""
if not current_user.is_super_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Super admin access required"
)
return current_user
async def require_engineer_or_admin(
current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
"""Require engineer, team admin, or super admin role (blocks viewers)."""
if current_user.is_super_admin:
return current_user
if current_user.is_team_admin and current_user.team_id is not None:
return current_user
if current_user.role not in ("engineer",):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Engineer or admin access required"
)
return current_user