Any authenticated user could read flow analytics (session counts, completion rates, CSAT) for any tree UUID. Now returns 404 if the tree doesn't belong to the requesting account. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
215 lines
7.2 KiB
Python
215 lines
7.2 KiB
Python
"""Phase 0 tenant-isolation tests.
|
|
|
|
Verifies that endpoints respect account boundaries and don't leak data
|
|
across tenants. Each task group tests a specific endpoint fix.
|
|
"""
|
|
import uuid
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.account import Account
|
|
from app.models.user import User
|
|
from app.models.tree import Tree
|
|
from app.core.security import get_password_hash
|
|
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
async def _create_account_and_user(db: AsyncSession, prefix: str):
|
|
"""Create a fresh account + engineer user. Returns (account, user, plain_password)."""
|
|
password = "TestPass123!"
|
|
account = Account(
|
|
name=f"{prefix}-corp",
|
|
display_code=uuid.uuid4().hex[:8],
|
|
)
|
|
db.add(account)
|
|
await db.flush()
|
|
|
|
user = User(
|
|
email=f"{prefix}-{uuid.uuid4().hex[:6]}@example.com",
|
|
name=f"{prefix} user",
|
|
password_hash=get_password_hash(password),
|
|
is_active=True,
|
|
account_id=account.id,
|
|
account_role="engineer",
|
|
)
|
|
db.add(user)
|
|
await db.flush()
|
|
return account, user, password
|
|
|
|
|
|
async def _login(client: AsyncClient, email: str, password: str) -> dict:
|
|
"""Log in and return Authorization headers."""
|
|
resp = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": email, "password": password},
|
|
)
|
|
assert resp.status_code == 200, f"Login failed: {resp.text}"
|
|
token = resp.json()["access_token"]
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
async def _create_private_tree(db: AsyncSession, account: Account, user: User) -> Tree:
|
|
"""Create a private tree owned by the given account/user."""
|
|
tree = Tree(
|
|
name=f"Private Tree {uuid.uuid4().hex[:6]}",
|
|
account_id=account.id,
|
|
author_id=user.id,
|
|
visibility="private",
|
|
tree_type="troubleshooting",
|
|
tree_structure={"id": "root", "type": "start", "children": []},
|
|
is_active=True,
|
|
status="published",
|
|
)
|
|
db.add(tree)
|
|
await db.flush()
|
|
return tree
|
|
|
|
|
|
# ── Task 3: Analytics flow endpoint ──────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analytics_flow_cannot_read_other_account_tree(
|
|
client: AsyncClient, test_db: AsyncSession
|
|
):
|
|
"""Account A cannot read flow analytics for Account B's private tree."""
|
|
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "anl-a")
|
|
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "anl-b")
|
|
tree_b = await _create_private_tree(test_db, acct_b, user_b)
|
|
await test_db.commit()
|
|
|
|
headers_a = await _login(client, user_a.email, pass_a)
|
|
|
|
resp = await client.get(
|
|
f"/api/v1/analytics/flows/{tree_b.id}",
|
|
headers=headers_a,
|
|
)
|
|
assert resp.status_code == 404, f"Expected 404, got {resp.status_code}: {resp.text}"
|
|
|
|
|
|
# ── Task 4: Category tree count ───────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_category_tree_count_scoped_to_account(
|
|
client: AsyncClient, test_db: AsyncSession
|
|
):
|
|
"""tree_count on a category must not include trees from other accounts."""
|
|
from app.models.category import TreeCategory
|
|
|
|
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "cat-a")
|
|
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "cat-b")
|
|
|
|
# Shared category (account_id=None means global)
|
|
category = TreeCategory(
|
|
name="Shared Category",
|
|
slug=f"shared-cat-{uuid.uuid4().hex[:6]}",
|
|
account_id=None,
|
|
is_active=True,
|
|
)
|
|
test_db.add(category)
|
|
await test_db.flush()
|
|
|
|
# 3 trees for account_b under this category
|
|
for i in range(3):
|
|
tree = Tree(
|
|
name=f"B Tree {i}",
|
|
account_id=acct_b.id,
|
|
author_id=user_b.id,
|
|
category_id=category.id,
|
|
visibility="team",
|
|
tree_type="troubleshooting",
|
|
tree_structure={"id": "root", "type": "start", "children": []},
|
|
is_active=True,
|
|
status="published",
|
|
)
|
|
test_db.add(tree)
|
|
|
|
# 1 tree for account_a under this category
|
|
tree_a = Tree(
|
|
name="A Tree",
|
|
account_id=acct_a.id,
|
|
author_id=user_a.id,
|
|
category_id=category.id,
|
|
visibility="team",
|
|
tree_type="troubleshooting",
|
|
tree_structure={"id": "root", "type": "start", "children": []},
|
|
is_active=True,
|
|
status="published",
|
|
)
|
|
test_db.add(tree_a)
|
|
await test_db.commit()
|
|
|
|
headers_a = await _login(client, user_a.email, pass_a)
|
|
|
|
resp = await client.get(
|
|
f"/api/v1/categories/{category.id}",
|
|
headers=headers_a,
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
# account_a should only see their 1 tree, not account_b's 3
|
|
assert resp.json()["tree_count"] == 1, (
|
|
f"Expected tree_count=1 (own trees only), got {resp.json()['tree_count']}"
|
|
)
|
|
|
|
|
|
# ── Task 5: AI session search scope ──────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ai_session_search_cannot_see_other_users_sessions(
|
|
client: AsyncClient, test_db: AsyncSession
|
|
):
|
|
"""User A cannot find User B's AI sessions via the search endpoint,
|
|
even when both users are in the same account."""
|
|
from app.models.ai_session import AISession
|
|
|
|
# Two users in the SAME account
|
|
account = Account(name="Shared Corp", display_code=uuid.uuid4().hex[:8])
|
|
test_db.add(account)
|
|
await test_db.flush()
|
|
|
|
password = "TestPass123!"
|
|
user_a = User(
|
|
email=f"user-a-{uuid.uuid4().hex[:6]}@shared.com",
|
|
name="User A",
|
|
password_hash=get_password_hash(password),
|
|
is_active=True,
|
|
account_id=account.id,
|
|
account_role="engineer",
|
|
)
|
|
user_b = User(
|
|
email=f"user-b-{uuid.uuid4().hex[:6]}@shared.com",
|
|
name="User B",
|
|
password_hash=get_password_hash(password),
|
|
is_active=True,
|
|
account_id=account.id,
|
|
account_role="engineer",
|
|
)
|
|
test_db.add_all([user_a, user_b])
|
|
await test_db.flush()
|
|
|
|
# Session belonging to user_b with distinctive problem_summary
|
|
session_b = AISession(
|
|
user_id=user_b.id,
|
|
account_id=account.id,
|
|
problem_summary="CONFIDENTIAL: user_b's session",
|
|
problem_domain="networking",
|
|
status="resolved",
|
|
)
|
|
test_db.add(session_b)
|
|
await test_db.commit()
|
|
|
|
headers_a = await _login(client, user_a.email, password)
|
|
|
|
resp = await client.get(
|
|
"/api/v1/ai-sessions/search",
|
|
params={"q": "CONFIDENTIAL"},
|
|
headers=headers_a,
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
results = resp.json()
|
|
ids = [r["id"] for r in results]
|
|
assert str(session_b.id) not in ids, (
|
|
"User A can see User B's session via search — cross-user leak within account"
|
|
)
|