"""Integration tests for admin user management endpoints.""" import pytest from httpx import AsyncClient from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.audit_log import AuditLog class TestAdminEndpoints: """Test suite for admin user management endpoints.""" @pytest.mark.asyncio async def test_list_users_as_admin( self, client: AsyncClient, admin_auth_headers: dict, test_user: dict ): """Test listing users as a super admin.""" response = await client.get( "/api/v1/admin/users", headers=admin_auth_headers ) assert response.status_code == 200 payload = response.json() assert payload["total"] >= 2 # admin + test_user assert len(payload["items"]) >= 2 @pytest.mark.asyncio async def test_list_users_supports_search( self, client: AsyncClient, admin_auth_headers: dict, test_user: dict ): """Test admin people search by user email.""" response = await client.get( "/api/v1/admin/users", params={"search": test_user["email"]}, headers=admin_auth_headers, ) assert response.status_code == 200 payload = response.json() assert payload["total"] >= 1 assert any(item["email"] == test_user["email"] for item in payload["items"]) @pytest.mark.asyncio async def test_list_accounts_as_admin( self, client: AsyncClient, admin_auth_headers: dict ): """Test listing accounts with member data.""" response = await client.get( "/api/v1/admin/accounts", headers=admin_auth_headers ) assert response.status_code == 200 payload = response.json() assert payload["total"] >= 1 assert len(payload["items"]) >= 1 assert "members" in payload["items"][0] @pytest.mark.asyncio async def test_list_users_as_non_admin( self, client: AsyncClient, auth_headers: dict ): """Test that non-admin users cannot list users.""" response = await client.get( "/api/v1/admin/users", headers=auth_headers ) assert response.status_code == 403 @pytest.mark.asyncio async def test_get_user_as_admin( self, client: AsyncClient, admin_auth_headers: dict, test_user: dict ): """Test getting user details as admin.""" user_id = test_user["user_data"]["id"] response = await client.get( f"/api/v1/admin/users/{user_id}", headers=admin_auth_headers ) assert response.status_code == 200 data = response.json() assert data["email"] == test_user["email"] @pytest.mark.asyncio async def test_change_user_role( self, client: AsyncClient, admin_auth_headers: dict, test_user: dict ): """Test changing a user's role to viewer.""" user_id = test_user["user_data"]["id"] response = await client.put( f"/api/v1/admin/users/{user_id}/role", json={"role": "viewer"}, headers=admin_auth_headers ) assert response.status_code == 200 assert response.json()["role"] == "viewer" @pytest.mark.asyncio async def test_change_role_invalid( self, client: AsyncClient, admin_auth_headers: dict, test_user: dict ): """Test that invalid role values are rejected.""" user_id = test_user["user_data"]["id"] response = await client.put( f"/api/v1/admin/users/{user_id}/role", json={"role": "admin"}, headers=admin_auth_headers ) assert response.status_code == 422 @pytest.mark.asyncio async def test_cannot_change_own_role( self, client: AsyncClient, admin_auth_headers: dict, test_admin: dict ): """Test that admin cannot change their own role.""" admin_id = test_admin["user_data"]["id"] response = await client.put( f"/api/v1/admin/users/{admin_id}/role", json={"role": "viewer"}, headers=admin_auth_headers ) assert response.status_code == 400 assert "own role" in response.json()["detail"].lower() @pytest.mark.asyncio async def test_deactivate_user( self, client: AsyncClient, admin_auth_headers: dict, test_user: dict ): """Test deactivating a user.""" user_id = test_user["user_data"]["id"] response = await client.put( f"/api/v1/admin/users/{user_id}/deactivate", headers=admin_auth_headers ) assert response.status_code == 200 assert response.json()["is_active"] is False @pytest.mark.asyncio async def test_activate_user( self, client: AsyncClient, admin_auth_headers: dict, test_user: dict ): """Test reactivating a user.""" user_id = test_user["user_data"]["id"] # Deactivate first await client.put( f"/api/v1/admin/users/{user_id}/deactivate", headers=admin_auth_headers ) # Then reactivate response = await client.put( f"/api/v1/admin/users/{user_id}/activate", headers=admin_auth_headers ) assert response.status_code == 200 assert response.json()["is_active"] is True @pytest.mark.asyncio async def test_cannot_deactivate_self( self, client: AsyncClient, admin_auth_headers: dict, test_admin: dict ): """Test that admin cannot deactivate themselves.""" admin_id = test_admin["user_data"]["id"] response = await client.put( f"/api/v1/admin/users/{admin_id}/deactivate", headers=admin_auth_headers ) assert response.status_code == 400 assert "own account" in response.json()["detail"].lower() @pytest.mark.asyncio async def test_audit_log_created_on_role_change( self, client: AsyncClient, admin_auth_headers: dict, test_user: dict, test_db: AsyncSession ): """Test that changing a user's role creates an audit log entry.""" user_id = test_user["user_data"]["id"] await client.put( f"/api/v1/admin/users/{user_id}/role", json={"role": "viewer"}, headers=admin_auth_headers ) result = await test_db.execute( select(AuditLog).where(AuditLog.action == "user.role_change") ) log = result.scalar_one_or_none() assert log is not None assert str(log.resource_id) == user_id assert log.details["old_role"] == "engineer" assert log.details["new_role"] == "viewer" @pytest.mark.asyncio async def test_audit_log_created_on_deactivate( self, client: AsyncClient, admin_auth_headers: dict, test_user: dict, test_db: AsyncSession ): """Test that deactivating a user creates an audit log entry.""" user_id = test_user["user_data"]["id"] await client.put( f"/api/v1/admin/users/{user_id}/deactivate", headers=admin_auth_headers ) result = await test_db.execute( select(AuditLog).where(AuditLog.action == "user.deactivate") ) log = result.scalar_one_or_none() assert log is not None assert str(log.resource_id) == user_id @pytest.mark.asyncio async def test_change_account_role( self, client: AsyncClient, admin_auth_headers: dict, test_user: dict ): """Test changing a user's account role.""" user_id = test_user["user_data"]["id"] response = await client.put( f"/api/v1/admin/users/{user_id}/account-role", json={"account_role": "viewer"}, headers=admin_auth_headers ) assert response.status_code == 200 assert response.json()["account_role"] == "viewer" @pytest.mark.asyncio async def test_change_account_role_invalid( self, client: AsyncClient, admin_auth_headers: dict, test_user: dict ): """Test that invalid account_role values are rejected.""" user_id = test_user["user_data"]["id"] response = await client.put( f"/api/v1/admin/users/{user_id}/account-role", json={"account_role": "owner"}, headers=admin_auth_headers ) assert response.status_code == 422 @pytest.mark.asyncio async def test_audit_log_created_on_account_role_change( self, client: AsyncClient, admin_auth_headers: dict, test_user: dict, test_db: AsyncSession ): """Test that changing account role creates an audit log entry.""" user_id = test_user["user_data"]["id"] await client.put( f"/api/v1/admin/users/{user_id}/account-role", json={"account_role": "viewer"}, headers=admin_auth_headers ) result = await test_db.execute( select(AuditLog).where(AuditLog.action == "user.account_role_change") ) log = result.scalar_one_or_none() assert log is not None assert str(log.resource_id) == user_id assert log.details["old_account_role"] == "owner" assert log.details["new_account_role"] == "viewer"