feat: tenant isolation Phase 4 — RLS on 31 remaining tables + script_builder fix

Enable RLS on all remaining tenant-scoped tables (31 tables):

Standard policy (tenant sees own rows):
  users, account_invites, account_limit_overrides, account_feature_overrides,
  subscriptions, ai_chat_sessions, ai_conversations, ai_session_steps,
  ai_session_embeddings, ai_suggestions, ai_usage, assistant_chats,
  attachments, copilot_conversations, feedback, file_uploads, fork_points,
  kb_imports, notifications, notification_configs, notification_logs,
  psa_activity_logs, psa_member_mappings, script_builder_sessions,
  script_categories, session_ratings, tree_embeddings, user_folders,
  user_pinned_trees

Platform-visibility policy (own rows OR PLATFORM_ACCOUNT_ID):
  platform_steps, template_trees

Intentionally skipped:
  accounts (IS the root table, no account_id column)
  plan_feature_defaults (platform config, no account_id column)

Also fixes script_builder_service.create_session() which was missing
account_id= on ScriptBuilderSession construction, causing 500s on all
script builder endpoints (pre-existing CI failure).

Adds Phase 4 RLS isolation tests covering: users, script_builder_sessions,
ai_session_steps, notifications, platform_steps, template_trees.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-04-12 01:25:28 +00:00
parent ba36e37dab
commit 64f004a62c
4 changed files with 292 additions and 0 deletions

View File

@@ -954,3 +954,187 @@ async def test_tree_shares_account_a_cannot_see_account_b(admin_conn, conn_a):
assert len(rows) == 0, "Account A should not see Account B tree_shares"
finally:
await admin_conn.execute(f"DELETE FROM tree_shares WHERE id = '{share_id}'")
# ===========================================================================
# Phase 4 RLS isolation tests
# Tables: users, script_builder_sessions, ai_session_steps,
# notifications, platform_steps, template_trees
# ===========================================================================
# ---------------------------------------------------------------------------
# users
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_users_account_a_cannot_see_account_b(admin_conn, conn_a):
"""Account A must not see users belonging to Account B."""
rows = await conn_a.fetch(
f"SELECT id FROM users WHERE account_id = '{ACCOUNT_B_ID}'"
)
assert len(rows) == 0, "Account A should not see Account B users"
@pytest.mark.asyncio
async def test_users_account_a_can_see_own(admin_conn, conn_a):
"""Account A must be able to see its own users."""
rows = await conn_a.fetch(
f"SELECT id FROM users WHERE account_id = '{ACCOUNT_A_ID}'"
)
assert len(rows) > 0, "Account A should see its own users"
# ---------------------------------------------------------------------------
# script_builder_sessions
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_script_builder_sessions_account_a_cannot_see_account_b(admin_conn, conn_a):
"""Account A must not see script builder sessions belonging to Account B."""
user_b_id = await _get_user_b_id(admin_conn)
session_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO script_builder_sessions (
id, user_id, account_id, language, created_at, updated_at
) VALUES (
'{session_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
'powershell', NOW(), NOW()
)
""")
try:
rows = await conn_a.fetch(
f"SELECT id FROM script_builder_sessions WHERE account_id = '{ACCOUNT_B_ID}'"
)
assert len(rows) == 0, "Account A should not see Account B script_builder_sessions"
finally:
await admin_conn.execute(
f"DELETE FROM script_builder_sessions WHERE id = '{session_id}'"
)
# ---------------------------------------------------------------------------
# ai_session_steps
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ai_session_steps_account_a_cannot_see_account_b(admin_conn, conn_a):
"""Account A must not see ai_session_steps belonging to Account B."""
user_b_id = await _get_user_b_id(admin_conn)
tree_b_id = await _get_tree_b_id(admin_conn)
# Need an ai_sessions row as FK
ai_session_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO ai_sessions (
id, user_id, account_id, flow_type, status, confidence_tier,
created_at, updated_at
) VALUES (
'{ai_session_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
'troubleshooting', 'active', 'guided', NOW(), NOW()
)
""")
step_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO ai_session_steps (
id, session_id, account_id, step_type, content,
created_at
) VALUES (
'{step_id}', '{ai_session_id}', '{ACCOUNT_B_ID}',
'question', 'Phase4 RLS test step', NOW()
)
""")
try:
rows = await conn_a.fetch(
f"SELECT id FROM ai_session_steps WHERE account_id = '{ACCOUNT_B_ID}'"
)
assert len(rows) == 0, "Account A should not see Account B ai_session_steps"
finally:
await admin_conn.execute(f"DELETE FROM ai_session_steps WHERE id = '{step_id}'")
await admin_conn.execute(f"DELETE FROM ai_sessions WHERE id = '{ai_session_id}'")
# ---------------------------------------------------------------------------
# notifications
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_notifications_account_a_cannot_see_account_b(admin_conn, conn_a):
"""Account A must not see notifications belonging to Account B."""
user_b_id = await _get_user_b_id(admin_conn)
notif_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO notifications (
id, user_id, account_id, type, title, message,
is_read, created_at
) VALUES (
'{notif_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
'info', 'Phase4 RLS Test', 'RLS isolation test notification',
FALSE, NOW()
)
""")
try:
rows = await conn_a.fetch(
f"SELECT id FROM notifications WHERE account_id = '{ACCOUNT_B_ID}'"
)
assert len(rows) == 0, "Account A should not see Account B notifications"
finally:
await admin_conn.execute(f"DELETE FROM notifications WHERE id = '{notif_id}'")
# ---------------------------------------------------------------------------
# platform_steps — platform content visible to all tenants
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_platform_steps_visible_to_all_tenants(admin_conn, conn_a):
"""Platform steps (PLATFORM_ACCOUNT_ID) must be visible to any tenant."""
step_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO platform_steps (
id, account_id, title, step_type, content,
is_active, created_at, updated_at
) VALUES (
'{step_id}', '{PLATFORM_ACCOUNT_ID}', 'Phase4 RLS Platform Step',
'action', '{{}}'::jsonb, TRUE, NOW(), NOW()
)
""")
try:
rows = await conn_a.fetch(
f"SELECT id FROM platform_steps WHERE id = '{step_id}'"
)
assert len(rows) == 1, (
"Platform steps (PLATFORM_ACCOUNT_ID) must be visible to all tenants"
)
finally:
await admin_conn.execute(f"DELETE FROM platform_steps WHERE id = '{step_id}'")
# ---------------------------------------------------------------------------
# template_trees — platform content visible to all tenants
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_template_trees_visible_to_all_tenants(admin_conn, conn_a):
"""Template trees (PLATFORM_ACCOUNT_ID) must be visible to any tenant."""
tmpl_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO template_trees (
id, account_id, name, tree_structure, is_active,
created_at, updated_at
) VALUES (
'{tmpl_id}', '{PLATFORM_ACCOUNT_ID}', 'Phase4 RLS Template',
'{{}}'::jsonb, TRUE, NOW(), NOW()
)
""")
try:
rows = await conn_a.fetch(
f"SELECT id FROM template_trees WHERE id = '{tmpl_id}'"
)
assert len(rows) == 1, (
"Template trees (PLATFORM_ACCOUNT_ID) must be visible to all tenants"
)
finally:
await admin_conn.execute(f"DELETE FROM template_trees WHERE id = '{tmpl_id}'")