Files
resolutionflow/backend/app/api/deps.py
chihlasm 4ccb93ee31 feat: add account-based subscription model with migrations
Transition from team-based to account-based multi-tenancy (Free/Pro/Team).
Migrations 016-020 create accounts, subscriptions, plan_limits, and
account_invites tables, then migrate existing users and content FKs.

New models: Account, Subscription, PlanLimits, AccountInvite.
Updated models add account_id alongside existing team_id (coexistence
for safe two-PR deployment). Permissions and deps refactored for
account_role instead of is_team_admin.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 02:38:47 -05:00

126 lines
3.8 KiB
Python

from typing import Annotated, Optional
from uuid import UUID
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.core.database import get_db
from app.core.security import decode_token
from app.models.user import User
from app.models.plan_limits import PlanLimits
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
async def get_current_user(
db: Annotated[AsyncSession, Depends(get_db)],
token: Annotated[str, Depends(oauth2_scheme)]
) -> User:
"""Get current authenticated user from JWT token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_token(token)
if payload is None:
raise credentials_exception
token_type = payload.get("type")
if token_type != "access":
raise credentials_exception
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
try:
user_uuid = UUID(user_id)
except ValueError:
raise credentials_exception
result = await db.execute(select(User).where(User.id == user_uuid))
user = result.scalar_one_or_none()
if user is None:
raise credentials_exception
return user
async def get_refresh_token_payload(
token: Annotated[str, Depends(oauth2_scheme)]
) -> dict:
"""Extract and validate a refresh token from the Authorization header."""
payload = decode_token(token)
if payload is None or payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
return payload
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)]
) -> User:
"""Ensure user is active (not disabled)."""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account has been deactivated"
)
return current_user
async def require_admin(
current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
"""Require super admin access."""
if not current_user.is_super_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Super admin access required"
)
return current_user
async def require_engineer_or_admin(
current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
"""Require engineer, account owner, or super admin role (blocks viewers)."""
if current_user.is_super_admin:
return current_user
if current_user.account_role in ("owner", "engineer"):
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Engineer or admin access required"
)
async def require_account_owner(
current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
"""Require account owner or super admin access."""
if current_user.is_super_admin:
return current_user
if current_user.account_role == "owner":
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account owner access required"
)
async def get_plan_limits_for_user(
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> Optional[PlanLimits]:
"""Get plan limits for the current user's account."""
from app.core.subscriptions import get_user_plan_limits
return await get_user_plan_limits(current_user.account_id, db)