test: add Phase 2 RLS isolation tests for 11 session tables (incl. step_library visibility regression)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-04-10 07:00:09 +00:00
parent 5bd331ca92
commit ed8de92c52

View File

@@ -264,3 +264,268 @@ async def test_flow_proposals_account_a_cannot_see_account_b(conn_a):
f"SELECT id FROM flow_proposals WHERE account_id = '{ACCOUNT_B_ID}'" f"SELECT id FROM flow_proposals WHERE account_id = '{ACCOUNT_B_ID}'"
) )
assert len(rows) == 0 assert len(rows) == 0
# ---------------------------------------------------------------------------
# Phase 2 fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
async def session_row_ids(admin_conn):
"""
Insert one `sessions` row and one `ai_sessions` row for each of
ACCOUNT_A and ACCOUNT_B using the superuser connection (BYPASSRLS).
Returns a dict with the inserted IDs for use in tests.
Cleans up on exit.
"""
# Resolve a valid tree_id and user_id for each account
tree_a = await admin_conn.fetchrow(
f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_A_ID}' LIMIT 1"
)
tree_b = await admin_conn.fetchrow(
f"SELECT id FROM trees WHERE account_id = '{ACCOUNT_B_ID}' LIMIT 1"
)
user_a = await admin_conn.fetchrow(
f"SELECT id FROM users WHERE account_id = '{ACCOUNT_A_ID}' LIMIT 1"
)
user_b = await admin_conn.fetchrow(
f"SELECT id FROM users WHERE account_id = '{ACCOUNT_B_ID}' LIMIT 1"
)
tree_a_id = str(tree_a["id"])
tree_b_id = str(tree_b["id"])
user_a_id = str(user_a["id"])
user_b_id = str(user_b["id"])
session_a_id = str(uuid.uuid4())
session_b_id = str(uuid.uuid4())
ai_session_a_id = str(uuid.uuid4())
ai_session_b_id = str(uuid.uuid4())
# Insert sessions rows
await admin_conn.execute(f"""
INSERT INTO sessions (
id, tree_id, user_id, account_id, tree_snapshot,
path_taken, decisions, custom_steps, created_at, updated_at
) VALUES
('{session_a_id}', '{tree_a_id}', '{user_a_id}', '{ACCOUNT_A_ID}',
'[]'::jsonb, '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, NOW(), NOW()),
('{session_b_id}', '{tree_b_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
'[]'::jsonb, '[]'::jsonb, '[]'::jsonb, '[]'::jsonb, NOW(), NOW())
""")
# Insert ai_sessions rows
await admin_conn.execute(f"""
INSERT INTO ai_sessions (
id, user_id, account_id, session_type, intake_type,
intake_content, status, confidence_tier, confidence_score,
created_at, updated_at
) VALUES
('{ai_session_a_id}', '{user_a_id}', '{ACCOUNT_A_ID}',
'guided', 'free_text', '{{}}'::jsonb, 'active', 'medium', 0.0,
NOW(), NOW()),
('{ai_session_b_id}', '{user_b_id}', '{ACCOUNT_B_ID}',
'guided', 'free_text', '{{}}'::jsonb, 'active', 'medium', 0.0,
NOW(), NOW())
""")
yield {
"session_a": session_a_id,
"session_b": session_b_id,
"ai_session_a": ai_session_a_id,
"ai_session_b": ai_session_b_id,
}
# Cleanup
await admin_conn.execute(
f"DELETE FROM sessions WHERE id IN ('{session_a_id}', '{session_b_id}')"
)
await admin_conn.execute(
f"DELETE FROM ai_sessions WHERE id IN ('{ai_session_a_id}', '{ai_session_b_id}')"
)
# ---------------------------------------------------------------------------
# sessions
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_sessions_account_a_cannot_see_account_b_sessions(conn_a, session_row_ids):
rows = await conn_a.fetch(
f"SELECT id FROM sessions WHERE id = '{session_row_ids['session_b']}'"
)
assert len(rows) == 0, "Account A should not see Account B sessions"
@pytest.mark.asyncio
async def test_sessions_account_a_can_see_own_sessions(conn_a, session_row_ids):
rows = await conn_a.fetch(
f"SELECT id FROM sessions WHERE id = '{session_row_ids['session_a']}'"
)
assert len(rows) == 1, "Account A should see its own sessions"
@pytest.mark.asyncio
async def test_sessions_no_context_sees_nothing(conn_no_context, session_row_ids):
rows = await conn_no_context.fetch(
f"SELECT id FROM sessions WHERE id IN "
f"('{session_row_ids['session_a']}', '{session_row_ids['session_b']}')"
)
assert len(rows) == 0, "No-context connection should see no sessions"
# ---------------------------------------------------------------------------
# ai_sessions
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ai_sessions_account_a_cannot_see_account_b(conn_a, session_row_ids):
rows = await conn_a.fetch(
f"SELECT id FROM ai_sessions WHERE id = '{session_row_ids['ai_session_b']}'"
)
assert len(rows) == 0, "Account A should not see Account B ai_sessions"
@pytest.mark.asyncio
async def test_ai_sessions_account_a_can_see_own(conn_a, session_row_ids):
rows = await conn_a.fetch(
f"SELECT id FROM ai_sessions WHERE id = '{session_row_ids['ai_session_a']}'"
)
assert len(rows) == 1, "Account A should see its own ai_sessions"
# ---------------------------------------------------------------------------
# session_branches
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_session_branches_account_a_cannot_see_account_b(conn_a):
rows = await conn_a.fetch(
f"SELECT id FROM session_branches WHERE account_id = '{ACCOUNT_B_ID}'"
)
assert len(rows) == 0, "Account A should not see Account B session_branches"
# ---------------------------------------------------------------------------
# session_supporting_data
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_session_supporting_data_account_a_cannot_see_account_b(conn_a):
rows = await conn_a.fetch(
f"SELECT id FROM session_supporting_data WHERE account_id = '{ACCOUNT_B_ID}'"
)
assert len(rows) == 0, "Account A should not see Account B session_supporting_data"
# ---------------------------------------------------------------------------
# session_resolution_outputs
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_session_resolution_outputs_account_a_cannot_see_account_b(conn_a):
rows = await conn_a.fetch(
f"SELECT id FROM session_resolution_outputs WHERE account_id = '{ACCOUNT_B_ID}'"
)
assert len(rows) == 0, "Account A should not see Account B session_resolution_outputs"
# ---------------------------------------------------------------------------
# session_handoffs
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_session_handoffs_account_a_cannot_see_account_b(conn_a):
rows = await conn_a.fetch(
f"SELECT id FROM session_handoffs WHERE account_id = '{ACCOUNT_B_ID}'"
)
assert len(rows) == 0, "Account A should not see Account B session_handoffs"
# ---------------------------------------------------------------------------
# script_templates
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_script_templates_account_a_cannot_see_account_b(conn_a):
rows = await conn_a.fetch(
f"SELECT id FROM script_templates WHERE account_id = '{ACCOUNT_B_ID}'"
)
assert len(rows) == 0, "Account A should not see Account B script_templates"
# ---------------------------------------------------------------------------
# script_generations
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_script_generations_account_a_cannot_see_account_b(conn_a):
rows = await conn_a.fetch(
f"SELECT id FROM script_generations WHERE account_id = '{ACCOUNT_B_ID}'"
)
assert len(rows) == 0, "Account A should not see Account B script_generations"
# ---------------------------------------------------------------------------
# maintenance_schedules
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_maintenance_schedules_account_a_cannot_see_account_b(conn_a):
rows = await conn_a.fetch(
f"SELECT id FROM maintenance_schedules WHERE account_id = '{ACCOUNT_B_ID}'"
)
assert len(rows) == 0, "Account A should not see Account B maintenance_schedules"
# ---------------------------------------------------------------------------
# psa_post_log
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_psa_post_log_account_a_cannot_see_account_b(conn_a):
rows = await conn_a.fetch(
f"SELECT id FROM psa_post_log WHERE account_id = '{ACCOUNT_B_ID}'"
)
assert len(rows) == 0, "Account A should not see Account B psa_post_log"
# ---------------------------------------------------------------------------
# step_library — visibility-aware policy
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_step_library_account_a_cannot_see_account_b_private_steps(conn_a):
"""Private/non-public steps owned by Account B must not be visible to Account A."""
rows = await conn_a.fetch(
f"SELECT id FROM step_library "
f"WHERE account_id = '{ACCOUNT_B_ID}' AND visibility != 'public'"
)
assert len(rows) == 0, "Account A should not see Account B's private step_library rows"
@pytest.mark.asyncio
async def test_step_library_account_a_can_see_account_b_public_steps(admin_conn, conn_a):
"""Public steps owned by Account B MUST be visible to Account A (cross-tenant visibility)."""
public_step_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO step_library (
id, account_id, title, step_type, content,
visibility, is_active, created_at, updated_at
) VALUES (
'{public_step_id}', '{ACCOUNT_B_ID}', 'RLS Public Step', 'action',
'{{}}'::jsonb, 'public', TRUE, NOW(), NOW()
)
""")
try:
rows = await conn_a.fetch(
f"SELECT id FROM step_library WHERE id = '{public_step_id}'"
)
assert len(rows) == 1, (
"Account A should see public steps owned by Account B "
"(cross-tenant public visibility policy)"
)
finally:
await admin_conn.execute(
f"DELETE FROM step_library WHERE id = '{public_step_id}'"
)