# backend/tests/test_rls_isolation.py """ RLS foundation tests. Connect directly as resolutionflow_app (not superuser) and verify: - Tenant A cannot read Tenant B's rows - No tenant context set → zero rows for private data (fail-closed) - Platform rows (PLATFORM_ACCOUNT_ID) are visible to all tenants Tests bypass FastAPI entirely — raw asyncpg connections only. MUST FAIL before Task 10 (RLS migration) and PASS after it. Run with: DB_APP_ROLE_PASSWORD=app_secret_change_me pytest tests/test_rls_isolation.py -v The test DB is patherly_test (matches conftest.py default). """ import os import subprocess import sys import uuid from pathlib import Path import asyncpg import pytest # All tests in this module use module-scoped async fixtures (admin_conn, # seed_rls_test_data) which run on the module event loop. Without this marker, # pytest-asyncio 0.23+ defaults tests to function-scoped loops, causing # "Future attached to a different loop" errors on the asyncpg connections. pytestmark = pytest.mark.asyncio(loop_scope="module") _DB_HOST = os.getenv("TEST_DB_HOST", "localhost") _DB_PORT = int(os.getenv("TEST_DB_PORT", "5432")) _DB_NAME = os.getenv("TEST_DB_NAME", "patherly_test") # matches conftest.py _APP_PASSWORD = os.getenv("DB_APP_ROLE_PASSWORD", "app_secret_change_me") _ADMIN_DSN = f"postgresql://postgres:postgres@{_DB_HOST}:{_DB_PORT}/{_DB_NAME}" PLATFORM_ACCOUNT_ID = "00000000-0000-0000-0000-000000000001" ACCOUNT_A_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa" ACCOUNT_B_ID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb" # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture(scope="module") def _ensure_rls_schema(): """Re-apply Alembic migrations before the module runs. Function-scoped test_db fixtures in other modules drop and recreate the public schema using Base.metadata.create_all, which does not enable RLS or create DB roles. This fixture re-runs 'alembic upgrade head' so that the full migration-managed schema (including RLS policies) is in place. """ backend_dir = Path(__file__).parent.parent subprocess.run( [sys.executable, "-m", "alembic", "upgrade", "head"], cwd=backend_dir, check=True, capture_output=True, ) @pytest.fixture(scope="module") async def admin_conn(_ensure_rls_schema): """Superuser asyncpg connection for fixture setup and teardown.""" conn = await asyncpg.connect(_ADMIN_DSN) yield conn await conn.close() @pytest.fixture(scope="module", autouse=True) async def seed_rls_test_data(admin_conn): """ Create two isolated test accounts, one user per account, and one private tree per account. Trees require a valid author_id FK to users, so users must be created first. accounts.display_code must be unique and 8 chars (NOT NULL constraint). """ # Insert accounts await admin_conn.execute(f""" INSERT INTO accounts (id, name, display_code, created_at, updated_at) VALUES ('{ACCOUNT_A_ID}', 'RLS Tenant A', 'RLSA0001', NOW(), NOW()), ('{ACCOUNT_B_ID}', 'RLS Tenant B', 'RLSB0001', NOW(), NOW()) ON CONFLICT (id) DO NOTHING """) # Insert one user per account (users.account_id NOT NULL, password_hash NOT NULL) user_a_id = str(uuid.uuid4()) user_b_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO users ( id, email, password_hash, name, role, is_active, account_id, account_role, created_at ) VALUES ('{user_a_id}', 'rls-user-a@example.com', 'placeholder', 'RLS User A', 'engineer', TRUE, '{ACCOUNT_A_ID}', 'engineer', NOW()), ('{user_b_id}', 'rls-user-b@example.com', 'placeholder', 'RLS User B', 'engineer', TRUE, '{ACCOUNT_B_ID}', 'engineer', NOW()) ON CONFLICT (email) DO NOTHING """) # Look up the user IDs we just inserted (ON CONFLICT may have skipped) row_a = await admin_conn.fetchrow( "SELECT id FROM users WHERE email = 'rls-user-a@example.com'" ) row_b = await admin_conn.fetchrow( "SELECT id FROM users WHERE email = 'rls-user-b@example.com'" ) actual_user_a = str(row_a["id"]) actual_user_b = str(row_b["id"]) # Insert one private tree per account with explicit author_id await admin_conn.execute(f""" INSERT INTO trees ( id, name, tree_structure, account_id, author_id, is_active, is_default, is_public, visibility, tree_type, created_at, updated_at ) VALUES (gen_random_uuid(), 'RLS Tree A', '[]'::jsonb, '{ACCOUNT_A_ID}', '{actual_user_a}', TRUE, FALSE, FALSE, 'private', 'troubleshooting', NOW(), NOW()), (gen_random_uuid(), 'RLS Tree B', '[]'::jsonb, '{ACCOUNT_B_ID}', '{actual_user_b}', TRUE, FALSE, FALSE, 'private', 'troubleshooting', NOW(), NOW()) """) # One platform-owned tree_tag (global, visible to all tenants) await admin_conn.execute(f""" INSERT INTO tree_tags ( id, name, slug, account_id, usage_count, created_at ) VALUES ( gen_random_uuid(), 'rls-global-tag', 'rls-global-tag', '{PLATFORM_ACCOUNT_ID}', 0, NOW() ) ON CONFLICT DO NOTHING """) yield # Cleanup await admin_conn.execute( f"DELETE FROM trees WHERE account_id IN ('{ACCOUNT_A_ID}', '{ACCOUNT_B_ID}')" ) await admin_conn.execute( "DELETE FROM users WHERE email IN " "('rls-user-a@example.com', 'rls-user-b@example.com')" ) await admin_conn.execute( f"DELETE FROM accounts WHERE id IN ('{ACCOUNT_A_ID}', '{ACCOUNT_B_ID}')" ) await admin_conn.execute("DELETE FROM tree_tags WHERE slug = 'rls-global-tag'") @pytest.fixture async def conn_a(): """App-role connection, tenant context = Account A.""" conn = await asyncpg.connect( host=_DB_HOST, port=_DB_PORT, database=_DB_NAME, user="resolutionflow_app", password=_APP_PASSWORD, ) await conn.execute( "SELECT set_config('app.current_account_id', $1, false)", ACCOUNT_A_ID ) yield conn await conn.close() @pytest.fixture async def conn_b(): """App-role connection, tenant context = Account B.""" conn = await asyncpg.connect( host=_DB_HOST, port=_DB_PORT, database=_DB_NAME, user="resolutionflow_app", password=_APP_PASSWORD, ) await conn.execute( "SELECT set_config('app.current_account_id', $1, false)", ACCOUNT_B_ID ) yield conn await conn.close() @pytest.fixture async def conn_no_context(): """App-role connection with NO tenant context set.""" conn = await asyncpg.connect( host=_DB_HOST, port=_DB_PORT, database=_DB_NAME, user="resolutionflow_app", password=_APP_PASSWORD, ) yield conn await conn.close() # --------------------------------------------------------------------------- # trees # --------------------------------------------------------------------------- async def test_trees_account_a_cannot_see_account_b_rows(conn_a): rows = await conn_a.fetch( f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B trees" async def test_trees_account_a_can_see_own_rows(conn_a): rows = await conn_a.fetch( f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_A_ID}'" ) assert len(rows) >= 1, "Account A should see its own trees" async def test_trees_no_context_sees_no_private_trees(conn_no_context): rows = await conn_no_context.fetch( "SELECT id FROM trees WHERE is_default = FALSE AND is_public = FALSE" ) assert len(rows) == 0, "No-context connection should see no private trees" # --------------------------------------------------------------------------- # tree_tags — platform visibility # --------------------------------------------------------------------------- async def test_tree_tags_account_a_cannot_see_account_b_tags(conn_a): rows = await conn_a.fetch( f"SELECT id FROM tree_tags WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0 async def test_tree_tags_both_tenants_see_platform_tags(conn_a, conn_b): rows_a = await conn_a.fetch( f"SELECT id FROM tree_tags WHERE account_id = '{PLATFORM_ACCOUNT_ID}'" ) rows_b = await conn_b.fetch( f"SELECT id FROM tree_tags WHERE account_id = '{PLATFORM_ACCOUNT_ID}'" ) assert len(rows_a) >= 1, "Account A should see platform tags" assert len(rows_b) >= 1, "Account B should see platform tags" # --------------------------------------------------------------------------- # tree_categories — platform visibility # --------------------------------------------------------------------------- async def test_tree_categories_account_a_cannot_see_account_b(conn_a): rows = await conn_a.fetch( f"SELECT id FROM tree_categories WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0 # --------------------------------------------------------------------------- # step_categories — platform visibility # --------------------------------------------------------------------------- async def test_step_categories_account_a_cannot_see_account_b(conn_a): rows = await conn_a.fetch( f"SELECT id FROM step_categories WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0 # --------------------------------------------------------------------------- # psa_connections — tenant-only # --------------------------------------------------------------------------- async def test_psa_connections_account_a_cannot_see_account_b(conn_a): rows = await conn_a.fetch( f"SELECT id FROM psa_connections WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0 # --------------------------------------------------------------------------- # flow_proposals — tenant-only # --------------------------------------------------------------------------- async def test_flow_proposals_account_a_cannot_see_account_b(conn_a): rows = await conn_a.fetch( f"SELECT id FROM flow_proposals WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0 # --------------------------------------------------------------------------- # Phase 2 fixtures # --------------------------------------------------------------------------- @pytest.fixture(scope="module") async def session_row_ids(admin_conn): """ Insert one `sessions` row and one `ai_sessions` row for each of ACCOUNT_A and ACCOUNT_B using the superuser connection (BYPASSRLS). Returns a dict with the inserted IDs for use in tests. Cleans up on exit. """ # Resolve a valid tree_id and user_id for each account tree_a = await admin_conn.fetchrow( f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_A_ID}' LIMIT 1" ) tree_b = await admin_conn.fetchrow( f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_B_ID}' LIMIT 1" ) user_a = await admin_conn.fetchrow( f"SELECT id FROM users WHERE account_id = '{ACCOUNT_A_ID}' LIMIT 1" ) user_b = await admin_conn.fetchrow( f"SELECT id FROM users WHERE account_id = '{ACCOUNT_B_ID}' LIMIT 1" ) assert tree_a is not None, f"No tree found for ACCOUNT_A ({ACCOUNT_A_ID}) — seed_rls_test_data must run first" assert tree_b is not None, f"No tree found for ACCOUNT_B ({ACCOUNT_B_ID}) — seed_rls_test_data must run first" assert user_a is not None, f"No user found for ACCOUNT_A ({ACCOUNT_A_ID}) — seed_rls_test_data must run first" assert user_b is not None, f"No user found for ACCOUNT_B ({ACCOUNT_B_ID}) — seed_rls_test_data must run first" tree_a_id = str(tree_a["id"]) tree_b_id = str(tree_b["id"]) user_a_id = str(user_a["id"]) user_b_id = str(user_b["id"]) session_a_id = str(uuid.uuid4()) session_b_id = str(uuid.uuid4()) ai_session_a_id = str(uuid.uuid4()) ai_session_b_id = str(uuid.uuid4()) # Insert sessions rows (sessions uses started_at not created_at) await admin_conn.execute(f""" INSERT INTO sessions ( id, tree_id, user_id, account_id, tree_snapshot, path_taken, decisions, custom_steps, started_at ) VALUES ('{session_a_id}', '{tree_a_id}', '{user_a_id}', '{ACCOUNT_A_ID}', '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, NOW()), ('{session_b_id}', '{tree_b_id}', '{user_b_id}', '{ACCOUNT_B_ID}', '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, NOW()) """) # Insert ai_sessions rows # confidence_tier valid values: 'guided' | 'exploring' | 'discovery' await admin_conn.execute(f""" INSERT INTO ai_sessions ( id, user_id, account_id, session_type, intake_type, intake_content, status, confidence_tier, confidence_score, created_at, updated_at ) VALUES ('{ai_session_a_id}', '{user_a_id}', '{ACCOUNT_A_ID}', 'guided', 'free_text', '{{}}'::jsonb, 'active', 'guided', 0.0, NOW(), NOW()), ('{ai_session_b_id}', '{user_b_id}', '{ACCOUNT_B_ID}', 'guided', 'free_text', '{{}}'::jsonb, 'active', 'guided', 0.0, NOW(), NOW()) """) # ------------------------------------------------------------------------- # Seed Account B rows for every "cannot-see" table that would otherwise be # empty. Without these, isolation tests pass vacuously even when RLS is off. # ------------------------------------------------------------------------- # session_branches (FK: ai_sessions.id) branch_b_row = await admin_conn.fetchrow(""" INSERT INTO session_branches ( id, session_id, account_id, branch_order, label, status, conversation_messages, created_at, updated_at ) VALUES ( gen_random_uuid(), $1::uuid, $2::uuid, 1, 'test-branch', 'active', '[]'::jsonb, NOW(), NOW() ) RETURNING id """, ai_session_b_id, ACCOUNT_B_ID) branch_b_id = str(branch_b_row["id"]) # session_supporting_data (FK: sessions.id) supporting_data_b_row = await admin_conn.fetchrow(""" INSERT INTO session_supporting_data ( id, session_id, account_id, label, data_type, content, sort_order, created_at, updated_at ) VALUES ( gen_random_uuid(), $1::uuid, $2::uuid, 'test-data', 'text_snippet', 'test content', 0, NOW(), NOW() ) RETURNING id """, session_b_id, ACCOUNT_B_ID) supporting_data_b_id = str(supporting_data_b_row["id"]) # session_resolution_outputs (FK: ai_sessions.id) resolution_output_b_row = await admin_conn.fetchrow(""" INSERT INTO session_resolution_outputs ( id, session_id, account_id, output_type, generated_content, status, generated_by_model, created_at, updated_at ) VALUES ( gen_random_uuid(), $1::uuid, $2::uuid, 'psa_ticket_notes', 'test content', 'draft', 'test-model', NOW(), NOW() ) RETURNING id """, ai_session_b_id, ACCOUNT_B_ID) resolution_output_b_id = str(resolution_output_b_row["id"]) # session_handoffs (FK: ai_sessions.id, users.id) handoff_b_row = await admin_conn.fetchrow(""" INSERT INTO session_handoffs ( id, session_id, account_id, handed_off_by, intent, snapshot, priority, psa_note_pushed, notification_sent, created_at ) VALUES ( gen_random_uuid(), $1::uuid, $2::uuid, $3::uuid, 'park', '{}'::jsonb, 'normal', false, false, NOW() ) RETURNING id """, ai_session_b_id, ACCOUNT_B_ID, user_b_id) handoff_b_id = str(handoff_b_row["id"]) # maintenance_schedules (FK: trees.id) maintenance_b_row = await admin_conn.fetchrow(""" INSERT INTO maintenance_schedules ( id, tree_id, account_id, cron_expression, timezone, created_at, updated_at ) VALUES ( gen_random_uuid(), $1::uuid, $2::uuid, '0 9 * * 1', 'UTC', NOW(), NOW() ) RETURNING id """, tree_b_id, ACCOUNT_B_ID) maintenance_b_id = str(maintenance_b_row["id"]) # psa_post_log (FK: ai_sessions.id, users.id) psa_log_b_row = await admin_conn.fetchrow(""" INSERT INTO psa_post_log ( id, ai_session_id, account_id, ticket_id, note_type, content_posted, status, posted_by, posted_at ) VALUES ( gen_random_uuid(), $1::uuid, $2::uuid, 'TEST-0001', 'internal', 'test note', 'success', $3::uuid, NOW() ) RETURNING id """, ai_session_b_id, ACCOUNT_B_ID, user_b_id) psa_log_b_id = str(psa_log_b_row["id"]) # script_templates requires a script_categories row — insert a temporary one script_category_b_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO script_categories (id, name, slug, sort_order, is_active, created_at, updated_at) VALUES ('{script_category_b_id}', 'RLS Test Category', 'rls-test-category-{script_category_b_id[:8]}', 0, true, NOW(), NOW()) """) script_template_b_row = await admin_conn.fetchrow(f""" INSERT INTO script_templates ( id, category_id, account_id, name, slug, script_body, complexity, is_active, created_at, updated_at ) VALUES ( gen_random_uuid(), '{script_category_b_id}'::uuid, $1::uuid, 'RLS Test Template', 'rls-test-template-b-' || gen_random_uuid()::text, 'Write-Host "test"', 'beginner', true, NOW(), NOW() ) RETURNING id """, ACCOUNT_B_ID) script_template_b_id = str(script_template_b_row["id"]) # script_generations (FK: script_templates.id, users.id) script_gen_b_row = await admin_conn.fetchrow(""" INSERT INTO script_generations ( id, template_id, user_id, account_id, parameters_used, generated_script, created_at ) VALUES ( gen_random_uuid(), $1::uuid, $2::uuid, $3::uuid, '{}'::jsonb, 'test script', NOW() ) RETURNING id """, script_template_b_id, user_b_id, ACCOUNT_B_ID) script_gen_b_id = str(script_gen_b_row["id"]) try: yield { "session_a": session_a_id, "session_b": session_b_id, "ai_session_a": ai_session_a_id, "ai_session_b": ai_session_b_id, } finally: # Cleanup in reverse FK order (children before parents) await admin_conn.execute( f"DELETE FROM script_generations WHERE id = '{script_gen_b_id}'" ) await admin_conn.execute( f"DELETE FROM session_branches WHERE id = '{branch_b_id}'" ) await admin_conn.execute( f"DELETE FROM session_supporting_data WHERE id = '{supporting_data_b_id}'" ) await admin_conn.execute( f"DELETE FROM session_resolution_outputs WHERE id = '{resolution_output_b_id}'" ) await admin_conn.execute( f"DELETE FROM session_handoffs WHERE id = '{handoff_b_id}'" ) await admin_conn.execute( f"DELETE FROM maintenance_schedules WHERE id = '{maintenance_b_id}'" ) await admin_conn.execute( f"DELETE FROM psa_post_log WHERE id = '{psa_log_b_id}'" ) await admin_conn.execute( f"DELETE FROM script_templates WHERE id = '{script_template_b_id}'" ) await admin_conn.execute( f"DELETE FROM script_categories WHERE id = '{script_category_b_id}'" ) await admin_conn.execute( f"DELETE FROM sessions WHERE id IN ('{session_a_id}', '{session_b_id}')" ) await admin_conn.execute( f"DELETE FROM ai_sessions WHERE id IN ('{ai_session_a_id}', '{ai_session_b_id}')" ) # --------------------------------------------------------------------------- # sessions # --------------------------------------------------------------------------- async def test_sessions_account_a_cannot_see_account_b_sessions(conn_a, session_row_ids): rows = await conn_a.fetch( f"SELECT id FROM sessions WHERE id = '{session_row_ids['session_b']}'" ) assert len(rows) == 0, "Account A should not see Account B sessions" async def test_sessions_account_a_can_see_own_sessions(conn_a, session_row_ids): rows = await conn_a.fetch( f"SELECT id FROM sessions WHERE id = '{session_row_ids['session_a']}'" ) assert len(rows) == 1, "Account A should see its own sessions" async def test_sessions_no_context_sees_nothing(conn_no_context, session_row_ids): rows = await conn_no_context.fetch( f"SELECT id FROM sessions WHERE id IN " f"('{session_row_ids['session_a']}', '{session_row_ids['session_b']}')" ) assert len(rows) == 0, "No-context connection should see no sessions" # --------------------------------------------------------------------------- # ai_sessions # --------------------------------------------------------------------------- async def test_ai_sessions_account_a_cannot_see_account_b(conn_a, session_row_ids): rows = await conn_a.fetch( f"SELECT id FROM ai_sessions WHERE id = '{session_row_ids['ai_session_b']}'" ) assert len(rows) == 0, "Account A should not see Account B ai_sessions" async def test_ai_sessions_account_a_can_see_own(conn_a, session_row_ids): rows = await conn_a.fetch( f"SELECT id FROM ai_sessions WHERE id = '{session_row_ids['ai_session_a']}'" ) assert len(rows) == 1, "Account A should see its own ai_sessions" # --------------------------------------------------------------------------- # session_branches # --------------------------------------------------------------------------- async def test_session_branches_account_a_cannot_see_account_b(conn_a, session_row_ids): rows = await conn_a.fetch( f"SELECT id FROM session_branches WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B session_branches" # --------------------------------------------------------------------------- # session_supporting_data # --------------------------------------------------------------------------- async def test_session_supporting_data_account_a_cannot_see_account_b(conn_a, session_row_ids): rows = await conn_a.fetch( f"SELECT id FROM session_supporting_data WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B session_supporting_data" # --------------------------------------------------------------------------- # session_resolution_outputs # --------------------------------------------------------------------------- async def test_session_resolution_outputs_account_a_cannot_see_account_b(conn_a, session_row_ids): rows = await conn_a.fetch( f"SELECT id FROM session_resolution_outputs WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B session_resolution_outputs" # --------------------------------------------------------------------------- # session_handoffs # --------------------------------------------------------------------------- async def test_session_handoffs_account_a_cannot_see_account_b(conn_a, session_row_ids): rows = await conn_a.fetch( f"SELECT id FROM session_handoffs WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B session_handoffs" # --------------------------------------------------------------------------- # script_templates # --------------------------------------------------------------------------- async def test_script_templates_account_a_cannot_see_account_b(conn_a, session_row_ids): rows = await conn_a.fetch( f"SELECT id FROM script_templates WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B script_templates" # --------------------------------------------------------------------------- # script_generations # --------------------------------------------------------------------------- async def test_script_generations_account_a_cannot_see_account_b(conn_a, session_row_ids): rows = await conn_a.fetch( f"SELECT id FROM script_generations WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B script_generations" # --------------------------------------------------------------------------- # maintenance_schedules # --------------------------------------------------------------------------- async def test_maintenance_schedules_account_a_cannot_see_account_b(conn_a, session_row_ids): rows = await conn_a.fetch( f"SELECT id FROM maintenance_schedules WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B maintenance_schedules" # --------------------------------------------------------------------------- # psa_post_log # --------------------------------------------------------------------------- async def test_psa_post_log_account_a_cannot_see_account_b(conn_a, session_row_ids): rows = await conn_a.fetch( f"SELECT id FROM psa_post_log WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B psa_post_log" # --------------------------------------------------------------------------- # step_library — visibility-aware policy # --------------------------------------------------------------------------- async def test_step_library_account_a_cannot_see_account_b_private_steps(admin_conn, conn_a): """Private/non-public steps owned by Account B must not be visible to Account A.""" private_step_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO step_library ( id, account_id, title, step_type, content, visibility, is_active, created_at, updated_at ) VALUES ( '{private_step_id}', '{ACCOUNT_B_ID}', 'RLS Private Step', 'action', '{{}}'::jsonb, 'private', TRUE, NOW(), NOW() ) """) try: rows = await conn_a.fetch( f"SELECT id FROM step_library " f"WHERE id = '{private_step_id}' AND visibility != 'public'" ) assert len(rows) == 0, "Account A should not see Account B's private step_library rows" finally: await admin_conn.execute( f"DELETE FROM step_library WHERE id = '{private_step_id}'" ) async def test_step_library_account_a_can_see_account_b_public_steps(admin_conn, conn_a): """Public steps owned by Account B MUST be visible to Account A (cross-tenant visibility).""" public_step_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO step_library ( id, account_id, title, step_type, content, visibility, is_active, created_at, updated_at ) VALUES ( '{public_step_id}', '{ACCOUNT_B_ID}', 'RLS Public Step', 'action', '{{}}'::jsonb, 'public', TRUE, NOW(), NOW() ) """) try: rows = await conn_a.fetch( f"SELECT id FROM step_library WHERE id = '{public_step_id}'" ) assert len(rows) == 1, ( "Account A should see public steps owned by Account B " "(cross-tenant public visibility policy)" ) finally: await admin_conn.execute( f"DELETE FROM step_library WHERE id = '{public_step_id}'" ) # =========================================================================== # Phase 3 RLS isolation tests # Tables: step_ratings, step_usage_log, target_lists, # session_shares, audit_logs, tree_shares # =========================================================================== # --------------------------------------------------------------------------- # Helpers shared by Phase 3 fixtures # --------------------------------------------------------------------------- async def _get_user_b_id(admin_conn) -> str: row = await admin_conn.fetchrow( "SELECT id FROM users WHERE email = 'rls-user-b@example.com'" ) return str(row["id"]) async def _get_tree_b_id(admin_conn) -> str: row = await admin_conn.fetchrow( f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_B_ID}' LIMIT 1" ) return str(row["id"]) # --------------------------------------------------------------------------- # step_ratings # --------------------------------------------------------------------------- async def test_step_ratings_account_a_cannot_see_account_b(admin_conn, conn_a): """Account A must not see step ratings belonging to Account B.""" user_b_id = await _get_user_b_id(admin_conn) # Need a step_library row as FK target step_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO step_library ( id, account_id, title, step_type, content, visibility, is_active, created_at, updated_at ) VALUES ( '{step_id}', '{ACCOUNT_B_ID}', 'Phase3 RLS Step', 'action', '{{}}'::jsonb, 'private', TRUE, NOW(), NOW() ) """) rating_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO step_ratings ( id, step_id, user_id, account_id, is_verified_use, is_visible, created_at, updated_at ) VALUES ( '{rating_id}', '{step_id}', '{user_b_id}', '{ACCOUNT_B_ID}', FALSE, TRUE, NOW(), NOW() ) """) try: rows = await conn_a.fetch( f"SELECT id FROM step_ratings WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B step_ratings" finally: await admin_conn.execute(f"DELETE FROM step_ratings WHERE id = '{rating_id}'") await admin_conn.execute(f"DELETE FROM step_library WHERE id = '{step_id}'") # --------------------------------------------------------------------------- # step_usage_log # --------------------------------------------------------------------------- async def test_step_usage_log_account_a_cannot_see_account_b(admin_conn, conn_a): """Account A must not see step usage logs belonging to Account B.""" user_b_id = await _get_user_b_id(admin_conn) tree_b_id = await _get_tree_b_id(admin_conn) step_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO step_library ( id, account_id, title, step_type, content, visibility, is_active, created_at, updated_at ) VALUES ( '{step_id}', '{ACCOUNT_B_ID}', 'Phase3 Usage Step', 'action', '{{}}'::jsonb, 'private', TRUE, NOW(), NOW() ) """) # Need a sessions row as FK for usage log session_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO sessions ( id, tree_id, user_id, account_id, tree_snapshot, path_taken, decisions, custom_steps, started_at ) VALUES ( '{session_id}', '{tree_b_id}', '{user_b_id}', '{ACCOUNT_B_ID}', '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, NOW() ) """) log_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO step_usage_log ( id, step_id, user_id, account_id, session_id, used_at ) VALUES ( '{log_id}', '{step_id}', '{user_b_id}', '{ACCOUNT_B_ID}', '{session_id}', NOW() ) """) try: rows = await conn_a.fetch( f"SELECT id FROM step_usage_log WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B step_usage_log" finally: await admin_conn.execute(f"DELETE FROM step_usage_log WHERE id = '{log_id}'") await admin_conn.execute(f"DELETE FROM sessions WHERE id = '{session_id}'") await admin_conn.execute(f"DELETE FROM step_library WHERE id = '{step_id}'") # --------------------------------------------------------------------------- # target_lists # --------------------------------------------------------------------------- async def test_target_lists_account_a_cannot_see_account_b(admin_conn, conn_a): """Account A must not see target lists belonging to Account B.""" user_b_id = await _get_user_b_id(admin_conn) tl_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO target_lists ( id, account_id, created_by, name, targets, created_at, updated_at ) VALUES ( '{tl_id}', '{ACCOUNT_B_ID}', '{user_b_id}', 'Phase3 RLS Target List', '[]'::jsonb, NOW(), NOW() ) """) try: rows = await conn_a.fetch( f"SELECT id FROM target_lists WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B target_lists" finally: await admin_conn.execute(f"DELETE FROM target_lists WHERE id = '{tl_id}'") # --------------------------------------------------------------------------- # session_shares # --------------------------------------------------------------------------- async def test_session_shares_account_a_cannot_see_account_b(admin_conn, conn_a): """Account A must not see session shares belonging to Account B.""" user_b_id = await _get_user_b_id(admin_conn) tree_b_id = await _get_tree_b_id(admin_conn) # Need a sessions row as FK session_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO sessions ( id, tree_id, user_id, account_id, tree_snapshot, path_taken, decisions, custom_steps, started_at ) VALUES ( '{session_id}', '{tree_b_id}', '{user_b_id}', '{ACCOUNT_B_ID}', '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, NOW() ) """) share_id = str(uuid.uuid4()) share_token = f"phase3-rls-test-{share_id[:8]}" await admin_conn.execute(f""" INSERT INTO session_shares ( id, session_id, account_id, share_token, visibility, created_by, view_count, is_active, created_at, updated_at ) VALUES ( '{share_id}', '{session_id}', '{ACCOUNT_B_ID}', '{share_token}', 'account', '{user_b_id}', 0, TRUE, NOW(), NOW() ) """) try: rows = await conn_a.fetch( f"SELECT id FROM session_shares WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B session_shares" finally: await admin_conn.execute(f"DELETE FROM session_shares WHERE id = '{share_id}'") await admin_conn.execute(f"DELETE FROM sessions WHERE id = '{session_id}'") # --------------------------------------------------------------------------- # audit_logs # --------------------------------------------------------------------------- async def test_audit_logs_account_a_cannot_see_account_b(admin_conn, conn_a): """Account A must not see audit logs belonging to Account B.""" user_b_id = await _get_user_b_id(admin_conn) log_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO audit_logs ( id, user_id, account_id, action, resource_type, created_at ) VALUES ( '{log_id}', '{user_b_id}', '{ACCOUNT_B_ID}', 'test.action', 'test_resource', NOW() ) """) try: rows = await conn_a.fetch( f"SELECT id FROM audit_logs WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B audit_logs" finally: await admin_conn.execute(f"DELETE FROM audit_logs WHERE id = '{log_id}'") # --------------------------------------------------------------------------- # tree_shares # --------------------------------------------------------------------------- async def test_tree_shares_account_a_cannot_see_account_b(admin_conn, conn_a): """Account A must not see tree shares belonging to Account B.""" user_b_id = await _get_user_b_id(admin_conn) tree_b_id = await _get_tree_b_id(admin_conn) share_id = str(uuid.uuid4()) share_token = f"phase3-tree-rls-{share_id[:8]}" await admin_conn.execute(f""" INSERT INTO tree_shares ( id, tree_id, account_id, share_token, created_by, allow_forking, created_at ) VALUES ( '{share_id}', '{tree_b_id}', '{ACCOUNT_B_ID}', '{share_token}', '{user_b_id}', TRUE, NOW() ) """) try: rows = await conn_a.fetch( f"SELECT id FROM tree_shares WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B tree_shares" finally: await admin_conn.execute(f"DELETE FROM tree_shares WHERE id = '{share_id}'") # =========================================================================== # Phase 4 RLS isolation tests # Tables: users, script_builder_sessions, ai_session_steps, notifications # # Note: platform_steps and template_trees have no account_id column and no RLS — # they are globally readable by all authenticated users. # =========================================================================== # --------------------------------------------------------------------------- # users # --------------------------------------------------------------------------- async def test_users_account_a_cannot_see_account_b(admin_conn, conn_a): """Account A must not see users belonging to Account B.""" rows = await conn_a.fetch( f"SELECT id FROM users WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B users" async def test_users_account_a_can_see_own(admin_conn, conn_a): """Account A must be able to see its own users.""" rows = await conn_a.fetch( f"SELECT id FROM users WHERE account_id = '{ACCOUNT_A_ID}'" ) assert len(rows) > 0, "Account A should see its own users" # --------------------------------------------------------------------------- # script_builder_sessions # --------------------------------------------------------------------------- async def test_script_builder_sessions_account_a_cannot_see_account_b(admin_conn, conn_a): """Account A must not see script builder sessions belonging to Account B.""" user_b_id = await _get_user_b_id(admin_conn) session_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO script_builder_sessions ( id, user_id, account_id, language, created_at, updated_at ) VALUES ( '{session_id}', '{user_b_id}', '{ACCOUNT_B_ID}', 'powershell', NOW(), NOW() ) """) try: rows = await conn_a.fetch( f"SELECT id FROM script_builder_sessions WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B script_builder_sessions" finally: await admin_conn.execute( f"DELETE FROM script_builder_sessions WHERE id = '{session_id}'" ) # --------------------------------------------------------------------------- # ai_session_steps # --------------------------------------------------------------------------- async def test_ai_session_steps_account_a_cannot_see_account_b(admin_conn, conn_a): """Account A must not see ai_session_steps belonging to Account B.""" user_b_id = await _get_user_b_id(admin_conn) tree_b_id = await _get_tree_b_id(admin_conn) # Need an ai_sessions row as FK ai_session_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO ai_sessions ( id, user_id, account_id, flow_type, status, confidence_tier, created_at, updated_at ) VALUES ( '{ai_session_id}', '{user_b_id}', '{ACCOUNT_B_ID}', 'troubleshooting', 'active', 'guided', NOW(), NOW() ) """) step_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO ai_session_steps ( id, session_id, account_id, step_type, content, created_at ) VALUES ( '{step_id}', '{ai_session_id}', '{ACCOUNT_B_ID}', 'question', 'Phase4 RLS test step', NOW() ) """) try: rows = await conn_a.fetch( f"SELECT id FROM ai_session_steps WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B ai_session_steps" finally: await admin_conn.execute(f"DELETE FROM ai_session_steps WHERE id = '{step_id}'") await admin_conn.execute(f"DELETE FROM ai_sessions WHERE id = '{ai_session_id}'") # --------------------------------------------------------------------------- # notifications # --------------------------------------------------------------------------- async def test_notifications_account_a_cannot_see_account_b(admin_conn, conn_a): """Account A must not see notifications belonging to Account B.""" user_b_id = await _get_user_b_id(admin_conn) notif_id = str(uuid.uuid4()) await admin_conn.execute(f""" INSERT INTO notifications ( id, user_id, account_id, type, title, message, is_read, created_at ) VALUES ( '{notif_id}', '{user_b_id}', '{ACCOUNT_B_ID}', 'info', 'Phase4 RLS Test', 'RLS isolation test notification', FALSE, NOW() ) """) try: rows = await conn_a.fetch( f"SELECT id FROM notifications WHERE account_id = '{ACCOUNT_B_ID}'" ) assert len(rows) == 0, "Account A should not see Account B notifications" finally: await admin_conn.execute(f"DELETE FROM notifications WHERE id = '{notif_id}'")