test: add failing RLS isolation tests (green after Task 10 migration + Task 11 URL switch)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
266
backend/tests/test_rls_isolation.py
Normal file
266
backend/tests/test_rls_isolation.py
Normal file
@@ -0,0 +1,266 @@
|
||||
# backend/tests/test_rls_isolation.py
|
||||
"""
|
||||
RLS foundation tests.
|
||||
|
||||
Connect directly as resolutionflow_app (not superuser) and verify:
|
||||
- Tenant A cannot read Tenant B's rows
|
||||
- No tenant context set → zero rows for private data (fail-closed)
|
||||
- Platform rows (PLATFORM_ACCOUNT_ID) are visible to all tenants
|
||||
|
||||
Tests bypass FastAPI entirely — raw asyncpg connections only.
|
||||
MUST FAIL before Task 10 (RLS migration) and PASS after it.
|
||||
|
||||
Run with:
|
||||
DB_APP_ROLE_PASSWORD=app_secret_change_me pytest tests/test_rls_isolation.py -v
|
||||
|
||||
The test DB is patherly_test (matches conftest.py default).
|
||||
"""
|
||||
import os
|
||||
import uuid
|
||||
|
||||
import asyncpg
|
||||
import pytest
|
||||
|
||||
_DB_HOST = os.getenv("TEST_DB_HOST", "localhost")
|
||||
_DB_PORT = int(os.getenv("TEST_DB_PORT", "5432"))
|
||||
_DB_NAME = os.getenv("TEST_DB_NAME", "patherly_test") # matches conftest.py
|
||||
_APP_PASSWORD = os.getenv("DB_APP_ROLE_PASSWORD", "app_secret_change_me")
|
||||
_ADMIN_DSN = f"postgresql://postgres:postgres@{_DB_HOST}:{_DB_PORT}/{_DB_NAME}"
|
||||
|
||||
PLATFORM_ACCOUNT_ID = "00000000-0000-0000-0000-000000000001"
|
||||
ACCOUNT_A_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
|
||||
ACCOUNT_B_ID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
async def admin_conn():
|
||||
"""Superuser asyncpg connection for fixture setup and teardown."""
|
||||
conn = await asyncpg.connect(_ADMIN_DSN)
|
||||
yield conn
|
||||
await conn.close()
|
||||
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
async def seed_rls_test_data(admin_conn):
|
||||
"""
|
||||
Create two isolated test accounts, one user per account, and one private
|
||||
tree per account. Trees require a valid author_id FK to users, so users
|
||||
must be created first.
|
||||
|
||||
accounts.display_code must be unique and 8 chars (NOT NULL constraint).
|
||||
"""
|
||||
# Insert accounts
|
||||
await admin_conn.execute(f"""
|
||||
INSERT INTO accounts (id, name, display_code, created_at, updated_at)
|
||||
VALUES
|
||||
('{ACCOUNT_A_ID}', 'RLS Tenant A', 'RLSA0001', NOW(), NOW()),
|
||||
('{ACCOUNT_B_ID}', 'RLS Tenant B', 'RLSB0001', NOW(), NOW())
|
||||
ON CONFLICT (id) DO NOTHING
|
||||
""")
|
||||
|
||||
# Insert one user per account (users.account_id NOT NULL, password_hash NOT NULL)
|
||||
user_a_id = str(uuid.uuid4())
|
||||
user_b_id = str(uuid.uuid4())
|
||||
await admin_conn.execute(f"""
|
||||
INSERT INTO users (
|
||||
id, email, password_hash, name, role, is_active, account_id,
|
||||
account_role, created_at
|
||||
) VALUES
|
||||
('{user_a_id}', 'rls-user-a@example.com',
|
||||
'placeholder', 'RLS User A', 'engineer', TRUE,
|
||||
'{ACCOUNT_A_ID}', 'engineer', NOW()),
|
||||
('{user_b_id}', 'rls-user-b@example.com',
|
||||
'placeholder', 'RLS User B', 'engineer', TRUE,
|
||||
'{ACCOUNT_B_ID}', 'engineer', NOW())
|
||||
ON CONFLICT (email) DO NOTHING
|
||||
""")
|
||||
|
||||
# Look up the user IDs we just inserted (ON CONFLICT may have skipped)
|
||||
row_a = await admin_conn.fetchrow(
|
||||
"SELECT id FROM users WHERE email = 'rls-user-a@example.com'"
|
||||
)
|
||||
row_b = await admin_conn.fetchrow(
|
||||
"SELECT id FROM users WHERE email = 'rls-user-b@example.com'"
|
||||
)
|
||||
actual_user_a = str(row_a["id"])
|
||||
actual_user_b = str(row_b["id"])
|
||||
|
||||
# Insert one private tree per account with explicit author_id
|
||||
await admin_conn.execute(f"""
|
||||
INSERT INTO trees (
|
||||
id, name, account_id, author_id, is_active, is_default,
|
||||
is_public, visibility, tree_type, created_at, updated_at
|
||||
) VALUES
|
||||
(gen_random_uuid(), 'RLS Tree A', '{ACCOUNT_A_ID}', '{actual_user_a}',
|
||||
TRUE, FALSE, FALSE, 'private', 'troubleshooting', NOW(), NOW()),
|
||||
(gen_random_uuid(), 'RLS Tree B', '{ACCOUNT_B_ID}', '{actual_user_b}',
|
||||
TRUE, FALSE, FALSE, 'private', 'troubleshooting', NOW(), NOW())
|
||||
""")
|
||||
|
||||
# One platform-owned tree_tag (global, visible to all tenants)
|
||||
await admin_conn.execute(f"""
|
||||
INSERT INTO tree_tags (
|
||||
id, name, slug, account_id, usage_count, created_at, updated_at
|
||||
) VALUES (
|
||||
gen_random_uuid(), 'rls-global-tag', 'rls-global-tag',
|
||||
'{PLATFORM_ACCOUNT_ID}', 0, NOW(), NOW()
|
||||
) ON CONFLICT DO NOTHING
|
||||
""")
|
||||
|
||||
yield
|
||||
|
||||
# Cleanup
|
||||
await admin_conn.execute(
|
||||
f"DELETE FROM trees WHERE account_id IN ('{ACCOUNT_A_ID}', '{ACCOUNT_B_ID}')"
|
||||
)
|
||||
await admin_conn.execute(
|
||||
"DELETE FROM users WHERE email IN "
|
||||
"('rls-user-a@example.com', 'rls-user-b@example.com')"
|
||||
)
|
||||
await admin_conn.execute(
|
||||
f"DELETE FROM accounts WHERE id IN ('{ACCOUNT_A_ID}', '{ACCOUNT_B_ID}')"
|
||||
)
|
||||
await admin_conn.execute("DELETE FROM tree_tags WHERE slug = 'rls-global-tag'")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def conn_a():
|
||||
"""App-role connection, tenant context = Account A."""
|
||||
conn = await asyncpg.connect(
|
||||
host=_DB_HOST, port=_DB_PORT, database=_DB_NAME,
|
||||
user="resolutionflow_app", password=_APP_PASSWORD,
|
||||
)
|
||||
await conn.execute(
|
||||
"SELECT set_config('app.current_account_id', $1, true)", ACCOUNT_A_ID
|
||||
)
|
||||
yield conn
|
||||
await conn.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def conn_b():
|
||||
"""App-role connection, tenant context = Account B."""
|
||||
conn = await asyncpg.connect(
|
||||
host=_DB_HOST, port=_DB_PORT, database=_DB_NAME,
|
||||
user="resolutionflow_app", password=_APP_PASSWORD,
|
||||
)
|
||||
await conn.execute(
|
||||
"SELECT set_config('app.current_account_id', $1, true)", ACCOUNT_B_ID
|
||||
)
|
||||
yield conn
|
||||
await conn.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def conn_no_context():
|
||||
"""App-role connection with NO tenant context set."""
|
||||
conn = await asyncpg.connect(
|
||||
host=_DB_HOST, port=_DB_PORT, database=_DB_NAME,
|
||||
user="resolutionflow_app", password=_APP_PASSWORD,
|
||||
)
|
||||
yield conn
|
||||
await conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# trees
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_trees_account_a_cannot_see_account_b_rows(conn_a):
|
||||
rows = await conn_a.fetch(
|
||||
f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_B_ID}'"
|
||||
)
|
||||
assert len(rows) == 0, "Account A should not see Account B trees"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_trees_account_a_can_see_own_rows(conn_a):
|
||||
rows = await conn_a.fetch(
|
||||
f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_A_ID}'"
|
||||
)
|
||||
assert len(rows) >= 1, "Account A should see its own trees"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_trees_no_context_sees_no_private_trees(conn_no_context):
|
||||
rows = await conn_no_context.fetch(
|
||||
"SELECT id FROM trees WHERE is_default = FALSE AND is_public = FALSE"
|
||||
)
|
||||
assert len(rows) == 0, "No-context connection should see no private trees"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# tree_tags — platform visibility
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tree_tags_account_a_cannot_see_account_b_tags(conn_a):
|
||||
rows = await conn_a.fetch(
|
||||
f"SELECT id FROM tree_tags WHERE account_id = '{ACCOUNT_B_ID}'"
|
||||
)
|
||||
assert len(rows) == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tree_tags_both_tenants_see_platform_tags(conn_a, conn_b):
|
||||
rows_a = await conn_a.fetch(
|
||||
f"SELECT id FROM tree_tags WHERE account_id = '{PLATFORM_ACCOUNT_ID}'"
|
||||
)
|
||||
rows_b = await conn_b.fetch(
|
||||
f"SELECT id FROM tree_tags WHERE account_id = '{PLATFORM_ACCOUNT_ID}'"
|
||||
)
|
||||
assert len(rows_a) >= 1, "Account A should see platform tags"
|
||||
assert len(rows_b) >= 1, "Account B should see platform tags"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# tree_categories — platform visibility
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tree_categories_account_a_cannot_see_account_b(conn_a):
|
||||
rows = await conn_a.fetch(
|
||||
f"SELECT id FROM tree_categories WHERE account_id = '{ACCOUNT_B_ID}'"
|
||||
)
|
||||
assert len(rows) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# step_categories — platform visibility
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_step_categories_account_a_cannot_see_account_b(conn_a):
|
||||
rows = await conn_a.fetch(
|
||||
f"SELECT id FROM step_categories WHERE account_id = '{ACCOUNT_B_ID}'"
|
||||
)
|
||||
assert len(rows) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# psa_connections — tenant-only
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_psa_connections_account_a_cannot_see_account_b(conn_a):
|
||||
rows = await conn_a.fetch(
|
||||
f"SELECT id FROM psa_connections WHERE account_id = '{ACCOUNT_B_ID}'"
|
||||
)
|
||||
assert len(rows) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# flow_proposals — tenant-only
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_flow_proposals_account_a_cannot_see_account_b(conn_a):
|
||||
rows = await conn_a.fetch(
|
||||
f"SELECT id FROM flow_proposals WHERE account_id = '{ACCOUNT_B_ID}'"
|
||||
)
|
||||
assert len(rows) == 0
|
||||
Reference in New Issue
Block a user