* docs: add tenant data isolation design spec Complete architecture plan for multi-tenant data isolation across all layers (PostgreSQL RLS, application-layer filtering, schema migration, testing strategy, and phased rollout checklist). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * docs: add background job isolation policy to tenant isolation spec Documents policy for all 5 existing background jobs: - Knowledge Flywheel and PSA Retry flagged for account_id threading - Chat Retention already follows correct pattern (model for others) - Maintenance Schedule Firing needs account_id in queries + Session creation - AI Conversation Expiry approved as cross-tenant with justification Adds approved cross-tenant query registry and Phase 2 checklist items. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * docs: add tenant isolation Phase 0 implementation plan 8 tasks covering: CRITICAL copilot hotfix, tenant_filter() helper, get_tenant_context dependency, analytics/category/AI session gap fixes, full UUID endpoint audit, TargetList dead code audit, teams orphan check, and CI grep check for missing tenant filters. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: CRITICAL — scope copilot tree query to current account A user who knew another account's tree UUID could start a copilot conversation, causing the tree's full node structure, names, and descriptions to be sent to the AI as part of the system prompt. Fix: add account_id (or is_default / visibility='public') filter to the tree SELECT in copilot_service.start_conversation(). Returns 404 for inaccessible trees. Test added in test_tenant_isolation_p0.py. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
39 KiB
Tenant Isolation — Hotfix + Phase 0 Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Fix all known cross-tenant data leaks and establish the application-layer patterns and CI gating that all future tenant-data work must comply with.
Architecture: Application code is the primary enforcement layer. Every query on a tenant table must include an explicit account_id filter via tenant_filter(). PostgreSQL RLS (Phase 2) is the safety net — these Phase 0 changes must be complete and correct regardless of RLS. Each task is independently committable.
Tech Stack: Python 3.11 · FastAPI · SQLAlchemy 2.0 async · pytest-asyncio · GitHub Actions (existing CI in .github/workflows/ci.yml)
Spec: docs/superpowers/specs/2026-04-09-tenant-data-isolation-design.md
Phase gate: All 8 tasks complete + CI grep check active before Phase 1 (schema migration) begins.
File Map
| File | Action | Why |
|---|---|---|
backend/app/services/copilot_service.py |
Modify | CRITICAL: add account scoping to tree query (lines 107–112) |
backend/app/core/filters.py |
Modify | Add tenant_filter() helper; update existing filters to call it |
backend/app/api/deps.py |
Modify | Add get_tenant_context dependency |
backend/app/api/endpoints/analytics.py |
Modify | Add tenant_filter to flow analytics tree fetch (line 294) |
backend/app/api/endpoints/categories.py |
Modify | Scope category tree count to current account (line 112) |
backend/app/api/endpoints/ai_sessions.py |
Modify | Restrict search to user_id only (line 765) |
backend/tests/test_tenant_isolation_p0.py |
Create | Cross-tenant tests for every fix in this plan |
backend/scripts/check_tenant_filters.py |
Create | Grep-based CI check script |
.github/workflows/ci.yml |
Modify | Add tenant-filter check step to backend CI job |
Task 1: HOTFIX — Copilot tree access bypass (CRITICAL)
Ship this task immediately as its own PR before starting any other task.
Files:
- Modify:
backend/app/services/copilot_service.py:107-112 - Test:
backend/tests/test_tenant_isolation_p0.py
Background: start_conversation() loads a tree by UUID with no account filter. An attacker who knows another account's tree UUID can extract its full node structure, names, and descriptions via the AI system prompt. This is the highest priority fix.
- Step 1.1: Write the failing test first
Create backend/tests/test_tenant_isolation_p0.py:
"""Cross-tenant isolation tests for Phase 0 gap fixes."""
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
import uuid
from app.models.account import Account
from app.models.user import User
from app.models.tree import Tree
from app.core.security import get_password_hash
# ── Helpers ──────────────────────────────────────────────────────────────────
async def _create_account_and_user(db: AsyncSession, suffix: str) -> tuple[Account, User, str]:
"""Create an account + owner user. Returns (account, user, plain_password)."""
account = Account(
name=f"Test Corp {suffix}",
slug=f"test-corp-{suffix}-{uuid.uuid4().hex[:6]}",
)
db.add(account)
await db.flush()
password = "TestPass123!"
user = User(
email=f"user-{suffix}-{uuid.uuid4().hex[:6]}@example.com",
name=f"User {suffix}",
hashed_password=get_password_hash(password),
is_active=True,
account_id=account.id,
account_role="owner",
)
db.add(user)
await db.flush()
return account, user, password
async def _login(client: AsyncClient, email: str, password: str) -> dict:
"""Return auth headers for a user."""
resp = await client.post(
"/api/v1/auth/login/json",
json={"email": email, "password": password},
)
assert resp.status_code == 200, resp.text
return {"Authorization": f"Bearer {resp.json()['access_token']}"}
async def _create_private_tree(db: AsyncSession, account: Account, user: User) -> Tree:
"""Create a private tree owned by account."""
tree = Tree(
name=f"Private Tree {uuid.uuid4().hex[:6]}",
account_id=account.id,
author_id=user.id,
visibility="private",
tree_type="troubleshooting",
tree_structure={"id": "root", "type": "start", "children": []},
is_active=True,
status="published",
)
db.add(tree)
await db.flush()
return tree
# ── Task 1: Copilot bypass ────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_copilot_cannot_start_conversation_with_other_account_tree(
client: AsyncClient, test_db: AsyncSession
):
"""Account A cannot start a copilot conversation using Account B's private tree UUID."""
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "b")
tree_b = await _create_private_tree(test_db, acct_b, user_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.post(
"/api/v1/copilot/conversations",
json={"tree_id": str(tree_b.id), "session_id": None, "current_node_id": None},
headers=headers_a,
)
# Must be 404 (not 200, not 403 — never confirm existence)
assert resp.status_code == 404, f"Expected 404, got {resp.status_code}: {resp.text}"
- Step 1.2: Run test to confirm it currently fails (i.e., returns 200 instead of 404)
cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_copilot_cannot_start_conversation_with_other_account_tree -v --override-ini="addopts="
Expected: FAIL (test gets 200 back — confirms the vulnerability is real).
Note: If AI is not configured in test env, the endpoint may return 503 before reaching the tree lookup. In that case, temporarily add
settings.ai_enabled = Falsecheck first: if you get 503, the AI gate fires before the tree check, which means the tree is still loaded but the endpoint errors out at AI quota — not the same vulnerability surface. Forcesettings.ai_enabled = Truein the test or mock the_require_ai_enabledcall to confirm the 200 path. The vulnerability is in the service layer, not the endpoint gate.
- Step 1.3: Fix
copilot_service.py— add account filter to tree query
In backend/app/services/copilot_service.py, replace lines 107–112:
# BEFORE:
# Load tree
result = await db.execute(
select(Tree).options(selectinload(Tree.tags)).where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise ValueError(f"Tree {tree_id} not found")
# AFTER:
# Load tree — must be accessible to this account.
# Allows own account's trees, default trees, and public trees.
# Raises ValueError (caught by endpoint as 404) if not found or not accessible.
from sqlalchemy import or_
result = await db.execute(
select(Tree).options(selectinload(Tree.tags)).where(
Tree.id == tree_id,
or_(
Tree.account_id == account_id,
Tree.is_default == True,
Tree.visibility == "public",
),
)
)
tree = result.scalar_one_or_none()
if not tree:
raise ValueError(f"Tree {tree_id} not found or not accessible")
- Step 1.4: Run test — must pass
cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_copilot_cannot_start_conversation_with_other_account_tree -v --override-ini="addopts="
Expected: PASS
- Step 1.5: Run full test suite — must stay green
cd backend && python -m pytest --override-ini="addopts="
Expected: all tests pass (or same failures as before this change — do not regress).
- Step 1.6: Commit as standalone hotfix
git add backend/app/services/copilot_service.py backend/tests/test_tenant_isolation_p0.py
git commit -m "fix: CRITICAL — scope copilot tree query to current account
A user who knew another account's tree UUID could start a copilot
conversation, causing the tree's full node structure, names, and
descriptions to be sent to the AI as part of the system prompt.
Fix: add account_id (or is_default / visibility='public') filter to
the tree SELECT in copilot_service.start_conversation(). Returns 404
for inaccessible trees. Test added in test_tenant_isolation_p0.py.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>"
Task 2: Add tenant_filter() and get_tenant_context
Files:
- Modify:
backend/app/core/filters.py - Modify:
backend/app/api/deps.py
These are the canonical patterns every subsequent fix and future feature uses. No new tests needed — the patterns are exercised by Tasks 3–5 tests.
- Step 2.1: Add
tenant_filter()tofilters.py
In backend/app/core/filters.py, replace the entire file:
"""
Centralized query filters for ResolutionFlow.
Provides reusable SQLAlchemy filter builders for tree access control,
step visibility, and the canonical tenant_filter used by all queries
on tenant-scoped tables.
"""
from __future__ import annotations
import uuid
from typing import TYPE_CHECKING
from sqlalchemy import or_, and_, true as sa_true
if TYPE_CHECKING:
from app.models.user import User
def tenant_filter(model, account_id: uuid.UUID):
"""Primary app-layer tenant filter.
MUST be used in every SELECT/UPDATE/DELETE on tenant tables.
RLS (Phase 2) is the safety net — this is the primary enforcement.
Usage:
stmt = select(Tree).where(tenant_filter(Tree, current_user.account_id), ...)
"""
return model.account_id == account_id
def build_tree_access_filter(current_user: User):
"""Build the access filter for trees based on user permissions.
Visibility rules:
- super_admin: sees everything
- is_default: visible to all authenticated users
- visibility='public': visible to all authenticated users
- author_id == me: always visible (regardless of visibility setting)
- visibility='team' AND account_id == mine: visible to account members
- visibility='private': only visible to author (covered by author_id check above)
- visibility='link': only visible to author (share token access is handled separately)
"""
from app.models.tree import Tree
if current_user.is_super_admin:
return sa_true()
conditions = [
Tree.is_default == True,
Tree.visibility == 'public',
Tree.author_id == current_user.id,
]
if current_user.account_id:
# Team-visible trees: use tenant_filter as the account match
conditions.append(
and_(
Tree.visibility == 'team',
tenant_filter(Tree, current_user.account_id),
)
)
return or_(*conditions)
def build_step_visibility_filter(current_user: User):
"""Build SQLAlchemy filter for step visibility based on user.
Returns steps that are:
- Public steps (visible to all)
- Team steps (visible to same account members)
- User's own private steps
"""
from app.models.step_library import StepLibrary
if current_user.account_id:
return or_(
StepLibrary.visibility == 'public',
and_(
StepLibrary.visibility == 'team',
tenant_filter(StepLibrary, current_user.account_id),
),
StepLibrary.created_by == current_user.id,
)
else:
return or_(
StepLibrary.visibility == 'public',
StepLibrary.created_by == current_user.id,
)
- Step 2.2: Add
get_tenant_contexttodeps.py
In backend/app/api/deps.py, append at the end of the file (after line 193):
async def get_tenant_context(
current_user: Annotated[User, Depends(get_current_active_user)],
) -> UUID:
"""Return the current user's account_id.
Use this dependency instead of reading current_user.account_id directly.
Raises 403 if the user has no account association (should not happen in
normal flows — users are always associated with an account on registration).
"""
if current_user.account_id is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User not associated with any account",
)
return current_user.account_id
- Step 2.3: Run full test suite to confirm no regressions
cd backend && python -m pytest --override-ini="addopts="
Expected: all tests pass.
- Step 2.4: Commit
git add backend/app/core/filters.py backend/app/api/deps.py
git commit -m "feat: add tenant_filter() helper and get_tenant_context dependency
tenant_filter(model, account_id) is the canonical app-layer tenant
scoping expression. Every query on a tenant table must use it.
build_tree_access_filter and build_step_visibility_filter updated
to call tenant_filter() internally for the account_id match.
get_tenant_context is a FastAPI dependency that returns account_id
or raises 403 if the user has no account — prevents raw access to
current_user.account_id and centralises the null check.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>"
Task 3: Fix analytics flow endpoint
Files:
- Modify:
backend/app/api/endpoints/analytics.py:294 - Test:
backend/tests/test_tenant_isolation_p0.py
- Step 3.1: Write the failing test
Add to backend/tests/test_tenant_isolation_p0.py:
# ── Task 3: Analytics flow endpoint ──────────────────────────────────────────
@pytest.mark.asyncio
async def test_analytics_flow_cannot_read_other_account_tree(
client: AsyncClient, test_db: AsyncSession
):
"""Account A cannot read flow analytics for Account B's private tree."""
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "anl-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "anl-b")
tree_b = await _create_private_tree(test_db, acct_b, user_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(
f"/api/v1/analytics/flows/{tree_b.id}",
headers=headers_a,
)
assert resp.status_code == 404, f"Expected 404, got {resp.status_code}: {resp.text}"
- Step 3.2: Run test — confirm it currently fails (returns 200)
cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_analytics_flow_cannot_read_other_account_tree -v --override-ini="addopts="
Expected: FAIL (returns 200 — confirms gap is real).
- Step 3.3: Fix
analytics.py— add tenant_filter to tree fetch
In backend/app/api/endpoints/analytics.py:
Add the import at the top (after existing imports on line 9):
from app.core.filters import tenant_filter
Replace lines 293–297:
# BEFORE:
# Verify tree exists
result = await db.execute(select(Tree).where(Tree.id == tree_id))
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(status_code=404, detail="Flow not found")
# AFTER:
# Verify tree exists and belongs to the requesting user's account.
result = await db.execute(
select(Tree).where(
Tree.id == tree_id,
tenant_filter(Tree, current_user.account_id),
)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(status_code=404, detail="Flow not found")
- Step 3.4: Run test — must pass
cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_analytics_flow_cannot_read_other_account_tree -v --override-ini="addopts="
Expected: PASS
- Step 3.5: Commit
git add backend/app/api/endpoints/analytics.py backend/tests/test_tenant_isolation_p0.py
git commit -m "fix: scope analytics/flows/{tree_id} to requesting account
Any authenticated user could read flow analytics (session counts,
completion rates, CSAT) for any tree UUID. Now returns 404 if the
tree doesn't belong to the requesting account.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>"
Task 4: Fix category tree count scope
Files:
- Modify:
backend/app/api/endpoints/categories.py:112 - Test:
backend/tests/test_tenant_isolation_p0.py
- Step 4.1: Write the failing test
Add to backend/tests/test_tenant_isolation_p0.py:
# ── Task 4: Category tree count ───────────────────────────────────────────────
@pytest.mark.asyncio
async def test_category_tree_count_scoped_to_account(
client: AsyncClient, test_db: AsyncSession
):
"""tree_count on a category must not include trees from other accounts."""
from app.models.category import TreeCategory
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "cat-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "cat-b")
# Shared category (account_id=None means global)
category = TreeCategory(
name="Shared Category",
slug=f"shared-cat-{uuid.uuid4().hex[:6]}",
account_id=None,
is_active=True,
)
test_db.add(category)
await test_db.flush()
# 3 trees for account_b under this category
for i in range(3):
tree = Tree(
name=f"B Tree {i}",
account_id=acct_b.id,
author_id=user_b.id,
category_id=category.id,
visibility="team",
tree_type="troubleshooting",
tree_structure={"id": "root", "type": "start", "children": []},
is_active=True,
status="published",
)
test_db.add(tree)
# 1 tree for account_a under this category
tree_a = Tree(
name="A Tree",
account_id=acct_a.id,
author_id=user_a.id,
category_id=category.id,
visibility="team",
tree_type="troubleshooting",
tree_structure={"id": "root", "type": "start", "children": []},
is_active=True,
status="published",
)
test_db.add(tree_a)
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(
f"/api/v1/categories/{category.id}",
headers=headers_a,
)
assert resp.status_code == 200, resp.text
# account_a should only see their 1 tree, not account_b's 3
assert resp.json()["tree_count"] == 1, (
f"Expected tree_count=1 (own trees only), got {resp.json()['tree_count']}"
)
- Step 4.2: Run test — confirm it currently fails (count=4)
cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_category_tree_count_scoped_to_account -v --override-ini="addopts="
Expected: FAIL (tree_count=4 — includes account_b's trees).
- Step 4.3: Fix
categories.py— scope tree count query
In backend/app/api/endpoints/categories.py:
Add import at top (alongside existing imports):
from app.core.filters import tenant_filter
Replace lines 111–117 (the count query block):
# BEFORE:
# Get tree count
count_query = select(func.count(Tree.id)).where(
Tree.category_id == category.id,
Tree.is_active == True
)
count_result = await db.execute(count_query)
tree_count = count_result.scalar() or 0
# AFTER:
# Get tree count — scoped to the requesting account so cross-account
# trees in shared categories are not counted.
count_query = select(func.count(Tree.id)).where(
Tree.category_id == category.id,
Tree.is_active == True,
tenant_filter(Tree, current_user.account_id),
)
count_result = await db.execute(count_query)
tree_count = count_result.scalar() or 0
- Step 4.4: Run test — must pass
cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_category_tree_count_scoped_to_account -v --override-ini="addopts="
Expected: PASS
- Step 4.5: Commit
git add backend/app/api/endpoints/categories.py backend/tests/test_tenant_isolation_p0.py
git commit -m "fix: scope category tree_count to requesting account
tree_count on GET /categories/{id} was including trees from all
accounts, leaking cross-tenant row counts.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>"
Task 5: Fix AI session search scope
Files:
- Modify:
backend/app/api/endpoints/ai_sessions.py:765-777 - Test:
backend/tests/test_tenant_isolation_p0.py
Context: The search endpoint used OR(user_id == me, account_id == mine), exposing problem_summary, problem_domain, and status of other users' sessions within the same account. Sessions are user-scoped only. The list endpoint (GET /ai-sessions) already restricts to user_id. Both must behave consistently.
- Step 5.1: Write the failing test
Add to backend/tests/test_tenant_isolation_p0.py:
# ── Task 5: AI session search scope ──────────────────────────────────────────
@pytest.mark.asyncio
async def test_ai_session_search_cannot_see_other_users_sessions(
client: AsyncClient, test_db: AsyncSession
):
"""User A cannot find User B's AI sessions via the search endpoint,
even when both users are in the same account."""
from app.models.ai_session import AISession
# Two users in the SAME account
account = Account(name="Shared Corp", slug=f"shared-{uuid.uuid4().hex[:6]}")
test_db.add(account)
await test_db.flush()
password = "TestPass123!"
user_a = User(
email=f"user-a-{uuid.uuid4().hex[:6]}@shared.com",
name="User A",
hashed_password=get_password_hash(password),
is_active=True,
account_id=account.id,
account_role="engineer",
)
user_b = User(
email=f"user-b-{uuid.uuid4().hex[:6]}@shared.com",
name="User B",
hashed_password=get_password_hash(password),
is_active=True,
account_id=account.id,
account_role="engineer",
)
test_db.add_all([user_a, user_b])
await test_db.flush()
# Session belonging to user_b with distinctive problem_summary
session_b = AISession(
user_id=user_b.id,
account_id=account.id,
problem_summary="CONFIDENTIAL: user_b's session",
problem_domain="networking",
status="resolved",
)
test_db.add(session_b)
await test_db.commit()
headers_a = await _login(client, user_a.email, password)
resp = await client.get(
"/api/v1/ai-sessions/search",
params={"q": "CONFIDENTIAL"},
headers=headers_a,
)
assert resp.status_code == 200, resp.text
results = resp.json()
ids = [r["id"] for r in results]
assert str(session_b.id) not in ids, (
"User A can see User B's session via search — cross-user leak within account"
)
- Step 5.2: Run test — confirm it currently fails
cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_ai_session_search_cannot_see_other_users_sessions -v --override-ini="addopts="
Expected: FAIL (user_b's session appears in results).
- Step 5.3: Fix
ai_sessions.py— restrict search touser_id
In backend/app/api/endpoints/ai_sessions.py, find the search_sessions function (around line 755) and replace the .where() clause:
# BEFORE:
result = await db.execute(
select(AISession)
.where(
or_(
AISession.user_id == current_user.id,
AISession.account_id == current_user.account_id,
),
text("ai_sessions.search_vector @@ plainto_tsquery('english', :q)"),
)
.params(q=q)
.order_by(AISession.created_at.desc())
.limit(limit)
)
# AFTER:
# Sessions are user-scoped. The list endpoint uses user_id only;
# search must be consistent. Cross-user access requires explicit
# escalation or session sharing — not ambient account membership.
result = await db.execute(
select(AISession)
.where(
AISession.user_id == current_user.id,
text("ai_sessions.search_vector @@ plainto_tsquery('english', :q)"),
)
.params(q=q)
.order_by(AISession.created_at.desc())
.limit(limit)
)
Also remove the now-unused or_ import from the search_sessions function if it was only used there. (Do not remove or_ if it appears elsewhere in the file.)
- Step 5.4: Run test — must pass
cd backend && python -m pytest tests/test_tenant_isolation_p0.py::test_ai_session_search_cannot_see_other_users_sessions -v --override-ini="addopts="
Expected: PASS
- Step 5.5: Commit
git add backend/app/api/endpoints/ai_sessions.py backend/tests/test_tenant_isolation_p0.py
git commit -m "fix: restrict AI session search to current user only
Search endpoint used OR(user_id, account_id), exposing other users'
problem_summary and problem_domain within the same account. Sessions
are user-scoped only — cross-user access requires explicit escalation
or sharing. List and search endpoints now behave consistently.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>"
Task 6: UUID endpoint audit and gap fixes
Files:
- Read: all files in
backend/app/api/endpoints/ - Modify: whichever files the audit finds gaps in
- Test:
backend/tests/test_tenant_isolation_p0.py
Goal: Systematically check every route with a {resource_id} URL parameter. Verify that each one either (a) filters by id AND account_id in the query, or (b) calls a permission function on the fetched object that checks account ownership.
- Step 6.1: Run the audit
For each file listed below, scan every function decorated with @router.get, @router.put, @router.patch, @router.delete that has a path like /{something_id}. For each such function, answer:
- Does the query filter by both
idANDaccount_id(ortenant_filter)? - If not, does it call a permission check (
can_access_*,can_edit_*, etc.) on the fetched object? - If neither, it's a gap.
Endpoint files to audit:
backend/app/api/endpoints/trees.py
backend/app/api/endpoints/sessions.py
backend/app/api/endpoints/steps.py
backend/app/api/endpoints/categories.py ← already fixed (Task 4)
backend/app/api/endpoints/analytics.py ← already fixed (Task 3)
backend/app/api/endpoints/copilot.py
backend/app/api/endpoints/ai_sessions.py ← search fixed (Task 5)
backend/app/api/endpoints/assistant_chat.py
backend/app/api/endpoints/integrations.py
backend/app/api/endpoints/flow_proposals.py
backend/app/api/endpoints/maintenance_schedules.py
backend/app/api/endpoints/kb_accelerator.py
backend/app/api/endpoints/flowpilot_analytics.py
backend/app/api/endpoints/shares.py
backend/app/api/endpoints/uploads.py
backend/app/api/endpoints/tags.py
backend/app/api/endpoints/step_categories.py
backend/app/api/endpoints/notifications.py
backend/app/api/endpoints/survey.py
For each gap found: classify severity (CRITICAL / HIGH / MEDIUM / LOW), document file + line number, fix using the fetch-and-verify pattern:
# Standard fix pattern — fetch with account_id filter, return 404 if not found
from app.core.filters import tenant_filter
stmt = select(Model).where(
Model.id == resource_id,
tenant_filter(Model, current_user.account_id),
)
resource = (await db.execute(stmt)).scalar_one_or_none()
if not resource:
raise HTTPException(status_code=404) # Not 403 — never reveal existence
Known findings from prior audit (do not re-verify, just fix):
| File | Route | Severity | Status |
|---|---|---|---|
copilot.py |
POST /copilot/conversations |
CRITICAL | Fixed (Task 1) |
analytics.py |
GET /analytics/flows/{tree_id} |
LOW | Fixed (Task 3) |
categories.py |
tree_count in GET /categories/{id} |
LOW | Fixed (Task 4) |
ai_sessions.py |
GET /ai-sessions/search |
LOW | Fixed (Task 5) |
steps.py |
get_step_or_404 — 403 vs 404 on non-existent UUID |
Very Low | Audit only — fix if severity warrants |
flowpilot_analytics.py |
Needs manual review | Unknown | Audit now |
- Step 6.2: For each gap found, write a test, then apply the fix
Use the same TDD pattern as Tasks 1–5:
# Test template for a gap in MyEndpoint
@pytest.mark.asyncio
async def test_cannot_access_other_account_<resource>(
client: AsyncClient, test_db: AsyncSession
):
"""Account A cannot access Account B's <resource> by UUID."""
acct_a, user_a, pass_a = await _create_account_and_user(test_db, "<resource>-a")
acct_b, user_b, pass_b = await _create_account_and_user(test_db, "<resource>-b")
resource_b = <create resource for acct_b>
await test_db.commit()
headers_a = await _login(client, user_a.email, pass_a)
resp = await client.get(f"/api/v1/<resource>/{resource_b.id}", headers=headers_a)
assert resp.status_code == 404, f"Expected 404, got {resp.status_code}: {resp.text}"
- Step 6.3: After all gaps fixed, run full test suite
cd backend && python -m pytest --override-ini="addopts="
Expected: all tests pass.
- Step 6.4: Commit all audit fixes together
git add backend/app/api/endpoints/ backend/tests/test_tenant_isolation_p0.py
git commit -m "fix: UUID endpoint audit — add missing ownership checks
Audit of all {resource_id} endpoints. Gaps found and fixed:
<list each file:line fixed from the audit>
All fixed endpoints now return 404 (not 403) for cross-tenant IDs.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>"
Task 7: TargetList dead code audit + teams orphan check
Files:
- No code changes (audit and report only)
- Report findings documented in a comment on this plan PR or in
docs/
- Step 7.1: TargetList dead code audit
Run from the repo root:
grep -rn "TargetList\|target_list\|target-list\|target_lists" \
backend/app/api \
backend/app/services \
backend/app/schemas \
frontend/src \
--include="*.py" --include="*.ts" --include="*.tsx" \
| grep -v "__pycache__" \
| grep -v "test_target"
Record the output. Then query production/staging DB row count:
# Run via docker exec or psql connection
docker exec -it resolutionflow_postgres psql -U postgres -d patherly \
-c "SELECT COUNT(*) FROM target_lists;"
Decision:
-
Zero code references AND zero rows → drop the table in Phase 1 migration
-
Zero rows but code references exist → add TargetList deprecation cleanup to Phase 1 scope
-
Rows exist → add TargetList migration to Phase 1 scope (backfill
account_idfromteam_id → teams → users WHERE is_team_admin) -
Step 7.2: Teams orphan check
docker exec -it resolutionflow_postgres psql -U postgres -d patherly -c "
SELECT COUNT(*) AS orphaned_teams
FROM teams t
LEFT JOIN users u ON u.team_id = t.id AND u.account_id IS NOT NULL
WHERE u.id IS NULL;
"
Decision:
-
Count = 0 → proceed to Phase 1 schema migration without concern
-
Count > 0 → document which teams are orphaned, resolve before any Phase 1 backfill involving
team_id → account_idchains -
Step 7.3: Document results
In docs/superpowers/specs/2026-04-09-tenant-data-isolation-design.md, Section 9 Open Questions, replace the TargetList row:
# BEFORE:
| TargetList: zero references + zero rows? Or does data exist? | Determines whether table is dropped or migrated. | Phase 0 audit |
# AFTER (fill in actual result, e.g.):
| TargetList audit complete: zero rows, zero non-test code references. Decision: drop table in Phase 1. | Resolved — drop in Phase 1. | ✓ Done |
Also add a row for the teams orphan result:
| Teams orphan check: N orphaned teams found. | Phase 1 backfills using team→account chain safe to proceed (or: N teams need resolution before Phase 1). | ✓ Done |
- Step 7.4: Commit the updated spec
git add docs/superpowers/specs/2026-04-09-tenant-data-isolation-design.md
git commit -m "docs: record Phase 0 audit results — TargetList and teams orphan check
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>"
Task 8: CI tenant-filter grep check
Files:
- Create:
backend/scripts/check_tenant_filters.py - Modify:
.github/workflows/ci.yml
Goal: Warn (not yet block) when endpoint or service files contain select(TenantModel) patterns without tenant_filter or account_id in the surrounding context. Active from Phase 1 forward, so all Phase 1 work is gated by it. Switch to block (exit code 1) after 2 weeks of false-positive calibration.
- Step 8.1: Create the check script
Create backend/scripts/check_tenant_filters.py:
"""
Tenant filter enforcement check.
Scans endpoint and service files for SQLAlchemy select() calls on known
tenant tables and warns when account_id or tenant_filter is not present
in the surrounding 15 lines (the typical extent of a single query).
Usage:
python scripts/check_tenant_filters.py # warn mode (exits 0)
python scripts/check_tenant_filters.py --fail # block mode (exits 1 on findings)
"""
import re
import sys
from pathlib import Path
# Tables that must always be filtered by account_id or tenant_filter.
# Extend this list as new tenant tables are added.
TENANT_MODELS = [
"Tree", "AISession", "Session", "StepLibrary", "FlowProposal",
"CopilotConversation", "AssistantChat", "FileUpload", "KBImport",
"PsaConnection", "PsaPostLog", "PsaMemberMapping", "AIChatSession",
"AIConversation", "AIUsage", "Subscription", "AccountInvite",
"Notification", "NotificationConfig", "SessionShare", "UserFolder",
"UserPinnedTree", "SessionBranch", "SessionHandoff",
"SessionResolutionOutput", "ForkPoint", "AISessionStep",
"AISuggestion", "StepCategory", "TreeCategory", "TreeTag",
"Attachment", "SessionSupportingData", "MaintenanceSchedule",
"AuditLog", "ScriptBuilderSession", "ScriptTemplate",
"StepRating", "StepUsageLog", "AISession",
]
# Directories to scan
SCAN_DIRS = [
Path("app/api/endpoints"),
Path("app/services"),
]
# Patterns that indicate the query is correctly scoped
SAFE_PATTERNS = [
r"tenant_filter",
r"account_id",
r"is_super_admin", # Super admin queries intentionally bypass tenant filter
r"# cross-tenant: approved", # Explicit approval comment
]
SKIP_FILES = {
"admin.py", # Super admin endpoints intentionally bypass tenant filter
}
findings = []
for scan_dir in SCAN_DIRS:
if not scan_dir.exists():
continue
for path in sorted(scan_dir.glob("*.py")):
if path.name in SKIP_FILES:
continue
lines = path.read_text().splitlines()
for i, line in enumerate(lines):
for model in TENANT_MODELS:
if re.search(rf"\bselect\s*\(\s*{model}\b", line):
# Check surrounding 15 lines for a safe pattern
start = max(0, i - 2)
end = min(len(lines), i + 15)
context = "\n".join(lines[start:end])
if not any(re.search(p, context) for p in SAFE_PATTERNS):
findings.append(
f"{path}:{i + 1}: select({model}) — no tenant_filter or account_id found in context"
)
if findings:
print(f"\n⚠ Tenant filter check — {len(findings)} warning(s):\n")
for f in findings:
print(f" {f}")
print()
if "--fail" in sys.argv:
print("Run with --fail: exiting 1")
sys.exit(1)
else:
print("Run in warn mode — not blocking. Pass --fail to block.")
sys.exit(0)
else:
print("✓ Tenant filter check passed — no unscoped tenant table queries found.")
sys.exit(0)
- Step 8.2: Test the script locally — it should find zero issues after Tasks 1–6
cd backend && python scripts/check_tenant_filters.py
Expected output:
✓ Tenant filter check passed — no unscoped tenant table queries found.
If it reports false positives, add the file to SKIP_FILES or add # cross-tenant: approved comment to the query. Do not suppress real gaps — fix them as Task 6 additions.
- Step 8.3: Add the check to CI
In .github/workflows/ci.yml, add this step to the backend job, immediately after the Install dependencies step and before Run tests with coverage:
- name: Check tenant filter enforcement
run: cd backend && python scripts/check_tenant_filters.py
# Warn mode only. Switch to --fail after 2 weeks of calibration.
# See: docs/superpowers/specs/2026-04-09-tenant-data-isolation-design.md Section 3f
- Step 8.4: Run the full CI pipeline locally to confirm the step passes
cd backend && python scripts/check_tenant_filters.py && python -m pytest --override-ini="addopts="
Expected: both commands exit 0.
- Step 8.5: Commit
git add backend/scripts/check_tenant_filters.py .github/workflows/ci.yml
git commit -m "chore: add CI tenant-filter grep check (warn mode)
Scans endpoint and service files for select() calls on tenant tables
without tenant_filter or account_id in the surrounding context.
Running in warn mode (exit 0) — switch to --fail after 2-week
calibration period to block violating PRs.
See spec Section 3f for background.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>"
Phase 0 Gate Verification
Before declaring Phase 0 complete and starting Phase 1, verify every item:
- All tests in
test_tenant_isolation_p0.pypass python scripts/check_tenant_filters.pyexits 0 with no findings- Full test suite passes:
cd backend && python -m pytest --override-ini="addopts=" - Frontend builds:
cd frontend && npm run build - TargetList audit result documented in spec (Task 7.3)
- Teams orphan count documented in spec (Task 7.3)
- CI
check_tenant_filtersstep added to.github/workflows/ci.yml
What comes next
Phase 1 (schema migration) requires its own plan. Write it once Phase 0 gate is green.
Phase 1 scope (from spec Section 7):
- Add
account_id NOT NULLto ~20 tables that currently lack it (backfill sequences) - Make nullable
account_idNOT NULL on existing models (Users, Trees, etc.) - Migrate ScriptBuilderSession, ScriptTemplate, ScriptGeneration from
team_id - TargetList: drop or migrate per Phase 0 audit result
- Create
template_treesandplatform_stepstables
Spec: docs/superpowers/specs/2026-04-09-tenant-data-isolation-design.md Sections 1 and 5.