- POST /uploads: multipart upload with content-type/size validation, per-session limits, S3 storage
- GET /uploads/{id}/url: presigned download URL with account ownership check
- GET /uploads: list uploads for a session
- DELETE /uploads/{id}: delete with ownership enforcement (403 for non-owners)
- Returns 503 gracefully when STORAGE_ENDPOINT is not configured
- 15 integration tests covering auth, validation, 503 behavior, and ownership
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
209 lines
7.1 KiB
Python
209 lines
7.1 KiB
Python
"""File upload endpoints — S3-compatible object storage.
|
|
|
|
POST /uploads — Upload a file (multipart)
|
|
GET /uploads/{id}/url — Get presigned download URL
|
|
GET /uploads — List uploads for a session
|
|
DELETE /uploads/{id} — Delete an upload
|
|
"""
|
|
import logging
|
|
from typing import Annotated, Optional
|
|
from uuid import UUID
|
|
|
|
from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile, status
|
|
from sqlalchemy import select, func
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.api.deps import get_current_active_user, get_db
|
|
from app.core.config import settings
|
|
from app.core.rate_limit import limiter
|
|
from app.models.file_upload import FileUpload
|
|
from app.models.user import User
|
|
from app.schemas.upload import FileUploadResponse
|
|
from app.services import storage_service
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/uploads", tags=["uploads"])
|
|
|
|
|
|
def _check_storage_configured() -> None:
|
|
"""Raise 503 if object storage is not configured."""
|
|
if not settings.STORAGE_ENDPOINT:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="File storage is not configured",
|
|
)
|
|
|
|
|
|
@router.post("", response_model=FileUploadResponse, status_code=status.HTTP_201_CREATED)
|
|
@limiter.limit("10/minute")
|
|
async def upload_file(
|
|
request: Request,
|
|
file: UploadFile = File(...),
|
|
session_id: Optional[str] = Form(None),
|
|
current_user: Annotated[User, Depends(get_current_active_user)] = None,
|
|
db: Annotated[AsyncSession, Depends(get_db)] = None,
|
|
) -> FileUploadResponse:
|
|
"""Upload a file and store it in S3-compatible object storage."""
|
|
_check_storage_configured()
|
|
|
|
file_data = await file.read()
|
|
content_type = file.content_type or "application/octet-stream"
|
|
size_bytes = len(file_data)
|
|
|
|
# Validate content type and size
|
|
error = storage_service.validate_upload(content_type, size_bytes)
|
|
if error:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error)
|
|
|
|
# Parse and validate session_id if provided
|
|
parsed_session_id: Optional[UUID] = None
|
|
if session_id:
|
|
try:
|
|
parsed_session_id = UUID(session_id)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid session_id"
|
|
)
|
|
|
|
# Check per-session limits
|
|
count_result = await db.execute(
|
|
select(func.count()).select_from(FileUpload).where(
|
|
FileUpload.session_id == parsed_session_id
|
|
)
|
|
)
|
|
session_count = count_result.scalar() or 0
|
|
if session_count >= storage_service.MAX_FILES_PER_SESSION:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Session has reached the maximum of {storage_service.MAX_FILES_PER_SESSION} files",
|
|
)
|
|
|
|
size_result = await db.execute(
|
|
select(func.sum(FileUpload.size_bytes)).where(
|
|
FileUpload.session_id == parsed_session_id
|
|
)
|
|
)
|
|
session_bytes = size_result.scalar() or 0
|
|
if session_bytes + size_bytes > storage_service.MAX_BYTES_PER_SESSION:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Session has exceeded the maximum total upload size (50 MB)",
|
|
)
|
|
|
|
# Upload to S3
|
|
storage_key = await storage_service.upload_file(
|
|
file_data=file_data,
|
|
filename=file.filename or "upload",
|
|
content_type=content_type,
|
|
account_id=str(current_user.account_id),
|
|
)
|
|
|
|
# Persist metadata
|
|
upload = FileUpload(
|
|
account_id=current_user.account_id,
|
|
uploaded_by=current_user.id,
|
|
session_id=parsed_session_id,
|
|
filename=file.filename or "upload",
|
|
content_type=content_type,
|
|
size_bytes=size_bytes,
|
|
storage_key=storage_key,
|
|
)
|
|
db.add(upload)
|
|
await db.commit()
|
|
await db.refresh(upload)
|
|
|
|
presigned_url = storage_service.get_presigned_url(upload.storage_key)
|
|
|
|
return FileUploadResponse(
|
|
id=upload.id,
|
|
filename=upload.filename,
|
|
content_type=upload.content_type,
|
|
size_bytes=upload.size_bytes,
|
|
url=presigned_url,
|
|
created_at=upload.created_at,
|
|
)
|
|
|
|
|
|
@router.get("/{upload_id}/url")
|
|
async def get_upload_url(
|
|
upload_id: UUID,
|
|
current_user: Annotated[User, Depends(get_current_active_user)],
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
) -> dict:
|
|
"""Get a presigned download URL for an uploaded file."""
|
|
_check_storage_configured()
|
|
|
|
result = await db.execute(select(FileUpload).where(FileUpload.id == upload_id))
|
|
upload = result.scalar_one_or_none()
|
|
|
|
if upload is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found")
|
|
|
|
# Verify the upload belongs to the user's account
|
|
if upload.account_id != current_user.account_id and not current_user.is_super_admin:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
|
|
|
|
url = storage_service.get_presigned_url(upload.storage_key)
|
|
return {"url": url}
|
|
|
|
|
|
@router.get("", response_model=list[FileUploadResponse])
|
|
async def list_uploads(
|
|
session_id: UUID,
|
|
current_user: Annotated[User, Depends(get_current_active_user)],
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
) -> list[FileUploadResponse]:
|
|
"""List uploads for a session."""
|
|
_check_storage_configured()
|
|
|
|
result = await db.execute(
|
|
select(FileUpload).where(
|
|
FileUpload.session_id == session_id,
|
|
FileUpload.account_id == current_user.account_id,
|
|
)
|
|
)
|
|
uploads = result.scalars().all()
|
|
|
|
responses = []
|
|
for upload in uploads:
|
|
presigned_url = storage_service.get_presigned_url(upload.storage_key)
|
|
responses.append(
|
|
FileUploadResponse(
|
|
id=upload.id,
|
|
filename=upload.filename,
|
|
content_type=upload.content_type,
|
|
size_bytes=upload.size_bytes,
|
|
url=presigned_url,
|
|
created_at=upload.created_at,
|
|
)
|
|
)
|
|
return responses
|
|
|
|
|
|
@router.delete("/{upload_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def delete_upload(
|
|
upload_id: UUID,
|
|
current_user: Annotated[User, Depends(get_current_active_user)],
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
) -> None:
|
|
"""Delete an uploaded file."""
|
|
_check_storage_configured()
|
|
|
|
result = await db.execute(select(FileUpload).where(FileUpload.id == upload_id))
|
|
upload = result.scalar_one_or_none()
|
|
|
|
if upload is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found")
|
|
|
|
# Verify ownership
|
|
if upload.uploaded_by != current_user.id and not current_user.is_super_admin:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
|
|
|
|
# Delete from S3
|
|
await storage_service.delete_file(upload.storage_key)
|
|
|
|
# Delete DB record
|
|
await db.delete(upload)
|
|
await db.commit()
|