feat: reorganize admin panel around accounts

This commit is contained in:
chihlasm
2026-04-02 03:46:11 +00:00
parent b8189a1999
commit bfcb8c52d3
10 changed files with 624 additions and 188 deletions

View File

@@ -5,7 +5,7 @@ from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from sqlalchemy import select, func, or_
from sqlalchemy.orm import selectinload
from app.core.database import get_db
@@ -24,7 +24,19 @@ from app.models.invite_code import InviteCode
from app.models.account_invite import AccountInvite
from app.models.tree import Tree
from app.schemas.user import UserResponse, RoleUpdate, AccountRoleUpdate
from app.schemas.admin import MoveUserAccount, AdminUserCreate, AdminUserCreateResponse, AdminPasswordReset, AdminPasswordResetResponse, HardDeleteCheckResponse
from app.schemas.admin import (
MoveUserAccount,
AdminUserCreate,
AdminUserCreateResponse,
AdminPasswordReset,
AdminPasswordResetResponse,
HardDeleteCheckResponse,
AdminUserListItem,
AdminUserListResponse,
AdminAccountMember,
AdminAccountListItem,
AdminAccountListResponse,
)
from app.schemas.subscription import SubscriptionPlanUpdate, ExtendTrialRequest
from app.schemas.user_detail import (
UserDetailResponse, AccountSummary, SubscriptionSummary,
@@ -35,10 +47,13 @@ from app.api.deps import require_admin
router = APIRouter(prefix="/admin", tags=["admin"])
@router.get("/users", response_model=list[UserResponse])
@router.get("/users", response_model=AdminUserListResponse)
async def list_users(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_admin)],
page: Optional[int] = Query(None, ge=1),
size: Optional[int] = Query(None, ge=1, le=100),
search: Optional[str] = Query(None, description="Search by user or account fields"),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100),
is_active: Optional[bool] = Query(None, description="Filter by active status"),
@@ -46,23 +61,154 @@ async def list_users(
account_id: Optional[UUID] = Query(None, description="Filter by account"),
include_archived: bool = Query(False, description="Include archived (soft-deleted) users"),
):
"""List all users (super admin only)."""
query = select(User)
"""List users for super admin global people search."""
resolved_limit = size or limit
resolved_skip = skip
current_page = 1
if page is not None:
resolved_skip = (page - 1) * resolved_limit
current_page = page
elif resolved_limit > 0:
current_page = (resolved_skip // resolved_limit) + 1
count_query = (
select(func.count())
.select_from(User)
.outerjoin(Account, User.account_id == Account.id)
)
query = (
select(
User,
Account.name.label("account_name"),
Account.display_code.label("account_display_code"),
)
.outerjoin(Account, User.account_id == Account.id)
)
if not include_archived:
query = query.where(User.deleted_at.is_(None))
count_query = count_query.where(User.deleted_at.is_(None))
if is_active is not None:
query = query.where(User.is_active == is_active)
count_query = count_query.where(User.is_active == is_active)
if role:
query = query.where(User.role == role)
count_query = count_query.where(User.role == role)
if account_id:
query = query.where(User.account_id == account_id)
count_query = count_query.where(User.account_id == account_id)
if search:
search_term = f"%{search.strip()}%"
search_filter = or_(
User.name.ilike(search_term),
User.email.ilike(search_term),
Account.name.ilike(search_term),
Account.display_code.ilike(search_term),
)
query = query.where(search_filter)
count_query = count_query.where(search_filter)
query = query.order_by(User.created_at.desc()).offset(skip).limit(limit)
total_result = await db.execute(count_query)
total = total_result.scalar() or 0
query = query.order_by(User.created_at.desc()).offset(resolved_skip).limit(resolved_limit)
result = await db.execute(query)
users = result.scalars().all()
return users
rows = result.all()
items = [
AdminUserListItem(
id=user.id,
email=user.email,
name=user.name,
role=user.role,
is_super_admin=user.is_super_admin,
is_active=user.is_active,
account_id=user.account_id,
account_role=user.account_role,
account_name=account_name,
account_display_code=account_display_code,
created_at=user.created_at,
last_login=user.last_login,
deleted_at=user.deleted_at,
)
for user, account_name, account_display_code in rows
]
return AdminUserListResponse(
items=items,
total=total,
page=current_page,
per_page=resolved_limit,
)
@router.get("/accounts", response_model=AdminAccountListResponse)
async def list_accounts(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_admin)],
page: int = Query(1, ge=1),
size: int = Query(12, ge=1, le=100),
include_archived: bool = Query(False, description="Include archived users in account member lists"),
):
"""List accounts with embedded members for the admin panel."""
total_result = await db.execute(select(func.count()).select_from(Account))
total = total_result.scalar() or 0
accounts_result = await db.execute(
select(Account)
.order_by(Account.created_at.desc())
.offset((page - 1) * size)
.limit(size)
)
accounts = accounts_result.scalars().all()
account_ids = [account.id for account in accounts]
members_by_account: dict[UUID, list[AdminAccountMember]] = {account_id: [] for account_id in account_ids}
if account_ids:
members_query = select(User).where(User.account_id.in_(account_ids))
if not include_archived:
members_query = members_query.where(User.deleted_at.is_(None))
members_query = members_query.order_by(User.created_at.asc())
members_result = await db.execute(members_query)
for member in members_result.scalars().all():
members_by_account.setdefault(member.account_id, []).append(
AdminAccountMember(
id=member.id,
email=member.email,
name=member.name,
role=member.role,
is_super_admin=member.is_super_admin,
is_active=member.is_active,
account_role=member.account_role,
created_at=member.created_at,
last_login=member.last_login,
deleted_at=member.deleted_at,
)
)
items = [
AdminAccountListItem(
id=account.id,
name=account.name,
display_code=account.display_code,
created_at=account.created_at,
owner_id=account.owner_id,
member_count=len(members_by_account.get(account.id, [])),
active_member_count=sum(1 for member in members_by_account.get(account.id, []) if member.is_active),
members=members_by_account.get(account.id, []),
)
for account in accounts
]
return AdminAccountListResponse(
items=items,
total=total,
page=page,
per_page=size,
)
def _generate_display_code() -> str:

