Merge feat/l1-workspace into integration branch
Some checks failed
Mirror to GitHub / mirror (push) Successful in 5s
CI / frontend (pull_request) Failing after 1m29s
CI / e2e (pull_request) Failing after 6m23s
CI / backend (pull_request) Successful in 11m30s

# Conflicts:
#	frontend/src/router.tsx
This commit is contained in:
2026-05-28 23:51:50 -04:00
74 changed files with 13239 additions and 98 deletions

View File

@@ -105,7 +105,7 @@ assert "test" in _test_db_name, (
)
_RUN_RLS_TESTS = os.environ.get("RUN_RLS_TESTS") == "1"
_RLS_ISOLATION_FILE = "test_rls_isolation.py"
_RLS_TEST_FILES = {"test_rls_isolation.py", "test_l1_rls.py"}
def pytest_collection_modifyitems(config, items):
@@ -117,7 +117,9 @@ def pytest_collection_modifyitems(config, items):
deselected = []
for item in items:
item_path = getattr(item, "path", None) or getattr(item, "fspath", None)
if item_path and str(item_path).endswith(_RLS_ISOLATION_FILE):
if item_path and any(
str(item_path).endswith(f) for f in _RLS_TEST_FILES
):
deselected.append(item)
else:
selected.append(item)

View File

@@ -0,0 +1,99 @@
"""Unit tests for L1-related dependency guards.
Uses MagicMock user objects — no database required.
"""
from unittest.mock import MagicMock
from uuid import uuid4
import pytest
from fastapi import HTTPException
from app.api.deps import require_l1, require_l1_or_coverage, require_l1_or_above
def _make_user(account_role="engineer", is_super_admin=False, can_cover_l1=False):
user = MagicMock()
user.id = uuid4()
user.account_role = account_role
user.is_super_admin = is_super_admin
user.can_cover_l1 = can_cover_l1
return user
# ---------------------------------------------------------------------------
# require_l1
# ---------------------------------------------------------------------------
async def test_require_l1_passes_for_l1_tech():
user = _make_user(account_role="l1_tech")
result = await require_l1(current_user=user)
assert result is user
async def test_require_l1_passes_for_super_admin():
user = _make_user(account_role="owner", is_super_admin=True)
result = await require_l1(current_user=user)
assert result is user
async def test_require_l1_blocks_engineer():
user = _make_user(account_role="engineer")
with pytest.raises(HTTPException) as exc:
await require_l1(current_user=user)
assert exc.value.status_code == 403
# ---------------------------------------------------------------------------
# require_l1_or_coverage
# ---------------------------------------------------------------------------
async def test_require_l1_or_coverage_passes_l1_tech():
user = _make_user(account_role="l1_tech")
result = await require_l1_or_coverage(current_user=user)
assert result is user
async def test_require_l1_or_coverage_passes_engineer_with_flag():
user = _make_user(account_role="engineer", can_cover_l1=True)
result = await require_l1_or_coverage(current_user=user)
assert result is user
async def test_require_l1_or_coverage_blocks_engineer_without_flag():
user = _make_user(account_role="engineer", can_cover_l1=False)
with pytest.raises(HTTPException) as exc:
await require_l1_or_coverage(current_user=user)
assert exc.value.status_code == 403
async def test_require_l1_or_coverage_passes_owner_always():
user = _make_user(account_role="owner")
result = await require_l1_or_coverage(current_user=user)
assert result is user
# ---------------------------------------------------------------------------
# require_l1_or_above
# ---------------------------------------------------------------------------
async def test_require_l1_or_above_passes_engineer():
user = _make_user(account_role="engineer")
result = await require_l1_or_above(current_user=user)
assert result is user
async def test_require_l1_or_above_passes_l1_tech():
user = _make_user(account_role="l1_tech")
result = await require_l1_or_above(current_user=user)
assert result is user
async def test_require_l1_or_above_blocks_viewer():
user = _make_user(account_role="viewer")
with pytest.raises(HTTPException) as exc:
await require_l1_or_above(current_user=user)
assert exc.value.status_code == 403

View File

