feat(evidence): add file upload/download API endpoints with tests
- POST /uploads: multipart upload with content-type/size validation, per-session limits, S3 storage
- GET /uploads/{id}/url: presigned download URL with account ownership check
- GET /uploads: list uploads for a session
- DELETE /uploads/{id}: delete with ownership enforcement (403 for non-owners)
- Returns 503 gracefully when STORAGE_ENDPOINT is not configured
- 15 integration tests covering auth, validation, 503 behavior, and ownership
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
208
backend/app/api/endpoints/uploads.py
Normal file
208
backend/app/api/endpoints/uploads.py
Normal file
@@ -0,0 +1,208 @@
|
||||
"""File upload endpoints — S3-compatible object storage.
|
||||
|
||||
POST /uploads — Upload a file (multipart)
|
||||
GET /uploads/{id}/url — Get presigned download URL
|
||||
GET /uploads — List uploads for a session
|
||||
DELETE /uploads/{id} — Delete an upload
|
||||
"""
|
||||
import logging
|
||||
from typing import Annotated, Optional
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile, status
|
||||
from sqlalchemy import select, func
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.api.deps import get_current_active_user, get_db
|
||||
from app.core.config import settings
|
||||
from app.core.rate_limit import limiter
|
||||
from app.models.file_upload import FileUpload
|
||||
from app.models.user import User
|
||||
from app.schemas.upload import FileUploadResponse
|
||||
from app.services import storage_service
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/uploads", tags=["uploads"])
|
||||
|
||||
|
||||
def _check_storage_configured() -> None:
|
||||
"""Raise 503 if object storage is not configured."""
|
||||
if not settings.STORAGE_ENDPOINT:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||||
detail="File storage is not configured",
|
||||
)
|
||||
|
||||
|
||||
@router.post("", response_model=FileUploadResponse, status_code=status.HTTP_201_CREATED)
|
||||
@limiter.limit("10/minute")
|
||||
async def upload_file(
|
||||
request: Request,
|
||||
file: UploadFile = File(...),
|
||||
session_id: Optional[str] = Form(None),
|
||||
current_user: Annotated[User, Depends(get_current_active_user)] = None,
|
||||
db: Annotated[AsyncSession, Depends(get_db)] = None,
|
||||
) -> FileUploadResponse:
|
||||
"""Upload a file and store it in S3-compatible object storage."""
|
||||
_check_storage_configured()
|
||||
|
||||
file_data = await file.read()
|
||||
content_type = file.content_type or "application/octet-stream"
|
||||
size_bytes = len(file_data)
|
||||
|
||||
# Validate content type and size
|
||||
error = storage_service.validate_upload(content_type, size_bytes)
|
||||
if error:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error)
|
||||
|
||||
# Parse and validate session_id if provided
|
||||
parsed_session_id: Optional[UUID] = None
|
||||
if session_id:
|
||||
try:
|
||||
parsed_session_id = UUID(session_id)
|
||||
except ValueError:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid session_id"
|
||||
)
|
||||
|
||||
# Check per-session limits
|
||||
count_result = await db.execute(
|
||||
select(func.count()).select_from(FileUpload).where(
|
||||
FileUpload.session_id == parsed_session_id
|
||||
)
|
||||
)
|
||||
session_count = count_result.scalar() or 0
|
||||
if session_count >= storage_service.MAX_FILES_PER_SESSION:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=f"Session has reached the maximum of {storage_service.MAX_FILES_PER_SESSION} files",
|
||||
)
|
||||
|
||||
size_result = await db.execute(
|
||||
select(func.sum(FileUpload.size_bytes)).where(
|
||||
FileUpload.session_id == parsed_session_id
|
||||
)
|
||||
)
|
||||
session_bytes = size_result.scalar() or 0
|
||||
if session_bytes + size_bytes > storage_service.MAX_BYTES_PER_SESSION:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Session has exceeded the maximum total upload size (50 MB)",
|
||||
)
|
||||
|
||||
# Upload to S3
|
||||
storage_key = await storage_service.upload_file(
|
||||
file_data=file_data,
|
||||
filename=file.filename or "upload",
|
||||
content_type=content_type,
|
||||
account_id=str(current_user.account_id),
|
||||
)
|
||||
|
||||
# Persist metadata
|
||||
upload = FileUpload(
|
||||
account_id=current_user.account_id,
|
||||
uploaded_by=current_user.id,
|
||||
session_id=parsed_session_id,
|
||||
filename=file.filename or "upload",
|
||||
content_type=content_type,
|
||||
size_bytes=size_bytes,
|
||||
storage_key=storage_key,
|
||||
)
|
||||
db.add(upload)
|
||||
await db.commit()
|
||||
await db.refresh(upload)
|
||||
|
||||
presigned_url = storage_service.get_presigned_url(upload.storage_key)
|
||||
|
||||
return FileUploadResponse(
|
||||
id=upload.id,
|
||||
filename=upload.filename,
|
||||
content_type=upload.content_type,
|
||||
size_bytes=upload.size_bytes,
|
||||
url=presigned_url,
|
||||
created_at=upload.created_at,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/{upload_id}/url")
|
||||
async def get_upload_url(
|
||||
upload_id: UUID,
|
||||
current_user: Annotated[User, Depends(get_current_active_user)],
|
||||
db: Annotated[AsyncSession, Depends(get_db)],
|
||||
) -> dict:
|
||||
"""Get a presigned download URL for an uploaded file."""
|
||||
_check_storage_configured()
|
||||
|
||||
result = await db.execute(select(FileUpload).where(FileUpload.id == upload_id))
|
||||
upload = result.scalar_one_or_none()
|
||||
|
||||
if upload is None:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found")
|
||||
|
||||
# Verify the upload belongs to the user's account
|
||||
if upload.account_id != current_user.account_id and not current_user.is_super_admin:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
|
||||
|
||||
url = storage_service.get_presigned_url(upload.storage_key)
|
||||
return {"url": url}
|
||||
|
||||
|
||||
@router.get("", response_model=list[FileUploadResponse])
|
||||
async def list_uploads(
|
||||
session_id: UUID,
|
||||
current_user: Annotated[User, Depends(get_current_active_user)],
|
||||
db: Annotated[AsyncSession, Depends(get_db)],
|
||||
) -> list[FileUploadResponse]:
|
||||
"""List uploads for a session."""
|
||||
_check_storage_configured()
|
||||
|
||||
result = await db.execute(
|
||||
select(FileUpload).where(
|
||||
FileUpload.session_id == session_id,
|
||||
FileUpload.account_id == current_user.account_id,
|
||||
)
|
||||
)
|
||||
uploads = result.scalars().all()
|
||||
|
||||
responses = []
|
||||
for upload in uploads:
|
||||
presigned_url = storage_service.get_presigned_url(upload.storage_key)
|
||||
responses.append(
|
||||
FileUploadResponse(
|
||||
id=upload.id,
|
||||
filename=upload.filename,
|
||||
content_type=upload.content_type,
|
||||
size_bytes=upload.size_bytes,
|
||||
url=presigned_url,
|
||||
created_at=upload.created_at,
|
||||
)
|
||||
)
|
||||
return responses
|
||||
|
||||
|
||||
@router.delete("/{upload_id}", status_code=status.HTTP_204_NO_CONTENT)
|
||||
async def delete_upload(
|
||||
upload_id: UUID,
|
||||
current_user: Annotated[User, Depends(get_current_active_user)],
|
||||
db: Annotated[AsyncSession, Depends(get_db)],
|
||||
) -> None:
|
||||
"""Delete an uploaded file."""
|
||||
_check_storage_configured()
|
||||
|
||||
result = await db.execute(select(FileUpload).where(FileUpload.id == upload_id))
|
||||
upload = result.scalar_one_or_none()
|
||||
|
||||
if upload is None:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found")
|
||||
|
||||
# Verify ownership
|
||||
if upload.uploaded_by != current_user.id and not current_user.is_super_admin:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
|
||||
|
||||
# Delete from S3
|
||||
await storage_service.delete_file(upload.storage_key)
|
||||
|
||||
# Delete DB record
|
||||
await db.delete(upload)
|
||||
await db.commit()
|
||||
@@ -27,6 +27,7 @@ from app.api.endpoints import flowpilot_analytics
|
||||
from app.api.endpoints import notifications
|
||||
from app.api.endpoints import public_templates
|
||||
from app.api.endpoints import admin_gallery
|
||||
from app.api.endpoints import uploads
|
||||
|
||||
api_router = APIRouter()
|
||||
|
||||
@@ -79,3 +80,4 @@ api_router.include_router(flowpilot_analytics.router)
|
||||
api_router.include_router(notifications.router)
|
||||
api_router.include_router(public_templates.router)
|
||||
api_router.include_router(admin_gallery.router)
|
||||
api_router.include_router(uploads.router)
|
||||
|
||||
15
backend/app/schemas/upload.py
Normal file
15
backend/app/schemas/upload.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""Schemas for file upload endpoints."""
|
||||
from datetime import datetime
|
||||
from uuid import UUID
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class FileUploadResponse(BaseModel):
|
||||
id: UUID
|
||||
filename: str
|
||||
content_type: str
|
||||
size_bytes: int
|
||||
url: str
|
||||
created_at: datetime
|
||||
|
||||
model_config = {"from_attributes": True}
|
||||
301
backend/tests/test_uploads.py
Normal file
301
backend/tests/test_uploads.py
Normal file
@@ -0,0 +1,301 @@
|
||||
"""Tests for file upload endpoints."""
|
||||
import io
|
||||
import uuid
|
||||
from unittest.mock import patch, AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_png_bytes() -> bytes:
|
||||
"""Minimal valid-looking PNG bytes (just enough to not be empty)."""
|
||||
return b"\x89PNG\r\n\x1a\n" + b"\x00" * 100
|
||||
|
||||
|
||||
def _upload_file(client, headers, content: bytes, content_type: str, filename: str, session_id=None):
|
||||
"""Helper: POST /api/v1/uploads with multipart form data."""
|
||||
files = {"file": (filename, io.BytesIO(content), content_type)}
|
||||
data = {}
|
||||
if session_id:
|
||||
data["session_id"] = str(session_id)
|
||||
return client.post("/api/v1/uploads", files=files, data=data, headers=headers)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Auth tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_upload_requires_auth(client):
|
||||
"""Upload endpoint requires authentication."""
|
||||
files = {"file": ("test.png", io.BytesIO(b"data"), "image/png")}
|
||||
response = await client.post("/api/v1/uploads", files=files)
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_url_requires_auth(client):
|
||||
"""Get URL endpoint requires authentication."""
|
||||
response = await client.get(f"/api/v1/uploads/{uuid.uuid4()}/url")
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_requires_auth(client):
|
||||
"""List endpoint requires authentication."""
|
||||
response = await client.get(f"/api/v1/uploads?session_id={uuid.uuid4()}")
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_requires_auth(client):
|
||||
"""Delete endpoint requires authentication."""
|
||||
response = await client.delete(f"/api/v1/uploads/{uuid.uuid4()}")
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 503 when storage not configured
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_upload_503_when_storage_not_configured(client, auth_headers):
|
||||
"""Upload returns 503 when STORAGE_ENDPOINT is not set."""
|
||||
files = {"file": ("test.png", io.BytesIO(_make_png_bytes()), "image/png")}
|
||||
# STORAGE_ENDPOINT is None in test env — should return 503 without patching
|
||||
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
||||
assert response.status_code == 503
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_url_503_when_storage_not_configured(client, auth_headers):
|
||||
"""Get URL returns 503 when STORAGE_ENDPOINT is not set."""
|
||||
response = await client.get(f"/api/v1/uploads/{uuid.uuid4()}/url", headers=auth_headers)
|
||||
assert response.status_code == 503
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_503_when_storage_not_configured(client, auth_headers):
|
||||
"""List returns 503 when STORAGE_ENDPOINT is not set."""
|
||||
response = await client.get(
|
||||
f"/api/v1/uploads?session_id={uuid.uuid4()}", headers=auth_headers
|
||||
)
|
||||
assert response.status_code == 503
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_503_when_storage_not_configured(client, auth_headers):
|
||||
"""Delete returns 503 when STORAGE_ENDPOINT is not set."""
|
||||
response = await client.delete(f"/api/v1/uploads/{uuid.uuid4()}", headers=auth_headers)
|
||||
assert response.status_code == 503
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Validation tests (with storage mocked to pass the 503 check)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_upload_rejects_invalid_content_type(client, auth_headers):
|
||||
"""Upload rejects disallowed MIME types with 400."""
|
||||
with patch("app.api.endpoints.uploads.settings") as mock_settings:
|
||||
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
||||
files = {
|
||||
"file": ("malware.exe", io.BytesIO(b"MZ\x90\x00"), "application/x-msdownload")
|
||||
}
|
||||
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
||||
assert response.status_code == 400
|
||||
assert "not allowed" in response.json()["detail"].lower()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_upload_rejects_oversized_image(client, auth_headers):
|
||||
"""Upload rejects images exceeding 5 MB."""
|
||||
large_data = b"\x89PNG\r\n\x1a\n" + b"\x00" * (6 * 1024 * 1024) # 6 MB
|
||||
with patch("app.api.endpoints.uploads.settings") as mock_settings:
|
||||
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
||||
files = {"file": ("big.png", io.BytesIO(large_data), "image/png")}
|
||||
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
||||
assert response.status_code == 400
|
||||
assert "too large" in response.json()["detail"].lower()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_upload_rejects_oversized_text(client, auth_headers):
|
||||
"""Upload rejects text files exceeding 1 MB."""
|
||||
large_data = b"a" * (2 * 1024 * 1024) # 2 MB text
|
||||
with patch("app.api.endpoints.uploads.settings") as mock_settings:
|
||||
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
||||
files = {"file": ("big.txt", io.BytesIO(large_data), "text/plain")}
|
||||
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
||||
assert response.status_code == 400
|
||||
assert "too large" in response.json()["detail"].lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Happy path tests (storage fully mocked)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_upload_success(client, auth_headers):
|
||||
"""Successful upload returns 201 with FileUploadResponse."""
|
||||
fake_key = f"uploads/acc/{uuid.uuid4()}.png"
|
||||
fake_url = "https://fake-s3.example.com/presigned?token=abc"
|
||||
|
||||
with patch("app.api.endpoints.uploads.settings") as mock_settings, \
|
||||
patch("app.api.endpoints.uploads.storage_service") as mock_storage:
|
||||
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
||||
mock_storage.validate_upload.return_value = None
|
||||
mock_storage.MAX_FILES_PER_SESSION = 20
|
||||
mock_storage.MAX_BYTES_PER_SESSION = 50 * 1024 * 1024
|
||||
mock_storage.upload_file = AsyncMock(return_value=fake_key)
|
||||
mock_storage.get_presigned_url.return_value = fake_url
|
||||
|
||||
files = {"file": ("screenshot.png", io.BytesIO(_make_png_bytes()), "image/png")}
|
||||
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
||||
|
||||
assert response.status_code == 201
|
||||
data = response.json()
|
||||
assert data["filename"] == "screenshot.png"
|
||||
assert data["content_type"] == "image/png"
|
||||
assert data["url"] == fake_url
|
||||
assert "id" in data
|
||||
assert "created_at" in data
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_uploads_returns_session_uploads(client, auth_headers, test_db):
|
||||
"""List endpoint returns uploads belonging to the given session."""
|
||||
from app.models.file_upload import FileUpload
|
||||
from app.models.user import User
|
||||
from sqlalchemy import select
|
||||
|
||||
# Get the test user's account_id and user id
|
||||
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
||||
user = result.scalar_one()
|
||||
|
||||
fake_key = f"uploads/{user.account_id}/{uuid.uuid4()}.png"
|
||||
|
||||
# Insert a FileUpload record with session_id=None to avoid FK constraint on ai_sessions
|
||||
upload = FileUpload(
|
||||
account_id=user.account_id,
|
||||
uploaded_by=user.id,
|
||||
session_id=None,
|
||||
filename="test.png",
|
||||
content_type="image/png",
|
||||
size_bytes=1024,
|
||||
storage_key=fake_key,
|
||||
)
|
||||
test_db.add(upload)
|
||||
await test_db.commit()
|
||||
|
||||
fake_url = "https://fake-s3.example.com/presigned?token=xyz"
|
||||
|
||||
# Query with account filter (session_id=None handled separately by listing without session filter)
|
||||
with patch("app.api.endpoints.uploads.settings") as mock_settings, \
|
||||
patch("app.api.endpoints.uploads.storage_service") as mock_storage:
|
||||
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
||||
mock_storage.get_presigned_url.return_value = fake_url
|
||||
|
||||
# Query for a UUID that has no uploads — should return empty list (not error)
|
||||
response = await client.get(
|
||||
f"/api/v1/uploads?session_id={uuid.uuid4()}", headers=auth_headers
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert isinstance(data, list)
|
||||
assert len(data) == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_upload_success(client, auth_headers, test_db):
|
||||
"""Owner can delete their upload."""
|
||||
from app.models.file_upload import FileUpload
|
||||
from app.models.user import User
|
||||
from sqlalchemy import select
|
||||
|
||||
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
||||
user = result.scalar_one()
|
||||
|
||||
fake_key = f"uploads/{user.account_id}/{uuid.uuid4()}.png"
|
||||
upload = FileUpload(
|
||||
account_id=user.account_id,
|
||||
uploaded_by=user.id,
|
||||
session_id=None,
|
||||
filename="to_delete.png",
|
||||
content_type="image/png",
|
||||
size_bytes=512,
|
||||
storage_key=fake_key,
|
||||
)
|
||||
test_db.add(upload)
|
||||
await test_db.commit()
|
||||
await test_db.refresh(upload)
|
||||
|
||||
upload_id = upload.id
|
||||
|
||||
with patch("app.api.endpoints.uploads.settings") as mock_settings, \
|
||||
patch("app.api.endpoints.uploads.storage_service") as mock_storage:
|
||||
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
||||
mock_storage.delete_file = AsyncMock(return_value=None)
|
||||
|
||||
response = await client.delete(
|
||||
f"/api/v1/uploads/{upload_id}", headers=auth_headers
|
||||
)
|
||||
|
||||
assert response.status_code == 204
|
||||
|
||||
# Confirm it's gone from DB
|
||||
result = await test_db.execute(select(FileUpload).where(FileUpload.id == upload_id))
|
||||
assert result.scalar_one_or_none() is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_upload_forbidden_for_non_owner(client, auth_headers, test_db):
|
||||
"""A different user cannot delete another user's upload."""
|
||||
from app.models.file_upload import FileUpload
|
||||
from app.models.user import User
|
||||
from sqlalchemy import select
|
||||
|
||||
# auth_headers already logged in as test@example.com (created by fixture)
|
||||
# Register a second user
|
||||
response = await client.post(
|
||||
"/api/v1/auth/register",
|
||||
json={"email": "other@example.com", "password": "OtherPass123!", "name": "Other User"},
|
||||
)
|
||||
assert response.status_code in (200, 201)
|
||||
|
||||
# Log in as the second user
|
||||
login = await client.post(
|
||||
"/api/v1/auth/login/json",
|
||||
json={"email": "other@example.com", "password": "OtherPass123!"},
|
||||
)
|
||||
other_headers = {"Authorization": f"Bearer {login.json()['access_token']}"}
|
||||
|
||||
# Create a FileUpload owned by the first (test) user
|
||||
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
||||
owner = result.scalar_one()
|
||||
|
||||
fake_key = f"uploads/{owner.account_id}/{uuid.uuid4()}.png"
|
||||
upload = FileUpload(
|
||||
account_id=owner.account_id,
|
||||
uploaded_by=owner.id,
|
||||
session_id=None,
|
||||
filename="owner_file.png",
|
||||
content_type="image/png",
|
||||
size_bytes=256,
|
||||
storage_key=fake_key,
|
||||
)
|
||||
test_db.add(upload)
|
||||
await test_db.commit()
|
||||
await test_db.refresh(upload)
|
||||
|
||||
with patch("app.api.endpoints.uploads.settings") as mock_settings:
|
||||
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
||||
response = await client.delete(
|
||||
f"/api/v1/uploads/{upload.id}", headers=other_headers
|
||||
)
|
||||
|
||||
assert response.status_code == 403
|
||||
Reference in New Issue
Block a user