Files
resolutionflow/backend/app/api/endpoints/uploads.py
chihlasm 241ea1e458 feat(evidence): add file upload/download API endpoints with tests
- POST /uploads: multipart upload with content-type/size validation, per-session limits, S3 storage
- GET /uploads/{id}/url: presigned download URL with account ownership check
- GET /uploads: list uploads for a session
- DELETE /uploads/{id}: delete with ownership enforcement (403 for non-owners)
- Returns 503 gracefully when STORAGE_ENDPOINT is not configured
- 15 integration tests covering auth, validation, 503 behavior, and ownership

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 03:22:52 +00:00

209 lines
7.1 KiB
Python

"""File upload endpoints — S3-compatible object storage.
POST /uploads — Upload a file (multipart)
GET /uploads/{id}/url — Get presigned download URL
GET /uploads — List uploads for a session
DELETE /uploads/{id} — Delete an upload
"""
import logging
from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile, status
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_active_user, get_db
from app.core.config import settings
from app.core.rate_limit import limiter
from app.models.file_upload import FileUpload
from app.models.user import User
from app.schemas.upload import FileUploadResponse
from app.services import storage_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/uploads", tags=["uploads"])
def _check_storage_configured() -> None:
"""Raise 503 if object storage is not configured."""
if not settings.STORAGE_ENDPOINT:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="File storage is not configured",
)
@router.post("", response_model=FileUploadResponse, status_code=status.HTTP_201_CREATED)
@limiter.limit("10/minute")
async def upload_file(
request: Request,
file: UploadFile = File(...),
session_id: Optional[str] = Form(None),
current_user: Annotated[User, Depends(get_current_active_user)] = None,
db: Annotated[AsyncSession, Depends(get_db)] = None,
) -> FileUploadResponse:
"""Upload a file and store it in S3-compatible object storage."""
_check_storage_configured()
file_data = await file.read()
content_type = file.content_type or "application/octet-stream"
size_bytes = len(file_data)
# Validate content type and size
error = storage_service.validate_upload(content_type, size_bytes)
if error:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error)
# Parse and validate session_id if provided
parsed_session_id: Optional[UUID] = None
if session_id:
try:
parsed_session_id = UUID(session_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid session_id"
)
# Check per-session limits
count_result = await db.execute(
select(func.count()).select_from(FileUpload).where(
FileUpload.session_id == parsed_session_id
)
)
session_count = count_result.scalar() or 0
if session_count >= storage_service.MAX_FILES_PER_SESSION:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Session has reached the maximum of {storage_service.MAX_FILES_PER_SESSION} files",
)
size_result = await db.execute(
select(func.sum(FileUpload.size_bytes)).where(
FileUpload.session_id == parsed_session_id
)
)
session_bytes = size_result.scalar() or 0
if session_bytes + size_bytes > storage_service.MAX_BYTES_PER_SESSION:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Session has exceeded the maximum total upload size (50 MB)",
)
# Upload to S3
storage_key = await storage_service.upload_file(
file_data=file_data,
filename=file.filename or "upload",
content_type=content_type,
account_id=str(current_user.account_id),
)
# Persist metadata
upload = FileUpload(
account_id=current_user.account_id,
uploaded_by=current_user.id,
session_id=parsed_session_id,
filename=file.filename or "upload",
content_type=content_type,
size_bytes=size_bytes,
storage_key=storage_key,
)
db.add(upload)
await db.commit()
await db.refresh(upload)
presigned_url = storage_service.get_presigned_url(upload.storage_key)
return FileUploadResponse(
id=upload.id,
filename=upload.filename,
content_type=upload.content_type,
size_bytes=upload.size_bytes,
url=presigned_url,
created_at=upload.created_at,
)
@router.get("/{upload_id}/url")
async def get_upload_url(
upload_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> dict:
"""Get a presigned download URL for an uploaded file."""
_check_storage_configured()
result = await db.execute(select(FileUpload).where(FileUpload.id == upload_id))
upload = result.scalar_one_or_none()
if upload is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found")
# Verify the upload belongs to the user's account
if upload.account_id != current_user.account_id and not current_user.is_super_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
url = storage_service.get_presigned_url(upload.storage_key)
return {"url": url}
@router.get("", response_model=list[FileUploadResponse])
async def list_uploads(
session_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> list[FileUploadResponse]:
"""List uploads for a session."""
_check_storage_configured()
result = await db.execute(
select(FileUpload).where(
FileUpload.session_id == session_id,
FileUpload.account_id == current_user.account_id,
)
)
uploads = result.scalars().all()
responses = []
for upload in uploads:
presigned_url = storage_service.get_presigned_url(upload.storage_key)
responses.append(
FileUploadResponse(
id=upload.id,
filename=upload.filename,
content_type=upload.content_type,
size_bytes=upload.size_bytes,
url=presigned_url,
created_at=upload.created_at,
)
)
return responses
@router.delete("/{upload_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_upload(
upload_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> None:
"""Delete an uploaded file."""
_check_storage_configured()
result = await db.execute(select(FileUpload).where(FileUpload.id == upload_id))
upload = result.scalar_one_or_none()
if upload is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found")
# Verify ownership
if upload.uploaded_by != current_user.id and not current_user.is_super_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
# Delete from S3
await storage_service.delete_file(upload.storage_key)
# Delete DB record
await db.delete(upload)
await db.commit()