Files
resolutionflow/backend/tests/test_invite_seat_enforcement.py
Michael Chihlas 8010da8745 fix(l1): T8 review fixes — oauth status const + bulk-invite structured error
- oauth.py: use status.HTTP_402_PAYMENT_REQUIRED constant (was raw 402)
- accounts.py bulk-invite: catch HTTPException separately to preserve
  structured detail dict in failed-row error (was stringified repr,
  unparseable by clients)
- Add bulk-invite per-row 402 test verifying structured error preserved

T8 code review identified these as Important issues. Functional change is
the bulk-invite fix; clients can now parse seat-limit errors from bulk
responses. 13/13 seat-enforcement tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-28 12:58:35 -04:00

397 lines
16 KiB
Python

"""Integration tests for seat enforcement at invite create, accept-invite, and
role-change endpoints.
All tests use the `client` + `test_db` fixtures from conftest, which spin up
a fresh schema per test and wire the ASGI app to the test DB.
"""
import uuid
import pytest
from httpx import AsyncClient
from sqlalchemy import delete
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.account import Account
from app.models.account_invite import AccountInvite
from app.models.subscription import Subscription
from app.models.user import User
# ---------------------------------------------------------------------------
# Test-local helpers
# ---------------------------------------------------------------------------
async def _register(client: AsyncClient, *, email: str, password: str = "TestPassword123!", name: str = "Test User") -> dict:
resp = await client.post("/api/v1/auth/register", json={"email": email, "password": password, "name": name})
assert resp.status_code in (200, 201), resp.text
return resp.json()
async def _login(client: AsyncClient, *, email: str, password: str = "TestPassword123!") -> dict:
resp = await client.post("/api/v1/auth/login/json", json={"email": email, "password": password})
assert resp.status_code == 200, resp.text
return {"Authorization": f"Bearer {resp.json()['access_token']}"}
async def _set_sub(db: AsyncSession, account_id: uuid.UUID, *, seat_limit: int | None, l1_seat_limit: int | None = None) -> None:
"""Replace the account's subscription with specified limits."""
await db.execute(delete(Subscription).where(Subscription.account_id == account_id))
db.add(Subscription(
account_id=account_id,
plan="pro",
status="active",
seat_limit=seat_limit,
l1_seat_limit=l1_seat_limit,
))
await db.commit()
async def _add_member(db: AsyncSession, account_id: uuid.UUID, *, role: str, suffix: str | None = None) -> User:
"""Directly insert an active user with the given role into the account."""
s = suffix or str(uuid.uuid4())[:8]
user = User(
id=uuid.uuid4(),
email=f"member-{s}@example.com",
name=f"Member {s}",
account_id=account_id,
account_role=role,
role="engineer",
is_active=True,
)
db.add(user)
await db.commit()
return user
# ---------------------------------------------------------------------------
# Invite create — single invite endpoint
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_invite_engineer_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 402 when engineer seat limit is exhausted."""
owner = await _register(client, email="owner1@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner1@example.com")
# seat_limit=1, already 1 engineer → full
await _set_sub(test_db, account_id, seat_limit=1)
# The owner registers as engineer, but is actually 'owner' role — add a separate engineer
await _add_member(test_db, account_id, role="engineer")
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "new-eng@example.com", "role": "engineer"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
assert body["detail"]["role"] == "engineer"
assert body["detail"]["current"] == 1
assert body["detail"]["limit"] == 1
assert "upgrade_url" in body["detail"]
@pytest.mark.asyncio
async def test_invite_l1_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 402 when l1_tech seat limit is exhausted."""
owner = await _register(client, email="owner2@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner2@example.com")
await _set_sub(test_db, account_id, seat_limit=10, l1_seat_limit=1)
await _add_member(test_db, account_id, role="l1_tech")
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "new-l1@example.com", "role": "l1_tech"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
assert body["detail"]["role"] == "l1_tech"
assert body["detail"]["current"] == 1
assert body["detail"]["limit"] == 1
@pytest.mark.asyncio
async def test_invite_succeeds_when_seats_available(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 201 when engineer seats have room."""
owner = await _register(client, email="owner3@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner3@example.com")
# seat_limit=5, 0 engineers → plenty of room
await _set_sub(test_db, account_id, seat_limit=5)
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "new-eng2@example.com", "role": "engineer"},
headers=headers,
)
assert resp.status_code == 201, resp.text
@pytest.mark.asyncio
async def test_invite_viewer_bypasses_seat_check(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 201 for viewer role even when engineer seats full."""
owner = await _register(client, email="owner4@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner4@example.com")
# engineer seats exhausted — should not affect viewer invites
await _set_sub(test_db, account_id, seat_limit=1)
await _add_member(test_db, account_id, role="engineer")
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "viewer@example.com", "role": "viewer"},
headers=headers,
)
assert resp.status_code == 201, resp.text
@pytest.mark.asyncio
async def test_invite_unlimited_seat_limit_always_succeeds(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 201 when seat_limit is None (unlimited)."""
owner = await _register(client, email="owner5@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner5@example.com")
# seat_limit=None = unlimited
await _set_sub(test_db, account_id, seat_limit=None)
# add many engineers
for i in range(5):
await _add_member(test_db, account_id, role="engineer", suffix=f"bulk{i}")
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "new-unlimited@example.com", "role": "engineer"},
headers=headers,
)
assert resp.status_code == 201, resp.text
@pytest.mark.asyncio
async def test_bulk_invite_per_row_402_preserves_structured_detail(client: AsyncClient, test_db: AsyncSession):
"""Bulk invite returns 200 overall; rows that hit the seat limit appear in the
`failed` list with structured detail (not a stringified repr)."""
owner = await _register(client, email="owner_bulk@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner_bulk@example.com")
# seat_limit=1, already 1 engineer → next engineer invite fails
await _set_sub(test_db, account_id, seat_limit=1)
await _add_member(test_db, account_id, role="engineer")
resp = await client.post(
"/api/v1/accounts/me/invites/bulk",
json={"invites": [
{"email": "viewer-ok@example.com", "role": "viewer"},
{"email": "eng-blocked@example.com", "role": "engineer"},
]},
headers=headers,
)
assert resp.status_code in (200, 201), resp.text
body = resp.json()
assert len(body["created"]) == 1
assert body["created"][0]["email"] == "viewer-ok@example.com"
assert len(body["failed"]) == 1
failed_row = body["failed"][0]
assert failed_row["email"] == "eng-blocked@example.com"
# Structured detail preserved (dict, not repr string)
assert isinstance(failed_row["error"], dict)
assert failed_row["error"]["code"] == "seat_limit_exceeded"
assert failed_row["error"]["role"] == "engineer"
@pytest.mark.asyncio
async def test_invite_grandfathered_account_blocks_new_invites(client: AsyncClient, test_db: AsyncSession):
"""Grandfathering: existing over-seated account keeps existing users but
new engineer invites are still blocked (current > limit → blocked)."""
owner = await _register(client, email="owner6@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner6@example.com")
# current=3 engineers > seat_limit=2 (over-seated / grandfathered)
await _set_sub(test_db, account_id, seat_limit=2)
for i in range(3):
await _add_member(test_db, account_id, role="engineer", suffix=f"gf{i}")
# New invite must be blocked
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "one-more@example.com", "role": "engineer"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
# current (3) > limit (2) — forward enforcement fires, existing users unaffected
assert body["detail"]["current"] == 3
assert body["detail"]["limit"] == 2
# ---------------------------------------------------------------------------
# Accept-invite race condition — auth.py register path
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_accept_invite_blocked_when_seats_full_at_accept_time(client: AsyncClient, test_db: AsyncSession):
"""Race-condition guard: invite created when seats available, but by
accept time someone else consumed the last seat → 402."""
# Step 1: create an owner and send an invite
owner = await _register(client, email="owner7@example.com")
account_id = uuid.UUID(owner["account_id"])
owner_headers = await _login(client, email="owner7@example.com")
await _set_sub(test_db, account_id, seat_limit=2)
invite_resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "race@example.com", "role": "engineer"},
headers=owner_headers,
)
assert invite_resp.status_code == 201, invite_resp.text
invite_code = invite_resp.json()["code"]
# Step 2: fill the seats after the invite was created (race condition)
await _add_member(test_db, account_id, role="engineer", suffix="race1")
await _add_member(test_db, account_id, role="engineer", suffix="race2")
# Step 3: invitee tries to register — should get 402
resp = await client.post(
"/api/v1/auth/register",
json={
"email": "race@example.com",
"password": "TestPassword123!",
"name": "Race User",
"account_invite_code": invite_code,
},
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
@pytest.mark.asyncio
async def test_accept_invite_succeeds_when_seats_available(client: AsyncClient, test_db: AsyncSession):
"""Normal accept-invite path works when seats have room."""
owner = await _register(client, email="owner8@example.com")
account_id = uuid.UUID(owner["account_id"])
owner_headers = await _login(client, email="owner8@example.com")
await _set_sub(test_db, account_id, seat_limit=5)
invite_resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "acceptme@example.com", "role": "engineer"},
headers=owner_headers,
)
assert invite_resp.status_code == 201, invite_resp.text
invite_code = invite_resp.json()["code"]
resp = await client.post(
"/api/v1/auth/register",
json={
"email": "acceptme@example.com",
"password": "TestPassword123!",
"name": "Accept User",
"account_invite_code": invite_code,
},
)
assert resp.status_code in (200, 201), resp.text
assert resp.json()["account_id"] == str(account_id)
# ---------------------------------------------------------------------------
# Role-change endpoint — PATCH /me/members/{user_id}/role
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_role_change_viewer_to_engineer_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
"""PATCH /me/members/{id}/role → 402 when promoting viewer → engineer and seats full."""
owner = await _register(client, email="owner9@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner9@example.com")
await _set_sub(test_db, account_id, seat_limit=1)
# Fill the engineer seat
await _add_member(test_db, account_id, role="engineer")
# Add a viewer to promote
viewer = await _add_member(test_db, account_id, role="viewer")
resp = await client.patch(
f"/api/v1/accounts/me/members/{viewer.id}/role",
json={"account_role": "engineer"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
assert body["detail"]["role"] == "engineer"
@pytest.mark.asyncio
async def test_role_change_viewer_to_l1_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
"""PATCH /me/members/{id}/role → 402 when promoting viewer → l1_tech and l1 seats full."""
owner = await _register(client, email="owner10@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner10@example.com")
await _set_sub(test_db, account_id, seat_limit=10, l1_seat_limit=1)
await _add_member(test_db, account_id, role="l1_tech")
viewer = await _add_member(test_db, account_id, role="viewer")
resp = await client.patch(
f"/api/v1/accounts/me/members/{viewer.id}/role",
json={"account_role": "l1_tech"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
assert body["detail"]["role"] == "l1_tech"
@pytest.mark.asyncio
async def test_role_change_promotion_succeeds_when_seats_available(client: AsyncClient, test_db: AsyncSession):
"""PATCH /me/members/{id}/role → 200 when seats are available."""
owner = await _register(client, email="owner11@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner11@example.com")
await _set_sub(test_db, account_id, seat_limit=5)
viewer = await _add_member(test_db, account_id, role="viewer")
resp = await client.patch(
f"/api/v1/accounts/me/members/{viewer.id}/role",
json={"account_role": "engineer"},
headers=headers,
)
assert resp.status_code == 200, resp.text
assert resp.json()["account_role"] == "engineer"
@pytest.mark.asyncio
async def test_role_change_demotion_bypasses_seat_check(client: AsyncClient, test_db: AsyncSession):
"""PATCH /me/members/{id}/role → 200 for demotions even when seats full."""
owner = await _register(client, email="owner12@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner12@example.com")
# Seats full — but demotion should still succeed
await _set_sub(test_db, account_id, seat_limit=1)
engineer = await _add_member(test_db, account_id, role="engineer")
resp = await client.patch(
f"/api/v1/accounts/me/members/{engineer.id}/role",
json={"account_role": "viewer"},
headers=headers,
)
assert resp.status_code == 200, resp.text
assert resp.json()["account_role"] == "viewer"