View File

@@ -28,6 +28,62 @@ class ActivityEntry(BaseModel):
from_attributes = True
# --- Admin Accounts & People Search ---
class AdminUserListItem(BaseModel):
id: UUID
email: EmailStr
name: str
role: str
is_super_admin: bool = False
is_active: bool = True
account_id: Optional[UUID] = None
account_role: Optional[str] = None
account_name: Optional[str] = None
account_display_code: Optional[str] = None
created_at: datetime
last_login: Optional[datetime] = None
deleted_at: Optional[datetime] = None
class AdminUserListResponse(BaseModel):
items: list[AdminUserListItem]
total: int
page: int
per_page: int
class AdminAccountMember(BaseModel):
id: UUID
email: EmailStr
name: str
role: str
is_super_admin: bool = False
is_active: bool = True
account_role: Optional[str] = None
created_at: datetime
last_login: Optional[datetime] = None
deleted_at: Optional[datetime] = None
class AdminAccountListItem(BaseModel):
id: UUID
name: str
display_code: str
created_at: datetime
owner_id: Optional[UUID] = None
member_count: int = 0
active_member_count: int = 0
members: list[AdminAccountMember] = Field(default_factory=list)
class AdminAccountListResponse(BaseModel):
items: list[AdminAccountListItem]
total: int
page: int
per_page: int
# --- Audit Logs ---
class AuditLogEntry(BaseModel):

View File

@@ -19,8 +19,38 @@ class TestAdminEndpoints:
"/api/v1/admin/users", headers=admin_auth_headers
)
assert response.status_code == 200
users = response.json()
assert len(users) >= 2 # admin + test_user
payload = response.json()
assert payload["total"] >= 2 # admin + test_user
assert len(payload["items"]) >= 2
@pytest.mark.asyncio
async def test_list_users_supports_search(
self, client: AsyncClient, admin_auth_headers: dict, test_user: dict
):
"""Test admin people search by user email."""
response = await client.get(
"/api/v1/admin/users",
params={"search": test_user["email"]},
headers=admin_auth_headers,
)
assert response.status_code == 200
payload = response.json()
assert payload["total"] >= 1
assert any(item["email"] == test_user["email"] for item in payload["items"])
@pytest.mark.asyncio
async def test_list_accounts_as_admin(
self, client: AsyncClient, admin_auth_headers: dict
):
"""Test listing accounts with member data."""
response = await client.get(
"/api/v1/admin/accounts", headers=admin_auth_headers
)
assert response.status_code == 200
payload = response.json()
assert payload["total"] >= 1
assert len(payload["items"]) >= 1
assert "members" in payload["items"][0]
@pytest.mark.asyncio
async def test_list_users_as_non_admin(