Files
resolutionflow/backend/tests/test_invite_seat_enforcement.py
Michael Chihlas e15897c76f feat(l1): PATCH /accounts/me/members/{id}/coverage for engineer L1-coverage flag
Owner-only endpoint to toggle can_cover_l1 on an engineer user. 422 if target
role is not engineer (owners/super_admins already see L1 surface; viewers/
l1_techs don't need this flag). 404 for cross-account targets.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-28 13:07:09 -04:00

565 lines
22 KiB
Python

"""Integration tests for seat enforcement at invite create, accept-invite, and
role-change endpoints.
All tests use the `client` + `test_db` fixtures from conftest, which spin up
a fresh schema per test and wire the ASGI app to the test DB.
"""
import uuid
import pytest
from httpx import AsyncClient
from sqlalchemy import delete
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.account import Account
from app.models.account_invite import AccountInvite
from app.models.subscription import Subscription
from app.models.user import User
# ---------------------------------------------------------------------------
# Test-local helpers
# ---------------------------------------------------------------------------
async def _register(client: AsyncClient, *, email: str, password: str = "TestPassword123!", name: str = "Test User") -> dict:
resp = await client.post("/api/v1/auth/register", json={"email": email, "password": password, "name": name})
assert resp.status_code in (200, 201), resp.text
return resp.json()
async def _login(client: AsyncClient, *, email: str, password: str = "TestPassword123!") -> dict:
resp = await client.post("/api/v1/auth/login/json", json={"email": email, "password": password})
assert resp.status_code == 200, resp.text
return {"Authorization": f"Bearer {resp.json()['access_token']}"}
async def _set_sub(db: AsyncSession, account_id: uuid.UUID, *, seat_limit: int | None, l1_seat_limit: int | None = None) -> None:
"""Replace the account's subscription with specified limits."""
await db.execute(delete(Subscription).where(Subscription.account_id == account_id))
db.add(Subscription(
account_id=account_id,
plan="pro",
status="active",
seat_limit=seat_limit,
l1_seat_limit=l1_seat_limit,
))
await db.commit()
async def _add_member(db: AsyncSession, account_id: uuid.UUID, *, role: str, suffix: str | None = None) -> User:
"""Directly insert an active user with the given role into the account."""
s = suffix or str(uuid.uuid4())[:8]
user = User(
id=uuid.uuid4(),
email=f"member-{s}@example.com",
name=f"Member {s}",
account_id=account_id,
account_role=role,
role="engineer",
is_active=True,
)
db.add(user)
await db.commit()
return user
# ---------------------------------------------------------------------------
# Invite create — single invite endpoint
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_invite_engineer_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 402 when engineer seat limit is exhausted."""
owner = await _register(client, email="owner1@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner1@example.com")
# seat_limit=1, already 1 engineer → full
await _set_sub(test_db, account_id, seat_limit=1)
# The owner registers as engineer, but is actually 'owner' role — add a separate engineer
await _add_member(test_db, account_id, role="engineer")
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "new-eng@example.com", "role": "engineer"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
assert body["detail"]["role"] == "engineer"
assert body["detail"]["current"] == 1
assert body["detail"]["limit"] == 1
assert "upgrade_url" in body["detail"]
@pytest.mark.asyncio
async def test_invite_l1_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 402 when l1_tech seat limit is exhausted."""
owner = await _register(client, email="owner2@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner2@example.com")
await _set_sub(test_db, account_id, seat_limit=10, l1_seat_limit=1)
await _add_member(test_db, account_id, role="l1_tech")
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "new-l1@example.com", "role": "l1_tech"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
assert body["detail"]["role"] == "l1_tech"
assert body["detail"]["current"] == 1
assert body["detail"]["limit"] == 1
@pytest.mark.asyncio
async def test_invite_succeeds_when_seats_available(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 201 when engineer seats have room."""
owner = await _register(client, email="owner3@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner3@example.com")
# seat_limit=5, 0 engineers → plenty of room
await _set_sub(test_db, account_id, seat_limit=5)
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "new-eng2@example.com", "role": "engineer"},
headers=headers,
)
assert resp.status_code == 201, resp.text
@pytest.mark.asyncio
async def test_invite_viewer_bypasses_seat_check(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 201 for viewer role even when engineer seats full."""
owner = await _register(client, email="owner4@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner4@example.com")
# engineer seats exhausted — should not affect viewer invites
await _set_sub(test_db, account_id, seat_limit=1)
await _add_member(test_db, account_id, role="engineer")
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "viewer@example.com", "role": "viewer"},
headers=headers,
)
assert resp.status_code == 201, resp.text
@pytest.mark.asyncio
async def test_invite_unlimited_seat_limit_always_succeeds(client: AsyncClient, test_db: AsyncSession):
"""POST /me/invites → 201 when seat_limit is None (unlimited)."""
owner = await _register(client, email="owner5@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner5@example.com")
# seat_limit=None = unlimited
await _set_sub(test_db, account_id, seat_limit=None)
# add many engineers
for i in range(5):
await _add_member(test_db, account_id, role="engineer", suffix=f"bulk{i}")
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "new-unlimited@example.com", "role": "engineer"},
headers=headers,
)
assert resp.status_code == 201, resp.text
@pytest.mark.asyncio
async def test_bulk_invite_per_row_402_preserves_structured_detail(client: AsyncClient, test_db: AsyncSession):
"""Bulk invite returns 200 overall; rows that hit the seat limit appear in the
`failed` list with structured detail (not a stringified repr)."""
owner = await _register(client, email="owner_bulk@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner_bulk@example.com")
# seat_limit=1, already 1 engineer → next engineer invite fails
await _set_sub(test_db, account_id, seat_limit=1)
await _add_member(test_db, account_id, role="engineer")
resp = await client.post(
"/api/v1/accounts/me/invites/bulk",
json={"invites": [
{"email": "viewer-ok@example.com", "role": "viewer"},
{"email": "eng-blocked@example.com", "role": "engineer"},
]},
headers=headers,
)
assert resp.status_code in (200, 201), resp.text
body = resp.json()
assert len(body["created"]) == 1
assert body["created"][0]["email"] == "viewer-ok@example.com"
assert len(body["failed"]) == 1
failed_row = body["failed"][0]
assert failed_row["email"] == "eng-blocked@example.com"
# Structured detail preserved (dict, not repr string)
assert isinstance(failed_row["error"], dict)
assert failed_row["error"]["code"] == "seat_limit_exceeded"
assert failed_row["error"]["role"] == "engineer"
@pytest.mark.asyncio
async def test_invite_grandfathered_account_blocks_new_invites(client: AsyncClient, test_db: AsyncSession):
"""Grandfathering: existing over-seated account keeps existing users but
new engineer invites are still blocked (current > limit → blocked)."""
owner = await _register(client, email="owner6@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner6@example.com")
# current=3 engineers > seat_limit=2 (over-seated / grandfathered)
await _set_sub(test_db, account_id, seat_limit=2)
for i in range(3):
await _add_member(test_db, account_id, role="engineer", suffix=f"gf{i}")
# New invite must be blocked
resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "one-more@example.com", "role": "engineer"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
# current (3) > limit (2) — forward enforcement fires, existing users unaffected
assert body["detail"]["current"] == 3
assert body["detail"]["limit"] == 2
# ---------------------------------------------------------------------------
# Accept-invite race condition — auth.py register path
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_accept_invite_blocked_when_seats_full_at_accept_time(client: AsyncClient, test_db: AsyncSession):
"""Race-condition guard: invite created when seats available, but by
accept time someone else consumed the last seat → 402."""
# Step 1: create an owner and send an invite
owner = await _register(client, email="owner7@example.com")
account_id = uuid.UUID(owner["account_id"])
owner_headers = await _login(client, email="owner7@example.com")
await _set_sub(test_db, account_id, seat_limit=2)
invite_resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "race@example.com", "role": "engineer"},
headers=owner_headers,
)
assert invite_resp.status_code == 201, invite_resp.text
invite_code = invite_resp.json()["code"]
# Step 2: fill the seats after the invite was created (race condition)
await _add_member(test_db, account_id, role="engineer", suffix="race1")
await _add_member(test_db, account_id, role="engineer", suffix="race2")
# Step 3: invitee tries to register — should get 402
resp = await client.post(
"/api/v1/auth/register",
json={
"email": "race@example.com",
"password": "TestPassword123!",
"name": "Race User",
"account_invite_code": invite_code,
},
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
@pytest.mark.asyncio
async def test_accept_invite_succeeds_when_seats_available(client: AsyncClient, test_db: AsyncSession):
"""Normal accept-invite path works when seats have room."""
owner = await _register(client, email="owner8@example.com")
account_id = uuid.UUID(owner["account_id"])
owner_headers = await _login(client, email="owner8@example.com")
await _set_sub(test_db, account_id, seat_limit=5)
invite_resp = await client.post(
"/api/v1/accounts/me/invites",
json={"email": "acceptme@example.com", "role": "engineer"},
headers=owner_headers,
)
assert invite_resp.status_code == 201, invite_resp.text
invite_code = invite_resp.json()["code"]
resp = await client.post(
"/api/v1/auth/register",
json={
"email": "acceptme@example.com",
"password": "TestPassword123!",
"name": "Accept User",
"account_invite_code": invite_code,
},
)
assert resp.status_code in (200, 201), resp.text
assert resp.json()["account_id"] == str(account_id)
# ---------------------------------------------------------------------------
# Role-change endpoint — PATCH /me/members/{user_id}/role
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_role_change_viewer_to_engineer_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
"""PATCH /me/members/{id}/role → 402 when promoting viewer → engineer and seats full."""
owner = await _register(client, email="owner9@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner9@example.com")
await _set_sub(test_db, account_id, seat_limit=1)
# Fill the engineer seat
await _add_member(test_db, account_id, role="engineer")
# Add a viewer to promote
viewer = await _add_member(test_db, account_id, role="viewer")
resp = await client.patch(
f"/api/v1/accounts/me/members/{viewer.id}/role",
json={"account_role": "engineer"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
assert body["detail"]["role"] == "engineer"
@pytest.mark.asyncio
async def test_role_change_viewer_to_l1_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
"""PATCH /me/members/{id}/role → 402 when promoting viewer → l1_tech and l1 seats full."""
owner = await _register(client, email="owner10@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner10@example.com")
await _set_sub(test_db, account_id, seat_limit=10, l1_seat_limit=1)
await _add_member(test_db, account_id, role="l1_tech")
viewer = await _add_member(test_db, account_id, role="viewer")
resp = await client.patch(
f"/api/v1/accounts/me/members/{viewer.id}/role",
json={"account_role": "l1_tech"},
headers=headers,
)
assert resp.status_code == 402, resp.text
body = resp.json()
assert body["detail"]["code"] == "seat_limit_exceeded"
assert body["detail"]["role"] == "l1_tech"
@pytest.mark.asyncio
async def test_role_change_promotion_succeeds_when_seats_available(client: AsyncClient, test_db: AsyncSession):
"""PATCH /me/members/{id}/role → 200 when seats are available."""
owner = await _register(client, email="owner11@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner11@example.com")
await _set_sub(test_db, account_id, seat_limit=5)
viewer = await _add_member(test_db, account_id, role="viewer")
resp = await client.patch(
f"/api/v1/accounts/me/members/{viewer.id}/role",
json={"account_role": "engineer"},
headers=headers,
)
assert resp.status_code == 200, resp.text
assert resp.json()["account_role"] == "engineer"
@pytest.mark.asyncio
async def test_role_change_demotion_bypasses_seat_check(client: AsyncClient, test_db: AsyncSession):
"""PATCH /me/members/{id}/role → 200 for demotions even when seats full."""
owner = await _register(client, email="owner12@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner12@example.com")
# Seats full — but demotion should still succeed
await _set_sub(test_db, account_id, seat_limit=1)
engineer = await _add_member(test_db, account_id, role="engineer")
resp = await client.patch(
f"/api/v1/accounts/me/members/{engineer.id}/role",
json={"account_role": "viewer"},
headers=headers,
)
assert resp.status_code == 200, resp.text
assert resp.json()["account_role"] == "viewer"
# ---------------------------------------------------------------------------
# GET /me/seats — seat counter widget endpoint
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_seats_returns_both_role_counts(client: AsyncClient, test_db: AsyncSession):
"""GET /accounts/me/seats returns engineer + l1_tech seat usage."""
owner = await _register(client, email="owner_seats@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner_seats@example.com")
await _set_sub(test_db, account_id, seat_limit=5, l1_seat_limit=3)
# Add 2 engineers and 1 l1_tech as members
for i in range(2):
await _add_member(test_db, account_id, role="engineer", suffix=f"e{i}")
await _add_member(test_db, account_id, role="l1_tech", suffix="l1")
resp = await client.get("/api/v1/accounts/me/seats", headers=headers)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["engineer"]["role"] == "engineer"
assert body["engineer"]["current"] == 2
assert body["engineer"]["limit"] == 5
assert body["engineer"]["available"] is True
assert body["l1_tech"]["role"] == "l1_tech"
assert body["l1_tech"]["current"] == 1
assert body["l1_tech"]["limit"] == 3
assert body["l1_tech"]["available"] is True
@pytest.mark.asyncio
async def test_get_seats_blocked_for_viewer(client: AsyncClient, test_db: AsyncSession):
"""GET /accounts/me/seats → 403 for viewer role (engineer+ required)."""
from app.core.security import get_password_hash
# Register an owner for the account
owner = await _register(client, email="owner_seats2@example.com")
account_id = uuid.UUID(owner["account_id"])
await _set_sub(test_db, account_id, seat_limit=5, l1_seat_limit=3)
# Create a viewer user with a known password directly in the DB
viewer_password = "ViewerPass123!"
viewer = User(
id=uuid.uuid4(),
email="viewer_seats@example.com",
name="Viewer Seats",
account_id=account_id,
account_role="viewer",
role="engineer", # system role field (default)
is_active=True,
password_hash=get_password_hash(viewer_password),
)
test_db.add(viewer)
await test_db.commit()
# Log in as the viewer
viewer_headers = await _login(client, email="viewer_seats@example.com", password=viewer_password)
resp = await client.get("/api/v1/accounts/me/seats", headers=viewer_headers)
assert resp.status_code == 403, resp.text
# ---------------------------------------------------------------------------
# PATCH /me/members/{user_id}/coverage — engineer L1-coverage flag
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_coverage_owner_can_toggle_engineer(client: AsyncClient, test_db: AsyncSession):
"""Owner can set can_cover_l1=True on an engineer; response reflects new value."""
owner = await _register(client, email="owner_cov1@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner_cov1@example.com")
engineer = await _add_member(test_db, account_id, role="engineer", suffix="cov1")
resp = await client.patch(
f"/api/v1/accounts/me/members/{engineer.id}/coverage",
json={"can_cover_l1": True},
headers=headers,
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["can_cover_l1"] is True
# Toggle back to False
resp2 = await client.patch(
f"/api/v1/accounts/me/members/{engineer.id}/coverage",
json={"can_cover_l1": False},
headers=headers,
)
assert resp2.status_code == 200, resp2.text
assert resp2.json()["can_cover_l1"] is False
@pytest.mark.asyncio
async def test_coverage_non_owner_is_forbidden(client: AsyncClient, test_db: AsyncSession):
"""A non-owner engineer cannot toggle coverage on themselves or others."""
from app.core.security import get_password_hash
owner = await _register(client, email="owner_cov2@example.com")
account_id = uuid.UUID(owner["account_id"])
# Create an engineer with a known password
eng_password = "EngPass123!"
engineer = User(
id=uuid.uuid4(),
email="eng_cov2@example.com",
name="Eng Cov2",
account_id=account_id,
account_role="engineer",
role="engineer",
is_active=True,
password_hash=get_password_hash(eng_password),
)
test_db.add(engineer)
await test_db.commit()
eng_headers = await _login(client, email="eng_cov2@example.com", password=eng_password)
resp = await client.patch(
f"/api/v1/accounts/me/members/{engineer.id}/coverage",
json={"can_cover_l1": True},
headers=eng_headers,
)
assert resp.status_code == 403, resp.text
@pytest.mark.asyncio
async def test_coverage_viewer_role_returns_422(client: AsyncClient, test_db: AsyncSession):
"""PATCH coverage on a viewer → 422 (coverage flag only applies to engineers)."""
owner = await _register(client, email="owner_cov3@example.com")
account_id = uuid.UUID(owner["account_id"])
headers = await _login(client, email="owner_cov3@example.com")
viewer = await _add_member(test_db, account_id, role="viewer", suffix="cov3")
resp = await client.patch(
f"/api/v1/accounts/me/members/{viewer.id}/coverage",
json={"can_cover_l1": True},
headers=headers,
)
assert resp.status_code == 422, resp.text
assert "engineer" in resp.json()["detail"].lower()
@pytest.mark.asyncio
async def test_coverage_cross_account_returns_404(client: AsyncClient, test_db: AsyncSession):
"""PATCH coverage on a user from a different account → 404 (tenancy isolation)."""
# Account A
owner_a = await _register(client, email="owner_cov_a@example.com")
account_a_id = uuid.UUID(owner_a["account_id"])
headers_a = await _login(client, email="owner_cov_a@example.com")
# Account B — a separate registration creates a new account
owner_b = await _register(client, email="owner_cov_b@example.com")
account_b_id = uuid.UUID(owner_b["account_id"])
# Add an engineer to account B
engineer_b = await _add_member(test_db, account_b_id, role="engineer", suffix="covb")
# Owner of account A tries to patch account B's engineer — must 404
resp = await client.patch(
f"/api/v1/accounts/me/members/{engineer_b.id}/coverage",
json={"can_cover_l1": True},
headers=headers_a,
)
assert resp.status_code == 404, resp.text