feat: implement full admin panel with dashboard, user management, and platform settings

Adds complete super_admin panel with 9 pages and account owner categories page.
Backend includes 5 new DB tables, ~25 API endpoints, settings manager with
in-memory cache, and 29 integration tests. Frontend includes reusable admin
components (DataTable, Pagination, ActionMenu, etc.) with code-split lazy loading.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Michael Chihlas
2026-02-08 06:05:59 -05:00
parent 4f57c84d43
commit b570f8415f
50 changed files with 4589 additions and 5 deletions

View File

@@ -0,0 +1,154 @@
import csv
import io
from datetime import datetime, timezone
from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.core.database import get_db
from app.models.user import User
from app.models.audit_log import AuditLog
from app.schemas.admin import AuditLogEntry, AuditLogListResponse
from app.api.deps import require_admin
router = APIRouter(prefix="/admin/audit-logs", tags=["admin-audit"])
def _build_audit_query(
action: Optional[str] = None,
resource_type: Optional[str] = None,
user_id: Optional[UUID] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
search: Optional[str] = None,
):
"""Build base query with filters (reused for list and export)."""
query = (
select(
AuditLog.id,
AuditLog.user_id,
AuditLog.action,
AuditLog.resource_type,
AuditLog.resource_id,
AuditLog.details,
AuditLog.ip_address,
AuditLog.created_at,
User.email.label("user_email"),
)
.outerjoin(User, AuditLog.user_id == User.id)
)
if action:
query = query.where(AuditLog.action == action)
if resource_type:
query = query.where(AuditLog.resource_type == resource_type)
if user_id:
query = query.where(AuditLog.user_id == user_id)
if date_from:
query = query.where(AuditLog.created_at >= datetime.fromisoformat(date_from))
if date_to:
query = query.where(AuditLog.created_at <= datetime.fromisoformat(date_to))
if search:
query = query.where(AuditLog.resource_id.cast(str).ilike(f"%{search}%"))
return query
@router.get("", response_model=AuditLogListResponse)
async def list_audit_logs(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_admin)],
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=100),
action: Optional[str] = None,
resource_type: Optional[str] = None,
user_id: Optional[UUID] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
search: Optional[str] = None,
):
"""List audit logs with pagination and filters."""
base_query = _build_audit_query(action, resource_type, user_id, date_from, date_to, search)
# Count
count_query = select(func.count()).select_from(AuditLog)
if action:
count_query = count_query.where(AuditLog.action == action)
if resource_type:
count_query = count_query.where(AuditLog.resource_type == resource_type)
if user_id:
count_query = count_query.where(AuditLog.user_id == user_id)
if date_from:
count_query = count_query.where(AuditLog.created_at >= datetime.fromisoformat(date_from))
if date_to:
count_query = count_query.where(AuditLog.created_at <= datetime.fromisoformat(date_to))
total = await db.scalar(count_query) or 0
# Paginated results
query = base_query.order_by(AuditLog.created_at.desc()).offset((page - 1) * per_page).limit(per_page)
result = await db.execute(query)
rows = result.all()
items = [
AuditLogEntry(
id=row.id,
user_id=row.user_id,
user_email=row.user_email,
action=row.action,
resource_type=row.resource_type,
resource_id=row.resource_id,
details=row.details,
ip_address=row.ip_address,
created_at=row.created_at,
)
for row in rows
]
return AuditLogListResponse(items=items, total=total, page=page, per_page=per_page)
@router.get("/export")
async def export_audit_logs(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_admin)],
action: Optional[str] = None,
resource_type: Optional[str] = None,
user_id: Optional[UUID] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
):
"""Export audit logs as CSV (10k row limit)."""
query = _build_audit_query(action, resource_type, user_id, date_from, date_to)
query = query.order_by(AuditLog.created_at.desc()).limit(10000)
result = await db.execute(query)
rows = result.all()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["timestamp", "user_email", "action", "resource_type", "resource_id", "ip_address", "details"])
for row in rows:
writer.writerow([
row.created_at.isoformat() if row.created_at else "",
row.user_email or "",
row.action,
row.resource_type,
str(row.resource_id) if row.resource_id else "",
row.ip_address or "",
str(row.details) if row.details else "",
])
output.seek(0)
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=audit-logs-{today}.csv"},
)