feat: tenant isolation Phase 0 — app-layer filters, UUID audit, CI gate #132
@@ -7,6 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.core.database import get_db
|
||||
from app.api.deps import get_current_active_user
|
||||
from app.core.filters import tenant_filter
|
||||
from app.models import User, Session, Tree, SessionRating
|
||||
from app.schemas.analytics import (
|
||||
TeamAnalyticsResponse, PersonalAnalyticsResponse, FlowAnalyticsResponse,
|
||||
@@ -290,8 +291,13 @@ async def get_flow_analytics(
|
||||
current_user: User = Depends(get_current_active_user),
|
||||
):
|
||||
"""Analytics for a specific flow."""
|
||||
# Verify tree exists
|
||||
result = await db.execute(select(Tree).where(Tree.id == tree_id))
|
||||
# Verify tree exists and belongs to the requesting user's account.
|
||||
result = await db.execute(
|
||||
select(Tree).where(
|
||||
Tree.id == tree_id,
|
||||
tenant_filter(Tree, current_user.account_id),
|
||||
)
|
||||
)
|
||||
tree = result.scalar_one_or_none()
|
||||
if not tree:
|
||||
raise HTTPException(status_code=404, detail="Flow not found")
|
||||
|
||||
214
backend/tests/test_tenant_isolation_p0.py
Normal file
214
backend/tests/test_tenant_isolation_p0.py
Normal file
@@ -0,0 +1,214 @@
|
||||
"""Phase 0 tenant-isolation tests.
|
||||
|
||||
Verifies that endpoints respect account boundaries and don't leak data
|
||||
across tenants. Each task group tests a specific endpoint fix.
|
||||
"""
|
||||
import uuid
|
||||
import pytest
|
||||
from httpx import AsyncClient
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.models.account import Account
|
||||
from app.models.user import User
|
||||
from app.models.tree import Tree
|
||||
from app.core.security import get_password_hash
|
||||
|
||||
|
||||
# ── Helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
async def _create_account_and_user(db: AsyncSession, prefix: str):
|
||||
"""Create a fresh account + engineer user. Returns (account, user, plain_password)."""
|
||||
password = "TestPass123!"
|
||||
account = Account(
|
||||
name=f"{prefix}-corp",
|
||||
display_code=uuid.uuid4().hex[:8],
|
||||
)
|
||||
db.add(account)
|
||||
await db.flush()
|
||||
|
||||
user = User(
|
||||
email=f"{prefix}-{uuid.uuid4().hex[:6]}@example.com",
|
||||
name=f"{prefix} user",
|
||||
password_hash=get_password_hash(password),
|
||||
is_active=True,
|
||||
account_id=account.id,
|
||||
account_role="engineer",
|
||||
)
|
||||
db.add(user)
|
||||
await db.flush()
|
||||
return account, user, password
|
||||
|
||||
|
||||
async def _login(client: AsyncClient, email: str, password: str) -> dict:
|
||||
"""Log in and return Authorization headers."""
|
||||
resp = await client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"email": email, "password": password},
|
||||
)
|
||||
assert resp.status_code == 200, f"Login failed: {resp.text}"
|
||||
token = resp.json()["access_token"]
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
|
||||
async def _create_private_tree(db: AsyncSession, account: Account, user: User) -> Tree:
|
||||
"""Create a private tree owned by the given account/user."""
|
||||
tree = Tree(
|
||||
name=f"Private Tree {uuid.uuid4().hex[:6]}",
|
||||
account_id=account.id,
|
||||
author_id=user.id,
|
||||
visibility="private",
|
||||
tree_type="troubleshooting",
|
||||
tree_structure={"id": "root", "type": "start", "children": []},
|
||||
is_active=True,
|
||||
status="published",
|
||||
)
|
||||
db.add(tree)
|
||||
await db.flush()
|
||||
return tree
|
||||
|
||||
|
||||
# ── Task 3: Analytics flow endpoint ──────────────────────────────────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_analytics_flow_cannot_read_other_account_tree(
|
||||
client: AsyncClient, test_db: AsyncSession
|
||||
):
|
||||
"""Account A cannot read flow analytics for Account B's private tree."""
|
||||
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "anl-a")
|
||||
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "anl-b")
|
||||
tree_b = await _create_private_tree(test_db, acct_b, user_b)
|
||||
await test_db.commit()
|
||||
|
||||
headers_a = await _login(client, user_a.email, pass_a)
|
||||
|
||||
resp = await client.get(
|
||||
f"/api/v1/analytics/flows/{tree_b.id}",
|
||||
headers=headers_a,
|
||||
)
|
||||
assert resp.status_code == 404, f"Expected 404, got {resp.status_code}: {resp.text}"
|
||||
|
||||
|
||||
# ── Task 4: Category tree count ───────────────────────────────────────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_category_tree_count_scoped_to_account(
|
||||
client: AsyncClient, test_db: AsyncSession
|
||||
):
|
||||
"""tree_count on a category must not include trees from other accounts."""
|
||||
from app.models.category import TreeCategory
|
||||
|
||||
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "cat-a")
|
||||
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "cat-b")
|
||||
|
||||
# Shared category (account_id=None means global)
|
||||
category = TreeCategory(
|
||||
name="Shared Category",
|
||||
slug=f"shared-cat-{uuid.uuid4().hex[:6]}",
|
||||
account_id=None,
|
||||
is_active=True,
|
||||
)
|
||||
test_db.add(category)
|
||||
await test_db.flush()
|
||||
|
||||
# 3 trees for account_b under this category
|
||||
for i in range(3):
|
||||
tree = Tree(
|
||||
name=f"B Tree {i}",
|
||||
account_id=acct_b.id,
|
||||
author_id=user_b.id,
|
||||
category_id=category.id,
|
||||
visibility="team",
|
||||
tree_type="troubleshooting",
|
||||
tree_structure={"id": "root", "type": "start", "children": []},
|
||||
is_active=True,
|
||||
status="published",
|
||||
)
|
||||
test_db.add(tree)
|
||||
|
||||
# 1 tree for account_a under this category
|
||||
tree_a = Tree(
|
||||
name="A Tree",
|
||||
account_id=acct_a.id,
|
||||
author_id=user_a.id,
|
||||
category_id=category.id,
|
||||
visibility="team",
|
||||
tree_type="troubleshooting",
|
||||
tree_structure={"id": "root", "type": "start", "children": []},
|
||||
is_active=True,
|
||||
status="published",
|
||||
)
|
||||
test_db.add(tree_a)
|
||||
await test_db.commit()
|
||||
|
||||
headers_a = await _login(client, user_a.email, pass_a)
|
||||
|
||||
resp = await client.get(
|
||||
f"/api/v1/categories/{category.id}",
|
||||
headers=headers_a,
|
||||
)
|
||||
assert resp.status_code == 200, resp.text
|
||||
# account_a should only see their 1 tree, not account_b's 3
|
||||
assert resp.json()["tree_count"] == 1, (
|
||||
f"Expected tree_count=1 (own trees only), got {resp.json()['tree_count']}"
|
||||
)
|
||||
|
||||
|
||||
# ── Task 5: AI session search scope ──────────────────────────────────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ai_session_search_cannot_see_other_users_sessions(
|
||||
client: AsyncClient, test_db: AsyncSession
|
||||
):
|
||||
"""User A cannot find User B's AI sessions via the search endpoint,
|
||||
even when both users are in the same account."""
|
||||
from app.models.ai_session import AISession
|
||||
|
||||
# Two users in the SAME account
|
||||
account = Account(name="Shared Corp", display_code=uuid.uuid4().hex[:8])
|
||||
test_db.add(account)
|
||||
await test_db.flush()
|
||||
|
||||
password = "TestPass123!"
|
||||
user_a = User(
|
||||
email=f"user-a-{uuid.uuid4().hex[:6]}@shared.com",
|
||||
name="User A",
|
||||
password_hash=get_password_hash(password),
|
||||
is_active=True,
|
||||
account_id=account.id,
|
||||
account_role="engineer",
|
||||
)
|
||||
user_b = User(
|
||||
email=f"user-b-{uuid.uuid4().hex[:6]}@shared.com",
|
||||
name="User B",
|
||||
password_hash=get_password_hash(password),
|
||||
is_active=True,
|
||||
account_id=account.id,
|
||||
account_role="engineer",
|
||||
)
|
||||
test_db.add_all([user_a, user_b])
|
||||
await test_db.flush()
|
||||
|
||||
# Session belonging to user_b with distinctive problem_summary
|
||||
session_b = AISession(
|
||||
user_id=user_b.id,
|
||||
account_id=account.id,
|
||||
problem_summary="CONFIDENTIAL: user_b's session",
|
||||
problem_domain="networking",
|
||||
status="resolved",
|
||||
)
|
||||
test_db.add(session_b)
|
||||
await test_db.commit()
|
||||
|
||||
headers_a = await _login(client, user_a.email, password)
|
||||
|
||||
resp = await client.get(
|
||||
"/api/v1/ai-sessions/search",
|
||||
params={"q": "CONFIDENTIAL"},
|
||||
headers=headers_a,
|
||||
)
|
||||
assert resp.status_code == 200, resp.text
|
||||
results = resp.json()
|
||||
ids = [r["id"] for r in results]
|
||||
assert str(session_b.id) not in ids, (
|
||||
"User A can see User B's session via search — cross-user leak within account"
|
||||
)
|
||||
Reference in New Issue
Block a user