From 057c0abe16e2709d3d14d7b70e5069db9a4e34e7 Mon Sep 17 00:00:00 2001 From: chihlasm Date: Thu, 9 Apr 2026 03:53:57 +0000 Subject: [PATCH] fix: scope analytics/flows/{tree_id} to requesting account Any authenticated user could read flow analytics (session counts, completion rates, CSAT) for any tree UUID. Now returns 404 if the tree doesn't belong to the requesting account. Co-Authored-By: Claude Sonnet 4.6 --- backend/app/api/endpoints/analytics.py | 10 +- backend/tests/test_tenant_isolation_p0.py | 214 ++++++++++++++++++++++ 2 files changed, 222 insertions(+), 2 deletions(-) create mode 100644 backend/tests/test_tenant_isolation_p0.py diff --git a/backend/app/api/endpoints/analytics.py b/backend/app/api/endpoints/analytics.py index 41a3b015..d4591b81 100644 --- a/backend/app/api/endpoints/analytics.py +++ b/backend/app/api/endpoints/analytics.py @@ -7,6 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_db from app.api.deps import get_current_active_user +from app.core.filters import tenant_filter from app.models import User, Session, Tree, SessionRating from app.schemas.analytics import ( TeamAnalyticsResponse, PersonalAnalyticsResponse, FlowAnalyticsResponse, @@ -290,8 +291,13 @@ async def get_flow_analytics( current_user: User = Depends(get_current_active_user), ): """Analytics for a specific flow.""" - # Verify tree exists - result = await db.execute(select(Tree).where(Tree.id == tree_id)) + # Verify tree exists and belongs to the requesting user's account. + result = await db.execute( + select(Tree).where( + Tree.id == tree_id, + tenant_filter(Tree, current_user.account_id), + ) + ) tree = result.scalar_one_or_none() if not tree: raise HTTPException(status_code=404, detail="Flow not found") diff --git a/backend/tests/test_tenant_isolation_p0.py b/backend/tests/test_tenant_isolation_p0.py new file mode 100644 index 00000000..79018dfe --- /dev/null +++ b/backend/tests/test_tenant_isolation_p0.py @@ -0,0 +1,214 @@ +"""Phase 0 tenant-isolation tests. + +Verifies that endpoints respect account boundaries and don't leak data +across tenants. Each task group tests a specific endpoint fix. +""" +import uuid +import pytest +from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.account import Account +from app.models.user import User +from app.models.tree import Tree +from app.core.security import get_password_hash + + +# ── Helpers ────────────────────────────────────────────────────────────────── + +async def _create_account_and_user(db: AsyncSession, prefix: str): + """Create a fresh account + engineer user. Returns (account, user, plain_password).""" + password = "TestPass123!" + account = Account( + name=f"{prefix}-corp", + display_code=uuid.uuid4().hex[:8], + ) + db.add(account) + await db.flush() + + user = User( + email=f"{prefix}-{uuid.uuid4().hex[:6]}@example.com", + name=f"{prefix} user", + password_hash=get_password_hash(password), + is_active=True, + account_id=account.id, + account_role="engineer", + ) + db.add(user) + await db.flush() + return account, user, password + + +async def _login(client: AsyncClient, email: str, password: str) -> dict: + """Log in and return Authorization headers.""" + resp = await client.post( + "/api/v1/auth/login", + json={"email": email, "password": password}, + ) + assert resp.status_code == 200, f"Login failed: {resp.text}" + token = resp.json()["access_token"] + return {"Authorization": f"Bearer {token}"} + + +async def _create_private_tree(db: AsyncSession, account: Account, user: User) -> Tree: + """Create a private tree owned by the given account/user.""" + tree = Tree( + name=f"Private Tree {uuid.uuid4().hex[:6]}", + account_id=account.id, + author_id=user.id, + visibility="private", + tree_type="troubleshooting", + tree_structure={"id": "root", "type": "start", "children": []}, + is_active=True, + status="published", + ) + db.add(tree) + await db.flush() + return tree + + +# ── Task 3: Analytics flow endpoint ────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_analytics_flow_cannot_read_other_account_tree( + client: AsyncClient, test_db: AsyncSession +): + """Account A cannot read flow analytics for Account B's private tree.""" + acct_a, user_a, pass_a = await _create_account_and_user(test_db, "anl-a") + acct_b, user_b, pass_b = await _create_account_and_user(test_db, "anl-b") + tree_b = await _create_private_tree(test_db, acct_b, user_b) + await test_db.commit() + + headers_a = await _login(client, user_a.email, pass_a) + + resp = await client.get( + f"/api/v1/analytics/flows/{tree_b.id}", + headers=headers_a, + ) + assert resp.status_code == 404, f"Expected 404, got {resp.status_code}: {resp.text}" + + +# ── Task 4: Category tree count ─────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_category_tree_count_scoped_to_account( + client: AsyncClient, test_db: AsyncSession +): + """tree_count on a category must not include trees from other accounts.""" + from app.models.category import TreeCategory + + acct_a, user_a, pass_a = await _create_account_and_user(test_db, "cat-a") + acct_b, user_b, pass_b = await _create_account_and_user(test_db, "cat-b") + + # Shared category (account_id=None means global) + category = TreeCategory( + name="Shared Category", + slug=f"shared-cat-{uuid.uuid4().hex[:6]}", + account_id=None, + is_active=True, + ) + test_db.add(category) + await test_db.flush() + + # 3 trees for account_b under this category + for i in range(3): + tree = Tree( + name=f"B Tree {i}", + account_id=acct_b.id, + author_id=user_b.id, + category_id=category.id, + visibility="team", + tree_type="troubleshooting", + tree_structure={"id": "root", "type": "start", "children": []}, + is_active=True, + status="published", + ) + test_db.add(tree) + + # 1 tree for account_a under this category + tree_a = Tree( + name="A Tree", + account_id=acct_a.id, + author_id=user_a.id, + category_id=category.id, + visibility="team", + tree_type="troubleshooting", + tree_structure={"id": "root", "type": "start", "children": []}, + is_active=True, + status="published", + ) + test_db.add(tree_a) + await test_db.commit() + + headers_a = await _login(client, user_a.email, pass_a) + + resp = await client.get( + f"/api/v1/categories/{category.id}", + headers=headers_a, + ) + assert resp.status_code == 200, resp.text + # account_a should only see their 1 tree, not account_b's 3 + assert resp.json()["tree_count"] == 1, ( + f"Expected tree_count=1 (own trees only), got {resp.json()['tree_count']}" + ) + + +# ── Task 5: AI session search scope ────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_ai_session_search_cannot_see_other_users_sessions( + client: AsyncClient, test_db: AsyncSession +): + """User A cannot find User B's AI sessions via the search endpoint, + even when both users are in the same account.""" + from app.models.ai_session import AISession + + # Two users in the SAME account + account = Account(name="Shared Corp", display_code=uuid.uuid4().hex[:8]) + test_db.add(account) + await test_db.flush() + + password = "TestPass123!" + user_a = User( + email=f"user-a-{uuid.uuid4().hex[:6]}@shared.com", + name="User A", + password_hash=get_password_hash(password), + is_active=True, + account_id=account.id, + account_role="engineer", + ) + user_b = User( + email=f"user-b-{uuid.uuid4().hex[:6]}@shared.com", + name="User B", + password_hash=get_password_hash(password), + is_active=True, + account_id=account.id, + account_role="engineer", + ) + test_db.add_all([user_a, user_b]) + await test_db.flush() + + # Session belonging to user_b with distinctive problem_summary + session_b = AISession( + user_id=user_b.id, + account_id=account.id, + problem_summary="CONFIDENTIAL: user_b's session", + problem_domain="networking", + status="resolved", + ) + test_db.add(session_b) + await test_db.commit() + + headers_a = await _login(client, user_a.email, password) + + resp = await client.get( + "/api/v1/ai-sessions/search", + params={"q": "CONFIDENTIAL"}, + headers=headers_a, + ) + assert resp.status_code == 200, resp.text + results = resp.json() + ids = [r["id"] for r in results] + assert str(session_b.id) not in ids, ( + "User A can see User B's session via search — cross-user leak within account" + )