Files
resolutionflow/backend/tests/test_tenant_isolation_p0.py
chihlasm 61402f62a2 fix: address Task 6 quality review — rename helper, restore 403 for intra-account, add docs test
- Rename _get_tree_or_403 → _get_tree_or_404 in maintenance_schedules.py
  (function now raises 404, old name was misleading)
- Restore HTTP 403 for intra-account permission failures in update_tree:
  same-account users who can see a tree but can't edit it got 404 (wrong);
  only cross-account lookups should return 404 to avoid confirming existence
- Apply same 403/404 distinction to update_tree_visibility
- Add test: get_documentation must return 404 for cross-user session access
- Add comment documenting owner-only design for documentation endpoints

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-09 04:19:28 +00:00

579 lines
20 KiB
Python

"""Phase 0 tenant-isolation tests.
Verifies that endpoints respect account boundaries and don't leak data
across tenants. Each task group tests a specific endpoint fix.
"""
import uuid
import datetime as dt
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.account import Account
from app.models.user import User
from app.models.tree import Tree
from app.core.security import get_password_hash
# ── Helpers ──────────────────────────────────────────────────────────────────
async def _create_account_and_user(db: AsyncSession, prefix: str):
"""Create a fresh account + engineer user. Returns (account, user, plain_password)."""
password = "TestPass123!"
account = Account(
name=f"{prefix}-corp",
display_code=uuid.uuid4().hex[:8],
)
db.add(account)
await db.flush()
user = User(
email=f"{prefix}-{uuid.uuid4().hex[:6]}@example.com",
name=f"{prefix} user",
password_hash=get_password_hash(password),
is_active=True,
account_id=account.id,
account_role="engineer",
)
db.add(user)
await db.flush()
return account, user, password
async def _login(client: AsyncClient, email: str, password: str) -> dict:
"""Log in and return Authorization headers."""
resp = await client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
)
assert resp.status_code == 200, f"Login failed: {resp.text}"
token = resp.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
async def _create_private_tree(db: AsyncSession, account: Account, user: User) -> Tree:
"""Create a private tree owned by the given account/user."""
tree = Tree(
name=f"Private Tree {uuid.uuid4().hex[:6]}",
account_id=account.id,
author_id=user.id,
visibility="private",
tree_type="troubleshooting",
tree_structure={"id": "root", "type": "start", "children": []},
is_active=True,
status="published",
)
db.add(tree)
await db.flush()
return tree
# ── Task 3: Analytics flow endpoint ──────────────────────────────────────────
@pytest.mark.asyncio
async def test_analytics_flow_cannot_read_other_account_tree(
client: AsyncClient, test_db: AsyncSession
):
"""Account A cannot read flow analytics for Account B's private tree."""
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "anl-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "anl-b")
tree_b = await _create_private_tree(test_db, acct_b, user_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(
f"/api/v1/analytics/flows/{tree_b.id}",
headers=headers_a,
)
assert resp.status_code == 404, f"Expected 404, got {resp.status_code}: {resp.text}"
# ── Task 4: Category tree count ───────────────────────────────────────────────
@pytest.mark.asyncio
async def test_category_tree_count_scoped_to_account(
client: AsyncClient, test_db: AsyncSession
):
"""tree_count on a category must not include trees from other accounts."""
from app.models.category import TreeCategory
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "cat-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "cat-b")
# Shared category (account_id=None means global)
category = TreeCategory(
name="Shared Category",
slug=f"shared-cat-{uuid.uuid4().hex[:6]}",
account_id=None,
is_active=True,
)
test_db.add(category)
await test_db.flush()
# 3 trees for account_b under this category
for i in range(3):
tree = Tree(
name=f"B Tree {i}",
account_id=acct_b.id,
author_id=user_b.id,
category_id=category.id,
visibility="team",
tree_type="troubleshooting",
tree_structure={"id": "root", "type": "start", "children": []},
is_active=True,
status="published",
)
test_db.add(tree)
# 1 tree for account_a under this category
tree_a = Tree(
name="A Tree",
account_id=acct_a.id,
author_id=user_a.id,
category_id=category.id,
visibility="team",
tree_type="troubleshooting",
tree_structure={"id": "root", "type": "start", "children": []},
is_active=True,
status="published",
)
test_db.add(tree_a)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(
f"/api/v1/categories/{category.id}",
headers=headers_a,
)
assert resp.status_code == 200, resp.text
# account_a should only see their 1 tree, not account_b's 3
assert resp.json()["tree_count"] == 1, (
f"Expected tree_count=1 (own trees only), got {resp.json()['tree_count']}"
)
# ── Task 5: AI session search scope ──────────────────────────────────────────
@pytest.mark.asyncio
async def test_ai_session_search_cannot_see_other_users_sessions(
client: AsyncClient, test_db: AsyncSession
):
"""User A cannot find User B's AI sessions via the search endpoint,
even when both users are in the same account."""
from app.models.ai_session import AISession
# Two users in the SAME account
account = Account(name="Shared Corp", display_code=uuid.uuid4().hex[:8])
test_db.add(account)
await test_db.flush()
password = "TestPass123!"
user_a = User(
email=f"user-a-{uuid.uuid4().hex[:6]}@shared.com",
name="User A",
password_hash=get_password_hash(password),
is_active=True,
account_id=account.id,
account_role="engineer",
)
user_b = User(
email=f"user-b-{uuid.uuid4().hex[:6]}@shared.com",
name="User B",
password_hash=get_password_hash(password),
is_active=True,
account_id=account.id,
account_role="engineer",
)
test_db.add_all([user_a, user_b])
await test_db.flush()
# Session belonging to user_b with distinctive problem_summary
session_b = AISession(
user_id=user_b.id,
account_id=account.id,
problem_summary="CONFIDENTIAL: user_b's session",
problem_domain="networking",
status="resolved",
)
test_db.add(session_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, password)
resp = await client.get(
"/api/v1/ai-sessions/search",
params={"q": "CONFIDENTIAL"},
headers=headers_a,
)
assert resp.status_code == 200, resp.text
results = resp.json()
ids = [r["id"] for r in results]
assert str(session_b.id) not in ids, (
"User A can see User B's session via search — cross-user leak within account"
)
# ── Task 6: Cross-tenant UUID audit ─────────────────────────────────────────
@pytest.mark.asyncio
async def test_get_tree_returns_404_not_403_for_other_account(
client: AsyncClient, test_db: AsyncSession
):
"""Account A gets 404 (not 403) when accessing Account B's private tree."""
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-tree-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-tree-b")
tree_b = await _create_private_tree(test_db, acct_b, user_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(f"/api/v1/trees/{tree_b.id}", headers=headers_a)
assert resp.status_code == 404, (
f"Expected 404 for cross-tenant tree access, got {resp.status_code}"
)
@pytest.mark.asyncio
async def test_update_tree_returns_404_not_403_for_other_account(
client: AsyncClient, test_db: AsyncSession
):
"""Account A gets 404 (not 403) when trying to update Account B's tree."""
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-upd-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-upd-b")
tree_b = await _create_private_tree(test_db, acct_b, user_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.put(
f"/api/v1/trees/{tree_b.id}",
json={"name": "Hacked"},
headers=headers_a,
)
assert resp.status_code == 404, (
f"Expected 404 for cross-tenant tree update, got {resp.status_code}"
)
@pytest.mark.asyncio
async def test_get_session_returns_404_not_403_for_other_user(
client: AsyncClient, test_db: AsyncSession
):
"""User A gets 404 (not 403) when accessing User B's session."""
from app.models.session import Session
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-sess-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-sess-b")
tree_b = await _create_private_tree(test_db, acct_b, user_b)
session_b = Session(
tree_id=tree_b.id,
user_id=user_b.id,
tree_snapshot={"id": "root", "type": "start", "children": []},
path_taken=[],
decisions=[],
)
test_db.add(session_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(f"/api/v1/sessions/{session_b.id}", headers=headers_a)
assert resp.status_code == 404, (
f"Expected 404 for cross-user session access, got {resp.status_code}"
)
@pytest.mark.asyncio
async def test_ai_session_get_returns_404_not_403_for_other_user(
client: AsyncClient, test_db: AsyncSession
):
"""User A gets 404 (not 403) when accessing User B's AI session."""
from app.models.ai_session import AISession
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-ais-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-ais-b")
ai_session_b = AISession(
user_id=user_b.id,
account_id=acct_b.id,
problem_summary="Test session",
problem_domain="networking",
status="active",
)
test_db.add(ai_session_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(f"/api/v1/ai-sessions/{ai_session_b.id}", headers=headers_a)
assert resp.status_code == 404, (
f"Expected 404 for cross-user AI session access, got {resp.status_code}"
)
@pytest.mark.asyncio
async def test_ai_session_retry_psa_push_requires_ownership(
client: AsyncClient, test_db: AsyncSession
):
"""User A cannot retry PSA push for User B's AI session."""
from app.models.ai_session import AISession
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-psa-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-psa-b")
ai_session_b = AISession(
user_id=user_b.id,
account_id=acct_b.id,
problem_summary="PSA test",
problem_domain="networking",
status="resolved",
)
test_db.add(ai_session_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.post(
f"/api/v1/ai-sessions/{ai_session_b.id}/retry-psa-push",
headers=headers_a,
)
assert resp.status_code == 404, (
f"Expected 404 for cross-user retry-psa-push, got {resp.status_code}"
)
@pytest.mark.asyncio
async def test_upload_url_returns_404_not_403_for_other_account(
client: AsyncClient, test_db: AsyncSession
):
"""User A gets 404 (not 403) when accessing User B's upload URL."""
from app.models.file_upload import FileUpload
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-upl-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-upl-b")
upload_b = FileUpload(
account_id=acct_b.id,
uploaded_by=user_b.id,
filename="secret.png",
content_type="image/png",
size_bytes=1024,
storage_key="test/secret.png",
)
test_db.add(upload_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(f"/api/v1/uploads/{upload_b.id}/url", headers=headers_a)
assert resp.status_code in (404, 503), (
f"Expected 404 (or 503 if storage not configured) for cross-account upload, got {resp.status_code}"
)
@pytest.mark.asyncio
async def test_share_revoke_returns_404_not_403_for_other_user(
client: AsyncClient, test_db: AsyncSession
):
"""User A gets 404 (not 403) when revoking User B's share."""
from app.models.session import Session
from app.models.session_share import SessionShare
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-shr-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-shr-b")
tree_b = await _create_private_tree(test_db, acct_b, user_b)
session_b = Session(
tree_id=tree_b.id,
user_id=user_b.id,
tree_snapshot={"id": "root", "type": "start", "children": []},
path_taken=[],
decisions=[],
)
test_db.add(session_b)
await test_db.flush()
share_b = SessionShare(
session_id=session_b.id,
account_id=acct_b.id,
share_token="test-token-unique-" + uuid.uuid4().hex[:8],
share_name="Test",
visibility="public",
created_by=user_b.id,
)
test_db.add(share_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.delete(f"/api/v1/shares/{share_b.id}", headers=headers_a)
assert resp.status_code == 404, (
f"Expected 404 for cross-user share revoke, got {resp.status_code}"
)
# ── Task 6 (continued): steps, tags, step_categories, maintenance_schedules ──
@pytest.mark.asyncio
async def test_cannot_access_other_account_step(
client: AsyncClient, test_db: AsyncSession
):
"""User A gets 404 when reading a team-visibility step owned by Account B."""
from app.models.step_library import StepLibrary
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-step-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-step-b")
# Create a team-visibility step owned by account B
step_b = StepLibrary(
title="Account B Confidential Step",
step_type="action",
content={"description": "secret step"},
created_by=user_b.id,
account_id=acct_b.id,
visibility="team",
is_active=True,
)
test_db.add(step_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(f"/api/v1/steps/{step_b.id}", headers=headers_a)
assert resp.status_code == 404, (
f"Expected 404 for cross-account step access, got {resp.status_code}: {resp.text}"
)
@pytest.mark.asyncio
async def test_cannot_access_other_account_tag(
client: AsyncClient, test_db: AsyncSession
):
"""User A gets 404 when reading a tag scoped to Account B."""
from app.models.tag import TreeTag
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-tag-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-tag-b")
# Create an account-scoped tag for account B
tag_b = TreeTag(
name=f"account-b-tag-{uuid.uuid4().hex[:6]}",
slug=f"account-b-tag-{uuid.uuid4().hex[:6]}",
account_id=acct_b.id,
)
test_db.add(tag_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(f"/api/v1/tags/{tag_b.id}", headers=headers_a)
assert resp.status_code == 404, (
f"Expected 404 for cross-account tag access, got {resp.status_code}: {resp.text}"
)
@pytest.mark.asyncio
async def test_cannot_access_other_account_step_category(
client: AsyncClient, test_db: AsyncSession
):
"""User A gets 404 when reading a step category scoped to Account B."""
from app.models.step_category import StepCategory
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-scat-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-scat-b")
# Create an account-scoped step category for account B
category_b = StepCategory(
name=f"Account B Category {uuid.uuid4().hex[:6]}",
slug=f"account-b-cat-{uuid.uuid4().hex[:6]}",
account_id=acct_b.id,
is_active=True,
)
test_db.add(category_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(f"/api/v1/step-categories/{category_b.id}", headers=headers_a)
assert resp.status_code == 404, (
f"Expected 404 for cross-account step category access, got {resp.status_code}: {resp.text}"
)
@pytest.mark.asyncio
async def test_maintenance_schedule_returns_404_for_other_team(
client: AsyncClient, test_db: AsyncSession
):
"""User A gets 404 when reading a maintenance schedule belonging to Team B's tree."""
from app.models.team import Team
from app.models.maintenance_schedule import MaintenanceSchedule
# Create two separate teams
team_a = Team(name="Team A Corp")
team_b = Team(name="Team B Corp")
test_db.add_all([team_a, team_b])
await test_db.flush()
# Create accounts and users, assign to respective teams
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-ms-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-ms-b")
user_a.team_id = team_a.id
user_b.team_id = team_b.id
await test_db.flush()
# Create a maintenance tree owned by team B
tree_b = Tree(
name="Team B Maintenance Flow",
account_id=acct_b.id,
author_id=user_b.id,
team_id=team_b.id,
visibility="team",
tree_type="maintenance",
tree_structure={"id": "root", "type": "start", "children": []},
is_active=True,
status="published",
)
test_db.add(tree_b)
await test_db.flush()
# Create a schedule for that tree
schedule_b = MaintenanceSchedule(
tree_id=tree_b.id,
created_by=user_b.id,
cron_expression="0 2 * * 0",
timezone="UTC",
is_active=True,
next_run_at=dt.datetime(2026, 12, 31, tzinfo=dt.timezone.utc),
)
test_db.add(schedule_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(f"/api/v1/maintenance-schedules/tree/{tree_b.id}", headers=headers_a)
assert resp.status_code == 404, (
f"Expected 404 for cross-team maintenance schedule access, got {resp.status_code}: {resp.text}"
)
@pytest.mark.asyncio
async def test_get_documentation_returns_404_for_other_user_session(
client: AsyncClient, test_db: AsyncSession
):
"""GET /ai-sessions/{id}/documentation must return 404 (not 403) for cross-user access."""
from app.models.ai_session import AISession
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "doc-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "doc-b")
session_b = AISession(
user_id=user_b.id,
account_id=acct_b.id,
problem_summary="B's confidential session",
problem_domain="networking",
status="resolved",
)
test_db.add(session_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(
f"/api/v1/ai-sessions/{session_b.id}/documentation",
headers=headers_a,
)
assert resp.status_code == 404, f"Expected 404, got {resp.status_code}: {resp.text}"