import csv import io from datetime import datetime, timezone from typing import Annotated, Optional from uuid import UUID from fastapi import APIRouter, Depends, Query from fastapi.responses import StreamingResponse from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from app.core.database import get_db from app.models.user import User from app.models.audit_log import AuditLog from app.schemas.admin import AuditLogEntry, AuditLogListResponse from app.api.deps import require_admin router = APIRouter(prefix="/admin/audit-logs", tags=["admin-audit"]) def _build_audit_query( action: Optional[str] = None, resource_type: Optional[str] = None, user_id: Optional[UUID] = None, date_from: Optional[str] = None, date_to: Optional[str] = None, search: Optional[str] = None, ): """Build base query with filters (reused for list and export).""" query = ( select( AuditLog.id, AuditLog.user_id, AuditLog.action, AuditLog.resource_type, AuditLog.resource_id, AuditLog.details, AuditLog.ip_address, AuditLog.created_at, User.email.label("user_email"), ) .outerjoin(User, AuditLog.user_id == User.id) ) if action: query = query.where(AuditLog.action == action) if resource_type: query = query.where(AuditLog.resource_type == resource_type) if user_id: query = query.where(AuditLog.user_id == user_id) if date_from: query = query.where(AuditLog.created_at >= datetime.fromisoformat(date_from)) if date_to: query = query.where(AuditLog.created_at <= datetime.fromisoformat(date_to)) if search: query = query.where(AuditLog.resource_id.cast(str).ilike(f"%{search}%")) return query @router.get("", response_model=AuditLogListResponse) async def list_audit_logs( db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)], page: int = Query(1, ge=1), per_page: int = Query(50, ge=1, le=100), action: Optional[str] = None, resource_type: Optional[str] = None, user_id: Optional[UUID] = None, date_from: Optional[str] = None, date_to: Optional[str] = None, search: Optional[str] = None, ): """List audit logs with pagination and filters.""" base_query = _build_audit_query(action, resource_type, user_id, date_from, date_to, search) # Count count_query = select(func.count()).select_from(AuditLog) if action: count_query = count_query.where(AuditLog.action == action) if resource_type: count_query = count_query.where(AuditLog.resource_type == resource_type) if user_id: count_query = count_query.where(AuditLog.user_id == user_id) if date_from: count_query = count_query.where(AuditLog.created_at >= datetime.fromisoformat(date_from)) if date_to: count_query = count_query.where(AuditLog.created_at <= datetime.fromisoformat(date_to)) total = await db.scalar(count_query) or 0 # Paginated results query = base_query.order_by(AuditLog.created_at.desc()).offset((page - 1) * per_page).limit(per_page) result = await db.execute(query) rows = result.all() items = [ AuditLogEntry( id=row.id, user_id=row.user_id, user_email=row.user_email, action=row.action, resource_type=row.resource_type, resource_id=row.resource_id, details=row.details, ip_address=row.ip_address, created_at=row.created_at, ) for row in rows ] return AuditLogListResponse(items=items, total=total, page=page, per_page=per_page) @router.get("/export") async def export_audit_logs( db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(require_admin)], action: Optional[str] = None, resource_type: Optional[str] = None, user_id: Optional[UUID] = None, date_from: Optional[str] = None, date_to: Optional[str] = None, ): """Export audit logs as CSV (10k row limit).""" query = _build_audit_query(action, resource_type, user_id, date_from, date_to) query = query.order_by(AuditLog.created_at.desc()).limit(10000) result = await db.execute(query) rows = result.all() output = io.StringIO() writer = csv.writer(output) writer.writerow(["timestamp", "user_email", "action", "resource_type", "resource_id", "ip_address", "details"]) for row in rows: writer.writerow([ row.created_at.isoformat() if row.created_at else "", row.user_email or "", row.action, row.resource_type, str(row.resource_id) if row.resource_id else "", row.ip_address or "", str(row.details) if row.details else "", ]) output.seek(0) today = datetime.now(timezone.utc).strftime("%Y-%m-%d") return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename=audit-logs-{today}.csv"}, )