diff --git a/docs/superpowers/plans/2026-04-09-tenant-isolation-phase-0.md b/docs/superpowers/plans/2026-04-09-tenant-isolation-phase-0.md new file mode 100644 index 00000000..f11236ba --- /dev/null +++ b/docs/superpowers/plans/2026-04-09-tenant-isolation-phase-0.md @@ -0,0 +1,1136 @@ +# Tenant Isolation — Hotfix + Phase 0 Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Fix all known cross-tenant data leaks and establish the application-layer patterns and CI gating that all future tenant-data work must comply with. + +**Architecture:** Application code is the primary enforcement layer. Every query on a tenant table must include an explicit `account_id` filter via `tenant_filter()`. PostgreSQL RLS (Phase 2) is the safety net — these Phase 0 changes must be complete and correct regardless of RLS. Each task is independently committable. + +**Tech Stack:** Python 3.11 · FastAPI · SQLAlchemy 2.0 async · pytest-asyncio · GitHub Actions (existing CI in `.github/workflows/ci.yml`) + +**Spec:** `docs/superpowers/specs/2026-04-09-tenant-data-isolation-design.md` + +**Phase gate:** All 8 tasks complete + CI grep check active before Phase 1 (schema migration) begins. + +--- + +## File Map + +| File | Action | Why | +|---|---|---| +| `backend/app/services/copilot_service.py` | Modify | CRITICAL: add account scoping to tree query (lines 107–112) | +| `backend/app/core/filters.py` | Modify | Add `tenant_filter()` helper; update existing filters to call it | +| `backend/app/api/deps.py` | Modify | Add `get_tenant_context` dependency | +| `backend/app/api/endpoints/analytics.py` | Modify | Add `tenant_filter` to flow analytics tree fetch (line 294) | +| `backend/app/api/endpoints/categories.py` | Modify | Scope category tree count to current account (line 112) | +| `backend/app/api/endpoints/ai_sessions.py` | Modify | Restrict search to `user_id` only (line 765) | +| `backend/tests/test_tenant_isolation_p0.py` | Create | Cross-tenant tests for every fix in this plan | +| `backend/scripts/check_tenant_filters.py` | Create | Grep-based CI check script | +| `.github/workflows/ci.yml` | Modify | Add tenant-filter check step to backend CI job | + +--- + +## Task 1: HOTFIX — Copilot tree access bypass (CRITICAL) + +**Ship this task immediately as its own PR before starting any other task.** + +**Files:** +- Modify: `backend/app/services/copilot_service.py:107-112` +- Test: `backend/tests/test_tenant_isolation_p0.py` + +**Background:** `start_conversation()` loads a tree by UUID with no account filter. An attacker who knows another account's tree UUID can extract its full node structure, names, and descriptions via the AI system prompt. This is the highest priority fix. + +--- + +- [ ] **Step 1.1: Write the failing test first** + +Create `backend/tests/test_tenant_isolation_p0.py`: + +```python +"""Cross-tenant isolation tests for Phase 0 gap fixes.""" +import pytest +from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select +import uuid + +from app.models.account import Account +from app.models.user import User +from app.models.tree import Tree +from app.core.security import get_password_hash + + +# ── Helpers ────────────────────────────────────────────────────────────────── + +async def _create_account_and_user(db: AsyncSession, suffix: str) -> tuple[Account, User, str]: + """Create an account + owner user. Returns (account, user, plain_password).""" + account = Account( + name=f"Test Corp {suffix}", + slug=f"test-corp-{suffix}-{uuid.uuid4().hex[:6]}", + ) + db.add(account) + await db.flush() + + password = "TestPass123!" + user = User( + email=f"user-{suffix}-{uuid.uuid4().hex[:6]}@example.com", + name=f"User {suffix}", + hashed_password=get_password_hash(password), + is_active=True, + account_id=account.id, + account_role="owner", + ) + db.add(user) + await db.flush() + return account, user, password + + +async def _login(client: AsyncClient, email: str, password: str) -> dict: + """Return auth headers for a user.""" + resp = await client.post( + "/api/v1/auth/login/json", + json={"email": email, "password": password}, + ) + assert resp.status_code == 200, resp.text + return {"Authorization": f"Bearer {resp.json()['access_token']}"} + + +async def _create_private_tree(db: AsyncSession, account: Account, user: User) -> Tree: + """Create a private tree owned by account.""" + tree = Tree( + name=f"Private Tree {uuid.uuid4().hex[:6]}", + account_id=account.id, + author_id=user.id, + visibility="private", + tree_type="troubleshooting", + tree_structure={"id": "root", "type": "start", "children": []}, + is_active=True, + status="published", + ) + db.add(tree) + await db.flush() + return tree + + +# ── Task 1: Copilot bypass ──────────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_copilot_cannot_start_conversation_with_other_account_tree( + client: AsyncClient, test_db: AsyncSession +): + """Account A cannot start a copilot conversation using Account B's private tree UUID.""" + acct_a, user_a, pass_a = await _create_account_and_user(test_db, "a") + acct_b, user_b, pass_b = await _create_account_and_user(test_db, "b") + tree_b = await _create_private_tree(test_db, acct_b, user_b) + await test_db.commit() + + headers_a = await _login(client, user_a.email, pass_a) + + resp = await client.post( + "/api/v1/copilot/conversations", + json={"tree_id": str(tree_b.id), "session_id": None, "current_node_id": None}, + headers=headers_a, + ) + # Must be 404 (not 200, not 403 — never confirm existence) + assert resp.status_code == 404, f"Expected 404, got {resp.status_code}: {resp.text}" +``` + +- [ ] **Step 1.2: Run test to confirm it currently fails (i.e., returns 200 instead of 404)** + +```bash +cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_copilot_cannot_start_conversation_with_other_account_tree -v --override-ini="addopts=" +``` + +Expected: FAIL (test gets 200 back — confirms the vulnerability is real). + +> **Note:** If AI is not configured in test env, the endpoint may return 503 before reaching the tree lookup. In that case, temporarily add `settings.ai_enabled = False` check first: if you get 503, the AI gate fires before the tree check, which means the tree is still loaded but the endpoint errors out at AI quota — not the same vulnerability surface. Force `settings.ai_enabled = True` in the test or mock the `_require_ai_enabled` call to confirm the 200 path. The vulnerability is in the service layer, not the endpoint gate. + +- [ ] **Step 1.3: Fix `copilot_service.py` — add account filter to tree query** + +In `backend/app/services/copilot_service.py`, replace lines 107–112: + +```python +# BEFORE: + # Load tree + result = await db.execute( + select(Tree).options(selectinload(Tree.tags)).where(Tree.id == tree_id) + ) + tree = result.scalar_one_or_none() + if not tree: + raise ValueError(f"Tree {tree_id} not found") +``` + +```python +# AFTER: + # Load tree — must be accessible to this account. + # Allows own account's trees, default trees, and public trees. + # Raises ValueError (caught by endpoint as 404) if not found or not accessible. + from sqlalchemy import or_ + result = await db.execute( + select(Tree).options(selectinload(Tree.tags)).where( + Tree.id == tree_id, + or_( + Tree.account_id == account_id, + Tree.is_default == True, + Tree.visibility == "public", + ), + ) + ) + tree = result.scalar_one_or_none() + if not tree: + raise ValueError(f"Tree {tree_id} not found or not accessible") +``` + +- [ ] **Step 1.4: Run test — must pass** + +```bash +cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_copilot_cannot_start_conversation_with_other_account_tree -v --override-ini="addopts=" +``` + +Expected: PASS + +- [ ] **Step 1.5: Run full test suite — must stay green** + +```bash +cd backend && python -m pytest --override-ini="addopts=" +``` + +Expected: all tests pass (or same failures as before this change — do not regress). + +- [ ] **Step 1.6: Commit as standalone hotfix** + +```bash +git add backend/app/services/copilot_service.py backend/tests/test_tenant_isolation_p0.py +git commit -m "fix: CRITICAL — scope copilot tree query to current account + +A user who knew another account's tree UUID could start a copilot +conversation, causing the tree's full node structure, names, and +descriptions to be sent to the AI as part of the system prompt. + +Fix: add account_id (or is_default / visibility='public') filter to +the tree SELECT in copilot_service.start_conversation(). Returns 404 +for inaccessible trees. Test added in test_tenant_isolation_p0.py. + +Co-Authored-By: Claude Sonnet 4.6 " +``` + +--- + +## Task 2: Add `tenant_filter()` and `get_tenant_context` + +**Files:** +- Modify: `backend/app/core/filters.py` +- Modify: `backend/app/api/deps.py` + +These are the canonical patterns every subsequent fix and future feature uses. No new tests needed — the patterns are exercised by Tasks 3–5 tests. + +--- + +- [ ] **Step 2.1: Add `tenant_filter()` to `filters.py`** + +In `backend/app/core/filters.py`, replace the entire file: + +```python +""" +Centralized query filters for ResolutionFlow. + +Provides reusable SQLAlchemy filter builders for tree access control, +step visibility, and the canonical tenant_filter used by all queries +on tenant-scoped tables. +""" +from __future__ import annotations +import uuid +from typing import TYPE_CHECKING + +from sqlalchemy import or_, and_, true as sa_true + +if TYPE_CHECKING: + from app.models.user import User + + +def tenant_filter(model, account_id: uuid.UUID): + """Primary app-layer tenant filter. + + MUST be used in every SELECT/UPDATE/DELETE on tenant tables. + RLS (Phase 2) is the safety net — this is the primary enforcement. + + Usage: + stmt = select(Tree).where(tenant_filter(Tree, current_user.account_id), ...) + """ + return model.account_id == account_id + + +def build_tree_access_filter(current_user: User): + """Build the access filter for trees based on user permissions. + + Visibility rules: + - super_admin: sees everything + - is_default: visible to all authenticated users + - visibility='public': visible to all authenticated users + - author_id == me: always visible (regardless of visibility setting) + - visibility='team' AND account_id == mine: visible to account members + - visibility='private': only visible to author (covered by author_id check above) + - visibility='link': only visible to author (share token access is handled separately) + """ + from app.models.tree import Tree + + if current_user.is_super_admin: + return sa_true() + + conditions = [ + Tree.is_default == True, + Tree.visibility == 'public', + Tree.author_id == current_user.id, + ] + if current_user.account_id: + # Team-visible trees: use tenant_filter as the account match + conditions.append( + and_( + Tree.visibility == 'team', + tenant_filter(Tree, current_user.account_id), + ) + ) + return or_(*conditions) + + +def build_step_visibility_filter(current_user: User): + """Build SQLAlchemy filter for step visibility based on user. + + Returns steps that are: + - Public steps (visible to all) + - Team steps (visible to same account members) + - User's own private steps + """ + from app.models.step_library import StepLibrary + + if current_user.account_id: + return or_( + StepLibrary.visibility == 'public', + and_( + StepLibrary.visibility == 'team', + tenant_filter(StepLibrary, current_user.account_id), + ), + StepLibrary.created_by == current_user.id, + ) + else: + return or_( + StepLibrary.visibility == 'public', + StepLibrary.created_by == current_user.id, + ) +``` + +- [ ] **Step 2.2: Add `get_tenant_context` to `deps.py`** + +In `backend/app/api/deps.py`, append at the end of the file (after line 193): + +```python + + +async def get_tenant_context( + current_user: Annotated[User, Depends(get_current_active_user)], +) -> UUID: + """Return the current user's account_id. + + Use this dependency instead of reading current_user.account_id directly. + Raises 403 if the user has no account association (should not happen in + normal flows — users are always associated with an account on registration). + """ + if current_user.account_id is None: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="User not associated with any account", + ) + return current_user.account_id +``` + +- [ ] **Step 2.3: Run full test suite to confirm no regressions** + +```bash +cd backend && python -m pytest --override-ini="addopts=" +``` + +Expected: all tests pass. + +- [ ] **Step 2.4: Commit** + +```bash +git add backend/app/core/filters.py backend/app/api/deps.py +git commit -m "feat: add tenant_filter() helper and get_tenant_context dependency + +tenant_filter(model, account_id) is the canonical app-layer tenant +scoping expression. Every query on a tenant table must use it. +build_tree_access_filter and build_step_visibility_filter updated +to call tenant_filter() internally for the account_id match. + +get_tenant_context is a FastAPI dependency that returns account_id +or raises 403 if the user has no account — prevents raw access to +current_user.account_id and centralises the null check. + +Co-Authored-By: Claude Sonnet 4.6 " +``` + +--- + +## Task 3: Fix analytics flow endpoint + +**Files:** +- Modify: `backend/app/api/endpoints/analytics.py:294` +- Test: `backend/tests/test_tenant_isolation_p0.py` + +--- + +- [ ] **Step 3.1: Write the failing test** + +Add to `backend/tests/test_tenant_isolation_p0.py`: + +```python +# ── Task 3: Analytics flow endpoint ────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_analytics_flow_cannot_read_other_account_tree( + client: AsyncClient, test_db: AsyncSession +): + """Account A cannot read flow analytics for Account B's private tree.""" + acct_a, user_a, pass_a = await _create_account_and_user(test_db, "anl-a") + acct_b, user_b, pass_b = await _create_account_and_user(test_db, "anl-b") + tree_b = await _create_private_tree(test_db, acct_b, user_b) + await test_db.commit() + + headers_a = await _login(client, user_a.email, pass_a) + + resp = await client.get( + f"/api/v1/analytics/flows/{tree_b.id}", + headers=headers_a, + ) + assert resp.status_code == 404, f"Expected 404, got {resp.status_code}: {resp.text}" +``` + +- [ ] **Step 3.2: Run test — confirm it currently fails (returns 200)** + +```bash +cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_analytics_flow_cannot_read_other_account_tree -v --override-ini="addopts=" +``` + +Expected: FAIL (returns 200 — confirms gap is real). + +- [ ] **Step 3.3: Fix `analytics.py` — add tenant_filter to tree fetch** + +In `backend/app/api/endpoints/analytics.py`: + +Add the import at the top (after existing imports on line 9): + +```python +from app.core.filters import tenant_filter +``` + +Replace lines 293–297: + +```python +# BEFORE: + # Verify tree exists + result = await db.execute(select(Tree).where(Tree.id == tree_id)) + tree = result.scalar_one_or_none() + if not tree: + raise HTTPException(status_code=404, detail="Flow not found") +``` + +```python +# AFTER: + # Verify tree exists and belongs to the requesting user's account. + result = await db.execute( + select(Tree).where( + Tree.id == tree_id, + tenant_filter(Tree, current_user.account_id), + ) + ) + tree = result.scalar_one_or_none() + if not tree: + raise HTTPException(status_code=404, detail="Flow not found") +``` + +- [ ] **Step 3.4: Run test — must pass** + +```bash +cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_analytics_flow_cannot_read_other_account_tree -v --override-ini="addopts=" +``` + +Expected: PASS + +- [ ] **Step 3.5: Commit** + +```bash +git add backend/app/api/endpoints/analytics.py backend/tests/test_tenant_isolation_p0.py +git commit -m "fix: scope analytics/flows/{tree_id} to requesting account + +Any authenticated user could read flow analytics (session counts, +completion rates, CSAT) for any tree UUID. Now returns 404 if the +tree doesn't belong to the requesting account. + +Co-Authored-By: Claude Sonnet 4.6 " +``` + +--- + +## Task 4: Fix category tree count scope + +**Files:** +- Modify: `backend/app/api/endpoints/categories.py:112` +- Test: `backend/tests/test_tenant_isolation_p0.py` + +--- + +- [ ] **Step 4.1: Write the failing test** + +Add to `backend/tests/test_tenant_isolation_p0.py`: + +```python +# ── Task 4: Category tree count ─────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_category_tree_count_scoped_to_account( + client: AsyncClient, test_db: AsyncSession +): + """tree_count on a category must not include trees from other accounts.""" + from app.models.category import TreeCategory + + acct_a, user_a, pass_a = await _create_account_and_user(test_db, "cat-a") + acct_b, user_b, pass_b = await _create_account_and_user(test_db, "cat-b") + + # Shared category (account_id=None means global) + category = TreeCategory( + name="Shared Category", + slug=f"shared-cat-{uuid.uuid4().hex[:6]}", + account_id=None, + is_active=True, + ) + test_db.add(category) + await test_db.flush() + + # 3 trees for account_b under this category + for i in range(3): + tree = Tree( + name=f"B Tree {i}", + account_id=acct_b.id, + author_id=user_b.id, + category_id=category.id, + visibility="team", + tree_type="troubleshooting", + tree_structure={"id": "root", "type": "start", "children": []}, + is_active=True, + status="published", + ) + test_db.add(tree) + + # 1 tree for account_a under this category + tree_a = Tree( + name="A Tree", + account_id=acct_a.id, + author_id=user_a.id, + category_id=category.id, + visibility="team", + tree_type="troubleshooting", + tree_structure={"id": "root", "type": "start", "children": []}, + is_active=True, + status="published", + ) + test_db.add(tree_a) + await test_db.commit() + + headers_a = await _login(client, user_a.email, pass_a) + + resp = await client.get( + f"/api/v1/categories/{category.id}", + headers=headers_a, + ) + assert resp.status_code == 200, resp.text + # account_a should only see their 1 tree, not account_b's 3 + assert resp.json()["tree_count"] == 1, ( + f"Expected tree_count=1 (own trees only), got {resp.json()['tree_count']}" + ) +``` + +- [ ] **Step 4.2: Run test — confirm it currently fails (count=4)** + +```bash +cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_category_tree_count_scoped_to_account -v --override-ini="addopts=" +``` + +Expected: FAIL (tree_count=4 — includes account_b's trees). + +- [ ] **Step 4.3: Fix `categories.py` — scope tree count query** + +In `backend/app/api/endpoints/categories.py`: + +Add import at top (alongside existing imports): + +```python +from app.core.filters import tenant_filter +``` + +Replace lines 111–117 (the count query block): + +```python +# BEFORE: + # Get tree count + count_query = select(func.count(Tree.id)).where( + Tree.category_id == category.id, + Tree.is_active == True + ) + count_result = await db.execute(count_query) + tree_count = count_result.scalar() or 0 +``` + +```python +# AFTER: + # Get tree count — scoped to the requesting account so cross-account + # trees in shared categories are not counted. + count_query = select(func.count(Tree.id)).where( + Tree.category_id == category.id, + Tree.is_active == True, + tenant_filter(Tree, current_user.account_id), + ) + count_result = await db.execute(count_query) + tree_count = count_result.scalar() or 0 +``` + +- [ ] **Step 4.4: Run test — must pass** + +```bash +cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_category_tree_count_scoped_to_account -v --override-ini="addopts=" +``` + +Expected: PASS + +- [ ] **Step 4.5: Commit** + +```bash +git add backend/app/api/endpoints/categories.py backend/tests/test_tenant_isolation_p0.py +git commit -m "fix: scope category tree_count to requesting account + +tree_count on GET /categories/{id} was including trees from all +accounts, leaking cross-tenant row counts. + +Co-Authored-By: Claude Sonnet 4.6 " +``` + +--- + +## Task 5: Fix AI session search scope + +**Files:** +- Modify: `backend/app/api/endpoints/ai_sessions.py:765-777` +- Test: `backend/tests/test_tenant_isolation_p0.py` + +**Context:** The search endpoint used `OR(user_id == me, account_id == mine)`, exposing `problem_summary`, `problem_domain`, and `status` of other users' sessions within the same account. Sessions are user-scoped only. The list endpoint (`GET /ai-sessions`) already restricts to `user_id`. Both must behave consistently. + +--- + +- [ ] **Step 5.1: Write the failing test** + +Add to `backend/tests/test_tenant_isolation_p0.py`: + +```python +# ── Task 5: AI session search scope ────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_ai_session_search_cannot_see_other_users_sessions( + client: AsyncClient, test_db: AsyncSession +): + """User A cannot find User B's AI sessions via the search endpoint, + even when both users are in the same account.""" + from app.models.ai_session import AISession + + # Two users in the SAME account + account = Account(name="Shared Corp", slug=f"shared-{uuid.uuid4().hex[:6]}") + test_db.add(account) + await test_db.flush() + + password = "TestPass123!" + user_a = User( + email=f"user-a-{uuid.uuid4().hex[:6]}@shared.com", + name="User A", + hashed_password=get_password_hash(password), + is_active=True, + account_id=account.id, + account_role="engineer", + ) + user_b = User( + email=f"user-b-{uuid.uuid4().hex[:6]}@shared.com", + name="User B", + hashed_password=get_password_hash(password), + is_active=True, + account_id=account.id, + account_role="engineer", + ) + test_db.add_all([user_a, user_b]) + await test_db.flush() + + # Session belonging to user_b with distinctive problem_summary + session_b = AISession( + user_id=user_b.id, + account_id=account.id, + problem_summary="CONFIDENTIAL: user_b's session", + problem_domain="networking", + status="resolved", + ) + test_db.add(session_b) + await test_db.commit() + + headers_a = await _login(client, user_a.email, password) + + resp = await client.get( + "/api/v1/ai-sessions/search", + params={"q": "CONFIDENTIAL"}, + headers=headers_a, + ) + assert resp.status_code == 200, resp.text + results = resp.json() + ids = [r["id"] for r in results] + assert str(session_b.id) not in ids, ( + "User A can see User B's session via search — cross-user leak within account" + ) +``` + +- [ ] **Step 5.2: Run test — confirm it currently fails** + +```bash +cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_ai_session_search_cannot_see_other_users_sessions -v --override-ini="addopts=" +``` + +Expected: FAIL (user_b's session appears in results). + +- [ ] **Step 5.3: Fix `ai_sessions.py` — restrict search to `user_id`** + +In `backend/app/api/endpoints/ai_sessions.py`, find the `search_sessions` function (around line 755) and replace the `.where()` clause: + +```python +# BEFORE: + result = await db.execute( + select(AISession) + .where( + or_( + AISession.user_id == current_user.id, + AISession.account_id == current_user.account_id, + ), + text("ai_sessions.search_vector @@ plainto_tsquery('english', :q)"), + ) + .params(q=q) + .order_by(AISession.created_at.desc()) + .limit(limit) + ) +``` + +```python +# AFTER: + # Sessions are user-scoped. The list endpoint uses user_id only; + # search must be consistent. Cross-user access requires explicit + # escalation or session sharing — not ambient account membership. + result = await db.execute( + select(AISession) + .where( + AISession.user_id == current_user.id, + text("ai_sessions.search_vector @@ plainto_tsquery('english', :q)"), + ) + .params(q=q) + .order_by(AISession.created_at.desc()) + .limit(limit) + ) +``` + +Also remove the now-unused `or_` import from the `search_sessions` function if it was only used there. (Do not remove `or_` if it appears elsewhere in the file.) + +- [ ] **Step 5.4: Run test — must pass** + +```bash +cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_ai_session_search_cannot_see_other_users_sessions -v --override-ini="addopts=" +``` + +Expected: PASS + +- [ ] **Step 5.5: Commit** + +```bash +git add backend/app/api/endpoints/ai_sessions.py backend/tests/test_tenant_isolation_p0.py +git commit -m "fix: restrict AI session search to current user only + +Search endpoint used OR(user_id, account_id), exposing other users' +problem_summary and problem_domain within the same account. Sessions +are user-scoped only — cross-user access requires explicit escalation +or sharing. List and search endpoints now behave consistently. + +Co-Authored-By: Claude Sonnet 4.6 " +``` + +--- + +## Task 6: UUID endpoint audit and gap fixes + +**Files:** +- Read: all files in `backend/app/api/endpoints/` +- Modify: whichever files the audit finds gaps in +- Test: `backend/tests/test_tenant_isolation_p0.py` + +**Goal:** Systematically check every route with a `{resource_id}` URL parameter. Verify that each one either (a) filters by `id AND account_id` in the query, or (b) calls a permission function on the fetched object that checks account ownership. + +--- + +- [ ] **Step 6.1: Run the audit** + +For each file listed below, scan every function decorated with `@router.get`, `@router.put`, `@router.patch`, `@router.delete` that has a path like `/{something_id}`. For each such function, answer: + +1. Does the query filter by both `id` AND `account_id` (or `tenant_filter`)? +2. If not, does it call a permission check (`can_access_*`, `can_edit_*`, etc.) on the fetched object? +3. If neither, it's a gap. + +**Endpoint files to audit:** + +``` +backend/app/api/endpoints/trees.py +backend/app/api/endpoints/sessions.py +backend/app/api/endpoints/steps.py +backend/app/api/endpoints/categories.py ← already fixed (Task 4) +backend/app/api/endpoints/analytics.py ← already fixed (Task 3) +backend/app/api/endpoints/copilot.py +backend/app/api/endpoints/ai_sessions.py ← search fixed (Task 5) +backend/app/api/endpoints/assistant_chat.py +backend/app/api/endpoints/integrations.py +backend/app/api/endpoints/flow_proposals.py +backend/app/api/endpoints/maintenance_schedules.py +backend/app/api/endpoints/kb_accelerator.py +backend/app/api/endpoints/flowpilot_analytics.py +backend/app/api/endpoints/shares.py +backend/app/api/endpoints/uploads.py +backend/app/api/endpoints/tags.py +backend/app/api/endpoints/step_categories.py +backend/app/api/endpoints/notifications.py +backend/app/api/endpoints/survey.py +``` + +For each gap found: classify severity (CRITICAL / HIGH / MEDIUM / LOW), document file + line number, fix using the fetch-and-verify pattern: + +```python +# Standard fix pattern — fetch with account_id filter, return 404 if not found +from app.core.filters import tenant_filter + +stmt = select(Model).where( + Model.id == resource_id, + tenant_filter(Model, current_user.account_id), +) +resource = (await db.execute(stmt)).scalar_one_or_none() +if not resource: + raise HTTPException(status_code=404) # Not 403 — never reveal existence +``` + +**Known findings from prior audit (do not re-verify, just fix):** + +| File | Route | Severity | Status | +|---|---|---|---| +| `copilot.py` | `POST /copilot/conversations` | CRITICAL | Fixed (Task 1) | +| `analytics.py` | `GET /analytics/flows/{tree_id}` | LOW | Fixed (Task 3) | +| `categories.py` | tree_count in `GET /categories/{id}` | LOW | Fixed (Task 4) | +| `ai_sessions.py` | `GET /ai-sessions/search` | LOW | Fixed (Task 5) | +| `steps.py` | `get_step_or_404` — 403 vs 404 on non-existent UUID | Very Low | Audit only — fix if severity warrants | +| `flowpilot_analytics.py` | Needs manual review | Unknown | Audit now | + +- [ ] **Step 6.2: For each gap found, write a test, then apply the fix** + +Use the same TDD pattern as Tasks 1–5: + +```python +# Test template for a gap in MyEndpoint +@pytest.mark.asyncio +async def test_cannot_access_other_account_( + client: AsyncClient, test_db: AsyncSession +): + """Account A cannot access Account B's by UUID.""" + acct_a, user_a, pass_a = await _create_account_and_user(test_db, "-a") + acct_b, user_b, pass_b = await _create_account_and_user(test_db, "-b") + resource_b = + await test_db.commit() + + headers_a = await _login(client, user_a.email, pass_a) + resp = await client.get(f"/api/v1//{resource_b.id}", headers=headers_a) + assert resp.status_code == 404, f"Expected 404, got {resp.status_code}: {resp.text}" +``` + +- [ ] **Step 6.3: After all gaps fixed, run full test suite** + +```bash +cd backend && python -m pytest --override-ini="addopts=" +``` + +Expected: all tests pass. + +- [ ] **Step 6.4: Commit all audit fixes together** + +```bash +git add backend/app/api/endpoints/ backend/tests/test_tenant_isolation_p0.py +git commit -m "fix: UUID endpoint audit — add missing ownership checks + +Audit of all {resource_id} endpoints. Gaps found and fixed: + + +All fixed endpoints now return 404 (not 403) for cross-tenant IDs. + +Co-Authored-By: Claude Sonnet 4.6 " +``` + +--- + +## Task 7: TargetList dead code audit + teams orphan check + +**Files:** +- No code changes (audit and report only) +- Report findings documented in a comment on this plan PR or in `docs/` + +--- + +- [ ] **Step 7.1: TargetList dead code audit** + +Run from the repo root: + +```bash +grep -rn "TargetList\|target_list\|target-list\|target_lists" \ + backend/app/api \ + backend/app/services \ + backend/app/schemas \ + frontend/src \ + --include="*.py" --include="*.ts" --include="*.tsx" \ + | grep -v "__pycache__" \ + | grep -v "test_target" +``` + +Record the output. Then query production/staging DB row count: + +```bash +# Run via docker exec or psql connection +docker exec -it resolutionflow_postgres psql -U postgres -d patherly \ + -c "SELECT COUNT(*) FROM target_lists;" +``` + +**Decision:** +- Zero code references AND zero rows → drop the table in Phase 1 migration +- Zero rows but code references exist → add TargetList deprecation cleanup to Phase 1 scope +- Rows exist → add TargetList migration to Phase 1 scope (backfill `account_id` from `team_id → teams → users WHERE is_team_admin`) + +- [ ] **Step 7.2: Teams orphan check** + +```bash +docker exec -it resolutionflow_postgres psql -U postgres -d patherly -c " +SELECT COUNT(*) AS orphaned_teams +FROM teams t +LEFT JOIN users u ON u.team_id = t.id AND u.account_id IS NOT NULL +WHERE u.id IS NULL; +" +``` + +**Decision:** +- Count = 0 → proceed to Phase 1 schema migration without concern +- Count > 0 → document which teams are orphaned, resolve before any Phase 1 backfill involving `team_id → account_id` chains + +- [ ] **Step 7.3: Document results** + +In `docs/superpowers/specs/2026-04-09-tenant-data-isolation-design.md`, Section 9 Open Questions, replace the TargetList row: + +```markdown +# BEFORE: +| TargetList: zero references + zero rows? Or does data exist? | Determines whether table is dropped or migrated. | Phase 0 audit | + +# AFTER (fill in actual result, e.g.): +| TargetList audit complete: zero rows, zero non-test code references. Decision: drop table in Phase 1. | Resolved — drop in Phase 1. | ✓ Done | +``` + +Also add a row for the teams orphan result: + +```markdown +| Teams orphan check: N orphaned teams found. | Phase 1 backfills using team→account chain safe to proceed (or: N teams need resolution before Phase 1). | ✓ Done | +``` + +- [ ] **Step 7.4: Commit the updated spec** + +```bash +git add docs/superpowers/specs/2026-04-09-tenant-data-isolation-design.md +git commit -m "docs: record Phase 0 audit results — TargetList and teams orphan check + +Co-Authored-By: Claude Sonnet 4.6 " +``` + +--- + +## Task 8: CI tenant-filter grep check + +**Files:** +- Create: `backend/scripts/check_tenant_filters.py` +- Modify: `.github/workflows/ci.yml` + +**Goal:** Warn (not yet block) when endpoint or service files contain `select(TenantModel)` patterns without `tenant_filter` or `account_id` in the surrounding context. Active from Phase 1 forward, so all Phase 1 work is gated by it. Switch to block (exit code 1) after 2 weeks of false-positive calibration. + +--- + +- [ ] **Step 8.1: Create the check script** + +Create `backend/scripts/check_tenant_filters.py`: + +```python +""" +Tenant filter enforcement check. + +Scans endpoint and service files for SQLAlchemy select() calls on known +tenant tables and warns when account_id or tenant_filter is not present +in the surrounding 15 lines (the typical extent of a single query). + +Usage: + python scripts/check_tenant_filters.py # warn mode (exits 0) + python scripts/check_tenant_filters.py --fail # block mode (exits 1 on findings) +""" +import re +import sys +from pathlib import Path + +# Tables that must always be filtered by account_id or tenant_filter. +# Extend this list as new tenant tables are added. +TENANT_MODELS = [ + "Tree", "AISession", "Session", "StepLibrary", "FlowProposal", + "CopilotConversation", "AssistantChat", "FileUpload", "KBImport", + "PsaConnection", "PsaPostLog", "PsaMemberMapping", "AIChatSession", + "AIConversation", "AIUsage", "Subscription", "AccountInvite", + "Notification", "NotificationConfig", "SessionShare", "UserFolder", + "UserPinnedTree", "SessionBranch", "SessionHandoff", + "SessionResolutionOutput", "ForkPoint", "AISessionStep", + "AISuggestion", "StepCategory", "TreeCategory", "TreeTag", + "Attachment", "SessionSupportingData", "MaintenanceSchedule", + "AuditLog", "ScriptBuilderSession", "ScriptTemplate", + "StepRating", "StepUsageLog", "AISession", +] + +# Directories to scan +SCAN_DIRS = [ + Path("app/api/endpoints"), + Path("app/services"), +] + +# Patterns that indicate the query is correctly scoped +SAFE_PATTERNS = [ + r"tenant_filter", + r"account_id", + r"is_super_admin", # Super admin queries intentionally bypass tenant filter + r"# cross-tenant: approved", # Explicit approval comment +] + +SKIP_FILES = { + "admin.py", # Super admin endpoints intentionally bypass tenant filter +} + +findings = [] + +for scan_dir in SCAN_DIRS: + if not scan_dir.exists(): + continue + for path in sorted(scan_dir.glob("*.py")): + if path.name in SKIP_FILES: + continue + lines = path.read_text().splitlines() + for i, line in enumerate(lines): + for model in TENANT_MODELS: + if re.search(rf"\bselect\s*\(\s*{model}\b", line): + # Check surrounding 15 lines for a safe pattern + start = max(0, i - 2) + end = min(len(lines), i + 15) + context = "\n".join(lines[start:end]) + if not any(re.search(p, context) for p in SAFE_PATTERNS): + findings.append( + f"{path}:{i + 1}: select({model}) — no tenant_filter or account_id found in context" + ) + +if findings: + print(f"\n⚠ Tenant filter check — {len(findings)} warning(s):\n") + for f in findings: + print(f" {f}") + print() + if "--fail" in sys.argv: + print("Run with --fail: exiting 1") + sys.exit(1) + else: + print("Run in warn mode — not blocking. Pass --fail to block.") + sys.exit(0) +else: + print("✓ Tenant filter check passed — no unscoped tenant table queries found.") + sys.exit(0) +``` + +- [ ] **Step 8.2: Test the script locally — it should find zero issues after Tasks 1–6** + +```bash +cd backend && python scripts/check_tenant_filters.py +``` + +Expected output: +``` +✓ Tenant filter check passed — no unscoped tenant table queries found. +``` + +If it reports false positives, add the file to `SKIP_FILES` or add `# cross-tenant: approved` comment to the query. **Do not suppress real gaps** — fix them as Task 6 additions. + +- [ ] **Step 8.3: Add the check to CI** + +In `.github/workflows/ci.yml`, add this step to the `backend` job, immediately after the `Install dependencies` step and before `Run tests with coverage`: + +```yaml + - name: Check tenant filter enforcement + run: cd backend && python scripts/check_tenant_filters.py + # Warn mode only. Switch to --fail after 2 weeks of calibration. + # See: docs/superpowers/specs/2026-04-09-tenant-data-isolation-design.md Section 3f +``` + +- [ ] **Step 8.4: Run the full CI pipeline locally to confirm the step passes** + +```bash +cd backend && python scripts/check_tenant_filters.py && python -m pytest --override-ini="addopts=" +``` + +Expected: both commands exit 0. + +- [ ] **Step 8.5: Commit** + +```bash +git add backend/scripts/check_tenant_filters.py .github/workflows/ci.yml +git commit -m "chore: add CI tenant-filter grep check (warn mode) + +Scans endpoint and service files for select() calls on tenant tables +without tenant_filter or account_id in the surrounding context. +Running in warn mode (exit 0) — switch to --fail after 2-week +calibration period to block violating PRs. + +See spec Section 3f for background. + +Co-Authored-By: Claude Sonnet 4.6 " +``` + +--- + +## Phase 0 Gate Verification + +Before declaring Phase 0 complete and starting Phase 1, verify every item: + +- [ ] All tests in `test_tenant_isolation_p0.py` pass +- [ ] `python scripts/check_tenant_filters.py` exits 0 with no findings +- [ ] Full test suite passes: `cd backend && python -m pytest --override-ini="addopts="` +- [ ] Frontend builds: `cd frontend && npm run build` +- [ ] TargetList audit result documented in spec (Task 7.3) +- [ ] Teams orphan count documented in spec (Task 7.3) +- [ ] CI `check_tenant_filters` step added to `.github/workflows/ci.yml` + +--- + +## What comes next + +Phase 1 (schema migration) requires its own plan. Write it once Phase 0 gate is green. + +Phase 1 scope (from spec Section 7): +- Add `account_id NOT NULL` to ~20 tables that currently lack it (backfill sequences) +- Make nullable `account_id` NOT NULL on existing models (Users, Trees, etc.) +- Migrate ScriptBuilderSession, ScriptTemplate, ScriptGeneration from `team_id` +- TargetList: drop or migrate per Phase 0 audit result +- Create `template_trees` and `platform_steps` tables + +Spec: `docs/superpowers/specs/2026-04-09-tenant-data-isolation-design.md` Sections 1 and 5.