fix: high-severity security hardening (Phase B permissions audit)
Phase B addresses 7 high-severity gaps from the permissions audit: - B1: Enforce tree access check on session start via can_access_tree - B2: Replace all inline permission helpers with centralized permissions.py - B3: Fix require_engineer_or_admin to check is_team_admin before role - B4: Add is_active field on User with enforcement in get_current_active_user - B5: Add admin user management endpoints (list, get, role, team-admin, deactivate, activate) - B6: Add rate limiting on auth/invite endpoints via slowapi (disabled in DEBUG) - B7: Implement refresh token rotation with JTI-based revocation and meaningful logout Also reduces access token TTL from 15 to 5 minutes and updates CLAUDE.md with SaaS/MSP context for future planning sessions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -43,7 +43,7 @@ class Settings(BaseSettings):
|
||||
)
|
||||
return v
|
||||
ALGORITHM: str = "HS256"
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES: int = 15
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES: int = 5
|
||||
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
|
||||
|
||||
# Security
|
||||
|
||||
@@ -9,13 +9,15 @@ Role hierarchy: super_admin > team_admin > engineer > viewer
|
||||
- viewer: role='viewer', read-only (can browse, run sessions, rate steps)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Optional, TYPE_CHECKING
|
||||
from uuid import UUID
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app.models.user import User
|
||||
from app.models.tree import Tree
|
||||
from app.models.step_library import StepLibrary
|
||||
from app.models.category import TreeCategory
|
||||
from app.models.step_category import StepCategory
|
||||
|
||||
ROLE_HIERARCHY = {
|
||||
"super_admin": 4,
|
||||
@@ -92,3 +94,78 @@ def can_manage_tree_tags(user: User, tree: Tree) -> bool:
|
||||
if user.is_team_admin and tree.team_id == user.team_id and user.team_id is not None:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def can_access_tree(user: User, tree: Tree) -> bool:
|
||||
"""Can the user access (view) this tree?"""
|
||||
if tree.is_default or tree.is_public:
|
||||
return True
|
||||
if tree.author_id == user.id:
|
||||
return True
|
||||
if tree.team_id == user.team_id and user.team_id is not None:
|
||||
return True
|
||||
if user.is_super_admin:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def can_view_step(user: User, step: StepLibrary) -> bool:
|
||||
"""Can the user view this step based on its visibility?"""
|
||||
if step.visibility == "public":
|
||||
return True
|
||||
if step.visibility == "private":
|
||||
return step.created_by == user.id
|
||||
if step.visibility == "team":
|
||||
return (step.team_id == user.team_id and user.team_id is not None) or user.is_super_admin
|
||||
return False
|
||||
|
||||
|
||||
def can_create_tag(user: User, team_id: Optional[UUID]) -> bool:
|
||||
"""Can the user create a tag for the given scope?
|
||||
|
||||
- Super admins can create global tags (team_id=None) or any team's tags
|
||||
- Engineers can create team tags for their own team
|
||||
- Viewers cannot create tags
|
||||
"""
|
||||
if user.is_super_admin:
|
||||
return True
|
||||
if not can_create_content(user):
|
||||
return False
|
||||
if team_id is not None and team_id == user.team_id:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def can_create_category(user: User, team_id: Optional[UUID]) -> bool:
|
||||
"""Can the user create a category for the given team?
|
||||
|
||||
- Super admins can create global or any team's categories
|
||||
- Team admins can create categories for their own team
|
||||
"""
|
||||
if user.is_super_admin:
|
||||
return True
|
||||
if user.is_team_admin and team_id == user.team_id and user.team_id is not None:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def can_manage_step_category(user: User, category: StepCategory) -> bool:
|
||||
"""Can the user edit/delete this step category?"""
|
||||
if user.is_super_admin:
|
||||
return True
|
||||
if user.is_team_admin and category.team_id == user.team_id and user.team_id is not None:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def can_create_step_category(user: User, team_id: Optional[UUID]) -> bool:
|
||||
"""Can the user create a step category for the given team?
|
||||
|
||||
- Super admins can create global or any team's step categories
|
||||
- Team admins can create step categories for their own team
|
||||
"""
|
||||
if user.is_super_admin:
|
||||
return True
|
||||
if user.is_team_admin and team_id == user.team_id and user.team_id is not None:
|
||||
return True
|
||||
return False
|
||||
|
||||
6
backend/app/core/rate_limit.py
Normal file
6
backend/app/core/rate_limit.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from slowapi import Limiter
|
||||
from slowapi.util import get_remote_address
|
||||
|
||||
from app.core.config import settings
|
||||
|
||||
limiter = Limiter(key_func=get_remote_address, enabled=not settings.DEBUG)
|
||||
@@ -1,3 +1,5 @@
|
||||
import hashlib
|
||||
import uuid
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Optional
|
||||
from jose import JWTError, jwt
|
||||
@@ -30,14 +32,20 @@ def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -
|
||||
|
||||
|
||||
def create_refresh_token(data: dict) -> str:
|
||||
"""Create a JWT refresh token."""
|
||||
"""Create a JWT refresh token with a unique jti for revocation tracking."""
|
||||
to_encode = data.copy()
|
||||
expire = datetime.now(timezone.utc) + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
|
||||
to_encode.update({"exp": expire, "type": "refresh"})
|
||||
jti = str(uuid.uuid4())
|
||||
to_encode.update({"exp": expire, "type": "refresh", "jti": jti})
|
||||
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
|
||||
return encoded_jwt
|
||||
|
||||
|
||||
def hash_token(jti: str) -> str:
|
||||
"""Hash a token JTI for secure storage."""
|
||||
return hashlib.sha256(jti.encode()).hexdigest()
|
||||
|
||||
|
||||
def decode_token(token: str) -> Optional[dict]:
|
||||
"""Decode and validate a JWT token."""
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user