Files
resolutionflow/backend/app/api/endpoints/uploads.py
chihlasm 217e70cb81 feat: add .docx upload support with text extraction
- Add DOCX MIME type to ALLOWED_DOCUMENT_TYPES in storage_service.py
- Add python-docx text extraction in _generate_ai_description
- Extract shared _store_document_content helper for PDF/DOCX
- Add python-docx>=1.1.0 to requirements.txt
- Add tests for docx upload acceptance and document fetch

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-27 21:08:12 +00:00

324 lines
12 KiB
Python

"""File upload endpoints — S3-compatible object storage.
POST /uploads — Upload a file (multipart)
GET /uploads/{id}/url — Get presigned download URL
GET /uploads — List uploads for a session
DELETE /uploads/{id} — Delete an upload
"""
import logging
from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile, status
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_active_user, get_db
from app.core.config import settings
from app.core.rate_limit import limiter
from app.models.file_upload import FileUpload
from app.models.user import User
from app.schemas.upload import FileUploadResponse
from app.services import storage_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/uploads", tags=["uploads"])
def _check_storage_configured() -> None:
"""Raise 503 if object storage is not configured."""
if not settings.STORAGE_ENDPOINT:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="File storage is not configured",
)
async def _store_document_content(upload, text_content: str, doc_type: str) -> None:
"""Store extracted document text and optionally generate an AI summary."""
from app.services.assistant_chat_service import _call_ai
if text_content:
upload.extracted_content = text_content[:10000]
if len(text_content) > 2000:
summary, _, _ = await _call_ai(
system_base="You are a technical document analyst for IT troubleshooting.",
rag_context="",
history=[],
new_message=f"Summarize this {doc_type} content in 2-3 sentences:\n\n{text_content[:5000]}",
max_tokens=200,
)
upload.content_summary = summary
upload.ai_description = summary
else:
upload.ai_description = f"{doc_type}: {upload.filename}"
else:
upload.ai_description = f"{doc_type} (no extractable text): {upload.filename}"
async def _generate_ai_description(upload_id: UUID, file_data: bytes, content_type: str) -> None:
"""Background task: generate AI description for uploaded file."""
try:
from app.core.database import async_session_maker
from app.services.assistant_chat_service import _call_ai
import base64
async with async_session_maker() as db:
result = await db.execute(
select(FileUpload).where(FileUpload.id == upload_id)
)
upload = result.scalar_one_or_none()
if not upload:
return
if content_type.startswith("image/"):
b64_data = base64.b64encode(file_data).decode("utf-8")
description, _, _ = await _call_ai(
system_base="You are a technical image analyst for IT troubleshooting.",
rag_context="",
history=[],
new_message="Describe this image in one sentence for a troubleshooting context log.",
images=[{"media_type": content_type, "data": b64_data}],
max_tokens=100,
)
upload.ai_description = description
elif content_type == "application/pdf":
try:
from pypdf import PdfReader
import io as _io
reader = PdfReader(_io.BytesIO(file_data))
pages_text = []
for page in reader.pages:
page_text = page.extract_text()
if page_text:
pages_text.append(page_text)
text_content = "\n\n".join(pages_text)
except Exception:
logger.warning("PDF text extraction failed for upload %s", upload_id)
text_content = ""
await _store_document_content(upload, text_content, "PDF")
elif content_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
try:
from docx import Document as DocxDocument
import io as _io
doc = DocxDocument(_io.BytesIO(file_data))
text_content = "\n\n".join(
p.text for p in doc.paragraphs if p.text.strip()
)
except Exception:
logger.warning("DOCX text extraction failed for upload %s", upload_id)
text_content = ""
await _store_document_content(upload, text_content, "Word document")
elif content_type.startswith("text/") or content_type in (
"application/json", "application/xml", "application/yaml",
):
try:
text_content = file_data.decode("utf-8")
except UnicodeDecodeError:
text_content = file_data.decode("latin-1")
upload.extracted_content = text_content[:10000]
if len(text_content) > 2000:
summary, _, _ = await _call_ai(
system_base="You are a technical log/config analyst.",
rag_context="",
history=[],
new_message=f"Summarize this file content in 2-3 sentences:\n\n{text_content[:5000]}",
max_tokens=200,
)
upload.content_summary = summary
upload.ai_description = summary
else:
upload.ai_description = f"Text file: {upload.filename}"
await db.commit()
except Exception:
logger.exception(f"Failed to generate AI description for upload {upload_id}")
@router.post("", response_model=FileUploadResponse, status_code=status.HTTP_201_CREATED)
@limiter.limit("10/minute")
async def upload_file(
request: Request,
file: UploadFile = File(...),
session_id: Optional[str] = Form(None),
current_user: Annotated[User, Depends(get_current_active_user)] = None,
db: Annotated[AsyncSession, Depends(get_db)] = None,
) -> FileUploadResponse:
"""Upload a file and store it in S3-compatible object storage."""
_check_storage_configured()
file_data = await file.read()
content_type = file.content_type or "application/octet-stream"
size_bytes = len(file_data)
# Validate content type and size
error = storage_service.validate_upload(content_type, size_bytes)
if error:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error)
# Parse and validate session_id if provided
parsed_session_id: Optional[UUID] = None
if session_id:
try:
parsed_session_id = UUID(session_id)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid session_id"
)
# Check per-session limits
count_result = await db.execute(
select(func.count()).select_from(FileUpload).where(
FileUpload.session_id == parsed_session_id
)
)
session_count = count_result.scalar() or 0
if session_count >= storage_service.MAX_FILES_PER_SESSION:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Session has reached the maximum of {storage_service.MAX_FILES_PER_SESSION} files",
)
size_result = await db.execute(
select(func.sum(FileUpload.size_bytes)).where(
FileUpload.session_id == parsed_session_id
)
)
session_bytes = size_result.scalar() or 0
if session_bytes + size_bytes > storage_service.MAX_BYTES_PER_SESSION:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Session has exceeded the maximum total upload size (50 MB)",
)
# Upload to S3
storage_key = await storage_service.upload_file(
file_data=file_data,
filename=file.filename or "upload",
content_type=content_type,
account_id=str(current_user.account_id),
)
# Persist metadata
upload = FileUpload(
account_id=current_user.account_id,
uploaded_by=current_user.id,
session_id=parsed_session_id,
filename=file.filename or "upload",
content_type=content_type,
size_bytes=size_bytes,
storage_key=storage_key,
)
db.add(upload)
await db.commit()
await db.refresh(upload)
import asyncio
asyncio.create_task(
_generate_ai_description(upload.id, file_data, content_type)
)
presigned_url = storage_service.get_presigned_url(upload.storage_key)
return FileUploadResponse(
id=upload.id,
filename=upload.filename,
content_type=upload.content_type,
size_bytes=upload.size_bytes,
url=presigned_url,
created_at=upload.created_at,
)
@router.get("/{upload_id}/url")
async def get_upload_url(
upload_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> dict:
"""Get a presigned download URL for an uploaded file."""
_check_storage_configured()
result = await db.execute(select(FileUpload).where(FileUpload.id == upload_id))
upload = result.scalar_one_or_none()
if upload is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found")
# Verify the upload belongs to the user's account
if upload.account_id != current_user.account_id and not current_user.is_super_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
url = storage_service.get_presigned_url(upload.storage_key)
return {"url": url}
@router.get("", response_model=list[FileUploadResponse])
async def list_uploads(
session_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> list[FileUploadResponse]:
"""List uploads for a session."""
_check_storage_configured()
result = await db.execute(
select(FileUpload).where(
FileUpload.session_id == session_id,
FileUpload.account_id == current_user.account_id,
)
)
uploads = result.scalars().all()
responses = []
for upload in uploads:
presigned_url = storage_service.get_presigned_url(upload.storage_key)
responses.append(
FileUploadResponse(
id=upload.id,
filename=upload.filename,
content_type=upload.content_type,
size_bytes=upload.size_bytes,
url=presigned_url,
created_at=upload.created_at,
)
)
return responses
@router.delete("/{upload_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_upload(
upload_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> None:
"""Delete an uploaded file."""
_check_storage_configured()
result = await db.execute(select(FileUpload).where(FileUpload.id == upload_id))
upload = result.scalar_one_or_none()
if upload is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found")
# Verify ownership
if upload.uploaded_by != current_user.id and not current_user.is_super_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
# Delete from S3
await storage_service.delete_file(upload.storage_key)
# Delete DB record
await db.delete(upload)
await db.commit()