"""Phase 0 tenant-isolation tests. Verifies that endpoints respect account boundaries and don't leak data across tenants. Each task group tests a specific endpoint fix. """ import uuid import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from app.models.account import Account from app.models.user import User from app.models.tree import Tree from app.core.security import get_password_hash # ── Helpers ────────────────────────────────────────────────────────────────── async def _create_account_and_user(db: AsyncSession, prefix: str): """Create a fresh account + engineer user. Returns (account, user, plain_password).""" password = "TestPass123!" account = Account( name=f"{prefix}-corp", display_code=uuid.uuid4().hex[:8], ) db.add(account) await db.flush() user = User( email=f"{prefix}-{uuid.uuid4().hex[:6]}@example.com", name=f"{prefix} user", password_hash=get_password_hash(password), is_active=True, account_id=account.id, account_role="engineer", ) db.add(user) await db.flush() return account, user, password async def _login(client: AsyncClient, email: str, password: str) -> dict: """Log in and return Authorization headers.""" resp = await client.post( "/api/v1/auth/login", json={"email": email, "password": password}, ) assert resp.status_code == 200, f"Login failed: {resp.text}" token = resp.json()["access_token"] return {"Authorization": f"Bearer {token}"} async def _create_private_tree(db: AsyncSession, account: Account, user: User) -> Tree: """Create a private tree owned by the given account/user.""" tree = Tree( name=f"Private Tree {uuid.uuid4().hex[:6]}", account_id=account.id, author_id=user.id, visibility="private", tree_type="troubleshooting", tree_structure={"id": "root", "type": "start", "children": []}, is_active=True, status="published", ) db.add(tree) await db.flush() return tree # ── Task 3: Analytics flow endpoint ────────────────────────────────────────── @pytest.mark.asyncio async def test_analytics_flow_cannot_read_other_account_tree( client: AsyncClient, test_db: AsyncSession ): """Account A cannot read flow analytics for Account B's private tree.""" acct_a, user_a, pass_a = await _create_account_and_user(test_db, "anl-a") acct_b, user_b, pass_b = await _create_account_and_user(test_db, "anl-b") tree_b = await _create_private_tree(test_db, acct_b, user_b) await test_db.commit() headers_a = await _login(client, user_a.email, pass_a) resp = await client.get( f"/api/v1/analytics/flows/{tree_b.id}", headers=headers_a, ) assert resp.status_code == 404, f"Expected 404, got {resp.status_code}: {resp.text}" # ── Task 4: Category tree count ─────────────────────────────────────────────── @pytest.mark.asyncio async def test_category_tree_count_scoped_to_account( client: AsyncClient, test_db: AsyncSession ): """tree_count on a category must not include trees from other accounts.""" from app.models.category import TreeCategory acct_a, user_a, pass_a = await _create_account_and_user(test_db, "cat-a") acct_b, user_b, pass_b = await _create_account_and_user(test_db, "cat-b") # Shared category (account_id=None means global) category = TreeCategory( name="Shared Category", slug=f"shared-cat-{uuid.uuid4().hex[:6]}", account_id=None, is_active=True, ) test_db.add(category) await test_db.flush() # 3 trees for account_b under this category for i in range(3): tree = Tree( name=f"B Tree {i}", account_id=acct_b.id, author_id=user_b.id, category_id=category.id, visibility="team", tree_type="troubleshooting", tree_structure={"id": "root", "type": "start", "children": []}, is_active=True, status="published", ) test_db.add(tree) # 1 tree for account_a under this category tree_a = Tree( name="A Tree", account_id=acct_a.id, author_id=user_a.id, category_id=category.id, visibility="team", tree_type="troubleshooting", tree_structure={"id": "root", "type": "start", "children": []}, is_active=True, status="published", ) test_db.add(tree_a) await test_db.commit() headers_a = await _login(client, user_a.email, pass_a) resp = await client.get( f"/api/v1/categories/{category.id}", headers=headers_a, ) assert resp.status_code == 200, resp.text # account_a should only see their 1 tree, not account_b's 3 assert resp.json()["tree_count"] == 1, ( f"Expected tree_count=1 (own trees only), got {resp.json()['tree_count']}" ) # ── Task 5: AI session search scope ────────────────────────────────────────── @pytest.mark.asyncio async def test_ai_session_search_cannot_see_other_users_sessions( client: AsyncClient, test_db: AsyncSession ): """User A cannot find User B's AI sessions via the search endpoint, even when both users are in the same account.""" from app.models.ai_session import AISession # Two users in the SAME account account = Account(name="Shared Corp", display_code=uuid.uuid4().hex[:8]) test_db.add(account) await test_db.flush() password = "TestPass123!" user_a = User( email=f"user-a-{uuid.uuid4().hex[:6]}@shared.com", name="User A", password_hash=get_password_hash(password), is_active=True, account_id=account.id, account_role="engineer", ) user_b = User( email=f"user-b-{uuid.uuid4().hex[:6]}@shared.com", name="User B", password_hash=get_password_hash(password), is_active=True, account_id=account.id, account_role="engineer", ) test_db.add_all([user_a, user_b]) await test_db.flush() # Session belonging to user_b with distinctive problem_summary session_b = AISession( user_id=user_b.id, account_id=account.id, problem_summary="CONFIDENTIAL: user_b's session", problem_domain="networking", status="resolved", ) test_db.add(session_b) await test_db.commit() headers_a = await _login(client, user_a.email, password) resp = await client.get( "/api/v1/ai-sessions/search", params={"q": "CONFIDENTIAL"}, headers=headers_a, ) assert resp.status_code == 200, resp.text results = resp.json() ids = [r["id"] for r in results] assert str(session_b.id) not in ids, ( "User A can see User B's session via search — cross-user leak within account" ) # ── Task 6: Cross-tenant UUID audit ───────────────────────────────────────── @pytest.mark.asyncio async def test_get_tree_returns_404_not_403_for_other_account( client: AsyncClient, test_db: AsyncSession ): """Account A gets 404 (not 403) when accessing Account B's private tree.""" acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-tree-a") acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-tree-b") tree_b = await _create_private_tree(test_db, acct_b, user_b) await test_db.commit() headers_a = await _login(client, user_a.email, pass_a) resp = await client.get(f"/api/v1/trees/{tree_b.id}", headers=headers_a) assert resp.status_code == 404, ( f"Expected 404 for cross-tenant tree access, got {resp.status_code}" ) @pytest.mark.asyncio async def test_update_tree_returns_404_not_403_for_other_account( client: AsyncClient, test_db: AsyncSession ): """Account A gets 404 (not 403) when trying to update Account B's tree.""" acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-upd-a") acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-upd-b") tree_b = await _create_private_tree(test_db, acct_b, user_b) await test_db.commit() headers_a = await _login(client, user_a.email, pass_a) resp = await client.put( f"/api/v1/trees/{tree_b.id}", json={"name": "Hacked"}, headers=headers_a, ) assert resp.status_code == 404, ( f"Expected 404 for cross-tenant tree update, got {resp.status_code}" ) @pytest.mark.asyncio async def test_get_session_returns_404_not_403_for_other_user( client: AsyncClient, test_db: AsyncSession ): """User A gets 404 (not 403) when accessing User B's session.""" from app.models.session import Session acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-sess-a") acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-sess-b") tree_b = await _create_private_tree(test_db, acct_b, user_b) session_b = Session( tree_id=tree_b.id, user_id=user_b.id, tree_snapshot={"id": "root", "type": "start", "children": []}, path_taken=[], decisions=[], ) test_db.add(session_b) await test_db.commit() headers_a = await _login(client, user_a.email, pass_a) resp = await client.get(f"/api/v1/sessions/{session_b.id}", headers=headers_a) assert resp.status_code == 404, ( f"Expected 404 for cross-user session access, got {resp.status_code}" ) @pytest.mark.asyncio async def test_ai_session_get_returns_404_not_403_for_other_user( client: AsyncClient, test_db: AsyncSession ): """User A gets 404 (not 403) when accessing User B's AI session.""" from app.models.ai_session import AISession acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-ais-a") acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-ais-b") ai_session_b = AISession( user_id=user_b.id, account_id=acct_b.id, problem_summary="Test session", problem_domain="networking", status="active", ) test_db.add(ai_session_b) await test_db.commit() headers_a = await _login(client, user_a.email, pass_a) resp = await client.get(f"/api/v1/ai-sessions/{ai_session_b.id}", headers=headers_a) assert resp.status_code == 404, ( f"Expected 404 for cross-user AI session access, got {resp.status_code}" ) @pytest.mark.asyncio async def test_ai_session_retry_psa_push_requires_ownership( client: AsyncClient, test_db: AsyncSession ): """User A cannot retry PSA push for User B's AI session.""" from app.models.ai_session import AISession acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-psa-a") acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-psa-b") ai_session_b = AISession( user_id=user_b.id, account_id=acct_b.id, problem_summary="PSA test", problem_domain="networking", status="resolved", ) test_db.add(ai_session_b) await test_db.commit() headers_a = await _login(client, user_a.email, pass_a) resp = await client.post( f"/api/v1/ai-sessions/{ai_session_b.id}/retry-psa-push", headers=headers_a, ) assert resp.status_code == 404, ( f"Expected 404 for cross-user retry-psa-push, got {resp.status_code}" ) @pytest.mark.asyncio async def test_upload_url_returns_404_not_403_for_other_account( client: AsyncClient, test_db: AsyncSession ): """User A gets 404 (not 403) when accessing User B's upload URL.""" from app.models.file_upload import FileUpload acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-upl-a") acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-upl-b") upload_b = FileUpload( account_id=acct_b.id, uploaded_by=user_b.id, filename="secret.png", content_type="image/png", size_bytes=1024, storage_key="test/secret.png", ) test_db.add(upload_b) await test_db.commit() headers_a = await _login(client, user_a.email, pass_a) resp = await client.get(f"/api/v1/uploads/{upload_b.id}/url", headers=headers_a) assert resp.status_code in (404, 503), ( f"Expected 404 (or 503 if storage not configured) for cross-account upload, got {resp.status_code}" ) @pytest.mark.asyncio async def test_share_revoke_returns_404_not_403_for_other_user( client: AsyncClient, test_db: AsyncSession ): """User A gets 404 (not 403) when revoking User B's share.""" from app.models.session import Session from app.models.session_share import SessionShare acct_a, user_a, pass_a = await _create_account_and_user(test_db, "t6-shr-a") acct_b, user_b, pass_b = await _create_account_and_user(test_db, "t6-shr-b") tree_b = await _create_private_tree(test_db, acct_b, user_b) session_b = Session( tree_id=tree_b.id, user_id=user_b.id, tree_snapshot={"id": "root", "type": "start", "children": []}, path_taken=[], decisions=[], ) test_db.add(session_b) await test_db.flush() share_b = SessionShare( session_id=session_b.id, account_id=acct_b.id, share_token="test-token-unique-" + uuid.uuid4().hex[:8], share_name="Test", visibility="public", created_by=user_b.id, ) test_db.add(share_b) await test_db.commit() headers_a = await _login(client, user_a.email, pass_a) resp = await client.delete(f"/api/v1/shares/{share_b.id}", headers=headers_a) assert resp.status_code == 404, ( f"Expected 404 for cross-user share revoke, got {resp.status_code}" )