feat: tenant isolation Phase 3 — audit_logs, tree_shares, remaining RLS

P3-A: Add account_id to audit_logs model + migration (backfill via user_id →
  users.account_id). log_audit() gains optional account_id param with fallback
  SELECT to avoid churn across 40 call sites.

P3-B: Add account_id to tree_shares model + migration (backfill via created_by
  → users.account_id). TreeShare constructor updated in trees.py.

P3-C: Enable RLS on 6 remaining tables: step_ratings, step_usage_log,
  target_lists, session_shares, audit_logs, tree_shares.

P3-D: Drop team_id from target_lists — endpoint, schema, and model now use
  account_id as the sole isolation key.

P3-E: Append Phase 3 RLS isolation tests for all 6 tables.

test_target_lists.py: fix cross-account test to use Account model (not Team)
and set account_id on new User.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-04-11 05:02:43 +00:00
parent 00fdd663bc
commit e05472615b
13 changed files with 485 additions and 55 deletions

View File

@@ -3,37 +3,10 @@ import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.team import Team
from app.models.user import User
from sqlalchemy import select
@pytest.fixture
async def auth_headers(client: AsyncClient, test_db: AsyncSession, test_user: dict):
"""Override auth_headers to ensure the test user has a team_id assigned."""
# Fetch the user from DB and assign a team
result = await test_db.execute(select(User).where(User.email == test_user["email"]))
user = result.scalar_one()
# Create a team and assign the user to it
team = Team(name="Test Team")
test_db.add(team)
await test_db.flush()
user.team_id = team.id
await test_db.commit()
# Re-login to get a fresh token
login_data = {
"email": test_user["email"],
"password": test_user["password"],
}
resp = await client.post("/api/v1/auth/login/json", json=login_data)
assert resp.status_code == 200
token_data = resp.json()
return {"Authorization": f"Bearer {token_data['access_token']}"}
@pytest.mark.asyncio
async def test_create_target_list(client: AsyncClient, auth_headers: dict):
resp = await client.post(
@@ -107,25 +80,28 @@ async def test_delete_target_list(client: AsyncClient, auth_headers: dict):
assert get.status_code == 404
@pytest.mark.asyncio
async def test_cannot_access_other_teams_list(client: AsyncClient, auth_headers: dict, test_db):
"""User from team B cannot access team A's list."""
async def test_cannot_access_other_accounts_list(client: AsyncClient, auth_headers: dict, test_db):
"""User from account B cannot access account A's target list."""
import uuid
from app.models.team import Team
from app.models.account import Account
from app.models.user import User
from app.core.security import get_password_hash
# Create team A list using existing auth_headers
# Create account A list using existing auth_headers
create = await client.post(
"/api/v1/target-lists/",
json={"name": "Team A List", "targets": [{"label": "SRV-A"}]},
json={"name": "Account A List", "targets": [{"label": "SRV-A"}]},
headers=auth_headers,
)
assert create.status_code == 201
list_id = create.json()["id"]
# Create a separate team B with its own user
team_b = Team(name=f"Team B {uuid.uuid4()}")
test_db.add(team_b)
# Create a separate account B with its own user
account_b = Account(
name=f"Account B {uuid.uuid4()}",
display_code=f"AB{str(uuid.uuid4())[:6].upper()}",
)
test_db.add(account_b)
await test_db.flush()
user_b = User(
@@ -133,11 +109,13 @@ async def test_cannot_access_other_teams_list(client: AsyncClient, auth_headers:
password_hash=get_password_hash("password123"),
name="User B",
is_active=True,
team_id=team_b.id,
account_id=account_b.id,
account_role="engineer",
role="engineer",
)
test_db.add(user_b)
await test_db.flush()
await test_db.commit()
# Get auth token for user B
login = await client.post(
@@ -148,6 +126,6 @@ async def test_cannot_access_other_teams_list(client: AsyncClient, auth_headers:
token_b = login.json()["access_token"]
headers_b = {"Authorization": f"Bearer {token_b}"}
# Team B cannot access Team A's list
# Account B cannot access Account A's list
resp = await client.get(f"/api/v1/target-lists/{list_id}", headers=headers_b)
assert resp.status_code == 404