Replace team_id with account_id across all API endpoints (trees, categories, tags, steps, step_categories, admin, auth). Add new accounts and webhooks endpoints. Registration now atomically creates Account + Subscription, with account_invite_code bypassing the platform invite gate. Schemas updated for account_id/account_role. 82 tests passing including 18 new tests for accounts, subscriptions, and permissions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
220 lines
8.0 KiB
Python
220 lines
8.0 KiB
Python
"""Integration tests for admin user management endpoints."""
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from app.models.audit_log import AuditLog
|
|
|
|
|
|
class TestAdminEndpoints:
|
|
"""Test suite for admin user management endpoints."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_users_as_admin(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict
|
|
):
|
|
"""Test listing users as a super admin."""
|
|
response = await client.get(
|
|
"/api/v1/admin/users", headers=admin_auth_headers
|
|
)
|
|
assert response.status_code == 200
|
|
users = response.json()
|
|
assert len(users) >= 2 # admin + test_user
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_users_as_non_admin(
|
|
self, client: AsyncClient, auth_headers: dict
|
|
):
|
|
"""Test that non-admin users cannot list users."""
|
|
response = await client.get(
|
|
"/api/v1/admin/users", headers=auth_headers
|
|
)
|
|
assert response.status_code == 403
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_user_as_admin(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict
|
|
):
|
|
"""Test getting user details as admin."""
|
|
user_id = test_user["user_data"]["id"]
|
|
response = await client.get(
|
|
f"/api/v1/admin/users/{user_id}", headers=admin_auth_headers
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["email"] == test_user["email"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_change_user_role(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict
|
|
):
|
|
"""Test changing a user's role to viewer."""
|
|
user_id = test_user["user_data"]["id"]
|
|
response = await client.put(
|
|
f"/api/v1/admin/users/{user_id}/role",
|
|
json={"role": "viewer"},
|
|
headers=admin_auth_headers
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["role"] == "viewer"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_change_role_invalid(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict
|
|
):
|
|
"""Test that invalid role values are rejected."""
|
|
user_id = test_user["user_data"]["id"]
|
|
response = await client.put(
|
|
f"/api/v1/admin/users/{user_id}/role",
|
|
json={"role": "admin"},
|
|
headers=admin_auth_headers
|
|
)
|
|
assert response.status_code == 422
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cannot_change_own_role(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_admin: dict
|
|
):
|
|
"""Test that admin cannot change their own role."""
|
|
admin_id = test_admin["user_data"]["id"]
|
|
response = await client.put(
|
|
f"/api/v1/admin/users/{admin_id}/role",
|
|
json={"role": "viewer"},
|
|
headers=admin_auth_headers
|
|
)
|
|
assert response.status_code == 400
|
|
assert "own role" in response.json()["detail"].lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_deactivate_user(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict
|
|
):
|
|
"""Test deactivating a user."""
|
|
user_id = test_user["user_data"]["id"]
|
|
response = await client.put(
|
|
f"/api/v1/admin/users/{user_id}/deactivate",
|
|
headers=admin_auth_headers
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["is_active"] is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_activate_user(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict
|
|
):
|
|
"""Test reactivating a user."""
|
|
user_id = test_user["user_data"]["id"]
|
|
# Deactivate first
|
|
await client.put(
|
|
f"/api/v1/admin/users/{user_id}/deactivate",
|
|
headers=admin_auth_headers
|
|
)
|
|
# Then reactivate
|
|
response = await client.put(
|
|
f"/api/v1/admin/users/{user_id}/activate",
|
|
headers=admin_auth_headers
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["is_active"] is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cannot_deactivate_self(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_admin: dict
|
|
):
|
|
"""Test that admin cannot deactivate themselves."""
|
|
admin_id = test_admin["user_data"]["id"]
|
|
response = await client.put(
|
|
f"/api/v1/admin/users/{admin_id}/deactivate",
|
|
headers=admin_auth_headers
|
|
)
|
|
assert response.status_code == 400
|
|
assert "own account" in response.json()["detail"].lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_audit_log_created_on_role_change(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict, test_db: AsyncSession
|
|
):
|
|
"""Test that changing a user's role creates an audit log entry."""
|
|
user_id = test_user["user_data"]["id"]
|
|
await client.put(
|
|
f"/api/v1/admin/users/{user_id}/role",
|
|
json={"role": "viewer"},
|
|
headers=admin_auth_headers
|
|
)
|
|
|
|
result = await test_db.execute(
|
|
select(AuditLog).where(AuditLog.action == "user.role_change")
|
|
)
|
|
log = result.scalar_one_or_none()
|
|
assert log is not None
|
|
assert str(log.resource_id) == user_id
|
|
assert log.details["old_role"] == "engineer"
|
|
assert log.details["new_role"] == "viewer"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_audit_log_created_on_deactivate(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict, test_db: AsyncSession
|
|
):
|
|
"""Test that deactivating a user creates an audit log entry."""
|
|
user_id = test_user["user_data"]["id"]
|
|
await client.put(
|
|
f"/api/v1/admin/users/{user_id}/deactivate",
|
|
headers=admin_auth_headers
|
|
)
|
|
|
|
result = await test_db.execute(
|
|
select(AuditLog).where(AuditLog.action == "user.deactivate")
|
|
)
|
|
log = result.scalar_one_or_none()
|
|
assert log is not None
|
|
assert str(log.resource_id) == user_id
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_change_account_role(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict
|
|
):
|
|
"""Test changing a user's account role."""
|
|
user_id = test_user["user_data"]["id"]
|
|
response = await client.put(
|
|
f"/api/v1/admin/users/{user_id}/account-role",
|
|
json={"account_role": "viewer"},
|
|
headers=admin_auth_headers
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["account_role"] == "viewer"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_change_account_role_invalid(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict
|
|
):
|
|
"""Test that invalid account_role values are rejected."""
|
|
user_id = test_user["user_data"]["id"]
|
|
response = await client.put(
|
|
f"/api/v1/admin/users/{user_id}/account-role",
|
|
json={"account_role": "owner"},
|
|
headers=admin_auth_headers
|
|
)
|
|
assert response.status_code == 422
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_audit_log_created_on_account_role_change(
|
|
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict, test_db: AsyncSession
|
|
):
|
|
"""Test that changing account role creates an audit log entry."""
|
|
user_id = test_user["user_data"]["id"]
|
|
await client.put(
|
|
f"/api/v1/admin/users/{user_id}/account-role",
|
|
json={"account_role": "viewer"},
|
|
headers=admin_auth_headers
|
|
)
|
|
|
|
result = await test_db.execute(
|
|
select(AuditLog).where(AuditLog.action == "user.account_role_change")
|
|
)
|
|
log = result.scalar_one_or_none()
|
|
assert log is not None
|
|
assert str(log.resource_id) == user_id
|
|
assert log.details["old_account_role"] == "owner"
|
|
assert log.details["new_account_role"] == "viewer"
|