"""File upload endpoints — S3-compatible object storage. POST /uploads — Upload a file (multipart) GET /uploads/{id}/url — Get presigned download URL GET /uploads — List uploads for a session DELETE /uploads/{id} — Delete an upload """ import logging from typing import Annotated, Optional from uuid import UUID from fastapi import APIRouter, Depends, File, Form, HTTPException, Request, UploadFile, status from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import get_current_active_user, get_db from app.core.config import settings from app.core.rate_limit import limiter from app.models.file_upload import FileUpload from app.models.user import User from app.schemas.upload import FileUploadResponse from app.services import storage_service logger = logging.getLogger(__name__) router = APIRouter(prefix="/uploads", tags=["uploads"]) def _check_storage_configured() -> None: """Raise 503 if object storage is not configured.""" if not settings.STORAGE_ENDPOINT: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="File storage is not configured", ) async def _store_document_content(upload, text_content: str, doc_type: str) -> None: """Store extracted document text and optionally generate an AI summary.""" from app.services.assistant_chat_service import _call_ai if text_content: upload.extracted_content = text_content[:10000] if len(text_content) > 2000: summary, _, _ = await _call_ai( system_base="You are a technical document analyst for IT troubleshooting.", rag_context="", history=[], new_message=f"Summarize this {doc_type} content in 2-3 sentences:\n\n{text_content[:5000]}", max_tokens=200, ) upload.content_summary = summary upload.ai_description = summary else: upload.ai_description = f"{doc_type}: {upload.filename}" else: upload.ai_description = f"{doc_type} (no extractable text): {upload.filename}" async def _generate_ai_description(upload_id: UUID, file_data: bytes, content_type: str) -> None: """Background task: generate AI description for uploaded file.""" try: from app.core.database import async_session_maker from app.services.assistant_chat_service import _call_ai import base64 async with async_session_maker() as db: result = await db.execute( select(FileUpload).where(FileUpload.id == upload_id) ) upload = result.scalar_one_or_none() if not upload: return if content_type.startswith("image/"): b64_data = base64.b64encode(file_data).decode("utf-8") description, _, _ = await _call_ai( system_base="You are a technical image analyst for IT troubleshooting.", rag_context="", history=[], new_message="Describe this image in one sentence for a troubleshooting context log.", images=[{"media_type": content_type, "data": b64_data}], max_tokens=100, ) upload.ai_description = description elif content_type == "application/pdf": try: from pypdf import PdfReader import io as _io reader = PdfReader(_io.BytesIO(file_data)) pages_text = [] for page in reader.pages: page_text = page.extract_text() if page_text: pages_text.append(page_text) text_content = "\n\n".join(pages_text) except Exception: logger.warning("PDF text extraction failed for upload %s", upload_id) text_content = "" await _store_document_content(upload, text_content, "PDF") elif content_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": try: from docx import Document as DocxDocument import io as _io doc = DocxDocument(_io.BytesIO(file_data)) text_content = "\n\n".join( p.text for p in doc.paragraphs if p.text.strip() ) except Exception: logger.warning("DOCX text extraction failed for upload %s", upload_id) text_content = "" await _store_document_content(upload, text_content, "Word document") elif content_type.startswith("text/") or content_type in ( "application/json", "application/xml", "application/yaml", ): try: text_content = file_data.decode("utf-8") except UnicodeDecodeError: text_content = file_data.decode("latin-1") upload.extracted_content = text_content[:10000] if len(text_content) > 2000: summary, _, _ = await _call_ai( system_base="You are a technical log/config analyst.", rag_context="", history=[], new_message=f"Summarize this file content in 2-3 sentences:\n\n{text_content[:5000]}", max_tokens=200, ) upload.content_summary = summary upload.ai_description = summary else: upload.ai_description = f"Text file: {upload.filename}" await db.commit() except Exception: logger.exception(f"Failed to generate AI description for upload {upload_id}") @router.post("", response_model=FileUploadResponse, status_code=status.HTTP_201_CREATED) @limiter.limit("10/minute") async def upload_file( request: Request, file: UploadFile = File(...), session_id: Optional[str] = Form(None), current_user: Annotated[User, Depends(get_current_active_user)] = None, db: Annotated[AsyncSession, Depends(get_db)] = None, ) -> FileUploadResponse: """Upload a file and store it in S3-compatible object storage.""" _check_storage_configured() file_data = await file.read() content_type = file.content_type or "application/octet-stream" size_bytes = len(file_data) # Validate content type and size error = storage_service.validate_upload(content_type, size_bytes) if error: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error) # Parse and validate session_id if provided parsed_session_id: Optional[UUID] = None if session_id: try: parsed_session_id = UUID(session_id) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid session_id" ) # Check per-session limits count_result = await db.execute( select(func.count()).select_from(FileUpload).where( FileUpload.session_id == parsed_session_id ) ) session_count = count_result.scalar() or 0 if session_count >= storage_service.MAX_FILES_PER_SESSION: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Session has reached the maximum of {storage_service.MAX_FILES_PER_SESSION} files", ) size_result = await db.execute( select(func.sum(FileUpload.size_bytes)).where( FileUpload.session_id == parsed_session_id ) ) session_bytes = size_result.scalar() or 0 if session_bytes + size_bytes > storage_service.MAX_BYTES_PER_SESSION: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Session has exceeded the maximum total upload size (50 MB)", ) # Upload to S3 storage_key = await storage_service.upload_file( file_data=file_data, filename=file.filename or "upload", content_type=content_type, account_id=str(current_user.account_id), ) # Persist metadata upload = FileUpload( account_id=current_user.account_id, uploaded_by=current_user.id, session_id=parsed_session_id, filename=file.filename or "upload", content_type=content_type, size_bytes=size_bytes, storage_key=storage_key, ) db.add(upload) await db.commit() await db.refresh(upload) import asyncio asyncio.create_task( _generate_ai_description(upload.id, file_data, content_type) ) presigned_url = storage_service.get_presigned_url(upload.storage_key) return FileUploadResponse( id=upload.id, filename=upload.filename, content_type=upload.content_type, size_bytes=upload.size_bytes, url=presigned_url, created_at=upload.created_at, ) @router.get("/{upload_id}/url") async def get_upload_url( upload_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ) -> dict: """Get a presigned download URL for an uploaded file.""" _check_storage_configured() result = await db.execute(select(FileUpload).where(FileUpload.id == upload_id)) upload = result.scalar_one_or_none() if upload is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found") # Verify the upload belongs to the user's account — 404 to avoid revealing existence if upload.account_id != current_user.account_id and not current_user.is_super_admin: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found") url = storage_service.get_presigned_url(upload.storage_key) return {"url": url} @router.get("", response_model=list[FileUploadResponse]) async def list_uploads( session_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ) -> list[FileUploadResponse]: """List uploads for a session.""" _check_storage_configured() result = await db.execute( select(FileUpload).where( FileUpload.session_id == session_id, FileUpload.account_id == current_user.account_id, ) ) uploads = result.scalars().all() responses = [] for upload in uploads: presigned_url = storage_service.get_presigned_url(upload.storage_key) responses.append( FileUploadResponse( id=upload.id, filename=upload.filename, content_type=upload.content_type, size_bytes=upload.size_bytes, url=presigned_url, created_at=upload.created_at, ) ) return responses @router.delete("/{upload_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_upload( upload_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ) -> None: """Delete an uploaded file.""" _check_storage_configured() result = await db.execute(select(FileUpload).where(FileUpload.id == upload_id)) upload = result.scalar_one_or_none() if upload is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found") # Verify ownership — 404 to avoid revealing existence if upload.uploaded_by != current_user.id and not current_user.is_super_admin: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Upload not found") # Delete from S3 await storage_service.delete_file(upload.storage_key) # Delete DB record await db.delete(upload) await db.commit()