Full backend suite (1325/1325 passing, xdist) + L1-specific tests (57/57) + L1 RLS tests (8/8) + frontend build (tsc clean, vite clean) + migration roundtrip results. Per-line checklist against spec §15. Known Phase 2/3 items explicitly deferred per plan scope section. fix(test): RLS fixture users INSERT missing NOT NULL columns test_l1_rls.py and test_rls_isolation.py seeded users without the five NOT NULL columns added in prior migrations (is_super_admin, is_team_admin, is_service_account, must_change_password, timezone). Also adds DROP SCHEMA before alembic upgrade in _ensure_rls_schema to prevent DuplicateTable errors when create_all tables are present. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
451 lines
15 KiB
Python
451 lines
15 KiB
Python
# backend/tests/test_l1_rls.py
|
|
"""
|
|
RLS regression tests for L1 Phase 1 tables.
|
|
|
|
Verifies that `internal_tickets` and `l1_walk_sessions` — both with
|
|
FORCE ROW LEVEL SECURITY + `tenant_isolation` policy on `account_id` —
|
|
block cross-tenant reads AND reject WITH CHECK violations on INSERT.
|
|
|
|
Uses synchronous psycopg2 (not asyncpg) to avoid the conftest
|
|
teardown hook that closes the asyncio event loop after every test,
|
|
which is incompatible with module-scoped asyncpg fixtures.
|
|
|
|
Run with:
|
|
RUN_RLS_TESTS=1 DB_APP_ROLE_PASSWORD=app_secret_change_me \
|
|
pytest tests/test_l1_rls.py -v --override-ini="addopts="
|
|
"""
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import uuid
|
|
from pathlib import Path
|
|
from urllib.parse import unquote, urlsplit
|
|
|
|
import psycopg2
|
|
import psycopg2.errors
|
|
import pytest
|
|
|
|
pytestmark = pytest.mark.rls
|
|
|
|
_DATABASE_TEST_URL = os.getenv(
|
|
"DATABASE_TEST_URL",
|
|
"postgresql+asyncpg://postgres:postgres@localhost:5432/resolutionflow_test",
|
|
)
|
|
_DATABASE_TEST_URL_SYNC = _DATABASE_TEST_URL.replace(
|
|
"postgresql+asyncpg://",
|
|
"postgresql://",
|
|
1,
|
|
)
|
|
_TEST_DB_PARTS = urlsplit(_DATABASE_TEST_URL_SYNC)
|
|
|
|
_DB_HOST = os.getenv(
|
|
"TEST_DB_HOST", _TEST_DB_PARTS.hostname or "localhost"
|
|
)
|
|
_DB_PORT = int(os.getenv(
|
|
"TEST_DB_PORT", str(_TEST_DB_PARTS.port or 5432)
|
|
))
|
|
_DB_NAME = os.getenv(
|
|
"TEST_DB_NAME",
|
|
unquote(_TEST_DB_PARTS.path.lstrip("/") or "resolutionflow_test"),
|
|
)
|
|
_ADMIN_USER = os.getenv(
|
|
"TEST_DB_ADMIN_USER",
|
|
unquote(_TEST_DB_PARTS.username or "postgres"),
|
|
)
|
|
_ADMIN_PASSWORD = os.getenv(
|
|
"TEST_DB_ADMIN_PASSWORD",
|
|
unquote(_TEST_DB_PARTS.password or "postgres"),
|
|
)
|
|
_APP_PASSWORD = os.getenv("DB_APP_ROLE_PASSWORD", "app_secret_change_me")
|
|
|
|
ACCOUNT_A_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
|
|
ACCOUNT_B_ID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
|
|
|
|
|
|
def _admin_dsn() -> dict:
|
|
return dict(
|
|
host=_DB_HOST, port=_DB_PORT, dbname=_DB_NAME,
|
|
user=_ADMIN_USER, password=_ADMIN_PASSWORD,
|
|
)
|
|
|
|
|
|
def _app_dsn() -> dict:
|
|
return dict(
|
|
host=_DB_HOST, port=_DB_PORT, dbname=_DB_NAME,
|
|
user="resolutionflow_app", password=_APP_PASSWORD,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schema bootstrap
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def _ensure_rls_schema():
|
|
"""Re-apply Alembic migrations so that RLS policies are present.
|
|
|
|
The standard test_db fixture uses Base.metadata.create_all which skips
|
|
RLS setup. Running 'alembic upgrade head' against the test DB ensures
|
|
the FORCE ROW LEVEL SECURITY + tenant_isolation policies created in the
|
|
L1 migrations (T5/T6) are active.
|
|
|
|
We drop and recreate the public schema first so that any tables left behind
|
|
by a prior create_all-based test_db run don't conflict with alembic's
|
|
migration tracking (alembic would see existing tables without alembic_version
|
|
and fail with DuplicateTable errors).
|
|
"""
|
|
# Drop and recreate the schema to ensure a clean slate for alembic.
|
|
with psycopg2.connect(**_admin_dsn()) as conn:
|
|
conn.autocommit = True
|
|
with conn.cursor() as cur:
|
|
cur.execute("DROP SCHEMA public CASCADE")
|
|
cur.execute("CREATE SCHEMA public")
|
|
|
|
backend_dir = Path(__file__).parent.parent
|
|
env = os.environ.copy()
|
|
env["DATABASE_URL"] = _DATABASE_TEST_URL
|
|
env["DATABASE_URL_SYNC"] = _DATABASE_TEST_URL_SYNC
|
|
subprocess.run(
|
|
[sys.executable, "-m", "alembic", "upgrade", "head"],
|
|
cwd=backend_dir,
|
|
env=env,
|
|
check=True,
|
|
capture_output=True,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Seed fixture (module-scoped, synchronous psycopg2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def l1_rls_seed(_ensure_rls_schema):
|
|
"""Insert two accounts, two users, one internal_ticket and one
|
|
l1_walk_session per account using a superuser (BYPASSRLS) connection.
|
|
|
|
Returns a dict with the seeded IDs so tests can reference them.
|
|
Cleans up on module teardown.
|
|
"""
|
|
conn = psycopg2.connect(**_admin_dsn())
|
|
conn.autocommit = True
|
|
cur = conn.cursor()
|
|
|
|
# Accounts (idempotent — shared with test_rls_isolation.py)
|
|
cur.execute(
|
|
"INSERT INTO accounts (id, name, display_code, created_at, updated_at)"
|
|
" VALUES (%s, %s, %s, NOW(), NOW()),"
|
|
" (%s, %s, %s, NOW(), NOW())"
|
|
" ON CONFLICT (id) DO NOTHING",
|
|
(
|
|
ACCOUNT_A_ID, "L1 RLS Tenant A", "RLSA0001",
|
|
ACCOUNT_B_ID, "L1 RLS Tenant B", "RLSB0001",
|
|
),
|
|
)
|
|
|
|
user_a_tmp = str(uuid.uuid4())
|
|
user_b_tmp = str(uuid.uuid4())
|
|
cur.execute(
|
|
"INSERT INTO users"
|
|
" (id, email, password_hash, name, role,"
|
|
" is_super_admin, is_team_admin, is_service_account, must_change_password,"
|
|
" is_active, account_id, account_role, timezone, created_at)"
|
|
" VALUES"
|
|
" (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()),"
|
|
" (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())"
|
|
" ON CONFLICT (email) DO NOTHING",
|
|
(
|
|
user_a_tmp, "l1-rls-a@example.com", "placeholder",
|
|
"L1 RLS User A", "engineer",
|
|
False, False, False, False,
|
|
True, ACCOUNT_A_ID, "engineer", "UTC",
|
|
user_b_tmp, "l1-rls-b@example.com", "placeholder",
|
|
"L1 RLS User B", "engineer",
|
|
False, False, False, False,
|
|
True, ACCOUNT_B_ID, "engineer", "UTC",
|
|
),
|
|
)
|
|
|
|
cur.execute(
|
|
"SELECT id FROM users WHERE email = 'l1-rls-a@example.com'"
|
|
)
|
|
user_a_id = str(cur.fetchone()[0])
|
|
cur.execute(
|
|
"SELECT id FROM users WHERE email = 'l1-rls-b@example.com'"
|
|
)
|
|
user_b_id = str(cur.fetchone()[0])
|
|
|
|
ticket_a_id = str(uuid.uuid4())
|
|
ticket_b_id = str(uuid.uuid4())
|
|
walk_a_id = str(uuid.uuid4())
|
|
walk_b_id = str(uuid.uuid4())
|
|
|
|
cur.execute(
|
|
"INSERT INTO internal_tickets"
|
|
" (id, account_id, created_by_user_id, problem_statement,"
|
|
" status, created_at, updated_at)"
|
|
" VALUES"
|
|
" (%s, %s, %s, %s, %s, NOW(), NOW()),"
|
|
" (%s, %s, %s, %s, %s, NOW(), NOW())",
|
|
(
|
|
ticket_a_id, ACCOUNT_A_ID, user_a_id,
|
|
"L1 RLS test ticket A", "open",
|
|
ticket_b_id, ACCOUNT_B_ID, user_b_id,
|
|
"L1 RLS test ticket B", "open",
|
|
),
|
|
)
|
|
|
|
cur.execute(
|
|
"INSERT INTO l1_walk_sessions"
|
|
" (id, account_id, created_by_user_id, ticket_id, ticket_kind,"
|
|
" session_kind, status, started_at, last_step_at)"
|
|
" VALUES"
|
|
" (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW()),"
|
|
" (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
|
|
(
|
|
walk_a_id, ACCOUNT_A_ID, user_a_id,
|
|
"INT-A", "internal", "adhoc", "active",
|
|
walk_b_id, ACCOUNT_B_ID, user_b_id,
|
|
"INT-B", "internal", "adhoc", "active",
|
|
),
|
|
)
|
|
|
|
seed = {
|
|
"ticket_a": ticket_a_id,
|
|
"ticket_b": ticket_b_id,
|
|
"walk_a": walk_a_id,
|
|
"walk_b": walk_b_id,
|
|
"user_a": user_a_id,
|
|
"user_b": user_b_id,
|
|
}
|
|
|
|
yield seed
|
|
|
|
# Cleanup in reverse FK order.
|
|
# Delete all child rows for both test accounts before removing users —
|
|
# other test modules (test_rls_isolation.py) may have seeded rows for
|
|
# these same accounts, so we clean by account_id rather than by row ID.
|
|
cur.execute(
|
|
"DELETE FROM l1_walk_sessions WHERE account_id IN (%s, %s)",
|
|
(ACCOUNT_A_ID, ACCOUNT_B_ID),
|
|
)
|
|
cur.execute(
|
|
"DELETE FROM internal_tickets WHERE account_id IN (%s, %s)",
|
|
(ACCOUNT_A_ID, ACCOUNT_B_ID),
|
|
)
|
|
cur.execute(
|
|
"DELETE FROM users WHERE email IN (%s, %s)",
|
|
("l1-rls-a@example.com", "l1-rls-b@example.com"),
|
|
)
|
|
cur.execute(
|
|
"DELETE FROM accounts WHERE id IN (%s, %s)"
|
|
" AND display_code IN ('RLSA0001', 'RLSB0001')",
|
|
(ACCOUNT_A_ID, ACCOUNT_B_ID),
|
|
)
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-test helper: open an app-role connection with a given tenant context
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _app_conn(account_id: str | None = None) -> psycopg2.extensions.connection:
|
|
"""Open a psycopg2 connection as resolutionflow_app.
|
|
|
|
If account_id is given, SET LOCAL app.current_account_id so RLS applies
|
|
to the given tenant. Callers must begin a transaction first.
|
|
"""
|
|
conn = psycopg2.connect(**_app_dsn())
|
|
conn.autocommit = False
|
|
cur = conn.cursor()
|
|
if account_id:
|
|
cur.execute(
|
|
"SELECT set_config('app.current_account_id', %s, false)",
|
|
(account_id,),
|
|
)
|
|
cur.close()
|
|
return conn
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# internal_tickets — read isolation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_l1_user_cannot_read_other_accounts_internal_tickets(l1_rls_seed):
|
|
"""RLS USING: Account A context must not see Account B's tickets."""
|
|
conn = _app_conn(ACCOUNT_A_ID)
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT id FROM internal_tickets WHERE id = %s",
|
|
(l1_rls_seed["ticket_b"],),
|
|
)
|
|
rows = cur.fetchall()
|
|
finally:
|
|
conn.rollback()
|
|
conn.close()
|
|
assert len(rows) == 0, (
|
|
"Account A must not read Account B's internal_tickets"
|
|
)
|
|
|
|
|
|
def test_internal_tickets_account_a_can_see_own_rows(l1_rls_seed):
|
|
"""Positive check: Account A can read its own internal_tickets."""
|
|
conn = _app_conn(ACCOUNT_A_ID)
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT id FROM internal_tickets WHERE id = %s",
|
|
(l1_rls_seed["ticket_a"],),
|
|
)
|
|
rows = cur.fetchall()
|
|
finally:
|
|
conn.rollback()
|
|
conn.close()
|
|
assert len(rows) == 1, (
|
|
"Account A must be able to read its own internal_tickets"
|
|
)
|
|
|
|
|
|
def test_internal_tickets_no_context_sees_nothing(l1_rls_seed):
|
|
"""Fail-closed: no tenant context → zero internal_tickets rows visible."""
|
|
conn = _app_conn() # no account_id
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT id FROM internal_tickets WHERE id IN (%s, %s)",
|
|
(l1_rls_seed["ticket_a"], l1_rls_seed["ticket_b"]),
|
|
)
|
|
rows = cur.fetchall()
|
|
finally:
|
|
conn.rollback()
|
|
conn.close()
|
|
assert len(rows) == 0, (
|
|
"No-context connection must not see any internal_tickets"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# l1_walk_sessions — read isolation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_l1_user_cannot_read_other_accounts_walk_sessions(l1_rls_seed):
|
|
"""RLS USING: Account A context must not see Account B's walk sessions."""
|
|
conn = _app_conn(ACCOUNT_A_ID)
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT id FROM l1_walk_sessions WHERE id = %s",
|
|
(l1_rls_seed["walk_b"],),
|
|
)
|
|
rows = cur.fetchall()
|
|
finally:
|
|
conn.rollback()
|
|
conn.close()
|
|
assert len(rows) == 0, (
|
|
"Account A must not read Account B's l1_walk_sessions"
|
|
)
|
|
|
|
|
|
def test_l1_walk_sessions_account_a_can_see_own_rows(l1_rls_seed):
|
|
"""Positive check: Account A can read its own l1_walk_sessions."""
|
|
conn = _app_conn(ACCOUNT_A_ID)
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT id FROM l1_walk_sessions WHERE id = %s",
|
|
(l1_rls_seed["walk_a"],),
|
|
)
|
|
rows = cur.fetchall()
|
|
finally:
|
|
conn.rollback()
|
|
conn.close()
|
|
assert len(rows) == 1, (
|
|
"Account A must be able to read its own l1_walk_sessions"
|
|
)
|
|
|
|
|
|
def test_l1_walk_sessions_no_context_sees_nothing(l1_rls_seed):
|
|
"""Fail-closed: no tenant context → zero l1_walk_sessions rows visible."""
|
|
conn = _app_conn() # no account_id
|
|
try:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT id FROM l1_walk_sessions WHERE id IN (%s, %s)",
|
|
(l1_rls_seed["walk_a"], l1_rls_seed["walk_b"]),
|
|
)
|
|
rows = cur.fetchall()
|
|
finally:
|
|
conn.rollback()
|
|
conn.close()
|
|
assert len(rows) == 0, (
|
|
"No-context connection must not see any l1_walk_sessions"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# internal_tickets — WITH CHECK (cross-tenant INSERT rejection)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_with_check_blocks_cross_tenant_insert_internal_tickets(l1_rls_seed):
|
|
"""RLS WITH CHECK: INSERT with account_id = A under context B is rejected.
|
|
|
|
psycopg2 raises InsufficientPrivilege (pgcode '42501') when a row
|
|
violates FORCE ROW LEVEL SECURITY WITH CHECK.
|
|
"""
|
|
new_id = str(uuid.uuid4())
|
|
user_b_id = l1_rls_seed["user_b"]
|
|
|
|
conn = _app_conn(ACCOUNT_B_ID)
|
|
try:
|
|
cur = conn.cursor()
|
|
with pytest.raises(psycopg2.errors.InsufficientPrivilege):
|
|
cur.execute(
|
|
"INSERT INTO internal_tickets"
|
|
" (id, account_id, created_by_user_id, problem_statement,"
|
|
" status, created_at, updated_at)"
|
|
" VALUES (%s, %s, %s, %s, %s, NOW(), NOW())",
|
|
(
|
|
new_id, ACCOUNT_A_ID, user_b_id,
|
|
"Cross-tenant injection attempt", "open",
|
|
),
|
|
)
|
|
finally:
|
|
conn.rollback()
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# l1_walk_sessions — WITH CHECK (cross-tenant INSERT rejection)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_with_check_blocks_cross_tenant_insert_l1_walk_sessions(l1_rls_seed):
|
|
"""RLS WITH CHECK: INSERT with account_id = A under context B is rejected."""
|
|
new_id = str(uuid.uuid4())
|
|
user_b_id = l1_rls_seed["user_b"]
|
|
|
|
conn = _app_conn(ACCOUNT_B_ID)
|
|
try:
|
|
cur = conn.cursor()
|
|
with pytest.raises(psycopg2.errors.InsufficientPrivilege):
|
|
cur.execute(
|
|
"INSERT INTO l1_walk_sessions"
|
|
" (id, account_id, created_by_user_id, ticket_id,"
|
|
" ticket_kind, session_kind, status, started_at, last_step_at)"
|
|
" VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
|
|
(
|
|
new_id, ACCOUNT_A_ID, user_b_id,
|
|
"INT-cross", "internal", "adhoc", "active",
|
|
),
|
|
)
|
|
finally:
|
|
conn.rollback()
|
|
conn.close()
|