@@ -0,0 +1,182 @@
"""Unit + integration tests for internal_ticket_service."""
import uuid
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.account import Account
from app.models.user import User
from app.services.internal_ticket_service import (
create_ticket, update_status, get_ticket,
list_tickets_for_account, promote_to_psa,
)
# ---------------------------------------------------------------------------
# Test helpers
# ---------------------------------------------------------------------------
async def _make_account(db: AsyncSession) -> Account:
s = str(uuid.uuid4())[:8]
account = Account(
id=uuid.uuid4(),
name=f"Test Account {s}",
display_code=s[:8],
)
db.add(account)
await db.flush()
return account
async def _make_user(
db: AsyncSession,
*,
account_id: uuid.UUID,
role: str = "l1_tech",
) -> User:
s = str(uuid.uuid4())[:8]
user = User(
id=uuid.uuid4(),
email=f"user-{s}@example.com",
name=f"User {s}",
account_id=account_id,
account_role=role,
role="engineer",
is_active=True,
)
db.add(user)
await db.flush()
return user
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_create_ticket_sets_status_open(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await create_ticket(
test_db,
account_id=account.id,
created_by_user_id=l1.id,
problem_statement="Outlook can't connect",
customer_name="Alice",
)
assert ticket.status == 'open'
assert ticket.account_id == account.id
assert ticket.customer_name == "Alice"
assert ticket.created_by_user_id == l1.id
@pytest.mark.asyncio
async def test_update_status_to_resolved_sets_resolved_at(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await create_ticket(
test_db,
account_id=account.id,
created_by_user_id=l1.id,
problem_statement="Test",
)
assert ticket.resolved_at is None
updated = await update_status(
test_db,
ticket_id=ticket.id,
status='resolved',
resolution_notes="Fixed via reboot",
)
assert updated.status == 'resolved'
assert updated.resolved_at is not None
assert updated.resolution_notes == "Fixed via reboot"
@pytest.mark.asyncio
async def test_update_status_to_escalated_does_not_set_resolved_at(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await create_ticket(
test_db, account_id=account.id, created_by_user_id=l1.id,
problem_statement="x",
)
updated = await update_status(test_db, ticket_id=ticket.id, status='escalated')
assert updated.status == 'escalated'
assert updated.resolved_at is None
@pytest.mark.asyncio
async def test_update_status_assigns_user(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
engineer = await _make_user(test_db, account_id=account.id, role="engineer")
ticket = await create_ticket(
test_db, account_id=account.id, created_by_user_id=l1.id,
problem_statement="x",
)
updated = await update_status(
test_db, ticket_id=ticket.id, status='escalated',
assigned_user_id=engineer.id,
)
assert updated.assigned_user_id == engineer.id
@pytest.mark.asyncio
async def test_get_ticket_returns_none_for_missing_id(test_db: AsyncSession):
result = await get_ticket(test_db, ticket_id=uuid.uuid4())
assert result is None
@pytest.mark.asyncio
async def test_list_tickets_filters_by_account(test_db: AsyncSession):
account_a = await _make_account(test_db)
account_b = await _make_account(test_db)
l1_a = await _make_user(test_db, account_id=account_a.id)
l1_b = await _make_user(test_db, account_id=account_b.id)
ticket_a = await create_ticket(
test_db, account_id=account_a.id, created_by_user_id=l1_a.id,
problem_statement="A",
)
ticket_b = await create_ticket(
test_db, account_id=account_b.id, created_by_user_id=l1_b.id,
problem_statement="B",
)
rows = await list_tickets_for_account(test_db, account_id=account_a.id)
ids = [r.id for r in rows]
assert ticket_a.id in ids
assert ticket_b.id not in ids
@pytest.mark.asyncio
async def test_list_tickets_filters_by_status(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
open_t = await create_ticket(
test_db, account_id=account.id, created_by_user_id=l1.id,
problem_statement="open",
)
resolved_t = await create_ticket(
test_db, account_id=account.id, created_by_user_id=l1.id,
problem_statement="r",
)
await update_status(test_db, ticket_id=resolved_t.id, status='resolved')
open_rows = await list_tickets_for_account(test_db, account_id=account.id, status='open')
assert open_t.id in [r.id for r in open_rows]
assert resolved_t.id not in [r.id for r in open_rows]
@pytest.mark.asyncio
async def test_promote_to_psa_sets_external_id(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await create_ticket(
test_db, account_id=account.id, created_by_user_id=l1.id,
problem_statement="x",
)
updated = await promote_to_psa(test_db, ticket_id=ticket.id, psa_ticket_id="CW-12345")
assert updated.psa_promoted_ticket_id == "CW-12345"
@pytest.mark.asyncio
async def test_update_status_raises_for_missing_ticket(test_db: AsyncSession):
with pytest.raises(ValueError, match="not found"):
await update_status(test_db, ticket_id=uuid.uuid4(), status='resolved')

View File

@@ -0,0 +1,564 @@
"""Integration tests for seat enforcement at invite create, accept-invite, and
role-change endpoints.
All tests use the `client` + `test_db` fixtures from conftest, which spin up
a fresh schema per test and wire the ASGI app to the test DB.
"""
import uuid
import pytest
from httpx import AsyncClient
from sqlalchemy import delete
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.account import Account
from app.models.account_invite import AccountInvite
from app.models.subscription import Subscription
from app.models.user import User
# ---------------------------------------------------------------------------
# Test-local helpers
# ---------------------------------------------------------------------------
async def _register(client: AsyncClient, *, email: str, password: str = "TestPassword123!", name: str = "Test User") -> dict:
resp = await client.post("/api/v1/auth/register", json={"email": email, "password": password, "name": name})
assert resp.status_code in (200, 201), resp.text
return resp.json()
async def _login(client: AsyncClient, *, email: str, password: str = "TestPassword123!") -> dict:
resp = await client.post("/api/v1/auth/login/json", json={"email": email, "password": password})
assert resp.status_code == 200, resp.text
return {"Authorization": f"Bearer {resp.json()['access_token']}"}
async def _set_sub(db: AsyncSession, account_id: uuid.UUID, *, seat_limit: int | None, l1_seat_limit: int | None = None) -> None:
"""Replace the account's subscription with specified limits."""
await db.execute(delete(Subscription).where(Subscription.account_id == account_id))
db.add(Subscription(
account_id=account_id,
plan="pro",
status="active",
seat_limit=seat_limit,
l1_seat_limit=l1_seat_limit,
))
await db.commit()
async def _add_member(db: AsyncSession, account_id: uuid.UUID, *, role: str, suffix: str | None = None) -> User:
"""Directly insert an active user with the given role into the account."""
s = suffix or str(uuid.uuid4())[:8]
user = User(
id=uuid.uuid4(),
email=f"member-{s}@example.com",
name=f"Member {s}",
account_id=account_id,
account_role=role,
role="engineer",
is_active=True,
)
db.add(user)
await db.commit()
return user
# ---------------------------------------------------------------------------
# Invite create — single invite endpoint
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_invite_engineer_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 402 when engineer seat limit is exhausted."""
owner = await _register(client, email="owner1@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner1@example.com")
# seat_limit=1, already 1 engineer → full
await _set_sub(test_db, account_id, seat_limit=1)
# The owner registers as engineer, but is actually 'owner' role — add a separate engineer
await _add_member(test_db, account_id, role="engineer")
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "new-eng@example.com", "role": "engineer"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
assert body["detail"]["role"] == "engineer"
assert body["detail"]["current"] == 1
assert body["detail"]["limit"] == 1
assert "upgrade_url" in body["detail"]
@pytest.mark.asyncio
async def test_invite_l1_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 402 when l1_tech seat limit is exhausted."""
owner = await _register(client, email="owner2@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner2@example.com")
await _set_sub(test_db, account_id, seat_limit=10, l1_seat_limit=1)
await _add_member(test_db, account_id, role="l1_tech")
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "new-l1@example.com", "role": "l1_tech"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
assert body["detail"]["role"] == "l1_tech"
assert body["detail"]["current"] == 1
assert body["detail"]["limit"] == 1
@pytest.mark.asyncio
async def test_invite_succeeds_when_seats_available(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 201 when engineer seats have room."""
owner = await _register(client, email="owner3@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner3@example.com")
# seat_limit=5, 0 engineers → plenty of room
await _set_sub(test_db, account_id, seat_limit=5)
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "new-eng2@example.com", "role": "engineer"},
headers=headers,
)
assert resp.status_code == 201, resp.text
@pytest.mark.asyncio
async def test_invite_viewer_bypasses_seat_check(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 201 for viewer role even when engineer seats full."""
owner = await _register(client, email="owner4@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner4@example.com")
# engineer seats exhausted — should not affect viewer invites
await _set_sub(test_db, account_id, seat_limit=1)
await _add_member(test_db, account_id, role="engineer")
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "viewer@example.com", "role": "viewer"},
headers=headers,
)
assert resp.status_code == 201, resp.text
@pytest.mark.asyncio
async def test_invite_unlimited_seat_limit_always_succeeds(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 201 when seat_limit is None (unlimited)."""
owner = await _register(client, email="owner5@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner5@example.com")
# seat_limit=None = unlimited
await _set_sub(test_db, account_id, seat_limit=None)
# add many engineers
for i in range(5):
await _add_member(test_db, account_id, role="engineer", suffix=f"bulk{i}")
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "new-unlimited@example.com", "role": "engineer"},
headers=headers,
)
assert resp.status_code == 201, resp.text
@pytest.mark.asyncio
async def test_bulk_invite_per_row_402_preserves_structured_detail(client: AsyncClient, test_db: AsyncSession):
"""Bulk invite returns 200 overall; rows that hit the seat limit appear in the
`failed` list with structured detail (not a stringified repr)."""
owner = await _register(client, email="owner_bulk@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner_bulk@example.com")
# seat_limit=1, already 1 engineer → next engineer invite fails
await _set_sub(test_db, account_id, seat_limit=1)
await _add_member(test_db, account_id, role="engineer")
resp = await client.post(
"/api/v1/accounts/me/invites/bulk",
json={"invites": [
{"email": "viewer-ok@example.com", "role": "viewer"},
{"email": "eng-blocked@example.com", "role": "engineer"},
]},
headers=headers,
)
assert resp.status_code in (200, 201), resp.text
body = resp.json()
assert len(body["created"]) == 1
assert body["created"][0]["email"] == "viewer-ok@example.com"
assert len(body["failed"]) == 1
failed_row = body["failed"][0]
assert failed_row["email"] == "eng-blocked@example.com"
# Structured detail preserved (dict, not repr string)
assert isinstance(failed_row["error"], dict)
assert failed_row["error"]["code"] == "seat_limit_exceeded"
assert failed_row["error"]["role"] == "engineer"
@pytest.mark.asyncio
async def test_invite_grandfathered_account_blocks_new_invites(client: AsyncClient, test_db: AsyncSession):
"""Grandfathering: existing over-seated account keeps existing users but
new engineer invites are still blocked (current > limit → blocked)."""
owner = await _register(client, email="owner6@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner6@example.com")
# current=3 engineers > seat_limit=2 (over-seated / grandfathered)
await _set_sub(test_db, account_id, seat_limit=2)
for i in range(3):
await _add_member(test_db, account_id, role="engineer", suffix=f"gf{i}")
# New invite must be blocked
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "one-more@example.com", "role": "engineer"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
# current (3) > limit (2) — forward enforcement fires, existing users unaffected
assert body["detail"]["current"] == 3
assert body["detail"]["limit"] == 2
# ---------------------------------------------------------------------------
# Accept-invite race condition — auth.py register path
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_accept_invite_blocked_when_seats_full_at_accept_time(client: AsyncClient, test_db: AsyncSession):
"""Race-condition guard: invite created when seats available, but by
accept time someone else consumed the last seat → 402."""
# Step 1: create an owner and send an invite
owner = await _register(client, email="owner7@example.com")
account_id = uuid.UUID(owner["account_id"])
owner_headers = await _login(client, email="owner7@example.com")
await _set_sub(test_db, account_id, seat_limit=2)
invite_resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "race@example.com", "role": "engineer"},
headers=owner_headers,
)
assert invite_resp.status_code == 201, invite_resp.text
invite_code = invite_resp.json()["code"]
# Step 2: fill the seats after the invite was created (race condition)
await _add_member(test_db, account_id, role="engineer", suffix="race1")
await _add_member(test_db, account_id, role="engineer", suffix="race2")
# Step 3: invitee tries to register — should get 402
resp = await client.post(
"/api/v1/auth/register",
json={
"email": "race@example.com",
"password": "TestPassword123!",
"name": "Race User",
"account_invite_code": invite_code,
},
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
@pytest.mark.asyncio
async def test_accept_invite_succeeds_when_seats_available(client: AsyncClient, test_db: AsyncSession):
"""Normal accept-invite path works when seats have room."""
owner = await _register(client, email="owner8@example.com")
account_id = uuid.UUID(owner["account_id"])
owner_headers = await _login(client, email="owner8@example.com")
await _set_sub(test_db, account_id, seat_limit=5)
invite_resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "acceptme@example.com", "role": "engineer"},
headers=owner_headers,
)
assert invite_resp.status_code == 201, invite_resp.text
invite_code = invite_resp.json()["code"]
resp = await client.post(
"/api/v1/auth/register",
json={
"email": "acceptme@example.com",
"password": "TestPassword123!",
"name": "Accept User",
"account_invite_code": invite_code,
},
)
assert resp.status_code in (200, 201), resp.text
assert resp.json()["account_id"] == str(account_id)
# ---------------------------------------------------------------------------
# Role-change endpoint — PATCH /me/members/{user_id}/role
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_role_change_viewer_to_engineer_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
"""PATCH /me/members/{id}/role → 402 when promoting viewer → engineer and seats full."""
owner = await _register(client, email="owner9@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner9@example.com")
await _set_sub(test_db, account_id, seat_limit=1)
# Fill the engineer seat
await _add_member(test_db, account_id, role="engineer")
# Add a viewer to promote
viewer = await _add_member(test_db, account_id, role="viewer")
resp = await client.patch(
f"/api/v1/accounts/me/members/{viewer.id}/role",
json={"account_role": "engineer"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
assert body["detail"]["role"] == "engineer"
@pytest.mark.asyncio
async def test_role_change_viewer_to_l1_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
"""PATCH /me/members/{id}/role → 402 when promoting viewer → l1_tech and l1 seats full."""
owner = await _register(client, email="owner10@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner10@example.com")
await _set_sub(test_db, account_id, seat_limit=10, l1_seat_limit=1)
await _add_member(test_db, account_id, role="l1_tech")
viewer = await _add_member(test_db, account_id, role="viewer")
resp = await client.patch(
f"/api/v1/accounts/me/members/{viewer.id}/role",
json={"account_role": "l1_tech"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
assert body["detail"]["role"] == "l1_tech"
@pytest.mark.asyncio
async def test_role_change_promotion_succeeds_when_seats_available(client: AsyncClient, test_db: AsyncSession):
"""PATCH /me/members/{id}/role → 200 when seats are available."""
owner = await _register(client, email="owner11@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner11@example.com")
await _set_sub(test_db, account_id, seat_limit=5)
viewer = await _add_member(test_db, account_id, role="viewer")
resp = await client.patch(
f"/api/v1/accounts/me/members/{viewer.id}/role",
json={"account_role": "engineer"},
headers=headers,
)
assert resp.status_code == 200, resp.text
assert resp.json()["account_role"] == "engineer"
@pytest.mark.asyncio
async def test_role_change_demotion_bypasses_seat_check(client: AsyncClient, test_db: AsyncSession):
"""PATCH /me/members/{id}/role → 200 for demotions even when seats full."""
owner = await _register(client, email="owner12@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner12@example.com")
# Seats full — but demotion should still succeed
await _set_sub(test_db, account_id, seat_limit=1)
engineer = await _add_member(test_db, account_id, role="engineer")
resp = await client.patch(
f"/api/v1/accounts/me/members/{engineer.id}/role",
json={"account_role": "viewer"},
headers=headers,
)
assert resp.status_code == 200, resp.text
assert resp.json()["account_role"] == "viewer"
# ---------------------------------------------------------------------------
# GET /me/seats — seat counter widget endpoint
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_seats_returns_both_role_counts(client: AsyncClient, test_db: AsyncSession):
"""GET /accounts/me/seats returns engineer + l1_tech seat usage."""
owner = await _register(client, email="owner_seats@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner_seats@example.com")
await _set_sub(test_db, account_id, seat_limit=5, l1_seat_limit=3)
# Add 2 engineers and 1 l1_tech as members
for i in range(2):
await _add_member(test_db, account_id, role="engineer", suffix=f"e{i}")
await _add_member(test_db, account_id, role="l1_tech", suffix="l1")
resp = await client.get("/api/v1/accounts/me/seats", headers=headers)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["engineer"]["role"] == "engineer"
assert body["engineer"]["current"] == 2
assert body["engineer"]["limit"] == 5
assert body["engineer"]["available"] is True
assert body["l1_tech"]["role"] == "l1_tech"
assert body["l1_tech"]["current"] == 1
assert body["l1_tech"]["limit"] == 3
assert body["l1_tech"]["available"] is True
@pytest.mark.asyncio
async def test_get_seats_blocked_for_viewer(client: AsyncClient, test_db: AsyncSession):
"""GET /accounts/me/seats → 403 for viewer role (engineer+ required)."""
from app.core.security import get_password_hash
# Register an owner for the account
owner = await _register(client, email="owner_seats2@example.com")
account_id = uuid.UUID(owner["account_id"])
await _set_sub(test_db, account_id, seat_limit=5, l1_seat_limit=3)
# Create a viewer user with a known password directly in the DB
viewer_password = "ViewerPass123!"
viewer = User(
id=uuid.uuid4(),
email="viewer_seats@example.com",
name="Viewer Seats",
account_id=account_id,
account_role="viewer",
role="engineer", # system role field (default)
is_active=True,
password_hash=get_password_hash(viewer_password),
)
test_db.add(viewer)
await test_db.commit()
# Log in as the viewer
viewer_headers = await _login(client, email="viewer_seats@example.com", password=viewer_password)
resp = await client.get("/api/v1/accounts/me/seats", headers=viewer_headers)
assert resp.status_code == 403, resp.text
# ---------------------------------------------------------------------------
# PATCH /me/members/{user_id}/coverage — engineer L1-coverage flag
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_coverage_owner_can_toggle_engineer(client: AsyncClient, test_db: AsyncSession):
"""Owner can set can_cover_l1=True on an engineer; response reflects new value."""
owner = await _register(client, email="owner_cov1@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner_cov1@example.com")
engineer = await _add_member(test_db, account_id, role="engineer", suffix="cov1")
resp = await client.patch(
f"/api/v1/accounts/me/members/{engineer.id}/coverage",
json={"can_cover_l1": True},
headers=headers,
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["can_cover_l1"] is True
# Toggle back to False
resp2 = await client.patch(
f"/api/v1/accounts/me/members/{engineer.id}/coverage",
json={"can_cover_l1": False},
headers=headers,
)
assert resp2.status_code == 200, resp2.text
assert resp2.json()["can_cover_l1"] is False
@pytest.mark.asyncio
async def test_coverage_non_owner_is_forbidden(client: AsyncClient, test_db: AsyncSession):
"""A non-owner engineer cannot toggle coverage on themselves or others."""
from app.core.security import get_password_hash
owner = await _register(client, email="owner_cov2@example.com")
account_id = uuid.UUID(owner["account_id"])
# Create an engineer with a known password
eng_password = "EngPass123!"
engineer = User(
id=uuid.uuid4(),
email="eng_cov2@example.com",
name="Eng Cov2",
account_id=account_id,
account_role="engineer",
role="engineer",
is_active=True,
password_hash=get_password_hash(eng_password),
)
test_db.add(engineer)
await test_db.commit()
eng_headers = await _login(client, email="eng_cov2@example.com", password=eng_password)
resp = await client.patch(
f"/api/v1/accounts/me/members/{engineer.id}/coverage",
json={"can_cover_l1": True},
headers=eng_headers,
)
assert resp.status_code == 403, resp.text
@pytest.mark.asyncio
async def test_coverage_viewer_role_returns_422(client: AsyncClient, test_db: AsyncSession):
"""PATCH coverage on a viewer → 422 (coverage flag only applies to engineers)."""
owner = await _register(client, email="owner_cov3@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner_cov3@example.com")
viewer = await _add_member(test_db, account_id, role="viewer", suffix="cov3")
resp = await client.patch(
f"/api/v1/accounts/me/members/{viewer.id}/coverage",
json={"can_cover_l1": True},
headers=headers,
)
assert resp.status_code == 422, resp.text
assert "engineer" in resp.json()["detail"].lower()
@pytest.mark.asyncio
async def test_coverage_cross_account_returns_404(client: AsyncClient, test_db: AsyncSession):
"""PATCH coverage on a user from a different account → 404 (tenancy isolation)."""
# Account A
owner_a = await _register(client, email="owner_cov_a@example.com")
account_a_id = uuid.UUID(owner_a["account_id"])
headers_a = await _login(client, email="owner_cov_a@example.com")
# Account B — a separate registration creates a new account
owner_b = await _register(client, email="owner_cov_b@example.com")
account_b_id = uuid.UUID(owner_b["account_id"])
# Add an engineer to account B
engineer_b = await _add_member(test_db, account_b_id, role="engineer", suffix="covb")
# Owner of account A tries to patch account B's engineer — must 404
resp = await client.patch(
f"/api/v1/accounts/me/members/{engineer_b.id}/coverage",
json={"can_cover_l1": True},
headers=headers_a,
)
assert resp.status_code == 404, resp.text

View File

@@ -0,0 +1,362 @@
"""Integration tests for the /l1/* endpoint surface (Task 15).
All tests use the `client` + `test_db` fixtures from conftest.
"""
import uuid
from datetime import datetime, timezone, timedelta
import pytest
from httpx import AsyncClient
from sqlalchemy import delete
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.subscription import Subscription
from app.models.user import User
from app.models.l1_walk_session import L1WalkSession
# ---------------------------------------------------------------------------
# Test-local helpers
# ---------------------------------------------------------------------------
async def _register(client: AsyncClient, *, email: str, password: str = "TestPassword123!", name: str = "Test User") -> dict:
resp = await client.post("/api/v1/auth/register", json={"email": email, "password": password, "name": name})
assert resp.status_code in (200, 201), resp.text
return resp.json()
async def _login(client: AsyncClient, *, email: str, password: str = "TestPassword123!") -> dict:
resp = await client.post("/api/v1/auth/login/json", json={"email": email, "password": password})
assert resp.status_code == 200, resp.text
return {"Authorization": f"Bearer {resp.json()['access_token']}"}
async def _ensure_subscription(db: AsyncSession, account_id: uuid.UUID) -> None:
"""Ensure account has an active Pro subscription."""
await db.execute(delete(Subscription).where(Subscription.account_id == account_id))
db.add(Subscription(account_id=account_id, plan="pro", status="active"))
await db.commit()
async def _make_l1_user(
client: AsyncClient,
db: AsyncSession,
*,
email: str,
account_id: uuid.UUID | None = None,
) -> dict:
"""Register a user, set role=l1_tech, ensure subscription.
If account_id is given, inserts a second user directly into that account.
Otherwise registers a fresh user via the API (new account) and returns
both user data and login headers.
"""
if account_id is None:
user_data = await _register(client, email=email)
uid = uuid.UUID(user_data["id"])
acct_id = uuid.UUID(user_data["account_id"])
# Promote to l1_tech
from sqlalchemy import select as sa_select
result = await db.execute(sa_select(User).where(User.id == uid))
user = result.scalar_one()
user.account_role = "l1_tech"
await db.commit()
await _ensure_subscription(db, acct_id)
headers = await _login(client, email=email)
return {"user_data": user_data, "headers": headers, "account_id": acct_id}
else:
# Insert directly into an existing account
s = str(uuid.uuid4())[:8]
user = User(
id=uuid.uuid4(),
email=email,
name=f"L1 Tech {s}",
account_id=account_id,
account_role="l1_tech",
role="engineer",
is_active=True,
hashed_password="$2b$12$placeholder.placeholder.placeholder.placeholder.plac",
)
db.add(user)
await db.commit()
return {"user_data": {"id": str(user.id), "account_id": str(account_id)}, "headers": None}
# ---------------------------------------------------------------------------
# 1. Intake without flow_id → 200 + session_kind='adhoc'
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_intake_adhoc(client: AsyncClient, test_db: AsyncSession):
"""POST /l1/intake without flow_id creates adhoc session."""
info = await _make_l1_user(client, test_db, email="l1intake@example.com")
headers = info["headers"]
resp = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "Printer won't turn on", "customer_name": "Alice"},
headers=headers,
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["session_kind"] == "adhoc"
assert body["ticket_kind"] == "internal"
assert "session_id" in body
assert "ticket_id" in body
# ---------------------------------------------------------------------------
# 2. Intake without auth → 401
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_intake_no_auth(client: AsyncClient, test_db: AsyncSession):
"""POST /l1/intake without token → 401."""
resp = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "Test"},
)
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# 3. Intake as viewer → 403
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_intake_viewer_forbidden(client: AsyncClient, test_db: AsyncSession):
"""POST /l1/intake as viewer role → 403."""
user_data = await _register(client, email="viewer_l1@example.com")
uid = uuid.UUID(user_data["id"])
acct_id = uuid.UUID(user_data["account_id"])
from sqlalchemy import select as sa_select
result = await test_db.execute(sa_select(User).where(User.id == uid))
user = result.scalar_one()
user.account_role = "viewer"
await test_db.commit()
await _ensure_subscription(test_db, acct_id)
headers = await _login(client, email="viewer_l1@example.com")
resp = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "Test"},
headers=headers,
)
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# 4. Step on adhoc session → 400 (cannot step an adhoc)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_step_on_adhoc_returns_400(client: AsyncClient, test_db: AsyncSession):
"""POST /l1/sessions/{id}/step on adhoc session → 400."""
info = await _make_l1_user(client, test_db, email="l1step@example.com")
headers = info["headers"]
# Create adhoc session via intake
resp = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "Adhoc issue"},
headers=headers,
)
assert resp.status_code == 200, resp.text
session_id = resp.json()["session_id"]
resp = await client.post(
f"/api/v1/l1/sessions/{session_id}/step",
json={"node_id": "node1", "question": "Q?", "answer": "A"},
headers=headers,
)
assert resp.status_code == 400
assert "adhoc" in resp.json()["detail"]
# ---------------------------------------------------------------------------
# 5. Notes on adhoc session → 200, walk_notes updated
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_notes_on_adhoc_session(client: AsyncClient, test_db: AsyncSession):
"""POST /l1/sessions/{id}/notes → 200 and walk_notes is updated."""
info = await _make_l1_user(client, test_db, email="l1notes@example.com")
headers = info["headers"]
resp = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "Notes test"},
headers=headers,
)
assert resp.status_code == 200, resp.text
session_id = resp.json()["session_id"]
notes_payload = [{"text": "Customer called about printer", "ts": "2026-05-28T10:00:00Z"}]
resp = await client.post(
f"/api/v1/l1/sessions/{session_id}/notes",
json={"notes": notes_payload},
headers=headers,
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["walk_notes"] == notes_payload
# ---------------------------------------------------------------------------
# 6. Resolve with helpful=True → 200; GET shows status=resolved
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_resolve_session(client: AsyncClient, test_db: AsyncSession):
"""POST /l1/sessions/{id}/resolve → 200; subsequent GET shows resolved."""
info = await _make_l1_user(client, test_db, email="l1resolve@example.com")
headers = info["headers"]
resp = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "Resolve test"},
headers=headers,
)
assert resp.status_code == 200, resp.text
session_id = resp.json()["session_id"]
resp = await client.post(
f"/api/v1/l1/sessions/{session_id}/resolve",
json={"helpful": True, "resolution_notes": "Restarted the printer."},
headers=headers,
)
assert resp.status_code == 200, resp.text
assert resp.json()["status"] == "resolved"
# GET should also show resolved
resp = await client.get(f"/api/v1/l1/sessions/{session_id}", headers=headers)
assert resp.status_code == 200
assert resp.json()["status"] == "resolved"
# ---------------------------------------------------------------------------
# 7. Escalate session → 200; status=escalated
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_escalate_session(client: AsyncClient, test_db: AsyncSession):
"""POST /l1/sessions/{id}/escalate → 200; status becomes escalated."""
info = await _make_l1_user(client, test_db, email="l1escalate@example.com")
headers = info["headers"]
resp = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "Escalation test"},
headers=headers,
)
assert resp.status_code == 200, resp.text
session_id = resp.json()["session_id"]
resp = await client.post(
f"/api/v1/l1/sessions/{session_id}/escalate",
json={"reason_category": "needs_l2", "reason": "Beyond L1 scope"},
headers=headers,
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["status"] == "escalated"
# ---------------------------------------------------------------------------
# 8. escalate-without-walk → 200 + session in escalated status
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_escalate_without_walk(client: AsyncClient, test_db: AsyncSession):
"""POST /l1/escalate-without-walk → 200 + session.status=escalated."""
info = await _make_l1_user(client, test_db, email="l1eww@example.com")
headers = info["headers"]
resp = await client.post(
"/api/v1/l1/escalate-without-walk",
json={
"problem_statement": "No KB available",
"reason_category": "no_kb",
"reason": "No knowledge base content matched",
},
headers=headers,
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["status"] == "escalated"
assert body["session_kind"] == "adhoc"
# ---------------------------------------------------------------------------
# 9. List active sessions returns L1's active sessions ordered by last_step_at DESC
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_list_active_sessions_ordered(client: AsyncClient, test_db: AsyncSession):
"""GET /l1/sessions/active returns active sessions ordered by last_step_at DESC."""
info = await _make_l1_user(client, test_db, email="l1active@example.com")
headers = info["headers"]
user_id = uuid.UUID(info["user_data"]["id"])
account_id = info["account_id"]
# Create two sessions with controlled timestamps directly in DB
now = datetime.now(timezone.utc)
s1 = L1WalkSession(
id=uuid.uuid4(),
account_id=account_id,
created_by_user_id=user_id,
ticket_id=str(uuid.uuid4()),
ticket_kind="internal",
session_kind="adhoc",
status="active",
started_at=now - timedelta(minutes=10),
last_step_at=now - timedelta(minutes=5),
)
s2 = L1WalkSession(
id=uuid.uuid4(),
account_id=account_id,
created_by_user_id=user_id,
ticket_id=str(uuid.uuid4()),
ticket_kind="internal",
session_kind="adhoc",
status="active",
started_at=now - timedelta(minutes=20),
last_step_at=now - timedelta(minutes=1),
)
test_db.add_all([s1, s2])
await test_db.commit()
resp = await client.get("/api/v1/l1/sessions/active", headers=headers)
assert resp.status_code == 200, resp.text
bodies = resp.json()
ids = [b["id"] for b in bodies]
# s2 has the more recent last_step_at → should come first
assert ids.index(str(s2.id)) < ids.index(str(s1.id))
# ---------------------------------------------------------------------------
# 10. GET session from different account → 404 (tenancy)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_session_cross_account_returns_404(client: AsyncClient, test_db: AsyncSession):
"""GET /l1/sessions/{id} from a different account → 404."""
# Account A: creates a session
info_a = await _make_l1_user(client, test_db, email="l1tenanta@example.com")
headers_a = info_a["headers"]
resp = await client.post(
"/api/v1/l1/intake",
json={"problem_statement": "Account A issue"},
headers=headers_a,
)
assert resp.status_code == 200, resp.text
session_id = resp.json()["session_id"]
# Account B: different user in a different account
info_b = await _make_l1_user(client, test_db, email="l1tenantb@example.com")
headers_b = info_b["headers"]
resp = await client.get(f"/api/v1/l1/sessions/{session_id}", headers=headers_b)
assert resp.status_code == 404

View File

@@ -0,0 +1,450 @@
# backend/tests/test_l1_rls.py
"""
RLS regression tests for L1 Phase 1 tables.
Verifies that `internal_tickets` and `l1_walk_sessions` — both with
FORCE ROW LEVEL SECURITY + `tenant_isolation` policy on `account_id` —
block cross-tenant reads AND reject WITH CHECK violations on INSERT.
Uses synchronous psycopg2 (not asyncpg) to avoid the conftest
teardown hook that closes the asyncio event loop after every test,
which is incompatible with module-scoped asyncpg fixtures.
Run with:
RUN_RLS_TESTS=1 DB_APP_ROLE_PASSWORD=app_secret_change_me \
pytest tests/test_l1_rls.py -v --override-ini="addopts="
"""
import os
import subprocess
import sys
import uuid
from pathlib import Path
from urllib.parse import unquote, urlsplit
import psycopg2
import psycopg2.errors
import pytest
pytestmark = pytest.mark.rls
_DATABASE_TEST_URL = os.getenv(
"DATABASE_TEST_URL",
"postgresql+asyncpg://postgres:postgres@localhost:5432/resolutionflow_test",
)
_DATABASE_TEST_URL_SYNC = _DATABASE_TEST_URL.replace(
"postgresql+asyncpg://",
"postgresql://",
1,
)
_TEST_DB_PARTS = urlsplit(_DATABASE_TEST_URL_SYNC)
_DB_HOST = os.getenv(
"TEST_DB_HOST", _TEST_DB_PARTS.hostname or "localhost"
)
_DB_PORT = int(os.getenv(
"TEST_DB_PORT", str(_TEST_DB_PARTS.port or 5432)
))
_DB_NAME = os.getenv(
"TEST_DB_NAME",
unquote(_TEST_DB_PARTS.path.lstrip("/") or "resolutionflow_test"),
)
_ADMIN_USER = os.getenv(
"TEST_DB_ADMIN_USER",
unquote(_TEST_DB_PARTS.username or "postgres"),
)
_ADMIN_PASSWORD = os.getenv(
"TEST_DB_ADMIN_PASSWORD",
unquote(_TEST_DB_PARTS.password or "postgres"),
)
_APP_PASSWORD = os.getenv("DB_APP_ROLE_PASSWORD", "app_secret_change_me")
ACCOUNT_A_ID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
ACCOUNT_B_ID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
def _admin_dsn() -> dict:
return dict(
host=_DB_HOST, port=_DB_PORT, dbname=_DB_NAME,
user=_ADMIN_USER, password=_ADMIN_PASSWORD,
)
def _app_dsn() -> dict:
return dict(
host=_DB_HOST, port=_DB_PORT, dbname=_DB_NAME,
user="resolutionflow_app", password=_APP_PASSWORD,
)
# ---------------------------------------------------------------------------
# Schema bootstrap
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def _ensure_rls_schema():
"""Re-apply Alembic migrations so that RLS policies are present.
The standard test_db fixture uses Base.metadata.create_all which skips
RLS setup. Running 'alembic upgrade head' against the test DB ensures
the FORCE ROW LEVEL SECURITY + tenant_isolation policies created in the
L1 migrations (T5/T6) are active.
We drop and recreate the public schema first so that any tables left behind
by a prior create_all-based test_db run don't conflict with alembic's
migration tracking (alembic would see existing tables without alembic_version
and fail with DuplicateTable errors).
"""
# Drop and recreate the schema to ensure a clean slate for alembic.
with psycopg2.connect(**_admin_dsn()) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("DROP SCHEMA public CASCADE")
cur.execute("CREATE SCHEMA public")
backend_dir = Path(__file__).parent.parent
env = os.environ.copy()
env["DATABASE_URL"] = _DATABASE_TEST_URL
env["DATABASE_URL_SYNC"] = _DATABASE_TEST_URL_SYNC
subprocess.run(
[sys.executable, "-m", "alembic", "upgrade", "head"],
cwd=backend_dir,
env=env,
check=True,
capture_output=True,
)
# ---------------------------------------------------------------------------
# Seed fixture (module-scoped, synchronous psycopg2)
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def l1_rls_seed(_ensure_rls_schema):
"""Insert two accounts, two users, one internal_ticket and one
l1_walk_session per account using a superuser (BYPASSRLS) connection.
Returns a dict with the seeded IDs so tests can reference them.
Cleans up on module teardown.
"""
conn = psycopg2.connect(**_admin_dsn())
conn.autocommit = True
cur = conn.cursor()
# Accounts (idempotent — shared with test_rls_isolation.py)
cur.execute(
"INSERT INTO accounts (id, name, display_code, created_at, updated_at)"
" VALUES (%s, %s, %s, NOW(), NOW()),"
" (%s, %s, %s, NOW(), NOW())"
" ON CONFLICT (id) DO NOTHING",
(
ACCOUNT_A_ID, "L1 RLS Tenant A", "RLSA0001",
ACCOUNT_B_ID, "L1 RLS Tenant B", "RLSB0001",
),
)
user_a_tmp = str(uuid.uuid4())
user_b_tmp = str(uuid.uuid4())
cur.execute(
"INSERT INTO users"
" (id, email, password_hash, name, role,"
" is_super_admin, is_team_admin, is_service_account, must_change_password,"
" is_active, account_id, account_role, timezone, created_at)"
" VALUES"
" (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()),"
" (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())"
" ON CONFLICT (email) DO NOTHING",
(
user_a_tmp, "l1-rls-a@example.com", "placeholder",
"L1 RLS User A", "engineer",
False, False, False, False,
True, ACCOUNT_A_ID, "engineer", "UTC",
user_b_tmp, "l1-rls-b@example.com", "placeholder",
"L1 RLS User B", "engineer",
False, False, False, False,
True, ACCOUNT_B_ID, "engineer", "UTC",
),
)
cur.execute(
"SELECT id FROM users WHERE email = 'l1-rls-a@example.com'"
)
user_a_id = str(cur.fetchone()[0])
cur.execute(
"SELECT id FROM users WHERE email = 'l1-rls-b@example.com'"
)
user_b_id = str(cur.fetchone()[0])
ticket_a_id = str(uuid.uuid4())
ticket_b_id = str(uuid.uuid4())
walk_a_id = str(uuid.uuid4())
walk_b_id = str(uuid.uuid4())
cur.execute(
"INSERT INTO internal_tickets"
" (id, account_id, created_by_user_id, problem_statement,"
" status, created_at, updated_at)"
" VALUES"
" (%s, %s, %s, %s, %s, NOW(), NOW()),"
" (%s, %s, %s, %s, %s, NOW(), NOW())",
(
ticket_a_id, ACCOUNT_A_ID, user_a_id,
"L1 RLS test ticket A", "open",
ticket_b_id, ACCOUNT_B_ID, user_b_id,
"L1 RLS test ticket B", "open",
),
)
cur.execute(
"INSERT INTO l1_walk_sessions"
" (id, account_id, created_by_user_id, ticket_id, ticket_kind,"
" session_kind, status, started_at, last_step_at)"
" VALUES"
" (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW()),"
" (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
(
walk_a_id, ACCOUNT_A_ID, user_a_id,
"INT-A", "internal", "adhoc", "active",
walk_b_id, ACCOUNT_B_ID, user_b_id,
"INT-B", "internal", "adhoc", "active",
),
)
seed = {
"ticket_a": ticket_a_id,
"ticket_b": ticket_b_id,
"walk_a": walk_a_id,
"walk_b": walk_b_id,
"user_a": user_a_id,
"user_b": user_b_id,
}
yield seed
# Cleanup in reverse FK order.
# Delete all child rows for both test accounts before removing users —
# other test modules (test_rls_isolation.py) may have seeded rows for
# these same accounts, so we clean by account_id rather than by row ID.
cur.execute(
"DELETE FROM l1_walk_sessions WHERE account_id IN (%s, %s)",
(ACCOUNT_A_ID, ACCOUNT_B_ID),
)
cur.execute(
"DELETE FROM internal_tickets WHERE account_id IN (%s, %s)",
(ACCOUNT_A_ID, ACCOUNT_B_ID),
)
cur.execute(
"DELETE FROM users WHERE email IN (%s, %s)",
("l1-rls-a@example.com", "l1-rls-b@example.com"),
)
cur.execute(
"DELETE FROM accounts WHERE id IN (%s, %s)"
" AND display_code IN ('RLSA0001', 'RLSB0001')",
(ACCOUNT_A_ID, ACCOUNT_B_ID),
)
cur.close()
conn.close()
# ---------------------------------------------------------------------------
# Per-test helper: open an app-role connection with a given tenant context
# ---------------------------------------------------------------------------
def _app_conn(account_id: str | None = None) -> psycopg2.extensions.connection:
"""Open a psycopg2 connection as resolutionflow_app.
If account_id is given, SET LOCAL app.current_account_id so RLS applies
to the given tenant. Callers must begin a transaction first.
"""
conn = psycopg2.connect(**_app_dsn())
conn.autocommit = False
cur = conn.cursor()
if account_id:
cur.execute(
"SELECT set_config('app.current_account_id', %s, false)",
(account_id,),
)
cur.close()
return conn
# ---------------------------------------------------------------------------
# internal_tickets — read isolation
# ---------------------------------------------------------------------------
def test_l1_user_cannot_read_other_accounts_internal_tickets(l1_rls_seed):
"""RLS USING: Account A context must not see Account B's tickets."""
conn = _app_conn(ACCOUNT_A_ID)
try:
cur = conn.cursor()
cur.execute(
"SELECT id FROM internal_tickets WHERE id = %s",
(l1_rls_seed["ticket_b"],),
)
rows = cur.fetchall()
finally:
conn.rollback()
conn.close()
assert len(rows) == 0, (
"Account A must not read Account B's internal_tickets"
)
def test_internal_tickets_account_a_can_see_own_rows(l1_rls_seed):
"""Positive check: Account A can read its own internal_tickets."""
conn = _app_conn(ACCOUNT_A_ID)
try:
cur = conn.cursor()
cur.execute(
"SELECT id FROM internal_tickets WHERE id = %s",
(l1_rls_seed["ticket_a"],),
)
rows = cur.fetchall()
finally:
conn.rollback()
conn.close()
assert len(rows) == 1, (
"Account A must be able to read its own internal_tickets"
)
def test_internal_tickets_no_context_sees_nothing(l1_rls_seed):
"""Fail-closed: no tenant context → zero internal_tickets rows visible."""
conn = _app_conn() # no account_id
try:
cur = conn.cursor()
cur.execute(
"SELECT id FROM internal_tickets WHERE id IN (%s, %s)",
(l1_rls_seed["ticket_a"], l1_rls_seed["ticket_b"]),
)
rows = cur.fetchall()
finally:
conn.rollback()
conn.close()
assert len(rows) == 0, (
"No-context connection must not see any internal_tickets"
)
# ---------------------------------------------------------------------------
# l1_walk_sessions — read isolation
# ---------------------------------------------------------------------------
def test_l1_user_cannot_read_other_accounts_walk_sessions(l1_rls_seed):
"""RLS USING: Account A context must not see Account B's walk sessions."""
conn = _app_conn(ACCOUNT_A_ID)
try:
cur = conn.cursor()
cur.execute(
"SELECT id FROM l1_walk_sessions WHERE id = %s",
(l1_rls_seed["walk_b"],),
)
rows = cur.fetchall()
finally:
conn.rollback()
conn.close()
assert len(rows) == 0, (
"Account A must not read Account B's l1_walk_sessions"
)
def test_l1_walk_sessions_account_a_can_see_own_rows(l1_rls_seed):
"""Positive check: Account A can read its own l1_walk_sessions."""
conn = _app_conn(ACCOUNT_A_ID)
try:
cur = conn.cursor()
cur.execute(
"SELECT id FROM l1_walk_sessions WHERE id = %s",
(l1_rls_seed["walk_a"],),
)
rows = cur.fetchall()
finally:
conn.rollback()
conn.close()
assert len(rows) == 1, (
"Account A must be able to read its own l1_walk_sessions"
)
def test_l1_walk_sessions_no_context_sees_nothing(l1_rls_seed):
"""Fail-closed: no tenant context → zero l1_walk_sessions rows visible."""
conn = _app_conn() # no account_id
try:
cur = conn.cursor()
cur.execute(
"SELECT id FROM l1_walk_sessions WHERE id IN (%s, %s)",
(l1_rls_seed["walk_a"], l1_rls_seed["walk_b"]),
)
rows = cur.fetchall()
finally:
conn.rollback()
conn.close()
assert len(rows) == 0, (
"No-context connection must not see any l1_walk_sessions"
)
# ---------------------------------------------------------------------------
# internal_tickets — WITH CHECK (cross-tenant INSERT rejection)
# ---------------------------------------------------------------------------
def test_with_check_blocks_cross_tenant_insert_internal_tickets(l1_rls_seed):
"""RLS WITH CHECK: INSERT with account_id = A under context B is rejected.
psycopg2 raises InsufficientPrivilege (pgcode '42501') when a row
violates FORCE ROW LEVEL SECURITY WITH CHECK.
"""
new_id = str(uuid.uuid4())
user_b_id = l1_rls_seed["user_b"]
conn = _app_conn(ACCOUNT_B_ID)
try:
cur = conn.cursor()
with pytest.raises(psycopg2.errors.InsufficientPrivilege):
cur.execute(
"INSERT INTO internal_tickets"
" (id, account_id, created_by_user_id, problem_statement,"
" status, created_at, updated_at)"
" VALUES (%s, %s, %s, %s, %s, NOW(), NOW())",
(
new_id, ACCOUNT_A_ID, user_b_id,
"Cross-tenant injection attempt", "open",
),
)
finally:
conn.rollback()
conn.close()
# ---------------------------------------------------------------------------
# l1_walk_sessions — WITH CHECK (cross-tenant INSERT rejection)
# ---------------------------------------------------------------------------
def test_with_check_blocks_cross_tenant_insert_l1_walk_sessions(l1_rls_seed):
"""RLS WITH CHECK: INSERT with account_id = A under context B is rejected."""
new_id = str(uuid.uuid4())
user_b_id = l1_rls_seed["user_b"]
conn = _app_conn(ACCOUNT_B_ID)
try:
cur = conn.cursor()
with pytest.raises(psycopg2.errors.InsufficientPrivilege):
cur.execute(
"INSERT INTO l1_walk_sessions"
" (id, account_id, created_by_user_id, ticket_id,"
" ticket_kind, session_kind, status, started_at, last_step_at)"
" VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
(
new_id, ACCOUNT_A_ID, user_b_id,
"INT-cross", "internal", "adhoc", "active",
),
)
finally:
conn.rollback()
conn.close()

View File

@@ -0,0 +1,119 @@
"""Tests for the l1_session_cleanup job."""
import uuid
from datetime import datetime, timedelta, timezone
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.l1_walk_session import L1WalkSession
from app.models.account import Account
from app.models.user import User
from app.services.l1_session_cleanup import flip_stale_sessions
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _make_account(db: AsyncSession) -> Account:
import secrets
import string
code = "".join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(8))
a = Account(id=uuid.uuid4(), name="Test", display_code=code)
db.add(a)
await db.flush()
return a
async def _make_user(db: AsyncSession, *, account_id: uuid.UUID) -> User:
u = User(
id=uuid.uuid4(),
email=f"user-{uuid.uuid4()}@example.com",
name="L1",
account_id=account_id,
account_role="l1_tech",
role="engineer",
is_active=True,
)
db.add(u)
await db.flush()
return u
async def _make_session(
db: AsyncSession,
*,
account_id: uuid.UUID,
user_id: uuid.UUID,
status: str = "active",
last_step_at: datetime | None = None,
) -> L1WalkSession:
now = datetime.now(timezone.utc)
session = L1WalkSession(
id=uuid.uuid4(),
account_id=account_id,
created_by_user_id=user_id,
ticket_id="t",
ticket_kind="internal",
session_kind="adhoc",
status=status,
started_at=now,
last_step_at=last_step_at or now,
)
db.add(session)
await db.flush()
return session
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_flip_stale_sessions_only_affects_old_active_rows(test_db: AsyncSession):
account = await _make_account(test_db)
user = await _make_user(test_db, account_id=account.id)
# 1. Stale active (>24h ago) — should flip
stale = await _make_session(
test_db, account_id=account.id, user_id=user.id,
status="active",
last_step_at=datetime.now(timezone.utc) - timedelta(hours=25),
)
# 2. Fresh active (1h ago) — should stay active
fresh = await _make_session(
test_db, account_id=account.id, user_id=user.id,
status="active",
last_step_at=datetime.now(timezone.utc) - timedelta(hours=1),
)
# 3. Already-resolved (old) — should stay resolved, not flip
already_resolved = await _make_session(
test_db, account_id=account.id, user_id=user.id,
status="resolved",
last_step_at=datetime.now(timezone.utc) - timedelta(hours=48),
)
await test_db.commit()
count = await flip_stale_sessions(test_db)
assert count == 1
await test_db.refresh(stale)
await test_db.refresh(fresh)
await test_db.refresh(already_resolved)
assert stale.status == "abandoned"
assert fresh.status == "active"
assert already_resolved.status == "resolved"
@pytest.mark.asyncio
async def test_flip_stale_sessions_returns_zero_when_none_stale(test_db: AsyncSession):
account = await _make_account(test_db)
user = await _make_user(test_db, account_id=account.id)
await _make_session(
test_db, account_id=account.id, user_id=user.id,
status="active",
last_step_at=datetime.now(timezone.utc) - timedelta(hours=1),
)
await test_db.commit()
count = await flip_stale_sessions(test_db)
assert count == 0

View File

@@ -0,0 +1,917 @@
"""Tests for l1_session_service start_* functions (T12), record_step/update_notes (T13), resolve/escalate (T14)."""
import uuid
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.account import Account
from app.models.audit_log import AuditLog
from app.models.user import User
from app.models.tree import Tree
from app.models.ai_session import AISession
from app.models.flow_proposal import FlowProposal
from app.services.l1_session_service import (
start_flow_session,
start_proposal_session,
start_adhoc_session,
_resolve_acting_as,
record_step,
update_notes,
resolve,
escalate,
escalate_without_walk,
)
from app.services import internal_ticket_service
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _make_account(db: AsyncSession) -> Account:
s = str(uuid.uuid4())[:8]
account = Account(
id=uuid.uuid4(),
name=f"Test Account {s}",
display_code=s[:8].upper(),
)
db.add(account)
await db.flush()
return account
async def _make_user(
db: AsyncSession,
*,
account_id: uuid.UUID,
account_role: str = "l1_tech",
can_cover_l1: bool = False,
) -> User:
user = User(
id=uuid.uuid4(),
email=f"user-{uuid.uuid4()}@example.com",
name="Test User",
account_id=account_id,
account_role=account_role,
role="engineer",
is_active=True,
can_cover_l1=can_cover_l1,
)
db.add(user)
await db.flush()
return user
async def _make_tree(db: AsyncSession, *, account_id: uuid.UUID, author_id: uuid.UUID) -> Tree:
tree = Tree(
id=uuid.uuid4(),
name="Test Flow",
account_id=account_id,
author_id=author_id,
tree_type="troubleshooting",
tree_structure={"nodes": [], "edges": []},
visibility="team",
status="published",
)
db.add(tree)
await db.flush()
return tree
async def _make_ai_session(db: AsyncSession, *, user_id: uuid.UUID, account_id: uuid.UUID) -> AISession:
ai_session = AISession(
id=uuid.uuid4(),
user_id=user_id,
account_id=account_id,
session_type="chat",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
db.add(ai_session)
await db.flush()
return ai_session
async def _make_proposal(
db: AsyncSession,
*,
account_id: uuid.UUID,
source_session_id: uuid.UUID,
) -> FlowProposal:
proposal = FlowProposal(
id=uuid.uuid4(),
account_id=account_id,
source_session_id=source_session_id,
proposal_type="new_flow",
title="Test Proposal",
proposed_flow_data={"nodes": [], "edges": []},
source="manual_draft",
status="pending",
)
db.add(proposal)
await db.flush()
return proposal
# ---------------------------------------------------------------------------
# Unit tests for _resolve_acting_as (no DB needed)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_resolve_acting_as_for_engineer_returns_coverage_tag():
user = User(account_role="engineer")
assert _resolve_acting_as(user) == "l1_coverage"
@pytest.mark.asyncio
async def test_resolve_acting_as_for_l1_tech_returns_none():
user = User(account_role="l1_tech")
assert _resolve_acting_as(user) is None
@pytest.mark.asyncio
async def test_resolve_acting_as_for_owner_returns_none():
user = User(account_role="owner")
assert _resolve_acting_as(user) is None
# ---------------------------------------------------------------------------
# Integration tests (real DB)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_start_flow_session_creates_active_flow_session(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="ticket-abc",
ticket_kind="internal",
)
assert session.session_kind == "flow"
assert session.flow_id == tree.id
assert session.flow_proposal_id is None
assert session.status == "active"
assert session.walked_path == []
assert session.walk_notes == []
assert session.acting_as is None # l1_tech native
assert session.ticket_id == "ticket-abc"
assert session.ticket_kind == "internal"
@pytest.mark.asyncio
async def test_start_proposal_session_creates_active_proposal_session(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ai_session = await _make_ai_session(test_db, user_id=l1.id, account_id=account.id)
proposal = await _make_proposal(
test_db,
account_id=account.id,
source_session_id=ai_session.id,
)
session = await start_proposal_session(
test_db,
account_id=account.id,
user=l1,
flow_proposal_id=proposal.id,
ticket_id="ticket-xyz",
ticket_kind="psa",
)
assert session.session_kind == "proposal"
assert session.flow_proposal_id == proposal.id
assert session.flow_id is None
assert session.status == "active"
@pytest.mark.asyncio
async def test_start_adhoc_session_no_flow_no_proposal(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="ticket-adhoc",
ticket_kind="internal",
)
assert session.session_kind == "adhoc"
assert session.flow_id is None
assert session.flow_proposal_id is None
assert session.walked_path == []
assert session.walk_notes == []
@pytest.mark.asyncio
async def test_engineer_with_coverage_gets_acting_as_tag(test_db: AsyncSession):
account = await _make_account(test_db)
eng = await _make_user(
test_db,
account_id=account.id,
account_role="engineer",
can_cover_l1=True,
)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=eng,
ticket_id="t",
ticket_kind="internal",
)
assert session.acting_as == "l1_coverage"
# ---------------------------------------------------------------------------
# T13: record_step and update_notes tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_record_step_appends_to_walked_path(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="t1",
ticket_kind="internal",
)
updated = await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Is the device powered on?",
answer="yes",
)
assert len(updated.walked_path) == 1
assert updated.walked_path[0] == {
"node_id": "n1",
"question": "Is the device powered on?",
"answer": "yes",
"l1_note": None,
}
assert updated.current_node_id == "n1"
assert updated.last_step_at is not None
@pytest.mark.asyncio
async def test_record_step_two_sequential_steps_accumulate(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="t2",
ticket_kind="internal",
)
await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Step 1?",
answer="yes",
)
updated = await record_step(
test_db,
session_id=session.id,
node_id="n2",
question="Step 2?",
answer="no",
)
assert len(updated.walked_path) == 2
assert updated.walked_path[0]["node_id"] == "n1"
assert updated.walked_path[1]["node_id"] == "n2"
assert updated.current_node_id == "n2"
@pytest.mark.asyncio
async def test_record_step_blocks_adhoc(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="t3",
ticket_kind="internal",
)
with pytest.raises(ValueError, match="adhoc"):
await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Q?",
answer="yes",
)
@pytest.mark.asyncio
async def test_record_step_blocks_inactive_session(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="t4",
ticket_kind="internal",
)
# Manually mark resolved to simulate inactive state
session.status = "resolved"
await test_db.flush()
with pytest.raises(ValueError, match="not active"):
await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Q?",
answer="yes",
)
@pytest.mark.asyncio
async def test_record_step_includes_note_when_provided(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="t5",
ticket_kind="internal",
)
updated = await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Q?",
answer="yes",
note="Customer mentioned it started yesterday",
)
assert updated.walked_path[0]["l1_note"] == "Customer mentioned it started yesterday"
@pytest.mark.asyncio
async def test_update_notes_replaces_walk_notes(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="t6",
ticket_kind="internal",
)
new_notes = [{"timestamp": "2026-05-28T10:00:00Z", "content": "Customer rebooted"}]
updated = await update_notes(test_db, session_id=session.id, notes=new_notes)
assert updated.walk_notes == new_notes
assert updated.last_step_at is not None
@pytest.mark.asyncio
async def test_update_notes_raises_if_session_not_found(test_db: AsyncSession):
missing_id = uuid.uuid4()
with pytest.raises(ValueError, match="not found"):
await update_notes(test_db, session_id=missing_id, notes=[])
@pytest.mark.asyncio
async def test_update_notes_raises_if_session_not_active(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="t7",
ticket_kind="internal",
)
session.status = "escalated"
await test_db.flush()
with pytest.raises(ValueError, match="not active"):
await update_notes(test_db, session_id=session.id, notes=[])
@pytest.mark.asyncio
async def test_update_notes_size_cap(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="t8",
ticket_kind="internal",
)
# Create a notes payload larger than 256KB
big_content = "x" * (256 * 1024 + 100)
notes = [{"timestamp": "2026-05-28T10:00:00Z", "content": big_content}]
with pytest.raises(ValueError, match="256KB"):
await update_notes(test_db, session_id=session.id, notes=notes)
# ---------------------------------------------------------------------------
# T14: resolve, escalate, escalate_without_walk tests
# ---------------------------------------------------------------------------
async def _make_internal_ticket(db: AsyncSession, *, account_id: uuid.UUID, user_id: uuid.UUID) -> object:
"""Create an internal ticket and return it."""
return await internal_ticket_service.create_ticket(
db,
account_id=account_id,
created_by_user_id=user_id,
problem_statement="Customer cannot log in",
)
@pytest.mark.asyncio
async def test_resolve_proposal_helpful_flips_validated_by_outcome(test_db: AsyncSession):
"""resolve(helpful=True) on a proposal session sets validated_by_outcome=True."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
ai_session = await _make_ai_session(test_db, user_id=l1.id, account_id=account.id)
proposal = await _make_proposal(test_db, account_id=account.id, source_session_id=ai_session.id)
session = await start_proposal_session(
test_db,
account_id=account.id,
user=l1,
flow_proposal_id=proposal.id,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
resolved = await resolve(
test_db,
session_id=session.id,
helpful=True,
resolution_notes="Issue fixed by proposal walk",
)
assert resolved.status == "resolved"
assert resolved.helpful is True
assert resolved.resolution_notes == "Issue fixed by proposal walk"
assert resolved.resolved_at is not None
# Proposal should now be validated
await test_db.refresh(proposal)
assert proposal.validated_by_outcome is True
# Internal ticket should be closed
await test_db.refresh(ticket)
assert ticket.status == "resolved"
@pytest.mark.asyncio
async def test_resolve_proposal_not_helpful_leaves_validated_by_outcome_false(test_db: AsyncSession):
"""resolve(helpful=False) on a proposal session does NOT flip validated_by_outcome."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
ai_session = await _make_ai_session(test_db, user_id=l1.id, account_id=account.id)
proposal = await _make_proposal(test_db, account_id=account.id, source_session_id=ai_session.id)
session = await start_proposal_session(
test_db,
account_id=account.id,
user=l1,
flow_proposal_id=proposal.id,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
resolved = await resolve(
test_db,
session_id=session.id,
helpful=False,
resolution_notes="Proposal did not help",
)
assert resolved.helpful is False
await test_db.refresh(proposal)
assert proposal.validated_by_outcome is False
@pytest.mark.asyncio
async def test_resolve_flow_session_closes_ticket_no_proposal_update(test_db: AsyncSession):
"""resolve on a flow session closes the ticket and does not touch proposals."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
resolved = await resolve(
test_db,
session_id=session.id,
helpful=True,
resolution_notes="Flow resolved the issue",
)
assert resolved.status == "resolved"
assert resolved.session_kind == "flow"
assert resolved.flow_proposal_id is None
await test_db.refresh(ticket)
assert ticket.status == "resolved"
@pytest.mark.asyncio
async def test_resolve_adhoc_session_closes_ticket(test_db: AsyncSession):
"""resolve on an adhoc session closes the ticket with no proposal interaction."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
resolved = await resolve(
test_db,
session_id=session.id,
helpful=True,
resolution_notes="Adhoc resolved",
)
assert resolved.status == "resolved"
await test_db.refresh(ticket)
assert ticket.status == "resolved"
@pytest.mark.asyncio
async def test_resolve_psa_session_no_ticket_update(test_db: AsyncSession):
"""resolve on a PSA-backed session does not attempt to update an internal ticket."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="psa-external-id-123",
ticket_kind="psa",
)
resolved = await resolve(
test_db,
session_id=session.id,
helpful=True,
resolution_notes="PSA ticket resolved externally",
)
assert resolved.status == "resolved"
assert resolved.ticket_kind == "psa"
@pytest.mark.asyncio
async def test_resolve_raises_on_missing_session(test_db: AsyncSession):
"""resolve raises ValueError when session does not exist."""
with pytest.raises(ValueError, match="not found"):
await resolve(
test_db,
session_id=uuid.uuid4(),
helpful=True,
resolution_notes="N/A",
)
@pytest.mark.asyncio
async def test_resolve_raises_on_inactive_session(test_db: AsyncSession):
"""resolve raises ValueError when session is not active."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
session.status = "escalated"
await test_db.flush()
with pytest.raises(ValueError, match="not active"):
await resolve(
test_db,
session_id=session.id,
helpful=False,
resolution_notes="N/A",
)
@pytest.mark.asyncio
async def test_escalate_marks_session_and_ticket_as_escalated(test_db: AsyncSession):
"""escalate sets session status=escalated and closes the internal ticket as escalated."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
escalated = await escalate(
test_db,
session_id=session.id,
reason="Customer reported intermittent failure not covered by flow",
reason_category="out_of_scope",
)
assert escalated.status == "escalated"
assert escalated.escalation_reason == "Customer reported intermittent failure not covered by flow"
assert escalated.escalation_reason_category == "out_of_scope"
assert escalated.resolved_at is not None
assert escalated.last_step_at is not None
await test_db.refresh(ticket)
assert ticket.status == "escalated"
@pytest.mark.asyncio
async def test_escalate_raises_on_missing_session(test_db: AsyncSession):
"""escalate raises ValueError when session does not exist."""
with pytest.raises(ValueError, match="not found"):
await escalate(
test_db,
session_id=uuid.uuid4(),
reason="Some reason",
reason_category="unknown",
)
@pytest.mark.asyncio
async def test_escalate_raises_on_inactive_session(test_db: AsyncSession):
"""escalate raises ValueError when session is already inactive."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
session.status = "resolved"
await test_db.flush()
with pytest.raises(ValueError, match="not active"):
await escalate(
test_db,
session_id=session.id,
reason="Too late",
reason_category="other",
)
@pytest.mark.asyncio
async def test_escalate_without_walk_creates_escalated_adhoc_session(test_db: AsyncSession):
"""escalate_without_walk creates an immediately-escalated session with empty walked_path."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
session = await escalate_without_walk(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
reason_category="no_kb_content",
reason="No KB article matched this issue",
)
assert session.status == "escalated"
assert session.session_kind == "adhoc"
assert session.walked_path == []
assert session.escalation_reason == "No KB article matched this issue"
assert session.escalation_reason_category == "no_kb_content"
assert session.resolved_at is not None
assert session.last_step_at is not None
assert session.account_id == account.id
assert session.created_by_user_id == l1.id
@pytest.mark.asyncio
async def test_escalate_without_walk_escalates_internal_ticket(test_db: AsyncSession):
"""escalate_without_walk marks the internal ticket as escalated."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
await escalate_without_walk(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
reason_category="no_kb_content",
)
await test_db.refresh(ticket)
assert ticket.status == "escalated"
@pytest.mark.asyncio
async def test_escalate_without_walk_psa_does_not_touch_internal_ticket(test_db: AsyncSession):
"""escalate_without_walk with ticket_kind='psa' does not update internal tickets."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await escalate_without_walk(
test_db,
account_id=account.id,
user=l1,
ticket_id="psa-ticket-999",
ticket_kind="psa",
reason_category="no_kb_content",
)
assert session.status == "escalated"
assert session.ticket_kind == "psa"
@pytest.mark.asyncio
async def test_escalate_without_walk_reason_is_optional(test_db: AsyncSession):
"""escalate_without_walk works without a reason string."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
session = await escalate_without_walk(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
reason_category="no_kb_content",
)
assert session.escalation_reason is None
assert session.escalation_reason_category == "no_kb_content"
# ---------------------------------------------------------------------------
# T14 audit log tests (spec §5.6.1)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_resolve_writes_audit_log_with_acting_as(test_db: AsyncSession):
"""resolve() writes an audit_logs row with acting_as='l1_coverage' for engineers."""
account = await _make_account(test_db)
eng = await _make_user(
test_db,
account_id=account.id,
account_role="engineer",
can_cover_l1=True,
)
ticket = await _make_internal_ticket(
test_db, account_id=account.id, user_id=eng.id
)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=eng,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
await resolve(
test_db,
session_id=session.id,
helpful=True,
resolution_notes="Coverage engineer resolved",
)
result = await test_db.execute(
select(AuditLog).where(
AuditLog.action == "l1.session.resolve",
AuditLog.resource_id == session.id,
)
)
row = result.scalar_one()
assert row.acting_as == "l1_coverage"
assert row.user_id == eng.id
assert row.account_id == account.id
assert row.details["helpful"] is True
@pytest.mark.asyncio
async def test_resolve_writes_audit_log_native_l1_acting_as_null(
test_db: AsyncSession,
):
"""resolve() writes an audit_logs row with acting_as=None for native l1_tech."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id, account_role="l1_tech")
ticket = await _make_internal_ticket(
test_db, account_id=account.id, user_id=l1.id
)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
await resolve(
test_db,
session_id=session.id,
helpful=False,
resolution_notes="Native L1 resolved",
)
result = await test_db.execute(
select(AuditLog).where(
AuditLog.action == "l1.session.resolve",
AuditLog.resource_id == session.id,
)
)
row = result.scalar_one()
assert row.acting_as is None
@pytest.mark.asyncio
async def test_escalate_writes_audit_log(test_db: AsyncSession):
"""escalate() writes an audit_logs row with action='l1.session.escalate'."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(
test_db, account_id=account.id, user_id=l1.id
)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
await escalate(
test_db,
session_id=session.id,
reason="Beyond scope",
reason_category="out_of_scope",
)
result = await test_db.execute(
select(AuditLog).where(
AuditLog.action == "l1.session.escalate",
AuditLog.resource_id == session.id,
)
)
row = result.scalar_one()
assert row.details["escalation_reason_category"] == "out_of_scope"
assert row.account_id == account.id
@pytest.mark.asyncio
async def test_escalate_without_walk_writes_audit_log(test_db: AsyncSession):
"""escalate_without_walk() writes an audit_logs row."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(
test_db, account_id=account.id, user_id=l1.id
)
session = await escalate_without_walk(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
reason_category="no_kb_content",
)
result = await test_db.execute(
select(AuditLog).where(
AuditLog.action == "l1.session.escalate_no_walk",
AuditLog.resource_id == session.id,
)
)
row = result.scalar_one()
assert row.account_id == account.id
assert row.details["escalation_reason_category"] == "no_kb_content"

View File

@@ -23,6 +23,7 @@ from pathlib import Path
from urllib.parse import unquote, urlsplit
import asyncpg
import psycopg2
import pytest
import pytest_asyncio
@@ -80,7 +81,22 @@ def _ensure_rls_schema():
public schema using Base.metadata.create_all, which does not enable RLS
or create DB roles. This fixture re-runs 'alembic upgrade head' so that
the full migration-managed schema (including RLS policies) is in place.
We drop and recreate the public schema first so that any tables left behind
by a prior create_all-based test_db run don't conflict with alembic's
migration tracking.
"""
# Drop and recreate the schema to ensure a clean slate for alembic.
admin_dsn = dict(
host=_DB_HOST, port=_DB_PORT, dbname=_DB_NAME,
user=_ADMIN_USER, password=_ADMIN_PASSWORD,
)
with psycopg2.connect(**admin_dsn) as conn:
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("DROP SCHEMA public CASCADE")
cur.execute("CREATE SCHEMA public")
backend_dir = Path(__file__).parent.parent
env = os.environ.copy()
env["DATABASE_URL"] = _DATABASE_TEST_URL
@@ -131,15 +147,18 @@ async def seed_rls_test_data(admin_conn):
user_b_id = str(uuid.uuid4())
await admin_conn.execute(f"""
INSERT INTO users (
id, email, password_hash, name, role, is_active, account_id,
account_role, created_at
id, email, password_hash, name, role,
is_super_admin, is_team_admin, is_service_account, must_change_password,
is_active, account_id, account_role, timezone, created_at
) VALUES
('{user_a_id}', 'rls-user-a@example.com',
'placeholder', 'RLS User A', 'engineer', TRUE,
'{ACCOUNT_A_ID}', 'engineer', NOW()),
'placeholder', 'RLS User A', 'engineer',
FALSE, FALSE, FALSE, FALSE,
TRUE, '{ACCOUNT_A_ID}', 'engineer', 'UTC', NOW()),
('{user_b_id}', 'rls-user-b@example.com',
'placeholder', 'RLS User B', 'engineer', TRUE,
'{ACCOUNT_B_ID}', 'engineer', NOW())
'placeholder', 'RLS User B', 'engineer',
FALSE, FALSE, FALSE, FALSE,
TRUE, '{ACCOUNT_B_ID}', 'engineer', 'UTC', NOW())
ON CONFLICT (email) DO NOTHING
""")

View File

@@ -0,0 +1,195 @@
"""Integration tests for the seat_enforcement service.
Uses the test_db fixture (real async DB, fresh schema per test) to exercise
the SQL counting logic in check_seat_available / get_seat_usage.
"""
import uuid
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.account import Account
from app.models.subscription import Subscription
from app.models.user import User
from app.services.seat_enforcement import check_seat_available, get_seat_usage
# ---------------------------------------------------------------------------
# Test-local DB helpers
# ---------------------------------------------------------------------------
async def _make_account(db: AsyncSession, *, suffix: str | None = None) -> Account:
"""Create and flush a minimal Account row."""
s = suffix or str(uuid.uuid4())[:8]
account = Account(
id=uuid.uuid4(),
name=f"Test Account {s}",
display_code=s[:8],
)
db.add(account)
await db.flush()
return account
async def _make_subscription(
db: AsyncSession,
account: Account,
*,
seat_limit: int | None = None,
l1_seat_limit: int | None = None,
) -> Subscription:
"""Create and flush a Subscription for the given account."""
sub = Subscription(
account_id=account.id,
plan="pro",
status="active",
seat_limit=seat_limit,
l1_seat_limit=l1_seat_limit,
)
db.add(sub)
await db.flush()
return sub
async def _make_user(
db: AsyncSession,
account: Account,
*,
account_role: str = "engineer",
is_active: bool = True,
suffix: str | None = None,
) -> User:
"""Create and flush a User row in the given account."""
s = suffix or str(uuid.uuid4())[:8]
user = User(
id=uuid.uuid4(),
email=f"user-{s}@example.com",
name=f"User {s}",
account_id=account.id,
account_role=account_role,
role="engineer",
is_active=is_active,
)
db.add(user)
await db.flush()
return user
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_engineer_seat_available_when_under_limit(test_db: AsyncSession):
"""check_seat_available returns available=True when current < seat_limit."""
account = await _make_account(test_db)
sub = await _make_subscription(test_db, account, seat_limit=5)
for _ in range(3):
await _make_user(test_db, account, account_role="engineer")
result = await check_seat_available(account, sub, "engineer", test_db)
assert result.available is True
assert result.current == 3
assert result.limit == 5
assert result.role == "engineer"
@pytest.mark.asyncio
async def test_engineer_seat_unavailable_when_at_limit(test_db: AsyncSession):
"""check_seat_available returns available=False when current == seat_limit."""
account = await _make_account(test_db)
sub = await _make_subscription(test_db, account, seat_limit=2)
for _ in range(2):
await _make_user(test_db, account, account_role="engineer")
result = await check_seat_available(account, sub, "engineer", test_db)
assert result.available is False
assert result.current == 2
assert result.limit == 2
@pytest.mark.asyncio
async def test_l1_uses_separate_seat_limit(test_db: AsyncSession):
"""Engineer limit hit does not affect l1_tech availability."""
account = await _make_account(test_db)
# seat_limit exhausted, l1_seat_limit still has room
sub = await _make_subscription(test_db, account, seat_limit=2, l1_seat_limit=3)
# Fill engineer seats to the limit
for _ in range(2):
await _make_user(test_db, account, account_role="engineer")
# Add one L1 user (below limit)
await _make_user(test_db, account, account_role="l1_tech")
eng_result = await check_seat_available(account, sub, "engineer", test_db)
l1_result = await check_seat_available(account, sub, "l1_tech", test_db)
assert eng_result.available is False, "engineer seats should be full"
assert eng_result.current == 2
assert l1_result.available is True, "l1_tech seats should still be available"
assert l1_result.current == 1
assert l1_result.limit == 3
@pytest.mark.asyncio
async def test_unlimited_seat_limit_is_always_available(test_db: AsyncSession):
"""seat_limit=None means unlimited; available=True regardless of count."""
account = await _make_account(test_db)
sub = await _make_subscription(test_db, account, seat_limit=None)
# Add many engineer users
for _ in range(10):
await _make_user(test_db, account, account_role="engineer")
result = await check_seat_available(account, sub, "engineer", test_db)
assert result.available is True
assert result.current == 10
assert result.limit is None
@pytest.mark.asyncio
async def test_get_seat_usage_returns_engineer_l1_tuple(test_db: AsyncSession):
"""get_seat_usage returns a (engineer, l1_tech) tuple in the correct order."""
account = await _make_account(test_db)
sub = await _make_subscription(test_db, account, seat_limit=5, l1_seat_limit=3)
await _make_user(test_db, account, account_role="engineer")
await _make_user(test_db, account, account_role="l1_tech")
await _make_user(test_db, account, account_role="l1_tech")
eng, l1 = await get_seat_usage(account, sub, test_db)
assert eng.role == "engineer"
assert eng.current == 1
assert eng.limit == 5
assert eng.available is True
assert l1.role == "l1_tech"
assert l1.current == 2
assert l1.limit == 3
assert l1.available is True
@pytest.mark.asyncio
async def test_inactive_users_not_counted(test_db: AsyncSession):
"""Inactive (is_active=False) users are excluded from the seat count."""
account = await _make_account(test_db)
sub = await _make_subscription(test_db, account, seat_limit=3)
# 1 active, 2 inactive
await _make_user(test_db, account, account_role="engineer", is_active=True)
await _make_user(test_db, account, account_role="engineer", is_active=False)
await _make_user(test_db, account, account_role="engineer", is_active=False)
result = await check_seat_available(account, sub, "engineer", test_db)
assert result.current == 1
assert result.available is True