chore(tests): gate RLS tests behind RUN_RLS_TESTS flag

Continues the test-isolation work from dab740d. RLS migration tests run
against a policy-installed database and fail in the default create_all
suite, so they need to be opt-in:

- pytest.ini: register `rls` marker.
- conftest.py: auto-deselect test_rls_isolation.py unless
  RUN_RLS_TESTS=1. Drops the deprecated session-scoped event_loop
  fixture (not needed since pytest-asyncio 0.23+).
- test_rls_isolation.py: tag module with `rls` marker. Replace
  hardcoded `patherly_test` DB reference with parsed DATABASE_TEST_URL
  (matches conftest.py default `resolutionflow_test`). Updated docstring
  command to show RUN_RLS_TESTS=1.
- requirements-dev.txt: bump pytest-asyncio 0.23.0 → 0.24.0 (loop-scope
  marker behavior required by the RLS module fixture).

Run the RLS suite with:
  RUN_RLS_TESTS=1 DB_APP_ROLE_PASSWORD=... pytest tests/test_rls_isolation.py

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-24 16:09:13 -04:00
parent 9c8ba296a8
commit b14a16a1ab
4 changed files with 99 additions and 41 deletions

View File

@@ -11,30 +11,57 @@ Tests bypass FastAPI entirely — raw asyncpg connections only.
MUST FAIL before Task 10 (RLS migration) and PASS after it.
Run with:
DB_APP_ROLE_PASSWORD=app_secret_change_me pytest tests/test_rls_isolation.py -v
RUN_RLS_TESTS=1 DB_APP_ROLE_PASSWORD=app_secret_change_me pytest tests/test_rls_isolation.py -v
The test DB is patherly_test (matches conftest.py default).
The test DB comes from DATABASE_TEST_URL, matching conftest.py.
"""
import os
import subprocess
import sys
import uuid
from pathlib import Path
from urllib.parse import unquote, urlsplit
import asyncpg
import pytest
import pytest_asyncio
# All tests in this module use module-scoped async fixtures (admin_conn,
# seed_rls_test_data) which run on the module event loop. Without this marker,
# pytest-asyncio 0.23+ defaults tests to function-scoped loops, causing
# "Future attached to a different loop" errors on the asyncpg connections.
pytestmark = pytest.mark.asyncio(loop_scope="module")
pytestmark = [
pytest.mark.asyncio(loop_scope="module"),
pytest.mark.rls,
]
_DB_HOST = os.getenv("TEST_DB_HOST", "localhost")
_DB_PORT = int(os.getenv("TEST_DB_PORT", "5432"))
_DB_NAME = os.getenv("TEST_DB_NAME", "patherly_test") # matches conftest.py
_DATABASE_TEST_URL = os.getenv(
"DATABASE_TEST_URL",
"postgresql+asyncpg://postgres:postgres@localhost:5432/resolutionflow_test",
)
_DATABASE_TEST_URL_ASYNCPG = _DATABASE_TEST_URL.replace(
"postgresql+asyncpg://",
"postgresql://",
1,
)
_DATABASE_TEST_URL_SYNC = _DATABASE_TEST_URL_ASYNCPG
_TEST_DB_PARTS = urlsplit(_DATABASE_TEST_URL_ASYNCPG)
_DB_HOST = os.getenv("TEST_DB_HOST", _TEST_DB_PARTS.hostname or "localhost")
_DB_PORT = int(os.getenv("TEST_DB_PORT", str(_TEST_DB_PARTS.port or 5432)))
_DB_NAME = os.getenv(
"TEST_DB_NAME",
unquote(_TEST_DB_PARTS.path.lstrip("/") or "resolutionflow_test"),
)
_ADMIN_USER = os.getenv(
"TEST_DB_ADMIN_USER",
unquote(_TEST_DB_PARTS.username or "postgres"),
)
_ADMIN_PASSWORD = os.getenv(
"TEST_DB_ADMIN_PASSWORD",
unquote(_TEST_DB_PARTS.password or "postgres"),
)
_APP_PASSWORD = os.getenv("DB_APP_ROLE_PASSWORD", "app_secret_change_me")
_ADMIN_DSN = f"postgresql://postgres:postgres@{_DB_HOST}:{_DB_PORT}/{_DB_NAME}"
PLATFORM_ACCOUNT_ID = "00000000-0000-0000-0000-000000000001"
ACCOUNT_A_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
@@ -55,23 +82,33 @@ def _ensure_rls_schema():
the full migration-managed schema (including RLS policies) is in place.
"""
backend_dir = Path(__file__).parent.parent
env = os.environ.copy()
env["DATABASE_URL"] = _DATABASE_TEST_URL
env["DATABASE_URL_SYNC"] = _DATABASE_TEST_URL_SYNC
subprocess.run(
[sys.executable, "-m", "alembic", "upgrade", "head"],
cwd=backend_dir,
env=env,
check=True,
capture_output=True,
)
@pytest.fixture(scope="module")
@pytest_asyncio.fixture(scope="module", loop_scope="module")
async def admin_conn(_ensure_rls_schema):
"""Superuser asyncpg connection for fixture setup and teardown."""
conn = await asyncpg.connect(_ADMIN_DSN)
conn = await asyncpg.connect(
host=_DB_HOST,
port=_DB_PORT,
database=_DB_NAME,
user=_ADMIN_USER,
password=_ADMIN_PASSWORD,
)
yield conn
await conn.close()
@pytest.fixture(scope="module", autouse=True)
@pytest_asyncio.fixture(scope="module", loop_scope="module", autouse=True)
async def seed_rls_test_data(admin_conn):
"""
Create two isolated test accounts, one user per account, and one private
@@ -154,7 +191,7 @@ async def seed_rls_test_data(admin_conn):
await admin_conn.execute("DELETE FROM tree_tags WHERE slug = 'rls-global-tag'")
@pytest.fixture
@pytest_asyncio.fixture(loop_scope="module")
async def conn_a():
"""App-role connection, tenant context = Account A."""
conn = await asyncpg.connect(
@@ -168,7 +205,7 @@ async def conn_a():
await conn.close()
@pytest.fixture
@pytest_asyncio.fixture(loop_scope="module")
async def conn_b():
"""App-role connection, tenant context = Account B."""
conn = await asyncpg.connect(
@@ -182,7 +219,7 @@ async def conn_b():
await conn.close()
@pytest.fixture
@pytest_asyncio.fixture(loop_scope="module")
async def conn_no_context():
"""App-role connection with NO tenant context set."""
conn = await asyncpg.connect(
@@ -288,7 +325,7 @@ async def test_flow_proposals_account_a_cannot_see_account_b(conn_a):
# Phase 2 fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
@pytest_asyncio.fixture(scope="module", loop_scope="module")
async def session_row_ids(admin_conn):
"""
Insert one `sessions` row and one `ai_sessions` row for each of
@@ -644,13 +681,15 @@ async def test_psa_post_log_account_a_cannot_see_account_b(conn_a, session_row_i
async def test_step_library_account_a_cannot_see_account_b_private_steps(admin_conn, conn_a):
"""Private/non-public steps owned by Account B must not be visible to Account A."""
user_b_id = await _get_user_b_id(admin_conn)
private_step_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO step_library (
id, account_id, title, step_type, content,
id, account_id, created_by, title, step_type, content,
visibility, is_active, created_at, updated_at
) VALUES (
'{private_step_id}', '{ACCOUNT_B_ID}', 'RLS Private Step', 'action',
'{private_step_id}', '{ACCOUNT_B_ID}', '{user_b_id}',
'RLS Private Step', 'action',
'{{}}'::jsonb, 'private', TRUE, NOW(), NOW()
)
""")
@@ -668,13 +707,15 @@ async def test_step_library_account_a_cannot_see_account_b_private_steps(admin_c
async def test_step_library_account_a_can_see_account_b_public_steps(admin_conn, conn_a):
"""Public steps owned by Account B MUST be visible to Account A (cross-tenant visibility)."""
user_b_id = await _get_user_b_id(admin_conn)
public_step_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO step_library (
id, account_id, title, step_type, content,
id, account_id, created_by, title, step_type, content,
visibility, is_active, created_at, updated_at
) VALUES (
'{public_step_id}', '{ACCOUNT_B_ID}', 'RLS Public Step', 'action',
'{public_step_id}', '{ACCOUNT_B_ID}', '{user_b_id}',
'RLS Public Step', 'action',
'{{}}'::jsonb, 'public', TRUE, NOW(), NOW()
)
""")
@@ -728,10 +769,11 @@ async def test_step_ratings_account_a_cannot_see_account_b(admin_conn, conn_a):
step_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO step_library (
id, account_id, title, step_type, content,
id, account_id, created_by, title, step_type, content,
visibility, is_active, created_at, updated_at
) VALUES (
'{step_id}', '{ACCOUNT_B_ID}', 'Phase3 RLS Step', 'action',
'{step_id}', '{ACCOUNT_B_ID}', '{user_b_id}',
'Phase3 RLS Step', 'action',
'{{}}'::jsonb, 'private', TRUE, NOW(), NOW()
)
""")
@@ -768,10 +810,11 @@ async def test_step_usage_log_account_a_cannot_see_account_b(admin_conn, conn_a)
step_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO step_library (
id, account_id, title, step_type, content,
id, account_id, created_by, title, step_type, content,
visibility, is_active, created_at, updated_at
) VALUES (
'{step_id}', '{ACCOUNT_B_ID}', 'Phase3 Usage Step', 'action',
'{step_id}', '{ACCOUNT_B_ID}', '{user_b_id}',
'Phase3 Usage Step', 'action',
'{{}}'::jsonb, 'private', TRUE, NOW(), NOW()
)
""")
@@ -971,10 +1014,10 @@ async def test_script_builder_sessions_account_a_cannot_see_account_b(admin_conn
session_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO script_builder_sessions (
id, user_id, account_id, language, created_at, updated_at
id, user_id, account_id, language, origin, created_at, updated_at
) VALUES (
'{session_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
'powershell', NOW(), NOW()
'powershell', 'standalone', NOW(), NOW()
)
""")
try:
@@ -1001,22 +1044,24 @@ async def test_ai_session_steps_account_a_cannot_see_account_b(admin_conn, conn_
ai_session_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO ai_sessions (
id, user_id, account_id, flow_type, status, confidence_tier,
id, user_id, account_id, session_type, intake_type,
intake_content, status, confidence_tier, confidence_score,
created_at, updated_at
) VALUES (
'{ai_session_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
'troubleshooting', 'active', 'guided', NOW(), NOW()
'guided', 'free_text', '{{}}'::jsonb, 'active', 'guided', 0.0,
NOW(), NOW()
)
""")
step_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO ai_session_steps (
id, session_id, account_id, step_type, content,
id, session_id, account_id, step_order, step_type, content,
created_at
) VALUES (
'{step_id}', '{ai_session_id}', '{ACCOUNT_B_ID}',
'question', 'Phase4 RLS test step', NOW()
1, 'question', '{{"text": "Phase4 RLS test step"}}'::jsonb, NOW()
)
""")
try:
@@ -1040,11 +1085,11 @@ async def test_notifications_account_a_cannot_see_account_b(admin_conn, conn_a):
notif_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO notifications (
id, user_id, account_id, type, title, message,
id, user_id, account_id, event, title, body,
is_read, created_at
) VALUES (
'{notif_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
'info', 'Phase4 RLS Test', 'RLS isolation test notification',
'test_event', 'Phase4 RLS Test', 'RLS isolation test notification',
FALSE, NOW()
)
""")
@@ -1055,4 +1100,3 @@ async def test_notifications_account_a_cannot_see_account_b(admin_conn, conn_a):
assert len(rows) == 0, "Account A should not see Account B notifications"
finally:
await admin_conn.execute(f"DELETE FROM notifications WHERE id = '{notif_id}'")