feat: add tenant_filter() helper and get_tenant_context dependency

tenant_filter(model, account_id) is the canonical app-layer tenant
scoping expression. Every query on a tenant table must use it.
build_tree_access_filter and build_step_visibility_filter updated
to call tenant_filter() internally for the account_id match.

get_tenant_context is a FastAPI dependency that returns account_id
or raises 403 if the user has no account — prevents raw access to
current_user.account_id and centralises the null check.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-04-09 03:49:50 +00:00
parent 56775eca04
commit 3c0374378c
2 changed files with 41 additions and 6 deletions

View File

@@ -190,3 +190,20 @@ async def get_plan_limits_for_user(
"""Get plan limits for the current user's account."""
from app.core.subscriptions import get_user_plan_limits
return await get_user_plan_limits(current_user.account_id, db)
async def get_tenant_context(
current_user: Annotated[User, Depends(get_current_active_user)],
) -> UUID:
"""Return the current user's account_id.
Use this dependency instead of reading current_user.account_id directly.
Raises 403 if the user has no account association (should not happen in
normal flows — users are always associated with an account on registration).
"""
if current_user.account_id is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User not associated with any account",
)
return current_user.account_id

View File

@@ -1,10 +1,12 @@
"""
Centralized query filters for ResolutionFlow.
Provides reusable SQLAlchemy filter builders for tree access control
and step visibility, used across multiple endpoint modules.
Provides reusable SQLAlchemy filter builders for tree access control,
step visibility, and the canonical tenant_filter used by all queries
on tenant-scoped tables.
"""
from __future__ import annotations
import uuid
from typing import TYPE_CHECKING
from sqlalchemy import or_, and_, true as sa_true
@@ -13,6 +15,18 @@ if TYPE_CHECKING:
from app.models.user import User
def tenant_filter(model, account_id: uuid.UUID):
"""Primary app-layer tenant filter.
MUST be used in every SELECT/UPDATE/DELETE on tenant tables.
RLS (Phase 2) is the safety net — this is the primary enforcement.
Usage:
stmt = select(Tree).where(tenant_filter(Tree, current_user.account_id), ...)
"""
return model.account_id == account_id
def build_tree_access_filter(current_user: User):
"""Build the access filter for trees based on user permissions.
@@ -36,10 +50,11 @@ def build_tree_access_filter(current_user: User):
Tree.author_id == current_user.id,
]
if current_user.account_id:
# Team-visible trees: use tenant_filter as the account match
conditions.append(
and_(
Tree.visibility == 'team',
Tree.account_id == current_user.account_id
tenant_filter(Tree, current_user.account_id),
)
)
return or_(*conditions)
@@ -58,11 +73,14 @@ def build_step_visibility_filter(current_user: User):
if current_user.account_id:
return or_(
StepLibrary.visibility == 'public',
and_(StepLibrary.visibility == 'team', StepLibrary.account_id == current_user.account_id),
StepLibrary.created_by == current_user.id # Own private steps
and_(
StepLibrary.visibility == 'team',
tenant_filter(StepLibrary, current_user.account_id),
),
StepLibrary.created_by == current_user.id,
)
else:
return or_(
StepLibrary.visibility == 'public',
StepLibrary.created_by == current_user.id
StepLibrary.created_by == current_user.id,
)