Owner-only endpoint to toggle can_cover_l1 on an engineer user. 422 if target role is not engineer (owners/super_admins already see L1 surface; viewers/ l1_techs don't need this flag). 404 for cross-account targets. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
565 lines
22 KiB
Python
565 lines
22 KiB
Python
"""Integration tests for seat enforcement at invite create, accept-invite, and
|
|
role-change endpoints.
|
|
|
|
All tests use the `client` + `test_db` fixtures from conftest, which spin up
|
|
a fresh schema per test and wire the ASGI app to the test DB.
|
|
"""
|
|
|
|
import uuid
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
from sqlalchemy import delete
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.account import Account
|
|
from app.models.account_invite import AccountInvite
|
|
from app.models.subscription import Subscription
|
|
from app.models.user import User
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test-local helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _register(client: AsyncClient, *, email: str, password: str = "TestPassword123!", name: str = "Test User") -> dict:
|
|
resp = await client.post("/api/v1/auth/register", json={"email": email, "password": password, "name": name})
|
|
assert resp.status_code in (200, 201), resp.text
|
|
return resp.json()
|
|
|
|
|
|
async def _login(client: AsyncClient, *, email: str, password: str = "TestPassword123!") -> dict:
|
|
resp = await client.post("/api/v1/auth/login/json", json={"email": email, "password": password})
|
|
assert resp.status_code == 200, resp.text
|
|
return {"Authorization": f"Bearer {resp.json()['access_token']}"}
|
|
|
|
|
|
async def _set_sub(db: AsyncSession, account_id: uuid.UUID, *, seat_limit: int | None, l1_seat_limit: int | None = None) -> None:
|
|
"""Replace the account's subscription with specified limits."""
|
|
await db.execute(delete(Subscription).where(Subscription.account_id == account_id))
|
|
db.add(Subscription(
|
|
account_id=account_id,
|
|
plan="pro",
|
|
status="active",
|
|
seat_limit=seat_limit,
|
|
l1_seat_limit=l1_seat_limit,
|
|
))
|
|
await db.commit()
|
|
|
|
|
|
async def _add_member(db: AsyncSession, account_id: uuid.UUID, *, role: str, suffix: str | None = None) -> User:
|
|
"""Directly insert an active user with the given role into the account."""
|
|
s = suffix or str(uuid.uuid4())[:8]
|
|
user = User(
|
|
id=uuid.uuid4(),
|
|
email=f"member-{s}@example.com",
|
|
name=f"Member {s}",
|
|
account_id=account_id,
|
|
account_role=role,
|
|
role="engineer",
|
|
is_active=True,
|
|
)
|
|
db.add(user)
|
|
await db.commit()
|
|
return user
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Invite create — single invite endpoint
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invite_engineer_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
|
|
"""POST /me/invites → 402 when engineer seat limit is exhausted."""
|
|
owner = await _register(client, email="owner1@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner1@example.com")
|
|
|
|
# seat_limit=1, already 1 engineer → full
|
|
await _set_sub(test_db, account_id, seat_limit=1)
|
|
# The owner registers as engineer, but is actually 'owner' role — add a separate engineer
|
|
await _add_member(test_db, account_id, role="engineer")
|
|
|
|
resp = await client.post(
|
|
"/api/v1/accounts/me/invites",
|
|
json={"email": "new-eng@example.com", "role": "engineer"},
|
|
headers=headers,
|
|
)
|
|
assert resp.status_code == 402, resp.text
|
|
body = resp.json()
|
|
assert body["detail"]["code"] == "seat_limit_exceeded"
|
|
assert body["detail"]["role"] == "engineer"
|
|
assert body["detail"]["current"] == 1
|
|
assert body["detail"]["limit"] == 1
|
|
assert "upgrade_url" in body["detail"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invite_l1_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
|
|
"""POST /me/invites → 402 when l1_tech seat limit is exhausted."""
|
|
owner = await _register(client, email="owner2@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner2@example.com")
|
|
|
|
await _set_sub(test_db, account_id, seat_limit=10, l1_seat_limit=1)
|
|
await _add_member(test_db, account_id, role="l1_tech")
|
|
|
|
resp = await client.post(
|
|
"/api/v1/accounts/me/invites",
|
|
json={"email": "new-l1@example.com", "role": "l1_tech"},
|
|
headers=headers,
|
|
)
|
|
assert resp.status_code == 402, resp.text
|
|
body = resp.json()
|
|
assert body["detail"]["code"] == "seat_limit_exceeded"
|
|
assert body["detail"]["role"] == "l1_tech"
|
|
assert body["detail"]["current"] == 1
|
|
assert body["detail"]["limit"] == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invite_succeeds_when_seats_available(client: AsyncClient, test_db: AsyncSession):
|
|
"""POST /me/invites → 201 when engineer seats have room."""
|
|
owner = await _register(client, email="owner3@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner3@example.com")
|
|
|
|
# seat_limit=5, 0 engineers → plenty of room
|
|
await _set_sub(test_db, account_id, seat_limit=5)
|
|
|
|
resp = await client.post(
|
|
"/api/v1/accounts/me/invites",
|
|
json={"email": "new-eng2@example.com", "role": "engineer"},
|
|
headers=headers,
|
|
)
|
|
assert resp.status_code == 201, resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invite_viewer_bypasses_seat_check(client: AsyncClient, test_db: AsyncSession):
|
|
"""POST /me/invites → 201 for viewer role even when engineer seats full."""
|
|
owner = await _register(client, email="owner4@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner4@example.com")
|
|
|
|
# engineer seats exhausted — should not affect viewer invites
|
|
await _set_sub(test_db, account_id, seat_limit=1)
|
|
await _add_member(test_db, account_id, role="engineer")
|
|
|
|
resp = await client.post(
|
|
"/api/v1/accounts/me/invites",
|
|
json={"email": "viewer@example.com", "role": "viewer"},
|
|
headers=headers,
|
|
)
|
|
assert resp.status_code == 201, resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invite_unlimited_seat_limit_always_succeeds(client: AsyncClient, test_db: AsyncSession):
|
|
"""POST /me/invites → 201 when seat_limit is None (unlimited)."""
|
|
owner = await _register(client, email="owner5@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner5@example.com")
|
|
|
|
# seat_limit=None = unlimited
|
|
await _set_sub(test_db, account_id, seat_limit=None)
|
|
# add many engineers
|
|
for i in range(5):
|
|
await _add_member(test_db, account_id, role="engineer", suffix=f"bulk{i}")
|
|
|
|
resp = await client.post(
|
|
"/api/v1/accounts/me/invites",
|
|
json={"email": "new-unlimited@example.com", "role": "engineer"},
|
|
headers=headers,
|
|
)
|
|
assert resp.status_code == 201, resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bulk_invite_per_row_402_preserves_structured_detail(client: AsyncClient, test_db: AsyncSession):
|
|
"""Bulk invite returns 200 overall; rows that hit the seat limit appear in the
|
|
`failed` list with structured detail (not a stringified repr)."""
|
|
owner = await _register(client, email="owner_bulk@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner_bulk@example.com")
|
|
|
|
# seat_limit=1, already 1 engineer → next engineer invite fails
|
|
await _set_sub(test_db, account_id, seat_limit=1)
|
|
await _add_member(test_db, account_id, role="engineer")
|
|
|
|
resp = await client.post(
|
|
"/api/v1/accounts/me/invites/bulk",
|
|
json={"invites": [
|
|
{"email": "viewer-ok@example.com", "role": "viewer"},
|
|
{"email": "eng-blocked@example.com", "role": "engineer"},
|
|
]},
|
|
headers=headers,
|
|
)
|
|
assert resp.status_code in (200, 201), resp.text
|
|
body = resp.json()
|
|
assert len(body["created"]) == 1
|
|
assert body["created"][0]["email"] == "viewer-ok@example.com"
|
|
assert len(body["failed"]) == 1
|
|
failed_row = body["failed"][0]
|
|
assert failed_row["email"] == "eng-blocked@example.com"
|
|
# Structured detail preserved (dict, not repr string)
|
|
assert isinstance(failed_row["error"], dict)
|
|
assert failed_row["error"]["code"] == "seat_limit_exceeded"
|
|
assert failed_row["error"]["role"] == "engineer"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invite_grandfathered_account_blocks_new_invites(client: AsyncClient, test_db: AsyncSession):
|
|
"""Grandfathering: existing over-seated account keeps existing users but
|
|
new engineer invites are still blocked (current > limit → blocked)."""
|
|
owner = await _register(client, email="owner6@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner6@example.com")
|
|
|
|
# current=3 engineers > seat_limit=2 (over-seated / grandfathered)
|
|
await _set_sub(test_db, account_id, seat_limit=2)
|
|
for i in range(3):
|
|
await _add_member(test_db, account_id, role="engineer", suffix=f"gf{i}")
|
|
|
|
# New invite must be blocked
|
|
resp = await client.post(
|
|
"/api/v1/accounts/me/invites",
|
|
json={"email": "one-more@example.com", "role": "engineer"},
|
|
headers=headers,
|
|
)
|
|
assert resp.status_code == 402, resp.text
|
|
body = resp.json()
|
|
assert body["detail"]["code"] == "seat_limit_exceeded"
|
|
# current (3) > limit (2) — forward enforcement fires, existing users unaffected
|
|
assert body["detail"]["current"] == 3
|
|
assert body["detail"]["limit"] == 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Accept-invite race condition — auth.py register path
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_accept_invite_blocked_when_seats_full_at_accept_time(client: AsyncClient, test_db: AsyncSession):
|
|
"""Race-condition guard: invite created when seats available, but by
|
|
accept time someone else consumed the last seat → 402."""
|
|
# Step 1: create an owner and send an invite
|
|
owner = await _register(client, email="owner7@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
owner_headers = await _login(client, email="owner7@example.com")
|
|
|
|
await _set_sub(test_db, account_id, seat_limit=2)
|
|
|
|
invite_resp = await client.post(
|
|
"/api/v1/accounts/me/invites",
|
|
json={"email": "race@example.com", "role": "engineer"},
|
|
headers=owner_headers,
|
|
)
|
|
assert invite_resp.status_code == 201, invite_resp.text
|
|
invite_code = invite_resp.json()["code"]
|
|
|
|
# Step 2: fill the seats after the invite was created (race condition)
|
|
await _add_member(test_db, account_id, role="engineer", suffix="race1")
|
|
await _add_member(test_db, account_id, role="engineer", suffix="race2")
|
|
|
|
# Step 3: invitee tries to register — should get 402
|
|
resp = await client.post(
|
|
"/api/v1/auth/register",
|
|
json={
|
|
"email": "race@example.com",
|
|
"password": "TestPassword123!",
|
|
"name": "Race User",
|
|
"account_invite_code": invite_code,
|
|
},
|
|
)
|
|
assert resp.status_code == 402, resp.text
|
|
body = resp.json()
|
|
assert body["detail"]["code"] == "seat_limit_exceeded"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_accept_invite_succeeds_when_seats_available(client: AsyncClient, test_db: AsyncSession):
|
|
"""Normal accept-invite path works when seats have room."""
|
|
owner = await _register(client, email="owner8@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
owner_headers = await _login(client, email="owner8@example.com")
|
|
|
|
await _set_sub(test_db, account_id, seat_limit=5)
|
|
|
|
invite_resp = await client.post(
|
|
"/api/v1/accounts/me/invites",
|
|
json={"email": "acceptme@example.com", "role": "engineer"},
|
|
headers=owner_headers,
|
|
)
|
|
assert invite_resp.status_code == 201, invite_resp.text
|
|
invite_code = invite_resp.json()["code"]
|
|
|
|
resp = await client.post(
|
|
"/api/v1/auth/register",
|
|
json={
|
|
"email": "acceptme@example.com",
|
|
"password": "TestPassword123!",
|
|
"name": "Accept User",
|
|
"account_invite_code": invite_code,
|
|
},
|
|
)
|
|
assert resp.status_code in (200, 201), resp.text
|
|
assert resp.json()["account_id"] == str(account_id)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Role-change endpoint — PATCH /me/members/{user_id}/role
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_role_change_viewer_to_engineer_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
|
|
"""PATCH /me/members/{id}/role → 402 when promoting viewer → engineer and seats full."""
|
|
owner = await _register(client, email="owner9@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner9@example.com")
|
|
|
|
await _set_sub(test_db, account_id, seat_limit=1)
|
|
# Fill the engineer seat
|
|
await _add_member(test_db, account_id, role="engineer")
|
|
# Add a viewer to promote
|
|
viewer = await _add_member(test_db, account_id, role="viewer")
|
|
|
|
resp = await client.patch(
|
|
f"/api/v1/accounts/me/members/{viewer.id}/role",
|
|
json={"account_role": "engineer"},
|
|
headers=headers,
|
|
)
|
|
assert resp.status_code == 402, resp.text
|
|
body = resp.json()
|
|
assert body["detail"]["code"] == "seat_limit_exceeded"
|
|
assert body["detail"]["role"] == "engineer"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_role_change_viewer_to_l1_blocked_when_seats_full(client: AsyncClient, test_db: AsyncSession):
|
|
"""PATCH /me/members/{id}/role → 402 when promoting viewer → l1_tech and l1 seats full."""
|
|
owner = await _register(client, email="owner10@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner10@example.com")
|
|
|
|
await _set_sub(test_db, account_id, seat_limit=10, l1_seat_limit=1)
|
|
await _add_member(test_db, account_id, role="l1_tech")
|
|
viewer = await _add_member(test_db, account_id, role="viewer")
|
|
|
|
resp = await client.patch(
|
|
f"/api/v1/accounts/me/members/{viewer.id}/role",
|
|
json={"account_role": "l1_tech"},
|
|
headers=headers,
|
|
)
|
|
assert resp.status_code == 402, resp.text
|
|
body = resp.json()
|
|
assert body["detail"]["code"] == "seat_limit_exceeded"
|
|
assert body["detail"]["role"] == "l1_tech"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_role_change_promotion_succeeds_when_seats_available(client: AsyncClient, test_db: AsyncSession):
|
|
"""PATCH /me/members/{id}/role → 200 when seats are available."""
|
|
owner = await _register(client, email="owner11@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner11@example.com")
|
|
|
|
await _set_sub(test_db, account_id, seat_limit=5)
|
|
viewer = await _add_member(test_db, account_id, role="viewer")
|
|
|
|
resp = await client.patch(
|
|
f"/api/v1/accounts/me/members/{viewer.id}/role",
|
|
json={"account_role": "engineer"},
|
|
headers=headers,
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
assert resp.json()["account_role"] == "engineer"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_role_change_demotion_bypasses_seat_check(client: AsyncClient, test_db: AsyncSession):
|
|
"""PATCH /me/members/{id}/role → 200 for demotions even when seats full."""
|
|
owner = await _register(client, email="owner12@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner12@example.com")
|
|
|
|
# Seats full — but demotion should still succeed
|
|
await _set_sub(test_db, account_id, seat_limit=1)
|
|
engineer = await _add_member(test_db, account_id, role="engineer")
|
|
|
|
resp = await client.patch(
|
|
f"/api/v1/accounts/me/members/{engineer.id}/role",
|
|
json={"account_role": "viewer"},
|
|
headers=headers,
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
assert resp.json()["account_role"] == "viewer"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /me/seats — seat counter widget endpoint
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_seats_returns_both_role_counts(client: AsyncClient, test_db: AsyncSession):
|
|
"""GET /accounts/me/seats returns engineer + l1_tech seat usage."""
|
|
owner = await _register(client, email="owner_seats@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner_seats@example.com")
|
|
await _set_sub(test_db, account_id, seat_limit=5, l1_seat_limit=3)
|
|
# Add 2 engineers and 1 l1_tech as members
|
|
for i in range(2):
|
|
await _add_member(test_db, account_id, role="engineer", suffix=f"e{i}")
|
|
await _add_member(test_db, account_id, role="l1_tech", suffix="l1")
|
|
|
|
resp = await client.get("/api/v1/accounts/me/seats", headers=headers)
|
|
assert resp.status_code == 200, resp.text
|
|
body = resp.json()
|
|
assert body["engineer"]["role"] == "engineer"
|
|
assert body["engineer"]["current"] == 2
|
|
assert body["engineer"]["limit"] == 5
|
|
assert body["engineer"]["available"] is True
|
|
assert body["l1_tech"]["role"] == "l1_tech"
|
|
assert body["l1_tech"]["current"] == 1
|
|
assert body["l1_tech"]["limit"] == 3
|
|
assert body["l1_tech"]["available"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_seats_blocked_for_viewer(client: AsyncClient, test_db: AsyncSession):
|
|
"""GET /accounts/me/seats → 403 for viewer role (engineer+ required)."""
|
|
from app.core.security import get_password_hash
|
|
|
|
# Register an owner for the account
|
|
owner = await _register(client, email="owner_seats2@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
await _set_sub(test_db, account_id, seat_limit=5, l1_seat_limit=3)
|
|
|
|
# Create a viewer user with a known password directly in the DB
|
|
viewer_password = "ViewerPass123!"
|
|
viewer = User(
|
|
id=uuid.uuid4(),
|
|
email="viewer_seats@example.com",
|
|
name="Viewer Seats",
|
|
account_id=account_id,
|
|
account_role="viewer",
|
|
role="engineer", # system role field (default)
|
|
is_active=True,
|
|
password_hash=get_password_hash(viewer_password),
|
|
)
|
|
test_db.add(viewer)
|
|
await test_db.commit()
|
|
|
|
# Log in as the viewer
|
|
viewer_headers = await _login(client, email="viewer_seats@example.com", password=viewer_password)
|
|
|
|
resp = await client.get("/api/v1/accounts/me/seats", headers=viewer_headers)
|
|
assert resp.status_code == 403, resp.text
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PATCH /me/members/{user_id}/coverage — engineer L1-coverage flag
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_coverage_owner_can_toggle_engineer(client: AsyncClient, test_db: AsyncSession):
|
|
"""Owner can set can_cover_l1=True on an engineer; response reflects new value."""
|
|
owner = await _register(client, email="owner_cov1@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner_cov1@example.com")
|
|
|
|
engineer = await _add_member(test_db, account_id, role="engineer", suffix="cov1")
|
|
|
|
resp = await client.patch(
|
|
f"/api/v1/accounts/me/members/{engineer.id}/coverage",
|
|
json={"can_cover_l1": True},
|
|
headers=headers,
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
body = resp.json()
|
|
assert body["can_cover_l1"] is True
|
|
|
|
# Toggle back to False
|
|
resp2 = await client.patch(
|
|
f"/api/v1/accounts/me/members/{engineer.id}/coverage",
|
|
json={"can_cover_l1": False},
|
|
headers=headers,
|
|
)
|
|
assert resp2.status_code == 200, resp2.text
|
|
assert resp2.json()["can_cover_l1"] is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_coverage_non_owner_is_forbidden(client: AsyncClient, test_db: AsyncSession):
|
|
"""A non-owner engineer cannot toggle coverage on themselves or others."""
|
|
from app.core.security import get_password_hash
|
|
|
|
owner = await _register(client, email="owner_cov2@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
|
|
# Create an engineer with a known password
|
|
eng_password = "EngPass123!"
|
|
engineer = User(
|
|
id=uuid.uuid4(),
|
|
email="eng_cov2@example.com",
|
|
name="Eng Cov2",
|
|
account_id=account_id,
|
|
account_role="engineer",
|
|
role="engineer",
|
|
is_active=True,
|
|
password_hash=get_password_hash(eng_password),
|
|
)
|
|
test_db.add(engineer)
|
|
await test_db.commit()
|
|
|
|
eng_headers = await _login(client, email="eng_cov2@example.com", password=eng_password)
|
|
|
|
resp = await client.patch(
|
|
f"/api/v1/accounts/me/members/{engineer.id}/coverage",
|
|
json={"can_cover_l1": True},
|
|
headers=eng_headers,
|
|
)
|
|
assert resp.status_code == 403, resp.text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_coverage_viewer_role_returns_422(client: AsyncClient, test_db: AsyncSession):
|
|
"""PATCH coverage on a viewer → 422 (coverage flag only applies to engineers)."""
|
|
owner = await _register(client, email="owner_cov3@example.com")
|
|
account_id = uuid.UUID(owner["account_id"])
|
|
headers = await _login(client, email="owner_cov3@example.com")
|
|
|
|
viewer = await _add_member(test_db, account_id, role="viewer", suffix="cov3")
|
|
|
|
resp = await client.patch(
|
|
f"/api/v1/accounts/me/members/{viewer.id}/coverage",
|
|
json={"can_cover_l1": True},
|
|
headers=headers,
|
|
)
|
|
assert resp.status_code == 422, resp.text
|
|
assert "engineer" in resp.json()["detail"].lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_coverage_cross_account_returns_404(client: AsyncClient, test_db: AsyncSession):
|
|
"""PATCH coverage on a user from a different account → 404 (tenancy isolation)."""
|
|
# Account A
|
|
owner_a = await _register(client, email="owner_cov_a@example.com")
|
|
account_a_id = uuid.UUID(owner_a["account_id"])
|
|
headers_a = await _login(client, email="owner_cov_a@example.com")
|
|
|
|
# Account B — a separate registration creates a new account
|
|
owner_b = await _register(client, email="owner_cov_b@example.com")
|
|
account_b_id = uuid.UUID(owner_b["account_id"])
|
|
|
|
# Add an engineer to account B
|
|
engineer_b = await _add_member(test_db, account_b_id, role="engineer", suffix="covb")
|
|
|
|
# Owner of account A tries to patch account B's engineer — must 404
|
|
resp = await client.patch(
|
|
f"/api/v1/accounts/me/members/{engineer_b.id}/coverage",
|
|
json={"can_cover_l1": True},
|
|
headers=headers_a,
|
|
)
|
|
assert resp.status_code == 404, resp.text
|