533 lines
20 KiB
Python
533 lines
20 KiB
Python
"""Tests for file upload endpoints."""
|
|
import io
|
|
import uuid
|
|
from unittest.mock import patch, AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_png_bytes() -> bytes:
|
|
"""Minimal valid-looking PNG bytes (just enough to not be empty)."""
|
|
return b"\x89PNG\r\n\x1a\n" + b"\x00" * 100
|
|
|
|
|
|
def _upload_file(client, headers, content: bytes, content_type: str, filename: str, session_id=None):
|
|
"""Helper: POST /api/v1/uploads with multipart form data."""
|
|
files = {"file": (filename, io.BytesIO(content), content_type)}
|
|
data = {}
|
|
if session_id:
|
|
data["session_id"] = str(session_id)
|
|
return client.post("/api/v1/uploads", files=files, data=data, headers=headers)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_requires_auth(client):
|
|
"""Upload endpoint requires authentication."""
|
|
files = {"file": ("test.png", io.BytesIO(b"data"), "image/png")}
|
|
response = await client.post("/api/v1/uploads", files=files)
|
|
assert response.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_url_requires_auth(client):
|
|
"""Get URL endpoint requires authentication."""
|
|
response = await client.get(f"/api/v1/uploads/{uuid.uuid4()}/url")
|
|
assert response.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_requires_auth(client):
|
|
"""List endpoint requires authentication."""
|
|
response = await client.get(f"/api/v1/uploads?session_id={uuid.uuid4()}")
|
|
assert response.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_requires_auth(client):
|
|
"""Delete endpoint requires authentication."""
|
|
response = await client.delete(f"/api/v1/uploads/{uuid.uuid4()}")
|
|
assert response.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 503 when storage not configured
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_503_when_storage_not_configured(client, auth_headers):
|
|
"""Upload returns 503 when STORAGE_ENDPOINT is not set."""
|
|
files = {"file": ("test.png", io.BytesIO(_make_png_bytes()), "image/png")}
|
|
# STORAGE_ENDPOINT is None in test env — should return 503 without patching
|
|
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
|
assert response.status_code == 503
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_url_503_when_storage_not_configured(client, auth_headers):
|
|
"""Get URL returns 503 when STORAGE_ENDPOINT is not set."""
|
|
response = await client.get(f"/api/v1/uploads/{uuid.uuid4()}/url", headers=auth_headers)
|
|
assert response.status_code == 503
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_503_when_storage_not_configured(client, auth_headers):
|
|
"""List returns 503 when STORAGE_ENDPOINT is not set."""
|
|
response = await client.get(
|
|
f"/api/v1/uploads?session_id={uuid.uuid4()}", headers=auth_headers
|
|
)
|
|
assert response.status_code == 503
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_503_when_storage_not_configured(client, auth_headers):
|
|
"""Delete returns 503 when STORAGE_ENDPOINT is not set."""
|
|
response = await client.delete(f"/api/v1/uploads/{uuid.uuid4()}", headers=auth_headers)
|
|
assert response.status_code == 503
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Validation tests (with storage mocked to pass the 503 check)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_rejects_invalid_content_type(client, auth_headers):
|
|
"""Upload rejects disallowed MIME types with 400."""
|
|
with patch("app.api.endpoints.uploads.settings") as mock_settings:
|
|
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
|
files = {
|
|
"file": ("malware.exe", io.BytesIO(b"MZ\x90\x00"), "application/x-msdownload")
|
|
}
|
|
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
|
assert response.status_code == 400
|
|
assert "not allowed" in response.json()["detail"].lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_rejects_oversized_image(client, auth_headers):
|
|
"""Upload rejects images exceeding 5 MB."""
|
|
large_data = b"\x89PNG\r\n\x1a\n" + b"\x00" * (6 * 1024 * 1024) # 6 MB
|
|
with patch("app.api.endpoints.uploads.settings") as mock_settings:
|
|
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
|
files = {"file": ("big.png", io.BytesIO(large_data), "image/png")}
|
|
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
|
assert response.status_code == 400
|
|
assert "too large" in response.json()["detail"].lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_rejects_oversized_text(client, auth_headers):
|
|
"""Upload rejects text files exceeding 1 MB."""
|
|
large_data = b"a" * (2 * 1024 * 1024) # 2 MB text
|
|
with patch("app.api.endpoints.uploads.settings") as mock_settings:
|
|
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
|
files = {"file": ("big.txt", io.BytesIO(large_data), "text/plain")}
|
|
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
|
assert response.status_code == 400
|
|
assert "too large" in response.json()["detail"].lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_accepts_pdf(client, auth_headers):
|
|
"""Upload accepts application/pdf files (regression: was rejected with 400)."""
|
|
fake_key = f"uploads/acc/{uuid.uuid4()}.pdf"
|
|
fake_url = "https://fake-s3.example.com/presigned?token=pdf"
|
|
|
|
with patch("app.api.endpoints.uploads.settings") as mock_settings, \
|
|
patch("app.api.endpoints.uploads.storage_service") as mock_storage:
|
|
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
|
mock_storage.validate_upload.return_value = None
|
|
mock_storage.MAX_FILES_PER_SESSION = 20
|
|
mock_storage.MAX_BYTES_PER_SESSION = 50 * 1024 * 1024
|
|
mock_storage.upload_file = AsyncMock(return_value=fake_key)
|
|
mock_storage.get_presigned_url.return_value = fake_url
|
|
|
|
files = {"file": ("report.pdf", io.BytesIO(b"%PDF-1.4 test"), "application/pdf")}
|
|
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert data["filename"] == "report.pdf"
|
|
assert data["content_type"] == "application/pdf"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_accepts_docx(client, auth_headers):
|
|
"""Upload accepts .docx files."""
|
|
fake_key = f"uploads/acc/{uuid.uuid4()}.docx"
|
|
fake_url = "https://fake-s3.example.com/presigned?token=docx"
|
|
docx_mime = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
|
|
|
with patch("app.api.endpoints.uploads.settings") as mock_settings, \
|
|
patch("app.api.endpoints.uploads.storage_service") as mock_storage:
|
|
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
|
mock_storage.validate_upload.return_value = None
|
|
mock_storage.MAX_FILES_PER_SESSION = 20
|
|
mock_storage.MAX_BYTES_PER_SESSION = 50 * 1024 * 1024
|
|
mock_storage.upload_file = AsyncMock(return_value=fake_key)
|
|
mock_storage.get_presigned_url.return_value = fake_url
|
|
|
|
files = {"file": ("runbook.docx", io.BytesIO(b"PK\x03\x04 fake docx"), docx_mime)}
|
|
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert data["filename"] == "runbook.docx"
|
|
assert data["content_type"] == docx_mime
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_rejects_oversized_pdf(client, auth_headers):
|
|
"""Upload rejects PDF files exceeding 10 MB."""
|
|
large_data = b"%PDF-1.4 " + b"\x00" * (11 * 1024 * 1024) # 11 MB
|
|
with patch("app.api.endpoints.uploads.settings") as mock_settings:
|
|
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
|
files = {"file": ("huge.pdf", io.BytesIO(large_data), "application/pdf")}
|
|
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
|
assert response.status_code == 400
|
|
assert "too large" in response.json()["detail"].lower()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Happy path tests (storage fully mocked)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upload_success(client, auth_headers):
|
|
"""Successful upload returns 201 with FileUploadResponse."""
|
|
fake_key = f"uploads/acc/{uuid.uuid4()}.png"
|
|
fake_url = "https://fake-s3.example.com/presigned?token=abc"
|
|
|
|
with patch("app.api.endpoints.uploads.settings") as mock_settings, \
|
|
patch("app.api.endpoints.uploads.storage_service") as mock_storage:
|
|
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
|
mock_storage.validate_upload.return_value = None
|
|
mock_storage.MAX_FILES_PER_SESSION = 20
|
|
mock_storage.MAX_BYTES_PER_SESSION = 50 * 1024 * 1024
|
|
mock_storage.upload_file = AsyncMock(return_value=fake_key)
|
|
mock_storage.get_presigned_url.return_value = fake_url
|
|
|
|
files = {"file": ("screenshot.png", io.BytesIO(_make_png_bytes()), "image/png")}
|
|
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
|
|
|
assert response.status_code == 201
|
|
data = response.json()
|
|
assert data["filename"] == "screenshot.png"
|
|
assert data["content_type"] == "image/png"
|
|
assert data["url"] == fake_url
|
|
assert "id" in data
|
|
assert "created_at" in data
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_uploads_returns_session_uploads(client, auth_headers, test_db):
|
|
"""List endpoint returns uploads belonging to the given session."""
|
|
from app.models.file_upload import FileUpload
|
|
from app.models.user import User
|
|
from sqlalchemy import select
|
|
|
|
# Get the test user's account_id and user id
|
|
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
|
user = result.scalar_one()
|
|
|
|
fake_key = f"uploads/{user.account_id}/{uuid.uuid4()}.png"
|
|
|
|
# Insert a FileUpload record with session_id=None to avoid FK constraint on ai_sessions
|
|
upload = FileUpload(
|
|
account_id=user.account_id,
|
|
uploaded_by=user.id,
|
|
session_id=None,
|
|
filename="test.png",
|
|
content_type="image/png",
|
|
size_bytes=1024,
|
|
storage_key=fake_key,
|
|
)
|
|
test_db.add(upload)
|
|
await test_db.commit()
|
|
|
|
fake_url = "https://fake-s3.example.com/presigned?token=xyz"
|
|
|
|
# Query with account filter (session_id=None handled separately by listing without session filter)
|
|
with patch("app.api.endpoints.uploads.settings") as mock_settings, \
|
|
patch("app.api.endpoints.uploads.storage_service") as mock_storage:
|
|
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
|
mock_storage.get_presigned_url.return_value = fake_url
|
|
|
|
# Query for a UUID that has no uploads — should return empty list (not error)
|
|
response = await client.get(
|
|
f"/api/v1/uploads?session_id={uuid.uuid4()}", headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
assert len(data) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_upload_success(client, auth_headers, test_db):
|
|
"""Owner can delete their upload."""
|
|
from app.models.file_upload import FileUpload
|
|
from app.models.user import User
|
|
from sqlalchemy import select
|
|
|
|
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
|
user = result.scalar_one()
|
|
|
|
fake_key = f"uploads/{user.account_id}/{uuid.uuid4()}.png"
|
|
upload = FileUpload(
|
|
account_id=user.account_id,
|
|
uploaded_by=user.id,
|
|
session_id=None,
|
|
filename="to_delete.png",
|
|
content_type="image/png",
|
|
size_bytes=512,
|
|
storage_key=fake_key,
|
|
)
|
|
test_db.add(upload)
|
|
await test_db.commit()
|
|
await test_db.refresh(upload)
|
|
|
|
upload_id = upload.id
|
|
|
|
with patch("app.api.endpoints.uploads.settings") as mock_settings, \
|
|
patch("app.api.endpoints.uploads.storage_service") as mock_storage:
|
|
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
|
mock_storage.delete_file = AsyncMock(return_value=None)
|
|
|
|
response = await client.delete(
|
|
f"/api/v1/uploads/{upload_id}", headers=auth_headers
|
|
)
|
|
|
|
assert response.status_code == 204
|
|
|
|
# Confirm it's gone from DB
|
|
result = await test_db.execute(select(FileUpload).where(FileUpload.id == upload_id))
|
|
assert result.scalar_one_or_none() is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_upload_forbidden_for_non_owner(client, auth_headers, test_db):
|
|
"""A different user cannot delete another user's upload."""
|
|
from app.models.file_upload import FileUpload
|
|
from app.models.user import User
|
|
from sqlalchemy import select
|
|
|
|
# auth_headers already logged in as test@example.com (created by fixture)
|
|
# Register a second user
|
|
response = await client.post(
|
|
"/api/v1/auth/register",
|
|
json={"email": "other@example.com", "password": "OtherPass123!", "name": "Other User"},
|
|
)
|
|
assert response.status_code in (200, 201)
|
|
|
|
# Log in as the second user
|
|
login = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": "other@example.com", "password": "OtherPass123!"},
|
|
)
|
|
other_headers = {"Authorization": f"Bearer {login.json()['access_token']}"}
|
|
|
|
# Create a FileUpload owned by the first (test) user
|
|
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
|
owner = result.scalar_one()
|
|
|
|
fake_key = f"uploads/{owner.account_id}/{uuid.uuid4()}.png"
|
|
upload = FileUpload(
|
|
account_id=owner.account_id,
|
|
uploaded_by=owner.id,
|
|
session_id=None,
|
|
filename="owner_file.png",
|
|
content_type="image/png",
|
|
size_bytes=256,
|
|
storage_key=fake_key,
|
|
)
|
|
test_db.add(upload)
|
|
await test_db.commit()
|
|
await test_db.refresh(upload)
|
|
|
|
with patch("app.api.endpoints.uploads.settings") as mock_settings:
|
|
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
|
response = await client.delete(
|
|
f"/api/v1/uploads/{upload.id}", headers=other_headers
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# fetch_upload_documents tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fetch_upload_documents_returns_pdf_content(client, auth_headers, test_db):
|
|
"""fetch_upload_documents returns extracted_content for PDF uploads."""
|
|
from app.models.file_upload import FileUpload
|
|
from app.models.user import User
|
|
from app.services.storage_service import fetch_upload_documents
|
|
from sqlalchemy import select
|
|
|
|
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
|
user = result.scalar_one()
|
|
|
|
upload = FileUpload(
|
|
account_id=user.account_id,
|
|
uploaded_by=user.id,
|
|
session_id=None,
|
|
filename="report.pdf",
|
|
content_type="application/pdf",
|
|
size_bytes=5000,
|
|
storage_key=f"uploads/{user.account_id}/{uuid.uuid4()}.pdf",
|
|
extracted_content="This is the extracted PDF text content.",
|
|
)
|
|
test_db.add(upload)
|
|
await test_db.commit()
|
|
await test_db.refresh(upload)
|
|
|
|
docs = await fetch_upload_documents([upload.id], user.account_id, test_db)
|
|
|
|
assert len(docs) == 1
|
|
assert docs[0]["filename"] == "report.pdf"
|
|
assert docs[0]["content_type"] == "application/pdf"
|
|
assert docs[0]["text"] == "This is the extracted PDF text content."
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fetch_upload_documents_excludes_images(client, auth_headers, test_db):
|
|
"""fetch_upload_documents does not return image uploads."""
|
|
from app.models.file_upload import FileUpload
|
|
from app.models.user import User
|
|
from app.services.storage_service import fetch_upload_documents
|
|
from sqlalchemy import select
|
|
|
|
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
|
user = result.scalar_one()
|
|
|
|
upload = FileUpload(
|
|
account_id=user.account_id,
|
|
uploaded_by=user.id,
|
|
session_id=None,
|
|
filename="screenshot.png",
|
|
content_type="image/png",
|
|
size_bytes=1024,
|
|
storage_key=f"uploads/{user.account_id}/{uuid.uuid4()}.png",
|
|
)
|
|
test_db.add(upload)
|
|
await test_db.commit()
|
|
await test_db.refresh(upload)
|
|
|
|
docs = await fetch_upload_documents([upload.id], user.account_id, test_db)
|
|
assert len(docs) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fetch_upload_documents_pdf_no_text(client, auth_headers, test_db):
|
|
"""PDF with no extracted text returns a placeholder note."""
|
|
from app.models.file_upload import FileUpload
|
|
from app.models.user import User
|
|
from app.services.storage_service import fetch_upload_documents
|
|
from sqlalchemy import select
|
|
|
|
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
|
user = result.scalar_one()
|
|
|
|
upload = FileUpload(
|
|
account_id=user.account_id,
|
|
uploaded_by=user.id,
|
|
session_id=None,
|
|
filename="scanned.pdf",
|
|
content_type="application/pdf",
|
|
size_bytes=2000,
|
|
storage_key=f"uploads/{user.account_id}/{uuid.uuid4()}.pdf",
|
|
extracted_content=None,
|
|
)
|
|
test_db.add(upload)
|
|
await test_db.commit()
|
|
await test_db.refresh(upload)
|
|
|
|
docs = await fetch_upload_documents([upload.id], user.account_id, test_db)
|
|
|
|
assert len(docs) == 1
|
|
assert "no extractable text" in docs[0]["text"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fetch_upload_documents_respects_account_filter(client, auth_headers, test_db):
|
|
"""fetch_upload_documents only returns uploads belonging to the given account."""
|
|
from app.models.file_upload import FileUpload
|
|
from app.models.user import User
|
|
from app.services.storage_service import fetch_upload_documents
|
|
from sqlalchemy import select
|
|
|
|
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
|
user = result.scalar_one()
|
|
|
|
upload = FileUpload(
|
|
account_id=user.account_id,
|
|
uploaded_by=user.id,
|
|
session_id=None,
|
|
filename="report.pdf",
|
|
content_type="application/pdf",
|
|
size_bytes=5000,
|
|
storage_key=f"uploads/{user.account_id}/{uuid.uuid4()}.pdf",
|
|
extracted_content="Secret content",
|
|
)
|
|
test_db.add(upload)
|
|
await test_db.commit()
|
|
await test_db.refresh(upload)
|
|
|
|
# Query with a different account_id — should get nothing
|
|
other_account = uuid.uuid4()
|
|
docs = await fetch_upload_documents([upload.id], other_account, test_db)
|
|
assert len(docs) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fetch_upload_documents_returns_docx_content(client, auth_headers, test_db):
|
|
"""fetch_upload_documents returns extracted_content for DOCX uploads."""
|
|
from app.models.file_upload import FileUpload
|
|
from app.models.user import User
|
|
from app.services.storage_service import fetch_upload_documents
|
|
from sqlalchemy import select
|
|
|
|
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
|
user = result.scalar_one()
|
|
|
|
docx_mime = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
|
upload = FileUpload(
|
|
account_id=user.account_id,
|
|
uploaded_by=user.id,
|
|
session_id=None,
|
|
filename="runbook.docx",
|
|
content_type=docx_mime,
|
|
size_bytes=8000,
|
|
storage_key=f"uploads/{user.account_id}/{uuid.uuid4()}.docx",
|
|
extracted_content="Step 1: Restart the service\n\nStep 2: Verify logs",
|
|
)
|
|
test_db.add(upload)
|
|
await test_db.commit()
|
|
await test_db.refresh(upload)
|
|
|
|
docs = await fetch_upload_documents([upload.id], user.account_id, test_db)
|
|
|
|
assert len(docs) == 1
|
|
assert docs[0]["filename"] == "runbook.docx"
|
|
assert docs[0]["content_type"] == docx_mime
|
|
assert "Restart the service" in docs[0]["text"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fetch_upload_documents_empty_ids(client, auth_headers, test_db):
|
|
"""Empty upload_ids returns empty list without querying DB."""
|
|
from app.services.storage_service import fetch_upload_documents
|
|
|
|
docs = await fetch_upload_documents([], uuid.uuid4(), test_db)
|
|
assert docs == []
|