"""S3-compatible object storage service for file uploads.""" import base64 import logging import uuid from io import BytesIO from typing import Any from uuid import UUID import boto3 from botocore.config import Config as BotoConfig from botocore.exceptions import ClientError from app.core.config import settings logger = logging.getLogger(__name__) ALLOWED_IMAGE_TYPES = {"image/png", "image/jpeg", "image/gif", "image/webp"} ALLOWED_TEXT_TYPES = {"text/plain", "text/csv", "application/octet-stream"} ALLOWED_TYPES = ALLOWED_IMAGE_TYPES | ALLOWED_TEXT_TYPES MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB MAX_TEXT_SIZE = 1 * 1024 * 1024 # 1MB MAX_FILES_PER_SESSION = 20 MAX_BYTES_PER_SESSION = 50 * 1024 * 1024 # 50MB PRESIGNED_URL_EXPIRY = 3600 # 1 hour def _get_client(): """Get S3 client configured for Railway Object Storage.""" if not settings.STORAGE_ENDPOINT: raise RuntimeError("Object storage not configured (STORAGE_ENDPOINT missing)") return boto3.client( "s3", endpoint_url=settings.STORAGE_ENDPOINT, aws_access_key_id=settings.STORAGE_ACCESS_KEY, aws_secret_access_key=settings.STORAGE_SECRET_KEY, region_name=settings.STORAGE_REGION, config=BotoConfig(signature_version="s3v4"), ) def validate_upload(content_type: str, size_bytes: int) -> str | None: """Validate file type and size. Returns error message or None.""" if content_type not in ALLOWED_TYPES: return f"File type {content_type} not allowed" max_size = MAX_IMAGE_SIZE if content_type in ALLOWED_IMAGE_TYPES else MAX_TEXT_SIZE if size_bytes > max_size: return f"File too large ({size_bytes} bytes, max {max_size})" return None async def upload_file( file_data: bytes, filename: str, content_type: str, account_id: str, ) -> str: """Upload file to S3, returns the storage key.""" ext = filename.rsplit(".", 1)[-1] if "." in filename else "bin" storage_key = f"uploads/{account_id}/{uuid.uuid4()}.{ext}" client = _get_client() client.upload_fileobj( BytesIO(file_data), settings.STORAGE_BUCKET_NAME, storage_key, ExtraArgs={"ContentType": content_type}, ) return storage_key def download_file(storage_key: str) -> bytes: """Download a file from S3 and return its contents as bytes.""" client = _get_client() buf = BytesIO() client.download_fileobj(settings.STORAGE_BUCKET_NAME, storage_key, buf) return buf.getvalue() def get_presigned_url(storage_key: str) -> str: """Generate a time-limited presigned URL for downloading a file.""" client = _get_client() return client.generate_presigned_url( "get_object", Params={"Bucket": settings.STORAGE_BUCKET_NAME, "Key": storage_key}, ExpiresIn=PRESIGNED_URL_EXPIRY, ) async def delete_file(storage_key: str) -> None: """Delete a file from S3.""" try: client = _get_client() client.delete_object(Bucket=settings.STORAGE_BUCKET_NAME, Key=storage_key) except ClientError: logger.warning(f"Failed to delete S3 object: {storage_key}") # ── Vision helpers (resize + fetch for AI) ───────────────────── # Claude vision costs: (width × height) / 750 tokens per image. # Claude auto-resizes images >1568px on the longest edge. # We resize server-side to avoid sending multi-MB base64 payloads over the wire. MAX_IMAGE_DIMENSION = 1568 # Claude's max efficient resolution MAX_IMAGES_PER_MESSAGE = 3 # Cap to control token budget def resize_image_for_vision(file_data: bytes, content_type: str) -> tuple[bytes, str]: """Resize image to fit within Claude's efficient vision bounds. Returns (resized_bytes, media_type). Converts PNG screenshots to JPEG when it reduces size significantly (screenshots are often huge PNGs). """ try: from PIL import Image img = Image.open(BytesIO(file_data)) w, h = img.size # Only resize if larger than Claude's max efficient dimension if max(w, h) > MAX_IMAGE_DIMENSION: ratio = MAX_IMAGE_DIMENSION / max(w, h) new_w, new_h = int(w * ratio), int(h * ratio) img = img.resize((new_w, new_h), Image.LANCZOS) # Convert RGBA (common in screenshots) to RGB for JPEG out_type = content_type if img.mode in ("RGBA", "P") and content_type == "image/png": img = img.convert("RGB") out_type = "image/jpeg" buf = BytesIO() if out_type == "image/jpeg": img.save(buf, format="JPEG", quality=85, optimize=True) else: img.save(buf, format=img.format or "PNG", optimize=True) result = buf.getvalue() # Only use resized version if it's actually smaller if len(result) < len(file_data): return result, out_type return file_data, content_type except ImportError: # Pillow not installed — send original (Claude auto-resizes) logger.debug("Pillow not available, sending original image to Claude") return file_data, content_type except Exception: logger.warning("Image resize failed, sending original") return file_data, content_type async def fetch_upload_images( upload_ids: list[UUID], account_id: UUID, db: Any, ) -> list[dict[str, Any]]: """Fetch uploaded images from S3 and return as base64-encoded dicts for Claude vision. Resizes images server-side to reduce network payload and applies a per-message cap to control token budget (~1,600 tokens per full-res image). """ if not upload_ids or not settings.STORAGE_ENDPOINT: return [] from sqlalchemy import select from app.models.file_upload import FileUpload # Cap the number of images to limit token cost capped_ids = upload_ids[:MAX_IMAGES_PER_MESSAGE] if len(upload_ids) > MAX_IMAGES_PER_MESSAGE: logger.info( "Capped images from %d to %d for token budget", len(upload_ids), MAX_IMAGES_PER_MESSAGE, ) result = await db.execute( select(FileUpload).where( FileUpload.id.in_(capped_ids), FileUpload.account_id == account_id, FileUpload.content_type.in_(ALLOWED_IMAGE_TYPES), ) ) uploads = result.scalars().all() images: list[dict[str, Any]] = [] for upload in uploads: try: file_data = download_file(upload.storage_key) resized_data, media_type = resize_image_for_vision( file_data, upload.content_type ) images.append({ "media_type": media_type, "data": base64.b64encode(resized_data).decode("ascii"), }) except Exception: logger.warning("Failed to fetch upload %s from S3", upload.id) return images