Files
resolutionflow/backend/app/api/deps.py
chihlasm 3c0374378c feat: add tenant_filter() helper and get_tenant_context dependency
tenant_filter(model, account_id) is the canonical app-layer tenant
scoping expression. Every query on a tenant table must use it.
build_tree_access_filter and build_step_visibility_filter updated
to call tenant_filter() internally for the account_id match.

get_tenant_context is a FastAPI dependency that returns account_id
or raises 403 if the user has no account — prevents raw access to
current_user.account_id and centralises the null check.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-09 03:49:50 +00:00

210 lines
6.8 KiB
Python

from typing import Annotated, Optional
from uuid import UUID
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
import sentry_sdk
from app.core.database import get_db
from app.core.security import decode_token
from app.models.user import User
from app.models.plan_limits import PlanLimits
# Routes that are allowed even when must_change_password is True
_PASSWORD_CHANGE_ALLOWLIST = {
"/api/v1/auth/password/change",
"/api/v1/auth/logout",
"/api/v1/auth/me",
}
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_user(
db: Annotated[AsyncSession, Depends(get_db)],
token: Annotated[str, Depends(oauth2_scheme)]
) -> User:
"""Get current authenticated user from JWT token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_token(token)
if payload is None:
raise credentials_exception
token_type = payload.get("type")
if token_type != "access":
raise credentials_exception
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
try:
user_uuid = UUID(user_id)
except ValueError:
raise credentials_exception
result = await db.execute(select(User).where(User.id == user_uuid))
user = result.scalar_one_or_none()
if user is None:
raise credentials_exception
return user
async def get_refresh_token_payload(
token: Annotated[str, Depends(oauth2_scheme)]
) -> dict:
"""Extract and validate a refresh token from the Authorization header."""
payload = decode_token(token)
if payload is None or payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
return payload
async def get_current_active_user(
request: Request,
current_user: Annotated[User, Depends(get_current_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
"""Ensure user is active (not disabled). Auto-downgrades expired trials.
Enforces must_change_password — blocks all routes except allowlist."""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account has been deactivated"
)
# Enforce must_change_password (backend hard lock)
if current_user.must_change_password:
if request.url.path not in _PASSWORD_CHANGE_ALLOWLIST:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="password_change_required"
)
# Set Sentry user context for error attribution
sentry_sdk.set_user({"id": str(current_user.id), "email": current_user.email})
# Lightweight trial expiry check
if current_user.account_id:
from app.models.subscription import Subscription
from datetime import datetime, timezone
result = await db.execute(
select(Subscription).where(Subscription.account_id == current_user.account_id)
)
subscription = result.scalar_one_or_none()
if (
subscription
and subscription.status == "trialing"
and subscription.current_period_end
and subscription.current_period_end < datetime.now(timezone.utc)
):
subscription.plan = "free"
subscription.status = "active"
subscription.current_period_end = None
subscription.current_period_start = None
await db.commit()
return current_user
async def require_admin(
current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
"""Require super admin access."""
if not current_user.is_super_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Super admin access required"
)
return current_user
async def require_engineer_or_admin(
current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
"""Require engineer, account owner, or super admin role (blocks viewers)."""
if current_user.is_super_admin:
return current_user
if current_user.account_role in ("owner", "admin", "engineer"):
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Engineer or admin access required"
)
async def require_team_admin(
current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
"""Require team admin, account owner, or super admin role."""
if current_user.is_super_admin:
return current_user
if current_user.is_team_admin:
return current_user
if current_user.account_role == "owner":
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Team admin access required"
)
async def require_account_owner(
current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
"""Require account owner or super admin access."""
if current_user.is_super_admin:
return current_user
if current_user.account_role == "owner":
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account owner access required"
)
def get_service_account_id(request: Request) -> Optional[UUID]:
"""Return the cached ResolutionFlow service account UUID from app.state.
Returns None in test environments where lifespan startup did not run.
"""
return getattr(request.app.state, "service_account_id", None)
async def get_plan_limits_for_user(
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> Optional[PlanLimits]:
"""Get plan limits for the current user's account."""
from app.core.subscriptions import get_user_plan_limits
return await get_user_plan_limits(current_user.account_id, db)
async def get_tenant_context(
current_user: Annotated[User, Depends(get_current_active_user)],
) -> UUID:
"""Return the current user's account_id.
Use this dependency instead of reading current_user.account_id directly.
Raises 403 if the user has no account association (should not happen in
normal flows — users are always associated with an account on registration).
"""
if current_user.account_id is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User not associated with any account",
)
return current_user.account_id