Files
resolutionflow/backend/tests/test_l1_rls.py
Michael Chihlas 10b5d4e9b0 docs(l1): Phase 1 acceptance validation report
Full backend suite (1325/1325 passing, xdist) + L1-specific tests
(57/57) + L1 RLS tests (8/8) + frontend build (tsc clean, vite clean)
+ migration roundtrip results. Per-line checklist against spec §15.
Known Phase 2/3 items explicitly deferred per plan scope section.

fix(test): RLS fixture users INSERT missing NOT NULL columns
  test_l1_rls.py and test_rls_isolation.py seeded users without the
  five NOT NULL columns added in prior migrations (is_super_admin,
  is_team_admin, is_service_account, must_change_password, timezone).
  Also adds DROP SCHEMA before alembic upgrade in _ensure_rls_schema
  to prevent DuplicateTable errors when create_all tables are present.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-28 16:07:23 -04:00

451 lines
15 KiB
Python

# backend/tests/test_l1_rls.py
"""
RLS regression tests for L1 Phase 1 tables.
Verifies that `internal_tickets` and `l1_walk_sessions` — both with
FORCE ROW LEVEL SECURITY + `tenant_isolation` policy on `account_id` —
block cross-tenant reads AND reject WITH CHECK violations on INSERT.
Uses synchronous psycopg2 (not asyncpg) to avoid the conftest
teardown hook that closes the asyncio event loop after every test,
which is incompatible with module-scoped asyncpg fixtures.
Run with:
RUN_RLS_TESTS=1 DB_APP_ROLE_PASSWORD=app_secret_change_me \
pytest tests/test_l1_rls.py -v --override-ini="addopts="
"""
import os
import subprocess
import sys
import uuid
from pathlib import Path
from urllib.parse import unquote, urlsplit
import psycopg2
import psycopg2.errors
import pytest
pytestmark = pytest.mark.rls
_DATABASE_TEST_URL = os.getenv(
"DATABASE_TEST_URL",
"postgresql+asyncpg://postgres:postgres@localhost:5432/resolutionflow_test",
)
_DATABASE_TEST_URL_SYNC = _DATABASE_TEST_URL.replace(
"postgresql+asyncpg://",
"postgresql://",
1,
)
_TEST_DB_PARTS = urlsplit(_DATABASE_TEST_URL_SYNC)
_DB_HOST = os.getenv(
"TEST_DB_HOST", _TEST_DB_PARTS.hostname or "localhost"
)
_DB_PORT = int(os.getenv(
"TEST_DB_PORT", str(_TEST_DB_PARTS.port or 5432)
))
_DB_NAME = os.getenv(
"TEST_DB_NAME",
unquote(_TEST_DB_PARTS.path.lstrip("/") or "resolutionflow_test"),
)
_ADMIN_USER = os.getenv(
"TEST_DB_ADMIN_USER",
unquote(_TEST_DB_PARTS.username or "postgres"),
)
_ADMIN_PASSWORD = os.getenv(
"TEST_DB_ADMIN_PASSWORD",
unquote(_TEST_DB_PARTS.password or "postgres"),
)
_APP_PASSWORD = os.getenv("DB_APP_ROLE_PASSWORD", "app_secret_change_me")
ACCOUNT_A_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
ACCOUNT_B_ID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
def _admin_dsn() -> dict:
return dict(
host=_DB_HOST, port=_DB_PORT, dbname=_DB_NAME,
user=_ADMIN_USER, password=_ADMIN_PASSWORD,
)
def _app_dsn() -> dict:
return dict(
host=_DB_HOST, port=_DB_PORT, dbname=_DB_NAME,
user="resolutionflow_app", password=_APP_PASSWORD,
)
# ---------------------------------------------------------------------------
# Schema bootstrap
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def _ensure_rls_schema():
"""Re-apply Alembic migrations so that RLS policies are present.
The standard test_db fixture uses Base.metadata.create_all which skips
RLS setup. Running 'alembic upgrade head' against the test DB ensures
the FORCE ROW LEVEL SECURITY + tenant_isolation policies created in the
L1 migrations (T5/T6) are active.
We drop and recreate the public schema first so that any tables left behind
by a prior create_all-based test_db run don't conflict with alembic's
migration tracking (alembic would see existing tables without alembic_version
and fail with DuplicateTable errors).
"""
# Drop and recreate the schema to ensure a clean slate for alembic.
with psycopg2.connect(**_admin_dsn()) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("DROP SCHEMA public CASCADE")
cur.execute("CREATE SCHEMA public")
backend_dir = Path(__file__).parent.parent
env = os.environ.copy()
env["DATABASE_URL"] = _DATABASE_TEST_URL
env["DATABASE_URL_SYNC"] = _DATABASE_TEST_URL_SYNC
subprocess.run(
[sys.executable, "-m", "alembic", "upgrade", "head"],
cwd=backend_dir,
env=env,
check=True,
capture_output=True,
)
# ---------------------------------------------------------------------------
# Seed fixture (module-scoped, synchronous psycopg2)
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def l1_rls_seed(_ensure_rls_schema):
"""Insert two accounts, two users, one internal_ticket and one
l1_walk_session per account using a superuser (BYPASSRLS) connection.
Returns a dict with the seeded IDs so tests can reference them.
Cleans up on module teardown.
"""
conn = psycopg2.connect(**_admin_dsn())
conn.autocommit = True
cur = conn.cursor()
# Accounts (idempotent — shared with test_rls_isolation.py)
cur.execute(
"INSERT INTO accounts (id, name, display_code, created_at, updated_at)"
" VALUES (%s, %s, %s, NOW(), NOW()),"
" (%s, %s, %s, NOW(), NOW())"
" ON CONFLICT (id) DO NOTHING",
(
ACCOUNT_A_ID, "L1 RLS Tenant A", "RLSA0001",
ACCOUNT_B_ID, "L1 RLS Tenant B", "RLSB0001",
),
)
user_a_tmp = str(uuid.uuid4())
user_b_tmp = str(uuid.uuid4())
cur.execute(
"INSERT INTO users"
" (id, email, password_hash, name, role,"
" is_super_admin, is_team_admin, is_service_account, must_change_password,"
" is_active, account_id, account_role, timezone, created_at)"
" VALUES"
" (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()),"
" (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())"
" ON CONFLICT (email) DO NOTHING",
(
user_a_tmp, "l1-rls-a@example.com", "placeholder",
"L1 RLS User A", "engineer",
False, False, False, False,
True, ACCOUNT_A_ID, "engineer", "UTC",
user_b_tmp, "l1-rls-b@example.com", "placeholder",
"L1 RLS User B", "engineer",
False, False, False, False,
True, ACCOUNT_B_ID, "engineer", "UTC",
),
)
cur.execute(
"SELECT id FROM users WHERE email = 'l1-rls-a@example.com'"
)
user_a_id = str(cur.fetchone()[0])
cur.execute(
"SELECT id FROM users WHERE email = 'l1-rls-b@example.com'"
)
user_b_id = str(cur.fetchone()[0])
ticket_a_id = str(uuid.uuid4())
ticket_b_id = str(uuid.uuid4())
walk_a_id = str(uuid.uuid4())
walk_b_id = str(uuid.uuid4())
cur.execute(
"INSERT INTO internal_tickets"
" (id, account_id, created_by_user_id, problem_statement,"
" status, created_at, updated_at)"
" VALUES"
" (%s, %s, %s, %s, %s, NOW(), NOW()),"
" (%s, %s, %s, %s, %s, NOW(), NOW())",
(
ticket_a_id, ACCOUNT_A_ID, user_a_id,
"L1 RLS test ticket A", "open",
ticket_b_id, ACCOUNT_B_ID, user_b_id,
"L1 RLS test ticket B", "open",
),
)
cur.execute(
"INSERT INTO l1_walk_sessions"
" (id, account_id, created_by_user_id, ticket_id, ticket_kind,"
" session_kind, status, started_at, last_step_at)"
" VALUES"
" (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW()),"
" (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
(
walk_a_id, ACCOUNT_A_ID, user_a_id,
"INT-A", "internal", "adhoc", "active",
walk_b_id, ACCOUNT_B_ID, user_b_id,
"INT-B", "internal", "adhoc", "active",
),
)
seed = {
"ticket_a": ticket_a_id,
"ticket_b": ticket_b_id,
"walk_a": walk_a_id,
"walk_b": walk_b_id,
"user_a": user_a_id,
"user_b": user_b_id,
}
yield seed
# Cleanup in reverse FK order.
# Delete all child rows for both test accounts before removing users —
# other test modules (test_rls_isolation.py) may have seeded rows for
# these same accounts, so we clean by account_id rather than by row ID.
cur.execute(
"DELETE FROM l1_walk_sessions WHERE account_id IN (%s, %s)",
(ACCOUNT_A_ID, ACCOUNT_B_ID),
)
cur.execute(
"DELETE FROM internal_tickets WHERE account_id IN (%s, %s)",
(ACCOUNT_A_ID, ACCOUNT_B_ID),
)
cur.execute(
"DELETE FROM users WHERE email IN (%s, %s)",
("l1-rls-a@example.com", "l1-rls-b@example.com"),
)
cur.execute(
"DELETE FROM accounts WHERE id IN (%s, %s)"
" AND display_code IN ('RLSA0001', 'RLSB0001')",
(ACCOUNT_A_ID, ACCOUNT_B_ID),
)
cur.close()
conn.close()
# ---------------------------------------------------------------------------
# Per-test helper: open an app-role connection with a given tenant context
# ---------------------------------------------------------------------------
def _app_conn(account_id: str | None = None) -> psycopg2.extensions.connection:
"""Open a psycopg2 connection as resolutionflow_app.
If account_id is given, SET LOCAL app.current_account_id so RLS applies
to the given tenant. Callers must begin a transaction first.
"""
conn = psycopg2.connect(**_app_dsn())
conn.autocommit = False
cur = conn.cursor()
if account_id:
cur.execute(
"SELECT set_config('app.current_account_id', %s, false)",
(account_id,),
)
cur.close()
return conn
# ---------------------------------------------------------------------------
# internal_tickets — read isolation
# ---------------------------------------------------------------------------
def test_l1_user_cannot_read_other_accounts_internal_tickets(l1_rls_seed):
"""RLS USING: Account A context must not see Account B's tickets."""
conn = _app_conn(ACCOUNT_A_ID)
try:
cur = conn.cursor()
cur.execute(
"SELECT id FROM internal_tickets WHERE id = %s",
(l1_rls_seed["ticket_b"],),
)
rows = cur.fetchall()
finally:
conn.rollback()
conn.close()
assert len(rows) == 0, (
"Account A must not read Account B's internal_tickets"
)
def test_internal_tickets_account_a_can_see_own_rows(l1_rls_seed):
"""Positive check: Account A can read its own internal_tickets."""
conn = _app_conn(ACCOUNT_A_ID)
try:
cur = conn.cursor()
cur.execute(
"SELECT id FROM internal_tickets WHERE id = %s",
(l1_rls_seed["ticket_a"],),
)
rows = cur.fetchall()
finally:
conn.rollback()
conn.close()
assert len(rows) == 1, (
"Account A must be able to read its own internal_tickets"
)
def test_internal_tickets_no_context_sees_nothing(l1_rls_seed):
"""Fail-closed: no tenant context → zero internal_tickets rows visible."""
conn = _app_conn() # no account_id
try:
cur = conn.cursor()
cur.execute(
"SELECT id FROM internal_tickets WHERE id IN (%s, %s)",
(l1_rls_seed["ticket_a"], l1_rls_seed["ticket_b"]),
)
rows = cur.fetchall()
finally:
conn.rollback()
conn.close()
assert len(rows) == 0, (
"No-context connection must not see any internal_tickets"
)
# ---------------------------------------------------------------------------
# l1_walk_sessions — read isolation
# ---------------------------------------------------------------------------
def test_l1_user_cannot_read_other_accounts_walk_sessions(l1_rls_seed):
"""RLS USING: Account A context must not see Account B's walk sessions."""
conn = _app_conn(ACCOUNT_A_ID)
try:
cur = conn.cursor()
cur.execute(
"SELECT id FROM l1_walk_sessions WHERE id = %s",
(l1_rls_seed["walk_b"],),
)
rows = cur.fetchall()
finally:
conn.rollback()
conn.close()
assert len(rows) == 0, (
"Account A must not read Account B's l1_walk_sessions"
)
def test_l1_walk_sessions_account_a_can_see_own_rows(l1_rls_seed):
"""Positive check: Account A can read its own l1_walk_sessions."""
conn = _app_conn(ACCOUNT_A_ID)
try:
cur = conn.cursor()
cur.execute(
"SELECT id FROM l1_walk_sessions WHERE id = %s",
(l1_rls_seed["walk_a"],),
)
rows = cur.fetchall()
finally:
conn.rollback()
conn.close()
assert len(rows) == 1, (
"Account A must be able to read its own l1_walk_sessions"
)
def test_l1_walk_sessions_no_context_sees_nothing(l1_rls_seed):
"""Fail-closed: no tenant context → zero l1_walk_sessions rows visible."""
conn = _app_conn() # no account_id
try:
cur = conn.cursor()
cur.execute(
"SELECT id FROM l1_walk_sessions WHERE id IN (%s, %s)",
(l1_rls_seed["walk_a"], l1_rls_seed["walk_b"]),
)
rows = cur.fetchall()
finally:
conn.rollback()
conn.close()
assert len(rows) == 0, (
"No-context connection must not see any l1_walk_sessions"
)
# ---------------------------------------------------------------------------
# internal_tickets — WITH CHECK (cross-tenant INSERT rejection)
# ---------------------------------------------------------------------------
def test_with_check_blocks_cross_tenant_insert_internal_tickets(l1_rls_seed):
"""RLS WITH CHECK: INSERT with account_id = A under context B is rejected.
psycopg2 raises InsufficientPrivilege (pgcode '42501') when a row
violates FORCE ROW LEVEL SECURITY WITH CHECK.
"""
new_id = str(uuid.uuid4())
user_b_id = l1_rls_seed["user_b"]
conn = _app_conn(ACCOUNT_B_ID)
try:
cur = conn.cursor()
with pytest.raises(psycopg2.errors.InsufficientPrivilege):
cur.execute(
"INSERT INTO internal_tickets"
" (id, account_id, created_by_user_id, problem_statement,"
" status, created_at, updated_at)"
" VALUES (%s, %s, %s, %s, %s, NOW(), NOW())",
(
new_id, ACCOUNT_A_ID, user_b_id,
"Cross-tenant injection attempt", "open",
),
)
finally:
conn.rollback()
conn.close()
# ---------------------------------------------------------------------------
# l1_walk_sessions — WITH CHECK (cross-tenant INSERT rejection)
# ---------------------------------------------------------------------------
def test_with_check_blocks_cross_tenant_insert_l1_walk_sessions(l1_rls_seed):
"""RLS WITH CHECK: INSERT with account_id = A under context B is rejected."""
new_id = str(uuid.uuid4())
user_b_id = l1_rls_seed["user_b"]
conn = _app_conn(ACCOUNT_B_ID)
try:
cur = conn.cursor()
with pytest.raises(psycopg2.errors.InsufficientPrivilege):
cur.execute(
"INSERT INTO l1_walk_sessions"
" (id, account_id, created_by_user_id, ticket_id,"
" ticket_kind, session_kind, status, started_at, last_step_at)"
" VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
(
new_id, ACCOUNT_A_ID, user_b_id,
"INT-cross", "internal", "adhoc", "active",
),
)
finally:
conn.rollback()
conn.close()