Adds complete super_admin panel with 9 pages and account owner categories page. Backend includes 5 new DB tables, ~25 API endpoints, settings manager with in-memory cache, and 29 integration tests. Frontend includes reusable admin components (DataTable, Pagination, ActionMenu, etc.) with code-split lazy loading. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
155 lines
5.1 KiB
Python
155 lines
5.1 KiB
Python
import csv
|
|
import io
|
|
from datetime import datetime, timezone
|
|
from typing import Annotated, Optional
|
|
from uuid import UUID
|
|
|
|
from fastapi import APIRouter, Depends, Query
|
|
from fastapi.responses import StreamingResponse
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, func
|
|
|
|
from app.core.database import get_db
|
|
from app.models.user import User
|
|
from app.models.audit_log import AuditLog
|
|
from app.schemas.admin import AuditLogEntry, AuditLogListResponse
|
|
from app.api.deps import require_admin
|
|
|
|
router = APIRouter(prefix="/admin/audit-logs", tags=["admin-audit"])
|
|
|
|
|
|
def _build_audit_query(
|
|
action: Optional[str] = None,
|
|
resource_type: Optional[str] = None,
|
|
user_id: Optional[UUID] = None,
|
|
date_from: Optional[str] = None,
|
|
date_to: Optional[str] = None,
|
|
search: Optional[str] = None,
|
|
):
|
|
"""Build base query with filters (reused for list and export)."""
|
|
query = (
|
|
select(
|
|
AuditLog.id,
|
|
AuditLog.user_id,
|
|
AuditLog.action,
|
|
AuditLog.resource_type,
|
|
AuditLog.resource_id,
|
|
AuditLog.details,
|
|
AuditLog.ip_address,
|
|
AuditLog.created_at,
|
|
User.email.label("user_email"),
|
|
)
|
|
.outerjoin(User, AuditLog.user_id == User.id)
|
|
)
|
|
|
|
if action:
|
|
query = query.where(AuditLog.action == action)
|
|
if resource_type:
|
|
query = query.where(AuditLog.resource_type == resource_type)
|
|
if user_id:
|
|
query = query.where(AuditLog.user_id == user_id)
|
|
if date_from:
|
|
query = query.where(AuditLog.created_at >= datetime.fromisoformat(date_from))
|
|
if date_to:
|
|
query = query.where(AuditLog.created_at <= datetime.fromisoformat(date_to))
|
|
if search:
|
|
query = query.where(AuditLog.resource_id.cast(str).ilike(f"%{search}%"))
|
|
|
|
return query
|
|
|
|
|
|
@router.get("", response_model=AuditLogListResponse)
|
|
async def list_audit_logs(
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_admin)],
|
|
page: int = Query(1, ge=1),
|
|
per_page: int = Query(50, ge=1, le=100),
|
|
action: Optional[str] = None,
|
|
resource_type: Optional[str] = None,
|
|
user_id: Optional[UUID] = None,
|
|
date_from: Optional[str] = None,
|
|
date_to: Optional[str] = None,
|
|
search: Optional[str] = None,
|
|
):
|
|
"""List audit logs with pagination and filters."""
|
|
base_query = _build_audit_query(action, resource_type, user_id, date_from, date_to, search)
|
|
|
|
# Count
|
|
count_query = select(func.count()).select_from(AuditLog)
|
|
if action:
|
|
count_query = count_query.where(AuditLog.action == action)
|
|
if resource_type:
|
|
count_query = count_query.where(AuditLog.resource_type == resource_type)
|
|
if user_id:
|
|
count_query = count_query.where(AuditLog.user_id == user_id)
|
|
if date_from:
|
|
count_query = count_query.where(AuditLog.created_at >= datetime.fromisoformat(date_from))
|
|
if date_to:
|
|
count_query = count_query.where(AuditLog.created_at <= datetime.fromisoformat(date_to))
|
|
|
|
total = await db.scalar(count_query) or 0
|
|
|
|
# Paginated results
|
|
query = base_query.order_by(AuditLog.created_at.desc()).offset((page - 1) * per_page).limit(per_page)
|
|
result = await db.execute(query)
|
|
rows = result.all()
|
|
|
|
items = [
|
|
AuditLogEntry(
|
|
id=row.id,
|
|
user_id=row.user_id,
|
|
user_email=row.user_email,
|
|
action=row.action,
|
|
resource_type=row.resource_type,
|
|
resource_id=row.resource_id,
|
|
details=row.details,
|
|
ip_address=row.ip_address,
|
|
created_at=row.created_at,
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
return AuditLogListResponse(items=items, total=total, page=page, per_page=per_page)
|
|
|
|
|
|
@router.get("/export")
|
|
async def export_audit_logs(
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
current_user: Annotated[User, Depends(require_admin)],
|
|
action: Optional[str] = None,
|
|
resource_type: Optional[str] = None,
|
|
user_id: Optional[UUID] = None,
|
|
date_from: Optional[str] = None,
|
|
date_to: Optional[str] = None,
|
|
):
|
|
"""Export audit logs as CSV (10k row limit)."""
|
|
query = _build_audit_query(action, resource_type, user_id, date_from, date_to)
|
|
query = query.order_by(AuditLog.created_at.desc()).limit(10000)
|
|
|
|
result = await db.execute(query)
|
|
rows = result.all()
|
|
|
|
output = io.StringIO()
|
|
writer = csv.writer(output)
|
|
writer.writerow(["timestamp", "user_email", "action", "resource_type", "resource_id", "ip_address", "details"])
|
|
|
|
for row in rows:
|
|
writer.writerow([
|
|
row.created_at.isoformat() if row.created_at else "",
|
|
row.user_email or "",
|
|
row.action,
|
|
row.resource_type,
|
|
str(row.resource_id) if row.resource_id else "",
|
|
row.ip_address or "",
|
|
str(row.details) if row.details else "",
|
|
])
|
|
|
|
output.seek(0)
|
|
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
|
|
return StreamingResponse(
|
|
iter([output.getvalue()]),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": f"attachment; filename=audit-logs-{today}.csv"},
|
|
)
|