diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index cefd8beb..cd4aa10b 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -105,7 +105,7 @@ assert "test" in _test_db_name, ( ) _RUN_RLS_TESTS = os.environ.get("RUN_RLS_TESTS") == "1" -_RLS_ISOLATION_FILE = "test_rls_isolation.py" +_RLS_TEST_FILES = {"test_rls_isolation.py", "test_l1_rls.py"} def pytest_collection_modifyitems(config, items): @@ -117,7 +117,9 @@ def pytest_collection_modifyitems(config, items): deselected = [] for item in items: item_path = getattr(item, "path", None) or getattr(item, "fspath", None) - if item_path and str(item_path).endswith(_RLS_ISOLATION_FILE): + if item_path and any( + str(item_path).endswith(f) for f in _RLS_TEST_FILES + ): deselected.append(item) else: selected.append(item) diff --git a/backend/tests/test_l1_rls.py b/backend/tests/test_l1_rls.py new file mode 100644 index 00000000..e5bdf043 --- /dev/null +++ b/backend/tests/test_l1_rls.py @@ -0,0 +1,435 @@ +# backend/tests/test_l1_rls.py +""" +RLS regression tests for L1 Phase 1 tables. + +Verifies that `internal_tickets` and `l1_walk_sessions` — both with +FORCE ROW LEVEL SECURITY + `tenant_isolation` policy on `account_id` — +block cross-tenant reads AND reject WITH CHECK violations on INSERT. + +Uses synchronous psycopg2 (not asyncpg) to avoid the conftest +teardown hook that closes the asyncio event loop after every test, +which is incompatible with module-scoped asyncpg fixtures. + +Run with: + RUN_RLS_TESTS=1 DB_APP_ROLE_PASSWORD=app_secret_change_me \ + pytest tests/test_l1_rls.py -v --override-ini="addopts=" +""" +import os +import subprocess +import sys +import uuid +from pathlib import Path +from urllib.parse import unquote, urlsplit + +import psycopg2 +import psycopg2.errors +import pytest + +pytestmark = pytest.mark.rls + +_DATABASE_TEST_URL = os.getenv( + "DATABASE_TEST_URL", + "postgresql+asyncpg://postgres:postgres@localhost:5432/resolutionflow_test", +) +_DATABASE_TEST_URL_SYNC = _DATABASE_TEST_URL.replace( + "postgresql+asyncpg://", + "postgresql://", + 1, +) +_TEST_DB_PARTS = urlsplit(_DATABASE_TEST_URL_SYNC) + +_DB_HOST = os.getenv( + "TEST_DB_HOST", _TEST_DB_PARTS.hostname or "localhost" +) +_DB_PORT = int(os.getenv( + "TEST_DB_PORT", str(_TEST_DB_PARTS.port or 5432) +)) +_DB_NAME = os.getenv( + "TEST_DB_NAME", + unquote(_TEST_DB_PARTS.path.lstrip("/") or "resolutionflow_test"), +) +_ADMIN_USER = os.getenv( + "TEST_DB_ADMIN_USER", + unquote(_TEST_DB_PARTS.username or "postgres"), +) +_ADMIN_PASSWORD = os.getenv( + "TEST_DB_ADMIN_PASSWORD", + unquote(_TEST_DB_PARTS.password or "postgres"), +) +_APP_PASSWORD = os.getenv("DB_APP_ROLE_PASSWORD", "app_secret_change_me") + +ACCOUNT_A_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa" +ACCOUNT_B_ID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb" + + +def _admin_dsn() -> dict: + return dict( + host=_DB_HOST, port=_DB_PORT, dbname=_DB_NAME, + user=_ADMIN_USER, password=_ADMIN_PASSWORD, + ) + + +def _app_dsn() -> dict: + return dict( + host=_DB_HOST, port=_DB_PORT, dbname=_DB_NAME, + user="resolutionflow_app", password=_APP_PASSWORD, + ) + + +# --------------------------------------------------------------------------- +# Schema bootstrap +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def _ensure_rls_schema(): + """Re-apply Alembic migrations so that RLS policies are present. + + The standard test_db fixture uses Base.metadata.create_all which skips + RLS setup. Running 'alembic upgrade head' against the test DB ensures + the FORCE ROW LEVEL SECURITY + tenant_isolation policies created in the + L1 migrations (T5/T6) are active. + """ + backend_dir = Path(__file__).parent.parent + env = os.environ.copy() + env["DATABASE_URL"] = _DATABASE_TEST_URL + env["DATABASE_URL_SYNC"] = _DATABASE_TEST_URL_SYNC + subprocess.run( + [sys.executable, "-m", "alembic", "upgrade", "head"], + cwd=backend_dir, + env=env, + check=True, + capture_output=True, + ) + + +# --------------------------------------------------------------------------- +# Seed fixture (module-scoped, synchronous psycopg2) +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def l1_rls_seed(_ensure_rls_schema): + """Insert two accounts, two users, one internal_ticket and one + l1_walk_session per account using a superuser (BYPASSRLS) connection. + + Returns a dict with the seeded IDs so tests can reference them. + Cleans up on module teardown. + """ + conn = psycopg2.connect(**_admin_dsn()) + conn.autocommit = True + cur = conn.cursor() + + # Accounts (idempotent — shared with test_rls_isolation.py) + cur.execute( + "INSERT INTO accounts (id, name, display_code, created_at, updated_at)" + " VALUES (%s, %s, %s, NOW(), NOW())," + " (%s, %s, %s, NOW(), NOW())" + " ON CONFLICT (id) DO NOTHING", + ( + ACCOUNT_A_ID, "L1 RLS Tenant A", "RLSA0001", + ACCOUNT_B_ID, "L1 RLS Tenant B", "RLSB0001", + ), + ) + + user_a_tmp = str(uuid.uuid4()) + user_b_tmp = str(uuid.uuid4()) + cur.execute( + "INSERT INTO users" + " (id, email, password_hash, name, role, is_active," + " account_id, account_role, created_at)" + " VALUES" + " (%s, %s, %s, %s, %s, %s, %s, %s, NOW())," + " (%s, %s, %s, %s, %s, %s, %s, %s, NOW())" + " ON CONFLICT (email) DO NOTHING", + ( + user_a_tmp, "l1-rls-a@example.com", "placeholder", + "L1 RLS User A", "engineer", True, + ACCOUNT_A_ID, "engineer", + user_b_tmp, "l1-rls-b@example.com", "placeholder", + "L1 RLS User B", "engineer", True, + ACCOUNT_B_ID, "engineer", + ), + ) + + cur.execute( + "SELECT id FROM users WHERE email = 'l1-rls-a@example.com'" + ) + user_a_id = str(cur.fetchone()[0]) + cur.execute( + "SELECT id FROM users WHERE email = 'l1-rls-b@example.com'" + ) + user_b_id = str(cur.fetchone()[0]) + + ticket_a_id = str(uuid.uuid4()) + ticket_b_id = str(uuid.uuid4()) + walk_a_id = str(uuid.uuid4()) + walk_b_id = str(uuid.uuid4()) + + cur.execute( + "INSERT INTO internal_tickets" + " (id, account_id, created_by_user_id, problem_statement," + " status, created_at, updated_at)" + " VALUES" + " (%s, %s, %s, %s, %s, NOW(), NOW())," + " (%s, %s, %s, %s, %s, NOW(), NOW())", + ( + ticket_a_id, ACCOUNT_A_ID, user_a_id, + "L1 RLS test ticket A", "open", + ticket_b_id, ACCOUNT_B_ID, user_b_id, + "L1 RLS test ticket B", "open", + ), + ) + + cur.execute( + "INSERT INTO l1_walk_sessions" + " (id, account_id, created_by_user_id, ticket_id, ticket_kind," + " session_kind, status, started_at, last_step_at)" + " VALUES" + " (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())," + " (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())", + ( + walk_a_id, ACCOUNT_A_ID, user_a_id, + "INT-A", "internal", "adhoc", "active", + walk_b_id, ACCOUNT_B_ID, user_b_id, + "INT-B", "internal", "adhoc", "active", + ), + ) + + seed = { + "ticket_a": ticket_a_id, + "ticket_b": ticket_b_id, + "walk_a": walk_a_id, + "walk_b": walk_b_id, + "user_a": user_a_id, + "user_b": user_b_id, + } + + yield seed + + # Cleanup in reverse FK order. + # Delete all child rows for both test accounts before removing users — + # other test modules (test_rls_isolation.py) may have seeded rows for + # these same accounts, so we clean by account_id rather than by row ID. + cur.execute( + "DELETE FROM l1_walk_sessions WHERE account_id IN (%s, %s)", + (ACCOUNT_A_ID, ACCOUNT_B_ID), + ) + cur.execute( + "DELETE FROM internal_tickets WHERE account_id IN (%s, %s)", + (ACCOUNT_A_ID, ACCOUNT_B_ID), + ) + cur.execute( + "DELETE FROM users WHERE email IN (%s, %s)", + ("l1-rls-a@example.com", "l1-rls-b@example.com"), + ) + cur.execute( + "DELETE FROM accounts WHERE id IN (%s, %s)" + " AND display_code IN ('RLSA0001', 'RLSB0001')", + (ACCOUNT_A_ID, ACCOUNT_B_ID), + ) + cur.close() + conn.close() + + +# --------------------------------------------------------------------------- +# Per-test helper: open an app-role connection with a given tenant context +# --------------------------------------------------------------------------- + + +def _app_conn(account_id: str | None = None) -> psycopg2.extensions.connection: + """Open a psycopg2 connection as resolutionflow_app. + + If account_id is given, SET LOCAL app.current_account_id so RLS applies + to the given tenant. Callers must begin a transaction first. + """ + conn = psycopg2.connect(**_app_dsn()) + conn.autocommit = False + cur = conn.cursor() + if account_id: + cur.execute( + "SELECT set_config('app.current_account_id', %s, false)", + (account_id,), + ) + cur.close() + return conn + + +# --------------------------------------------------------------------------- +# internal_tickets — read isolation +# --------------------------------------------------------------------------- + + +def test_l1_user_cannot_read_other_accounts_internal_tickets(l1_rls_seed): + """RLS USING: Account A context must not see Account B's tickets.""" + conn = _app_conn(ACCOUNT_A_ID) + try: + cur = conn.cursor() + cur.execute( + "SELECT id FROM internal_tickets WHERE id = %s", + (l1_rls_seed["ticket_b"],), + ) + rows = cur.fetchall() + finally: + conn.rollback() + conn.close() + assert len(rows) == 0, ( + "Account A must not read Account B's internal_tickets" + ) + + +def test_internal_tickets_account_a_can_see_own_rows(l1_rls_seed): + """Positive check: Account A can read its own internal_tickets.""" + conn = _app_conn(ACCOUNT_A_ID) + try: + cur = conn.cursor() + cur.execute( + "SELECT id FROM internal_tickets WHERE id = %s", + (l1_rls_seed["ticket_a"],), + ) + rows = cur.fetchall() + finally: + conn.rollback() + conn.close() + assert len(rows) == 1, ( + "Account A must be able to read its own internal_tickets" + ) + + +def test_internal_tickets_no_context_sees_nothing(l1_rls_seed): + """Fail-closed: no tenant context → zero internal_tickets rows visible.""" + conn = _app_conn() # no account_id + try: + cur = conn.cursor() + cur.execute( + "SELECT id FROM internal_tickets WHERE id IN (%s, %s)", + (l1_rls_seed["ticket_a"], l1_rls_seed["ticket_b"]), + ) + rows = cur.fetchall() + finally: + conn.rollback() + conn.close() + assert len(rows) == 0, ( + "No-context connection must not see any internal_tickets" + ) + + +# --------------------------------------------------------------------------- +# l1_walk_sessions — read isolation +# --------------------------------------------------------------------------- + + +def test_l1_user_cannot_read_other_accounts_walk_sessions(l1_rls_seed): + """RLS USING: Account A context must not see Account B's walk sessions.""" + conn = _app_conn(ACCOUNT_A_ID) + try: + cur = conn.cursor() + cur.execute( + "SELECT id FROM l1_walk_sessions WHERE id = %s", + (l1_rls_seed["walk_b"],), + ) + rows = cur.fetchall() + finally: + conn.rollback() + conn.close() + assert len(rows) == 0, ( + "Account A must not read Account B's l1_walk_sessions" + ) + + +def test_l1_walk_sessions_account_a_can_see_own_rows(l1_rls_seed): + """Positive check: Account A can read its own l1_walk_sessions.""" + conn = _app_conn(ACCOUNT_A_ID) + try: + cur = conn.cursor() + cur.execute( + "SELECT id FROM l1_walk_sessions WHERE id = %s", + (l1_rls_seed["walk_a"],), + ) + rows = cur.fetchall() + finally: + conn.rollback() + conn.close() + assert len(rows) == 1, ( + "Account A must be able to read its own l1_walk_sessions" + ) + + +def test_l1_walk_sessions_no_context_sees_nothing(l1_rls_seed): + """Fail-closed: no tenant context → zero l1_walk_sessions rows visible.""" + conn = _app_conn() # no account_id + try: + cur = conn.cursor() + cur.execute( + "SELECT id FROM l1_walk_sessions WHERE id IN (%s, %s)", + (l1_rls_seed["walk_a"], l1_rls_seed["walk_b"]), + ) + rows = cur.fetchall() + finally: + conn.rollback() + conn.close() + assert len(rows) == 0, ( + "No-context connection must not see any l1_walk_sessions" + ) + + +# --------------------------------------------------------------------------- +# internal_tickets — WITH CHECK (cross-tenant INSERT rejection) +# --------------------------------------------------------------------------- + + +def test_with_check_blocks_cross_tenant_insert_internal_tickets(l1_rls_seed): + """RLS WITH CHECK: INSERT with account_id = A under context B is rejected. + + psycopg2 raises InsufficientPrivilege (pgcode '42501') when a row + violates FORCE ROW LEVEL SECURITY WITH CHECK. + """ + new_id = str(uuid.uuid4()) + user_b_id = l1_rls_seed["user_b"] + + conn = _app_conn(ACCOUNT_B_ID) + try: + cur = conn.cursor() + with pytest.raises(psycopg2.errors.InsufficientPrivilege): + cur.execute( + "INSERT INTO internal_tickets" + " (id, account_id, created_by_user_id, problem_statement," + " status, created_at, updated_at)" + " VALUES (%s, %s, %s, %s, %s, NOW(), NOW())", + ( + new_id, ACCOUNT_A_ID, user_b_id, + "Cross-tenant injection attempt", "open", + ), + ) + finally: + conn.rollback() + conn.close() + + +# --------------------------------------------------------------------------- +# l1_walk_sessions — WITH CHECK (cross-tenant INSERT rejection) +# --------------------------------------------------------------------------- + + +def test_with_check_blocks_cross_tenant_insert_l1_walk_sessions(l1_rls_seed): + """RLS WITH CHECK: INSERT with account_id = A under context B is rejected.""" + new_id = str(uuid.uuid4()) + user_b_id = l1_rls_seed["user_b"] + + conn = _app_conn(ACCOUNT_B_ID) + try: + cur = conn.cursor() + with pytest.raises(psycopg2.errors.InsufficientPrivilege): + cur.execute( + "INSERT INTO l1_walk_sessions" + " (id, account_id, created_by_user_id, ticket_id," + " ticket_kind, session_kind, status, started_at, last_step_at)" + " VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())", + ( + new_id, ACCOUNT_A_ID, user_b_id, + "INT-cross", "internal", "adhoc", "active", + ), + ) + finally: + conn.rollback() + conn.close()