From ce4056c6b9e96d078a5d6a69e9a327f4c9f3428a Mon Sep 17 00:00:00 2001 From: chihlasm Date: Fri, 10 Apr 2026 03:54:42 +0000 Subject: [PATCH] test: add failing RLS isolation tests (green after Task 10 migration + Task 11 URL switch) Co-Authored-By: Claude Sonnet 4.6 --- backend/tests/test_rls_isolation.py | 266 ++++++++++++++++++++++++++++ 1 file changed, 266 insertions(+) create mode 100644 backend/tests/test_rls_isolation.py diff --git a/backend/tests/test_rls_isolation.py b/backend/tests/test_rls_isolation.py new file mode 100644 index 00000000..14ed7371 --- /dev/null +++ b/backend/tests/test_rls_isolation.py @@ -0,0 +1,266 @@ +# backend/tests/test_rls_isolation.py +""" +RLS foundation tests. + +Connect directly as resolutionflow_app (not superuser) and verify: + - Tenant A cannot read Tenant B's rows + - No tenant context set → zero rows for private data (fail-closed) + - Platform rows (PLATFORM_ACCOUNT_ID) are visible to all tenants + +Tests bypass FastAPI entirely — raw asyncpg connections only. +MUST FAIL before Task 10 (RLS migration) and PASS after it. + +Run with: + DB_APP_ROLE_PASSWORD=app_secret_change_me pytest tests/test_rls_isolation.py -v + +The test DB is patherly_test (matches conftest.py default). +""" +import os +import uuid + +import asyncpg +import pytest + +_DB_HOST = os.getenv("TEST_DB_HOST", "localhost") +_DB_PORT = int(os.getenv("TEST_DB_PORT", "5432")) +_DB_NAME = os.getenv("TEST_DB_NAME", "patherly_test") # matches conftest.py +_APP_PASSWORD = os.getenv("DB_APP_ROLE_PASSWORD", "app_secret_change_me") +_ADMIN_DSN = f"postgresql://postgres:postgres@{_DB_HOST}:{_DB_PORT}/{_DB_NAME}" + +PLATFORM_ACCOUNT_ID = "00000000-0000-0000-0000-000000000001" +ACCOUNT_A_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa" +ACCOUNT_B_ID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb" + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture(scope="module") +async def admin_conn(): + """Superuser asyncpg connection for fixture setup and teardown.""" + conn = await asyncpg.connect(_ADMIN_DSN) + yield conn + await conn.close() + + +@pytest.fixture(scope="module", autouse=True) +async def seed_rls_test_data(admin_conn): + """ + Create two isolated test accounts, one user per account, and one private + tree per account. Trees require a valid author_id FK to users, so users + must be created first. + + accounts.display_code must be unique and 8 chars (NOT NULL constraint). + """ + # Insert accounts + await admin_conn.execute(f""" + INSERT INTO accounts (id, name, display_code, created_at, updated_at) + VALUES + ('{ACCOUNT_A_ID}', 'RLS Tenant A', 'RLSA0001', NOW(), NOW()), + ('{ACCOUNT_B_ID}', 'RLS Tenant B', 'RLSB0001', NOW(), NOW()) + ON CONFLICT (id) DO NOTHING + """) + + # Insert one user per account (users.account_id NOT NULL, password_hash NOT NULL) + user_a_id = str(uuid.uuid4()) + user_b_id = str(uuid.uuid4()) + await admin_conn.execute(f""" + INSERT INTO users ( + id, email, password_hash, name, role, is_active, account_id, + account_role, created_at + ) VALUES + ('{user_a_id}', 'rls-user-a@example.com', + 'placeholder', 'RLS User A', 'engineer', TRUE, + '{ACCOUNT_A_ID}', 'engineer', NOW()), + ('{user_b_id}', 'rls-user-b@example.com', + 'placeholder', 'RLS User B', 'engineer', TRUE, + '{ACCOUNT_B_ID}', 'engineer', NOW()) + ON CONFLICT (email) DO NOTHING + """) + + # Look up the user IDs we just inserted (ON CONFLICT may have skipped) + row_a = await admin_conn.fetchrow( + "SELECT id FROM users WHERE email = 'rls-user-a@example.com'" + ) + row_b = await admin_conn.fetchrow( + "SELECT id FROM users WHERE email = 'rls-user-b@example.com'" + ) + actual_user_a = str(row_a["id"]) + actual_user_b = str(row_b["id"]) + + # Insert one private tree per account with explicit author_id + await admin_conn.execute(f""" + INSERT INTO trees ( + id, name, account_id, author_id, is_active, is_default, + is_public, visibility, tree_type, created_at, updated_at + ) VALUES + (gen_random_uuid(), 'RLS Tree A', '{ACCOUNT_A_ID}', '{actual_user_a}', + TRUE, FALSE, FALSE, 'private', 'troubleshooting', NOW(), NOW()), + (gen_random_uuid(), 'RLS Tree B', '{ACCOUNT_B_ID}', '{actual_user_b}', + TRUE, FALSE, FALSE, 'private', 'troubleshooting', NOW(), NOW()) + """) + + # One platform-owned tree_tag (global, visible to all tenants) + await admin_conn.execute(f""" + INSERT INTO tree_tags ( + id, name, slug, account_id, usage_count, created_at, updated_at + ) VALUES ( + gen_random_uuid(), 'rls-global-tag', 'rls-global-tag', + '{PLATFORM_ACCOUNT_ID}', 0, NOW(), NOW() + ) ON CONFLICT DO NOTHING + """) + + yield + + # Cleanup + await admin_conn.execute( + f"DELETE FROM trees WHERE account_id IN ('{ACCOUNT_A_ID}', '{ACCOUNT_B_ID}')" + ) + await admin_conn.execute( + "DELETE FROM users WHERE email IN " + "('rls-user-a@example.com', 'rls-user-b@example.com')" + ) + await admin_conn.execute( + f"DELETE FROM accounts WHERE id IN ('{ACCOUNT_A_ID}', '{ACCOUNT_B_ID}')" + ) + await admin_conn.execute("DELETE FROM tree_tags WHERE slug = 'rls-global-tag'") + + +@pytest.fixture +async def conn_a(): + """App-role connection, tenant context = Account A.""" + conn = await asyncpg.connect( + host=_DB_HOST, port=_DB_PORT, database=_DB_NAME, + user="resolutionflow_app", password=_APP_PASSWORD, + ) + await conn.execute( + "SELECT set_config('app.current_account_id', $1, true)", ACCOUNT_A_ID + ) + yield conn + await conn.close() + + +@pytest.fixture +async def conn_b(): + """App-role connection, tenant context = Account B.""" + conn = await asyncpg.connect( + host=_DB_HOST, port=_DB_PORT, database=_DB_NAME, + user="resolutionflow_app", password=_APP_PASSWORD, + ) + await conn.execute( + "SELECT set_config('app.current_account_id', $1, true)", ACCOUNT_B_ID + ) + yield conn + await conn.close() + + +@pytest.fixture +async def conn_no_context(): + """App-role connection with NO tenant context set.""" + conn = await asyncpg.connect( + host=_DB_HOST, port=_DB_PORT, database=_DB_NAME, + user="resolutionflow_app", password=_APP_PASSWORD, + ) + yield conn + await conn.close() + + +# --------------------------------------------------------------------------- +# trees +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_trees_account_a_cannot_see_account_b_rows(conn_a): + rows = await conn_a.fetch( + f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_B_ID}'" + ) + assert len(rows) == 0, "Account A should not see Account B trees" + + +@pytest.mark.asyncio +async def test_trees_account_a_can_see_own_rows(conn_a): + rows = await conn_a.fetch( + f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_A_ID}'" + ) + assert len(rows) >= 1, "Account A should see its own trees" + + +@pytest.mark.asyncio +async def test_trees_no_context_sees_no_private_trees(conn_no_context): + rows = await conn_no_context.fetch( + "SELECT id FROM trees WHERE is_default = FALSE AND is_public = FALSE" + ) + assert len(rows) == 0, "No-context connection should see no private trees" + + +# --------------------------------------------------------------------------- +# tree_tags — platform visibility +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_tree_tags_account_a_cannot_see_account_b_tags(conn_a): + rows = await conn_a.fetch( + f"SELECT id FROM tree_tags WHERE account_id = '{ACCOUNT_B_ID}'" + ) + assert len(rows) == 0 + + +@pytest.mark.asyncio +async def test_tree_tags_both_tenants_see_platform_tags(conn_a, conn_b): + rows_a = await conn_a.fetch( + f"SELECT id FROM tree_tags WHERE account_id = '{PLATFORM_ACCOUNT_ID}'" + ) + rows_b = await conn_b.fetch( + f"SELECT id FROM tree_tags WHERE account_id = '{PLATFORM_ACCOUNT_ID}'" + ) + assert len(rows_a) >= 1, "Account A should see platform tags" + assert len(rows_b) >= 1, "Account B should see platform tags" + + +# --------------------------------------------------------------------------- +# tree_categories — platform visibility +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_tree_categories_account_a_cannot_see_account_b(conn_a): + rows = await conn_a.fetch( + f"SELECT id FROM tree_categories WHERE account_id = '{ACCOUNT_B_ID}'" + ) + assert len(rows) == 0 + + +# --------------------------------------------------------------------------- +# step_categories — platform visibility +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_step_categories_account_a_cannot_see_account_b(conn_a): + rows = await conn_a.fetch( + f"SELECT id FROM step_categories WHERE account_id = '{ACCOUNT_B_ID}'" + ) + assert len(rows) == 0 + + +# --------------------------------------------------------------------------- +# psa_connections — tenant-only +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_psa_connections_account_a_cannot_see_account_b(conn_a): + rows = await conn_a.fetch( + f"SELECT id FROM psa_connections WHERE account_id = '{ACCOUNT_B_ID}'" + ) + assert len(rows) == 0 + + +# --------------------------------------------------------------------------- +# flow_proposals — tenant-only +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_flow_proposals_account_a_cannot_see_account_b(conn_a): + rows = await conn_a.fetch( + f"SELECT id FROM flow_proposals WHERE account_id = '{ACCOUNT_B_ID}'" + ) + assert len(rows) == 0