Full backend suite (1325/1325 passing, xdist) + L1-specific tests (57/57) + L1 RLS tests (8/8) + frontend build (tsc clean, vite clean) + migration roundtrip results. Per-line checklist against spec §15. Known Phase 2/3 items explicitly deferred per plan scope section. fix(test): RLS fixture users INSERT missing NOT NULL columns test_l1_rls.py and test_rls_isolation.py seeded users without the five NOT NULL columns added in prior migrations (is_super_admin, is_team_admin, is_service_account, must_change_password, timezone). Also adds DROP SCHEMA before alembic upgrade in _ensure_rls_schema to prevent DuplicateTable errors when create_all tables are present. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1122 lines
44 KiB
Python
1122 lines
44 KiB
Python
# backend/tests/test_rls_isolation.py
|
|
"""
|
|
RLS foundation tests.
|
|
|
|
Connect directly as resolutionflow_app (not superuser) and verify:
|
|
- Tenant A cannot read Tenant B's rows
|
|
- No tenant context set → zero rows for private data (fail-closed)
|
|
- Platform rows (PLATFORM_ACCOUNT_ID) are visible to all tenants
|
|
|
|
Tests bypass FastAPI entirely — raw asyncpg connections only.
|
|
MUST FAIL before Task 10 (RLS migration) and PASS after it.
|
|
|
|
Run with:
|
|
RUN_RLS_TESTS=1 DB_APP_ROLE_PASSWORD=app_secret_change_me pytest tests/test_rls_isolation.py -v
|
|
|
|
The test DB comes from DATABASE_TEST_URL, matching conftest.py.
|
|
"""
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import uuid
|
|
from pathlib import Path
|
|
from urllib.parse import unquote, urlsplit
|
|
|
|
import asyncpg
|
|
import psycopg2
|
|
import pytest
|
|
import pytest_asyncio
|
|
|
|
# All tests in this module use module-scoped async fixtures (admin_conn,
|
|
# seed_rls_test_data) which run on the module event loop. Without this marker,
|
|
# pytest-asyncio 0.23+ defaults tests to function-scoped loops, causing
|
|
# "Future attached to a different loop" errors on the asyncpg connections.
|
|
pytestmark = [
|
|
pytest.mark.asyncio(loop_scope="module"),
|
|
pytest.mark.rls,
|
|
]
|
|
|
|
_DATABASE_TEST_URL = os.getenv(
|
|
"DATABASE_TEST_URL",
|
|
"postgresql+asyncpg://postgres:postgres@localhost:5432/resolutionflow_test",
|
|
)
|
|
_DATABASE_TEST_URL_ASYNCPG = _DATABASE_TEST_URL.replace(
|
|
"postgresql+asyncpg://",
|
|
"postgresql://",
|
|
1,
|
|
)
|
|
_DATABASE_TEST_URL_SYNC = _DATABASE_TEST_URL_ASYNCPG
|
|
_TEST_DB_PARTS = urlsplit(_DATABASE_TEST_URL_ASYNCPG)
|
|
|
|
_DB_HOST = os.getenv("TEST_DB_HOST", _TEST_DB_PARTS.hostname or "localhost")
|
|
_DB_PORT = int(os.getenv("TEST_DB_PORT", str(_TEST_DB_PARTS.port or 5432)))
|
|
_DB_NAME = os.getenv(
|
|
"TEST_DB_NAME",
|
|
unquote(_TEST_DB_PARTS.path.lstrip("/") or "resolutionflow_test"),
|
|
)
|
|
_ADMIN_USER = os.getenv(
|
|
"TEST_DB_ADMIN_USER",
|
|
unquote(_TEST_DB_PARTS.username or "postgres"),
|
|
)
|
|
_ADMIN_PASSWORD = os.getenv(
|
|
"TEST_DB_ADMIN_PASSWORD",
|
|
unquote(_TEST_DB_PARTS.password or "postgres"),
|
|
)
|
|
_APP_PASSWORD = os.getenv("DB_APP_ROLE_PASSWORD", "app_secret_change_me")
|
|
|
|
PLATFORM_ACCOUNT_ID = "00000000-0000-0000-0000-000000000001"
|
|
ACCOUNT_A_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
|
|
ACCOUNT_B_ID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture(scope="module")
|
|
def _ensure_rls_schema():
|
|
"""Re-apply Alembic migrations before the module runs.
|
|
|
|
Function-scoped test_db fixtures in other modules drop and recreate the
|
|
public schema using Base.metadata.create_all, which does not enable RLS
|
|
or create DB roles. This fixture re-runs 'alembic upgrade head' so that
|
|
the full migration-managed schema (including RLS policies) is in place.
|
|
|
|
We drop and recreate the public schema first so that any tables left behind
|
|
by a prior create_all-based test_db run don't conflict with alembic's
|
|
migration tracking.
|
|
"""
|
|
# Drop and recreate the schema to ensure a clean slate for alembic.
|
|
admin_dsn = dict(
|
|
host=_DB_HOST, port=_DB_PORT, dbname=_DB_NAME,
|
|
user=_ADMIN_USER, password=_ADMIN_PASSWORD,
|
|
)
|
|
with psycopg2.connect(**admin_dsn) as conn:
|
|
conn.autocommit = True
|
|
with conn.cursor() as cur:
|
|
cur.execute("DROP SCHEMA public CASCADE")
|
|
cur.execute("CREATE SCHEMA public")
|
|
|
|
backend_dir = Path(__file__).parent.parent
|
|
env = os.environ.copy()
|
|
env["DATABASE_URL"] = _DATABASE_TEST_URL
|
|
env["DATABASE_URL_SYNC"] = _DATABASE_TEST_URL_SYNC
|
|
subprocess.run(
|
|
[sys.executable, "-m", "alembic", "upgrade", "head"],
|
|
cwd=backend_dir,
|
|
env=env,
|
|
check=True,
|
|
capture_output=True,
|
|
)
|
|
|
|
|
|
@pytest_asyncio.fixture(scope="module", loop_scope="module")
|
|
async def admin_conn(_ensure_rls_schema):
|
|
"""Superuser asyncpg connection for fixture setup and teardown."""
|
|
conn = await asyncpg.connect(
|
|
host=_DB_HOST,
|
|
port=_DB_PORT,
|
|
database=_DB_NAME,
|
|
user=_ADMIN_USER,
|
|
password=_ADMIN_PASSWORD,
|
|
)
|
|
yield conn
|
|
await conn.close()
|
|
|
|
|
|
@pytest_asyncio.fixture(scope="module", loop_scope="module", autouse=True)
|
|
async def seed_rls_test_data(admin_conn):
|
|
"""
|
|
Create two isolated test accounts, one user per account, and one private
|
|
tree per account. Trees require a valid author_id FK to users, so users
|
|
must be created first.
|
|
|
|
accounts.display_code must be unique and 8 chars (NOT NULL constraint).
|
|
"""
|
|
# Insert accounts
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO accounts (id, name, display_code, created_at, updated_at)
|
|
VALUES
|
|
('{ACCOUNT_A_ID}', 'RLS Tenant A', 'RLSA0001', NOW(), NOW()),
|
|
('{ACCOUNT_B_ID}', 'RLS Tenant B', 'RLSB0001', NOW(), NOW())
|
|
ON CONFLICT (id) DO NOTHING
|
|
""")
|
|
|
|
# Insert one user per account (users.account_id NOT NULL, password_hash NOT NULL)
|
|
user_a_id = str(uuid.uuid4())
|
|
user_b_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO users (
|
|
id, email, password_hash, name, role,
|
|
is_super_admin, is_team_admin, is_service_account, must_change_password,
|
|
is_active, account_id, account_role, timezone, created_at
|
|
) VALUES
|
|
('{user_a_id}', 'rls-user-a@example.com',
|
|
'placeholder', 'RLS User A', 'engineer',
|
|
FALSE, FALSE, FALSE, FALSE,
|
|
TRUE, '{ACCOUNT_A_ID}', 'engineer', 'UTC', NOW()),
|
|
('{user_b_id}', 'rls-user-b@example.com',
|
|
'placeholder', 'RLS User B', 'engineer',
|
|
FALSE, FALSE, FALSE, FALSE,
|
|
TRUE, '{ACCOUNT_B_ID}', 'engineer', 'UTC', NOW())
|
|
ON CONFLICT (email) DO NOTHING
|
|
""")
|
|
|
|
# Look up the user IDs we just inserted (ON CONFLICT may have skipped)
|
|
row_a = await admin_conn.fetchrow(
|
|
"SELECT id FROM users WHERE email = 'rls-user-a@example.com'"
|
|
)
|
|
row_b = await admin_conn.fetchrow(
|
|
"SELECT id FROM users WHERE email = 'rls-user-b@example.com'"
|
|
)
|
|
actual_user_a = str(row_a["id"])
|
|
actual_user_b = str(row_b["id"])
|
|
|
|
# Insert one private tree per account with explicit author_id
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO trees (
|
|
id, name, tree_structure, account_id, author_id, is_active, is_default,
|
|
is_public, visibility, tree_type, created_at, updated_at
|
|
) VALUES
|
|
(gen_random_uuid(), 'RLS Tree A', '[]'::jsonb, '{ACCOUNT_A_ID}', '{actual_user_a}',
|
|
TRUE, FALSE, FALSE, 'private', 'troubleshooting', NOW(), NOW()),
|
|
(gen_random_uuid(), 'RLS Tree B', '[]'::jsonb, '{ACCOUNT_B_ID}', '{actual_user_b}',
|
|
TRUE, FALSE, FALSE, 'private', 'troubleshooting', NOW(), NOW())
|
|
""")
|
|
|
|
# One platform-owned tree_tag (global, visible to all tenants)
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO tree_tags (
|
|
id, name, slug, account_id, usage_count, created_at
|
|
) VALUES (
|
|
gen_random_uuid(), 'rls-global-tag', 'rls-global-tag',
|
|
'{PLATFORM_ACCOUNT_ID}', 0, NOW()
|
|
) ON CONFLICT DO NOTHING
|
|
""")
|
|
|
|
yield
|
|
|
|
# Cleanup
|
|
await admin_conn.execute(
|
|
f"DELETE FROM trees WHERE account_id IN ('{ACCOUNT_A_ID}', '{ACCOUNT_B_ID}')"
|
|
)
|
|
await admin_conn.execute(
|
|
"DELETE FROM users WHERE email IN "
|
|
"('rls-user-a@example.com', 'rls-user-b@example.com')"
|
|
)
|
|
await admin_conn.execute(
|
|
f"DELETE FROM accounts WHERE id IN ('{ACCOUNT_A_ID}', '{ACCOUNT_B_ID}')"
|
|
)
|
|
await admin_conn.execute("DELETE FROM tree_tags WHERE slug = 'rls-global-tag'")
|
|
|
|
|
|
@pytest_asyncio.fixture(loop_scope="module")
|
|
async def conn_a():
|
|
"""App-role connection, tenant context = Account A."""
|
|
conn = await asyncpg.connect(
|
|
host=_DB_HOST, port=_DB_PORT, database=_DB_NAME,
|
|
user="resolutionflow_app", password=_APP_PASSWORD,
|
|
)
|
|
await conn.execute(
|
|
"SELECT set_config('app.current_account_id', $1, false)", ACCOUNT_A_ID
|
|
)
|
|
yield conn
|
|
await conn.close()
|
|
|
|
|
|
@pytest_asyncio.fixture(loop_scope="module")
|
|
async def conn_b():
|
|
"""App-role connection, tenant context = Account B."""
|
|
conn = await asyncpg.connect(
|
|
host=_DB_HOST, port=_DB_PORT, database=_DB_NAME,
|
|
user="resolutionflow_app", password=_APP_PASSWORD,
|
|
)
|
|
await conn.execute(
|
|
"SELECT set_config('app.current_account_id', $1, false)", ACCOUNT_B_ID
|
|
)
|
|
yield conn
|
|
await conn.close()
|
|
|
|
|
|
@pytest_asyncio.fixture(loop_scope="module")
|
|
async def conn_no_context():
|
|
"""App-role connection with NO tenant context set."""
|
|
conn = await asyncpg.connect(
|
|
host=_DB_HOST, port=_DB_PORT, database=_DB_NAME,
|
|
user="resolutionflow_app", password=_APP_PASSWORD,
|
|
)
|
|
yield conn
|
|
await conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# trees
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_trees_account_a_cannot_see_account_b_rows(conn_a):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B trees"
|
|
|
|
|
|
async def test_trees_account_a_can_see_own_rows(conn_a):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_A_ID}'"
|
|
)
|
|
assert len(rows) >= 1, "Account A should see its own trees"
|
|
|
|
|
|
async def test_trees_no_context_sees_no_private_trees(conn_no_context):
|
|
rows = await conn_no_context.fetch(
|
|
"SELECT id FROM trees WHERE is_default = FALSE AND is_public = FALSE"
|
|
)
|
|
assert len(rows) == 0, "No-context connection should see no private trees"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# tree_tags — platform visibility
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_tree_tags_account_a_cannot_see_account_b_tags(conn_a):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM tree_tags WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0
|
|
|
|
|
|
async def test_tree_tags_both_tenants_see_platform_tags(conn_a, conn_b):
|
|
rows_a = await conn_a.fetch(
|
|
f"SELECT id FROM tree_tags WHERE account_id = '{PLATFORM_ACCOUNT_ID}'"
|
|
)
|
|
rows_b = await conn_b.fetch(
|
|
f"SELECT id FROM tree_tags WHERE account_id = '{PLATFORM_ACCOUNT_ID}'"
|
|
)
|
|
assert len(rows_a) >= 1, "Account A should see platform tags"
|
|
assert len(rows_b) >= 1, "Account B should see platform tags"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# tree_categories — platform visibility
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_tree_categories_account_a_cannot_see_account_b(conn_a):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM tree_categories WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# step_categories — platform visibility
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_step_categories_account_a_cannot_see_account_b(conn_a):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM step_categories WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# psa_connections — tenant-only
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_psa_connections_account_a_cannot_see_account_b(conn_a):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM psa_connections WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# flow_proposals — tenant-only
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_flow_proposals_account_a_cannot_see_account_b(conn_a):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM flow_proposals WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Phase 2 fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest_asyncio.fixture(scope="module", loop_scope="module")
|
|
async def session_row_ids(admin_conn):
|
|
"""
|
|
Insert one `sessions` row and one `ai_sessions` row for each of
|
|
ACCOUNT_A and ACCOUNT_B using the superuser connection (BYPASSRLS).
|
|
Returns a dict with the inserted IDs for use in tests.
|
|
Cleans up on exit.
|
|
"""
|
|
# Resolve a valid tree_id and user_id for each account
|
|
tree_a = await admin_conn.fetchrow(
|
|
f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_A_ID}' LIMIT 1"
|
|
)
|
|
tree_b = await admin_conn.fetchrow(
|
|
f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_B_ID}' LIMIT 1"
|
|
)
|
|
user_a = await admin_conn.fetchrow(
|
|
f"SELECT id FROM users WHERE account_id = '{ACCOUNT_A_ID}' LIMIT 1"
|
|
)
|
|
user_b = await admin_conn.fetchrow(
|
|
f"SELECT id FROM users WHERE account_id = '{ACCOUNT_B_ID}' LIMIT 1"
|
|
)
|
|
|
|
assert tree_a is not None, f"No tree found for ACCOUNT_A ({ACCOUNT_A_ID}) — seed_rls_test_data must run first"
|
|
assert tree_b is not None, f"No tree found for ACCOUNT_B ({ACCOUNT_B_ID}) — seed_rls_test_data must run first"
|
|
assert user_a is not None, f"No user found for ACCOUNT_A ({ACCOUNT_A_ID}) — seed_rls_test_data must run first"
|
|
assert user_b is not None, f"No user found for ACCOUNT_B ({ACCOUNT_B_ID}) — seed_rls_test_data must run first"
|
|
|
|
tree_a_id = str(tree_a["id"])
|
|
tree_b_id = str(tree_b["id"])
|
|
user_a_id = str(user_a["id"])
|
|
user_b_id = str(user_b["id"])
|
|
|
|
session_a_id = str(uuid.uuid4())
|
|
session_b_id = str(uuid.uuid4())
|
|
ai_session_a_id = str(uuid.uuid4())
|
|
ai_session_b_id = str(uuid.uuid4())
|
|
|
|
# Insert sessions rows (sessions uses started_at not created_at)
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO sessions (
|
|
id, tree_id, user_id, account_id, tree_snapshot,
|
|
path_taken, decisions, custom_steps, started_at
|
|
) VALUES
|
|
('{session_a_id}', '{tree_a_id}', '{user_a_id}', '{ACCOUNT_A_ID}',
|
|
'[]'::jsonb, '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, NOW()),
|
|
('{session_b_id}', '{tree_b_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
|
|
'[]'::jsonb, '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, NOW())
|
|
""")
|
|
|
|
# Insert ai_sessions rows
|
|
# confidence_tier valid values: 'guided' | 'exploring' | 'discovery'
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO ai_sessions (
|
|
id, user_id, account_id, session_type, intake_type,
|
|
intake_content, status, confidence_tier, confidence_score,
|
|
created_at, updated_at
|
|
) VALUES
|
|
('{ai_session_a_id}', '{user_a_id}', '{ACCOUNT_A_ID}',
|
|
'guided', 'free_text', '{{}}'::jsonb, 'active', 'guided', 0.0,
|
|
NOW(), NOW()),
|
|
('{ai_session_b_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
|
|
'guided', 'free_text', '{{}}'::jsonb, 'active', 'guided', 0.0,
|
|
NOW(), NOW())
|
|
""")
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Seed Account B rows for every "cannot-see" table that would otherwise be
|
|
# empty. Without these, isolation tests pass vacuously even when RLS is off.
|
|
# -------------------------------------------------------------------------
|
|
|
|
# session_branches (FK: ai_sessions.id)
|
|
branch_b_row = await admin_conn.fetchrow("""
|
|
INSERT INTO session_branches (
|
|
id, session_id, account_id, branch_order, label, status,
|
|
conversation_messages, created_at, updated_at
|
|
) VALUES (
|
|
gen_random_uuid(), $1::uuid, $2::uuid, 1, 'test-branch', 'active',
|
|
'[]'::jsonb, NOW(), NOW()
|
|
) RETURNING id
|
|
""", ai_session_b_id, ACCOUNT_B_ID)
|
|
branch_b_id = str(branch_b_row["id"])
|
|
|
|
# session_supporting_data (FK: sessions.id)
|
|
supporting_data_b_row = await admin_conn.fetchrow("""
|
|
INSERT INTO session_supporting_data (
|
|
id, session_id, account_id, label, data_type, content,
|
|
sort_order, created_at, updated_at
|
|
) VALUES (
|
|
gen_random_uuid(), $1::uuid, $2::uuid, 'test-data', 'text_snippet',
|
|
'test content', 0, NOW(), NOW()
|
|
) RETURNING id
|
|
""", session_b_id, ACCOUNT_B_ID)
|
|
supporting_data_b_id = str(supporting_data_b_row["id"])
|
|
|
|
# session_resolution_outputs (FK: ai_sessions.id)
|
|
resolution_output_b_row = await admin_conn.fetchrow("""
|
|
INSERT INTO session_resolution_outputs (
|
|
id, session_id, account_id, output_type, generated_content,
|
|
status, generated_by_model, created_at, updated_at
|
|
) VALUES (
|
|
gen_random_uuid(), $1::uuid, $2::uuid, 'psa_ticket_notes',
|
|
'test content', 'draft', 'test-model', NOW(), NOW()
|
|
) RETURNING id
|
|
""", ai_session_b_id, ACCOUNT_B_ID)
|
|
resolution_output_b_id = str(resolution_output_b_row["id"])
|
|
|
|
# session_handoffs (FK: ai_sessions.id, users.id)
|
|
handoff_b_row = await admin_conn.fetchrow("""
|
|
INSERT INTO session_handoffs (
|
|
id, session_id, account_id, handed_off_by, intent, snapshot,
|
|
priority, psa_note_pushed, notification_sent, created_at
|
|
) VALUES (
|
|
gen_random_uuid(), $1::uuid, $2::uuid, $3::uuid, 'park',
|
|
'{}'::jsonb, 'normal', false, false, NOW()
|
|
) RETURNING id
|
|
""", ai_session_b_id, ACCOUNT_B_ID, user_b_id)
|
|
handoff_b_id = str(handoff_b_row["id"])
|
|
|
|
# maintenance_schedules (FK: trees.id)
|
|
maintenance_b_row = await admin_conn.fetchrow("""
|
|
INSERT INTO maintenance_schedules (
|
|
id, tree_id, account_id, cron_expression, timezone,
|
|
created_at, updated_at
|
|
) VALUES (
|
|
gen_random_uuid(), $1::uuid, $2::uuid, '0 9 * * 1', 'UTC',
|
|
NOW(), NOW()
|
|
) RETURNING id
|
|
""", tree_b_id, ACCOUNT_B_ID)
|
|
maintenance_b_id = str(maintenance_b_row["id"])
|
|
|
|
# psa_post_log (FK: ai_sessions.id, users.id)
|
|
psa_log_b_row = await admin_conn.fetchrow("""
|
|
INSERT INTO psa_post_log (
|
|
id, ai_session_id, account_id, ticket_id, note_type,
|
|
content_posted, status, posted_by, posted_at
|
|
) VALUES (
|
|
gen_random_uuid(), $1::uuid, $2::uuid, 'TEST-0001', 'internal',
|
|
'test note', 'success', $3::uuid, NOW()
|
|
) RETURNING id
|
|
""", ai_session_b_id, ACCOUNT_B_ID, user_b_id)
|
|
psa_log_b_id = str(psa_log_b_row["id"])
|
|
|
|
# script_templates requires a script_categories row — insert a temporary one
|
|
script_category_b_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO script_categories (id, name, slug, sort_order, is_active, created_at, updated_at)
|
|
VALUES ('{script_category_b_id}', 'RLS Test Category', 'rls-test-category-{script_category_b_id[:8]}',
|
|
0, true, NOW(), NOW())
|
|
""")
|
|
|
|
script_template_b_row = await admin_conn.fetchrow(f"""
|
|
INSERT INTO script_templates (
|
|
id, category_id, account_id, name, slug, script_body,
|
|
complexity, is_active, created_at, updated_at
|
|
) VALUES (
|
|
gen_random_uuid(), '{script_category_b_id}'::uuid, $1::uuid,
|
|
'RLS Test Template', 'rls-test-template-b-' || gen_random_uuid()::text,
|
|
'Write-Host "test"', 'beginner', true, NOW(), NOW()
|
|
) RETURNING id
|
|
""", ACCOUNT_B_ID)
|
|
script_template_b_id = str(script_template_b_row["id"])
|
|
|
|
# script_generations (FK: script_templates.id, users.id)
|
|
script_gen_b_row = await admin_conn.fetchrow("""
|
|
INSERT INTO script_generations (
|
|
id, template_id, user_id, account_id, parameters_used,
|
|
generated_script, created_at
|
|
) VALUES (
|
|
gen_random_uuid(), $1::uuid, $2::uuid, $3::uuid, '{}'::jsonb,
|
|
'test script', NOW()
|
|
) RETURNING id
|
|
""", script_template_b_id, user_b_id, ACCOUNT_B_ID)
|
|
script_gen_b_id = str(script_gen_b_row["id"])
|
|
|
|
try:
|
|
yield {
|
|
"session_a": session_a_id,
|
|
"session_b": session_b_id,
|
|
"ai_session_a": ai_session_a_id,
|
|
"ai_session_b": ai_session_b_id,
|
|
}
|
|
finally:
|
|
# Cleanup in reverse FK order (children before parents)
|
|
await admin_conn.execute(
|
|
f"DELETE FROM script_generations WHERE id = '{script_gen_b_id}'"
|
|
)
|
|
await admin_conn.execute(
|
|
f"DELETE FROM session_branches WHERE id = '{branch_b_id}'"
|
|
)
|
|
await admin_conn.execute(
|
|
f"DELETE FROM session_supporting_data WHERE id = '{supporting_data_b_id}'"
|
|
)
|
|
await admin_conn.execute(
|
|
f"DELETE FROM session_resolution_outputs WHERE id = '{resolution_output_b_id}'"
|
|
)
|
|
await admin_conn.execute(
|
|
f"DELETE FROM session_handoffs WHERE id = '{handoff_b_id}'"
|
|
)
|
|
await admin_conn.execute(
|
|
f"DELETE FROM maintenance_schedules WHERE id = '{maintenance_b_id}'"
|
|
)
|
|
await admin_conn.execute(
|
|
f"DELETE FROM psa_post_log WHERE id = '{psa_log_b_id}'"
|
|
)
|
|
await admin_conn.execute(
|
|
f"DELETE FROM script_templates WHERE id = '{script_template_b_id}'"
|
|
)
|
|
await admin_conn.execute(
|
|
f"DELETE FROM script_categories WHERE id = '{script_category_b_id}'"
|
|
)
|
|
await admin_conn.execute(
|
|
f"DELETE FROM sessions WHERE id IN ('{session_a_id}', '{session_b_id}')"
|
|
)
|
|
await admin_conn.execute(
|
|
f"DELETE FROM ai_sessions WHERE id IN ('{ai_session_a_id}', '{ai_session_b_id}')"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# sessions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_sessions_account_a_cannot_see_account_b_sessions(conn_a, session_row_ids):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM sessions WHERE id = '{session_row_ids['session_b']}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B sessions"
|
|
|
|
|
|
async def test_sessions_account_a_can_see_own_sessions(conn_a, session_row_ids):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM sessions WHERE id = '{session_row_ids['session_a']}'"
|
|
)
|
|
assert len(rows) == 1, "Account A should see its own sessions"
|
|
|
|
|
|
async def test_sessions_no_context_sees_nothing(conn_no_context, session_row_ids):
|
|
rows = await conn_no_context.fetch(
|
|
f"SELECT id FROM sessions WHERE id IN "
|
|
f"('{session_row_ids['session_a']}', '{session_row_ids['session_b']}')"
|
|
)
|
|
assert len(rows) == 0, "No-context connection should see no sessions"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ai_sessions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_ai_sessions_account_a_cannot_see_account_b(conn_a, session_row_ids):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM ai_sessions WHERE id = '{session_row_ids['ai_session_b']}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B ai_sessions"
|
|
|
|
|
|
async def test_ai_sessions_account_a_can_see_own(conn_a, session_row_ids):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM ai_sessions WHERE id = '{session_row_ids['ai_session_a']}'"
|
|
)
|
|
assert len(rows) == 1, "Account A should see its own ai_sessions"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# session_branches
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_session_branches_account_a_cannot_see_account_b(conn_a, session_row_ids):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM session_branches WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B session_branches"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# session_supporting_data
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_session_supporting_data_account_a_cannot_see_account_b(conn_a, session_row_ids):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM session_supporting_data WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B session_supporting_data"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# session_resolution_outputs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_session_resolution_outputs_account_a_cannot_see_account_b(conn_a, session_row_ids):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM session_resolution_outputs WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B session_resolution_outputs"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# session_handoffs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_session_handoffs_account_a_cannot_see_account_b(conn_a, session_row_ids):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM session_handoffs WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B session_handoffs"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# script_templates
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_script_templates_account_a_cannot_see_account_b(conn_a, session_row_ids):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM script_templates WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B script_templates"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# script_generations
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_script_generations_account_a_cannot_see_account_b(conn_a, session_row_ids):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM script_generations WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B script_generations"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# maintenance_schedules
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_maintenance_schedules_account_a_cannot_see_account_b(conn_a, session_row_ids):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM maintenance_schedules WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B maintenance_schedules"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# psa_post_log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_psa_post_log_account_a_cannot_see_account_b(conn_a, session_row_ids):
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM psa_post_log WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B psa_post_log"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# step_library — visibility-aware policy
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_step_library_account_a_cannot_see_account_b_private_steps(admin_conn, conn_a):
|
|
"""Private/non-public steps owned by Account B must not be visible to Account A."""
|
|
user_b_id = await _get_user_b_id(admin_conn)
|
|
private_step_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO step_library (
|
|
id, account_id, created_by, title, step_type, content,
|
|
visibility, is_active, created_at, updated_at
|
|
) VALUES (
|
|
'{private_step_id}', '{ACCOUNT_B_ID}', '{user_b_id}',
|
|
'RLS Private Step', 'action',
|
|
'{{}}'::jsonb, 'private', TRUE, NOW(), NOW()
|
|
)
|
|
""")
|
|
try:
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM step_library "
|
|
f"WHERE id = '{private_step_id}' AND visibility != 'public'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B's private step_library rows"
|
|
finally:
|
|
await admin_conn.execute(
|
|
f"DELETE FROM step_library WHERE id = '{private_step_id}'"
|
|
)
|
|
|
|
|
|
async def test_step_library_account_a_can_see_account_b_public_steps(admin_conn, conn_a):
|
|
"""Public steps owned by Account B MUST be visible to Account A (cross-tenant visibility)."""
|
|
user_b_id = await _get_user_b_id(admin_conn)
|
|
public_step_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO step_library (
|
|
id, account_id, created_by, title, step_type, content,
|
|
visibility, is_active, created_at, updated_at
|
|
) VALUES (
|
|
'{public_step_id}', '{ACCOUNT_B_ID}', '{user_b_id}',
|
|
'RLS Public Step', 'action',
|
|
'{{}}'::jsonb, 'public', TRUE, NOW(), NOW()
|
|
)
|
|
""")
|
|
try:
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM step_library WHERE id = '{public_step_id}'"
|
|
)
|
|
assert len(rows) == 1, (
|
|
"Account A should see public steps owned by Account B "
|
|
"(cross-tenant public visibility policy)"
|
|
)
|
|
finally:
|
|
await admin_conn.execute(
|
|
f"DELETE FROM step_library WHERE id = '{public_step_id}'"
|
|
)
|
|
|
|
|
|
# ===========================================================================
|
|
# Phase 3 RLS isolation tests
|
|
# Tables: step_ratings, step_usage_log, target_lists,
|
|
# session_shares, audit_logs, tree_shares
|
|
# ===========================================================================
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers shared by Phase 3 fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _get_user_b_id(admin_conn) -> str:
|
|
row = await admin_conn.fetchrow(
|
|
"SELECT id FROM users WHERE email = 'rls-user-b@example.com'"
|
|
)
|
|
return str(row["id"])
|
|
|
|
|
|
async def _get_tree_b_id(admin_conn) -> str:
|
|
row = await admin_conn.fetchrow(
|
|
f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_B_ID}' LIMIT 1"
|
|
)
|
|
return str(row["id"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# step_ratings
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_step_ratings_account_a_cannot_see_account_b(admin_conn, conn_a):
|
|
"""Account A must not see step ratings belonging to Account B."""
|
|
user_b_id = await _get_user_b_id(admin_conn)
|
|
|
|
# Need a step_library row as FK target
|
|
step_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO step_library (
|
|
id, account_id, created_by, title, step_type, content,
|
|
visibility, is_active, created_at, updated_at
|
|
) VALUES (
|
|
'{step_id}', '{ACCOUNT_B_ID}', '{user_b_id}',
|
|
'Phase3 RLS Step', 'action',
|
|
'{{}}'::jsonb, 'private', TRUE, NOW(), NOW()
|
|
)
|
|
""")
|
|
|
|
rating_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO step_ratings (
|
|
id, step_id, user_id, account_id, is_verified_use, is_visible,
|
|
created_at, updated_at
|
|
) VALUES (
|
|
'{rating_id}', '{step_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
|
|
FALSE, TRUE, NOW(), NOW()
|
|
)
|
|
""")
|
|
try:
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM step_ratings WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B step_ratings"
|
|
finally:
|
|
await admin_conn.execute(f"DELETE FROM step_ratings WHERE id = '{rating_id}'")
|
|
await admin_conn.execute(f"DELETE FROM step_library WHERE id = '{step_id}'")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# step_usage_log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_step_usage_log_account_a_cannot_see_account_b(admin_conn, conn_a):
|
|
"""Account A must not see step usage logs belonging to Account B."""
|
|
user_b_id = await _get_user_b_id(admin_conn)
|
|
tree_b_id = await _get_tree_b_id(admin_conn)
|
|
|
|
step_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO step_library (
|
|
id, account_id, created_by, title, step_type, content,
|
|
visibility, is_active, created_at, updated_at
|
|
) VALUES (
|
|
'{step_id}', '{ACCOUNT_B_ID}', '{user_b_id}',
|
|
'Phase3 Usage Step', 'action',
|
|
'{{}}'::jsonb, 'private', TRUE, NOW(), NOW()
|
|
)
|
|
""")
|
|
|
|
# Need a sessions row as FK for usage log
|
|
session_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO sessions (
|
|
id, tree_id, user_id, account_id, tree_snapshot,
|
|
path_taken, decisions, custom_steps, started_at
|
|
) VALUES (
|
|
'{session_id}', '{tree_b_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
|
|
'[]'::jsonb, '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, NOW()
|
|
)
|
|
""")
|
|
|
|
log_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO step_usage_log (
|
|
id, step_id, user_id, account_id, session_id, used_at
|
|
) VALUES (
|
|
'{log_id}', '{step_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
|
|
'{session_id}', NOW()
|
|
)
|
|
""")
|
|
try:
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM step_usage_log WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B step_usage_log"
|
|
finally:
|
|
await admin_conn.execute(f"DELETE FROM step_usage_log WHERE id = '{log_id}'")
|
|
await admin_conn.execute(f"DELETE FROM sessions WHERE id = '{session_id}'")
|
|
await admin_conn.execute(f"DELETE FROM step_library WHERE id = '{step_id}'")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# target_lists
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_target_lists_account_a_cannot_see_account_b(admin_conn, conn_a):
|
|
"""Account A must not see target lists belonging to Account B."""
|
|
user_b_id = await _get_user_b_id(admin_conn)
|
|
|
|
tl_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO target_lists (
|
|
id, account_id, created_by, name, targets, created_at, updated_at
|
|
) VALUES (
|
|
'{tl_id}', '{ACCOUNT_B_ID}', '{user_b_id}',
|
|
'Phase3 RLS Target List', '[]'::jsonb, NOW(), NOW()
|
|
)
|
|
""")
|
|
try:
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM target_lists WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B target_lists"
|
|
finally:
|
|
await admin_conn.execute(f"DELETE FROM target_lists WHERE id = '{tl_id}'")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# session_shares
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_session_shares_account_a_cannot_see_account_b(admin_conn, conn_a):
|
|
"""Account A must not see session shares belonging to Account B."""
|
|
user_b_id = await _get_user_b_id(admin_conn)
|
|
tree_b_id = await _get_tree_b_id(admin_conn)
|
|
|
|
# Need a sessions row as FK
|
|
session_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO sessions (
|
|
id, tree_id, user_id, account_id, tree_snapshot,
|
|
path_taken, decisions, custom_steps, started_at
|
|
) VALUES (
|
|
'{session_id}', '{tree_b_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
|
|
'[]'::jsonb, '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, NOW()
|
|
)
|
|
""")
|
|
|
|
share_id = str(uuid.uuid4())
|
|
share_token = f"phase3-rls-test-{share_id[:8]}"
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO session_shares (
|
|
id, session_id, account_id, share_token, visibility,
|
|
created_by, view_count, is_active, created_at, updated_at
|
|
) VALUES (
|
|
'{share_id}', '{session_id}', '{ACCOUNT_B_ID}',
|
|
'{share_token}', 'account', '{user_b_id}',
|
|
0, TRUE, NOW(), NOW()
|
|
)
|
|
""")
|
|
try:
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM session_shares WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B session_shares"
|
|
finally:
|
|
await admin_conn.execute(f"DELETE FROM session_shares WHERE id = '{share_id}'")
|
|
await admin_conn.execute(f"DELETE FROM sessions WHERE id = '{session_id}'")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# audit_logs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_audit_logs_account_a_cannot_see_account_b(admin_conn, conn_a):
|
|
"""Account A must not see audit logs belonging to Account B."""
|
|
user_b_id = await _get_user_b_id(admin_conn)
|
|
|
|
log_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO audit_logs (
|
|
id, user_id, account_id, action, resource_type, created_at
|
|
) VALUES (
|
|
'{log_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
|
|
'test.action', 'test_resource', NOW()
|
|
)
|
|
""")
|
|
try:
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM audit_logs WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B audit_logs"
|
|
finally:
|
|
await admin_conn.execute(f"DELETE FROM audit_logs WHERE id = '{log_id}'")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# tree_shares
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_tree_shares_account_a_cannot_see_account_b(admin_conn, conn_a):
|
|
"""Account A must not see tree shares belonging to Account B."""
|
|
user_b_id = await _get_user_b_id(admin_conn)
|
|
tree_b_id = await _get_tree_b_id(admin_conn)
|
|
|
|
share_id = str(uuid.uuid4())
|
|
share_token = f"phase3-tree-rls-{share_id[:8]}"
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO tree_shares (
|
|
id, tree_id, account_id, share_token, created_by,
|
|
allow_forking, created_at
|
|
) VALUES (
|
|
'{share_id}', '{tree_b_id}', '{ACCOUNT_B_ID}',
|
|
'{share_token}', '{user_b_id}', TRUE, NOW()
|
|
)
|
|
""")
|
|
try:
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM tree_shares WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B tree_shares"
|
|
finally:
|
|
await admin_conn.execute(f"DELETE FROM tree_shares WHERE id = '{share_id}'")
|
|
|
|
|
|
# ===========================================================================
|
|
# Phase 4 RLS isolation tests
|
|
# Tables: users, script_builder_sessions, ai_session_steps, notifications
|
|
#
|
|
# Note: platform_steps and template_trees have no account_id column and no RLS —
|
|
# they are globally readable by all authenticated users.
|
|
# ===========================================================================
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# users
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_users_account_a_cannot_see_account_b(admin_conn, conn_a):
|
|
"""Account A must not see users belonging to Account B."""
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM users WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B users"
|
|
|
|
|
|
async def test_users_account_a_can_see_own(admin_conn, conn_a):
|
|
"""Account A must be able to see its own users."""
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM users WHERE account_id = '{ACCOUNT_A_ID}'"
|
|
)
|
|
assert len(rows) > 0, "Account A should see its own users"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# script_builder_sessions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_script_builder_sessions_account_a_cannot_see_account_b(admin_conn, conn_a):
|
|
"""Account A must not see script builder sessions belonging to Account B."""
|
|
user_b_id = await _get_user_b_id(admin_conn)
|
|
|
|
session_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO script_builder_sessions (
|
|
id, user_id, account_id, language, origin, created_at, updated_at
|
|
) VALUES (
|
|
'{session_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
|
|
'powershell', 'standalone', NOW(), NOW()
|
|
)
|
|
""")
|
|
try:
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM script_builder_sessions WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B script_builder_sessions"
|
|
finally:
|
|
await admin_conn.execute(
|
|
f"DELETE FROM script_builder_sessions WHERE id = '{session_id}'"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ai_session_steps
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_ai_session_steps_account_a_cannot_see_account_b(admin_conn, conn_a):
|
|
"""Account A must not see ai_session_steps belonging to Account B."""
|
|
user_b_id = await _get_user_b_id(admin_conn)
|
|
tree_b_id = await _get_tree_b_id(admin_conn)
|
|
|
|
# Need an ai_sessions row as FK
|
|
ai_session_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO ai_sessions (
|
|
id, user_id, account_id, session_type, intake_type,
|
|
intake_content, status, confidence_tier, confidence_score,
|
|
created_at, updated_at
|
|
) VALUES (
|
|
'{ai_session_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
|
|
'guided', 'free_text', '{{}}'::jsonb, 'active', 'guided', 0.0,
|
|
NOW(), NOW()
|
|
)
|
|
""")
|
|
|
|
step_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO ai_session_steps (
|
|
id, session_id, account_id, step_order, step_type, content,
|
|
created_at
|
|
) VALUES (
|
|
'{step_id}', '{ai_session_id}', '{ACCOUNT_B_ID}',
|
|
1, 'question', '{{"text": "Phase4 RLS test step"}}'::jsonb, NOW()
|
|
)
|
|
""")
|
|
try:
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM ai_session_steps WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B ai_session_steps"
|
|
finally:
|
|
await admin_conn.execute(f"DELETE FROM ai_session_steps WHERE id = '{step_id}'")
|
|
await admin_conn.execute(f"DELETE FROM ai_sessions WHERE id = '{ai_session_id}'")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# notifications
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def test_notifications_account_a_cannot_see_account_b(admin_conn, conn_a):
|
|
"""Account A must not see notifications belonging to Account B."""
|
|
user_b_id = await _get_user_b_id(admin_conn)
|
|
|
|
notif_id = str(uuid.uuid4())
|
|
await admin_conn.execute(f"""
|
|
INSERT INTO notifications (
|
|
id, user_id, account_id, event, title, body,
|
|
is_read, created_at
|
|
) VALUES (
|
|
'{notif_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
|
|
'test_event', 'Phase4 RLS Test', 'RLS isolation test notification',
|
|
FALSE, NOW()
|
|
)
|
|
""")
|
|
try:
|
|
rows = await conn_a.fetch(
|
|
f"SELECT id FROM notifications WHERE account_id = '{ACCOUNT_B_ID}'"
|
|
)
|
|
assert len(rows) == 0, "Account A should not see Account B notifications"
|
|
finally:
|
|
await admin_conn.execute(f"DELETE FROM notifications WHERE id = '{notif_id}'")
|