Files
resolutionflow/backend/app/api/endpoints/admin_audit.py
Michael Chihlas b570f8415f feat: implement full admin panel with dashboard, user management, and platform settings
Adds complete super_admin panel with 9 pages and account owner categories page.
Backend includes 5 new DB tables, ~25 API endpoints, settings manager with
in-memory cache, and 29 integration tests. Frontend includes reusable admin
components (DataTable, Pagination, ActionMenu, etc.) with code-split lazy loading.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 06:05:59 -05:00

155 lines
5.1 KiB
Python

import csv
import io
from datetime import datetime, timezone
from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from app.core.database import get_db
from app.models.user import User
from app.models.audit_log import AuditLog
from app.schemas.admin import AuditLogEntry, AuditLogListResponse
from app.api.deps import require_admin
router = APIRouter(prefix="/admin/audit-logs", tags=["admin-audit"])
def _build_audit_query(
action: Optional[str] = None,
resource_type: Optional[str] = None,
user_id: Optional[UUID] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
search: Optional[str] = None,
):
"""Build base query with filters (reused for list and export)."""
query = (
select(
AuditLog.id,
AuditLog.user_id,
AuditLog.action,
AuditLog.resource_type,
AuditLog.resource_id,
AuditLog.details,
AuditLog.ip_address,
AuditLog.created_at,
User.email.label("user_email"),
)
.outerjoin(User, AuditLog.user_id == User.id)
)
if action:
query = query.where(AuditLog.action == action)
if resource_type:
query = query.where(AuditLog.resource_type == resource_type)
if user_id:
query = query.where(AuditLog.user_id == user_id)
if date_from:
query = query.where(AuditLog.created_at >= datetime.fromisoformat(date_from))
if date_to:
query = query.where(AuditLog.created_at <= datetime.fromisoformat(date_to))
if search:
query = query.where(AuditLog.resource_id.cast(str).ilike(f"%{search}%"))
return query
@router.get("", response_model=AuditLogListResponse)
async def list_audit_logs(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_admin)],
page: int = Query(1, ge=1),
per_page: int = Query(50, ge=1, le=100),
action: Optional[str] = None,
resource_type: Optional[str] = None,
user_id: Optional[UUID] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
search: Optional[str] = None,
):
"""List audit logs with pagination and filters."""
base_query = _build_audit_query(action, resource_type, user_id, date_from, date_to, search)
# Count
count_query = select(func.count()).select_from(AuditLog)
if action:
count_query = count_query.where(AuditLog.action == action)
if resource_type:
count_query = count_query.where(AuditLog.resource_type == resource_type)
if user_id:
count_query = count_query.where(AuditLog.user_id == user_id)
if date_from:
count_query = count_query.where(AuditLog.created_at >= datetime.fromisoformat(date_from))
if date_to:
count_query = count_query.where(AuditLog.created_at <= datetime.fromisoformat(date_to))
total = await db.scalar(count_query) or 0
# Paginated results
query = base_query.order_by(AuditLog.created_at.desc()).offset((page - 1) * per_page).limit(per_page)
result = await db.execute(query)
rows = result.all()
items = [
AuditLogEntry(
id=row.id,
user_id=row.user_id,
user_email=row.user_email,
action=row.action,
resource_type=row.resource_type,
resource_id=row.resource_id,
details=row.details,
ip_address=row.ip_address,
created_at=row.created_at,
)
for row in rows
]
return AuditLogListResponse(items=items, total=total, page=page, per_page=per_page)
@router.get("/export")
async def export_audit_logs(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_admin)],
action: Optional[str] = None,
resource_type: Optional[str] = None,
user_id: Optional[UUID] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
):
"""Export audit logs as CSV (10k row limit)."""
query = _build_audit_query(action, resource_type, user_id, date_from, date_to)
query = query.order_by(AuditLog.created_at.desc()).limit(10000)
result = await db.execute(query)
rows = result.all()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["timestamp", "user_email", "action", "resource_type", "resource_id", "ip_address", "details"])
for row in rows:
writer.writerow([
row.created_at.isoformat() if row.created_at else "",
row.user_email or "",
row.action,
row.resource_type,
str(row.resource_id) if row.resource_id else "",
row.ip_address or "",
str(row.details) if row.details else "",
])
output.seek(0)
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=audit-logs-{today}.csv"},
)