"""Standalone AI assistant chat endpoints. POST /assistant/chats — Create new chat GET /assistant/chats — List chats (paginated, newest first) GET /assistant/chats/{id} — Get chat with messages POST /assistant/chats/{id}/messages — Send message PATCH /assistant/chats/{id} — Update title, pin/unpin DELETE /assistant/chats/{id} — Delete single chat DELETE /assistant/chats — Bulk delete (older_than_days query param) GET /assistant/retention — Get account retention settings PATCH /assistant/retention — Update retention settings (owner only) """ import base64 import logging from datetime import datetime, timezone, timedelta from typing import Annotated, Any, Optional from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from sqlalchemy import select, delete, func from sqlalchemy.ext.asyncio import AsyncSession from app.core.rate_limit import limiter from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin from app.core.config import settings from app.core.ai_quota_service import check_ai_quota, record_ai_usage, get_user_plan from app.models.user import User from app.models.account import Account from app.models.assistant_chat import AssistantChat from app.models.file_upload import FileUpload from app.schemas.assistant_chat import ( ChatCreateRequest, ChatMessageRequest, ChatMessageResponse, ChatListResponse, ChatDetailResponse, ChatUpdateRequest, RetentionSettingsResponse, RetentionSettingsUpdate, ConcludeChatRequest, ConcludeChatResponse, ) from app.schemas.copilot import SuggestedFlow from app.services import assistant_chat_service logger = logging.getLogger(__name__) router = APIRouter(prefix="/assistant", tags=["assistant-chat"]) VISION_CONTENT_TYPES = {"image/png", "image/jpeg", "image/gif", "image/webp"} # Claude vision costs: (width × height) / 750 tokens per image. # Claude auto-resizes images >1568px on the longest edge. # We resize server-side to avoid sending multi-MB base64 payloads over the wire. MAX_IMAGE_DIMENSION = 1568 # Claude's max efficient resolution MAX_IMAGES_PER_MESSAGE = 3 # Cap to control token budget def _resize_image_for_vision(file_data: bytes, content_type: str) -> tuple[bytes, str]: """Resize image to fit within Claude's efficient vision bounds. Returns (resized_bytes, media_type). Converts PNG screenshots to JPEG when it reduces size significantly (screenshots are often huge PNGs). """ try: from PIL import Image from io import BytesIO img = Image.open(BytesIO(file_data)) w, h = img.size # Only resize if larger than Claude's max efficient dimension if max(w, h) > MAX_IMAGE_DIMENSION: ratio = MAX_IMAGE_DIMENSION / max(w, h) new_w, new_h = int(w * ratio), int(h * ratio) img = img.resize((new_w, new_h), Image.LANCZOS) # Convert RGBA (common in screenshots) to RGB for JPEG out_type = content_type if img.mode in ("RGBA", "P") and content_type == "image/png": img = img.convert("RGB") out_type = "image/jpeg" buf = BytesIO() if out_type == "image/jpeg": img.save(buf, format="JPEG", quality=85, optimize=True) else: img.save(buf, format=img.format or "PNG", optimize=True) result = buf.getvalue() # Only use resized version if it's actually smaller if len(result) < len(file_data): return result, out_type return file_data, content_type except ImportError: # Pillow not installed — send original (Claude auto-resizes) logger.debug("Pillow not available, sending original image to Claude") return file_data, content_type except Exception: logger.warning("Image resize failed, sending original") return file_data, content_type async def _fetch_upload_images( upload_ids: list[UUID], account_id: UUID, db: AsyncSession, ) -> list[dict[str, Any]]: """Fetch uploaded images from S3 and return as base64-encoded dicts for Claude vision. Resizes images server-side to reduce network payload and applies a per-message cap to control token budget (~1,600 tokens per full-res image). """ if not upload_ids or not settings.STORAGE_ENDPOINT: return [] from app.services import storage_service # Cap the number of images to limit token cost capped_ids = upload_ids[:MAX_IMAGES_PER_MESSAGE] if len(upload_ids) > MAX_IMAGES_PER_MESSAGE: logger.info( "Capped images from %d to %d for token budget", len(upload_ids), MAX_IMAGES_PER_MESSAGE, ) result = await db.execute( select(FileUpload).where( FileUpload.id.in_(capped_ids), FileUpload.account_id == account_id, FileUpload.content_type.in_(VISION_CONTENT_TYPES), ) ) uploads = result.scalars().all() images: list[dict[str, Any]] = [] for upload in uploads: try: file_data = storage_service.download_file(upload.storage_key) resized_data, media_type = _resize_image_for_vision( file_data, upload.content_type ) images.append({ "media_type": media_type, "data": base64.b64encode(resized_data).decode("ascii"), }) except Exception: logger.warning("Failed to fetch upload %s from S3", upload.id) return images def _require_ai_enabled() -> None: if not settings.ai_enabled: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="AI is not configured. Set GOOGLE_AI_API_KEY or ANTHROPIC_API_KEY.", ) @router.post("/chats", response_model=ChatDetailResponse, status_code=201) @limiter.limit("10/minute") async def create_chat( request: Request, data: ChatCreateRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Create a new empty chat conversation.""" chat = await assistant_chat_service.create_chat( user_id=current_user.id, account_id=current_user.account_id, db=db, ) await db.commit() return ChatDetailResponse.model_validate(chat) @router.get("/chats", response_model=list[ChatListResponse]) async def list_chats( current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], page: int = Query(1, ge=1), size: int = Query(20, ge=1, le=100), ): """List user's chat conversations (newest first, pinned on top).""" offset = (page - 1) * size result = await db.execute( select(AssistantChat) .where(AssistantChat.user_id == current_user.id) .order_by(AssistantChat.pinned.desc(), AssistantChat.updated_at.desc()) .offset(offset) .limit(size) ) chats = result.scalars().all() return [ChatListResponse.model_validate(c) for c in chats] @router.get("/chats/{chat_id}", response_model=ChatDetailResponse) async def get_chat( chat_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Get a chat with full message history.""" result = await db.execute( select(AssistantChat).where( AssistantChat.id == chat_id, AssistantChat.user_id == current_user.id, ) ) chat = result.scalar_one_or_none() if not chat: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Chat not found") return ChatDetailResponse.model_validate(chat) @router.post("/chats/{chat_id}/messages", response_model=ChatMessageResponse) @limiter.limit("10/minute") async def post_message( request: Request, chat_id: UUID, data: ChatMessageRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Send a message and get AI response.""" _require_ai_enabled() allowed, quota_status = await check_ai_quota( user_id=current_user.id, account_id=current_user.account_id, db=db, billing_anchor=current_user.ai_billing_cycle_anchor_at, is_super_admin=current_user.is_super_admin, ) if not allowed: reset_key = "daily_reset_at" if quota_status.get("deny_reason") == "daily" else "monthly_reset_at" raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail={ "message": f"AI limit exceeded ({quota_status['deny_reason']})", "reset_at": quota_status.get(reset_key), "quota": quota_status, }, ) plan = await get_user_plan(current_user.account_id, db) # Capture scalar fields before the try block — after db.rollback() # the ORM objects are expired and accessing attributes triggers a # lazy load, which crashes in async context (MissingGreenlet). user_id = current_user.id account_id = current_user.account_id # Fetch attached images from S3 (if any) images = await _fetch_upload_images(data.upload_ids, account_id, db) try: ai_content, suggested_flows, chat = await assistant_chat_service.send_message( chat_id=chat_id, user_id=user_id, account_id=account_id, message=data.message, db=db, images=images or None, ) except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) except Exception as e: logger.exception("Assistant chat message failed: %s", e) await db.rollback() await record_ai_usage( user_id=user_id, account_id=account_id, conversation_id=None, generation_type="assistant_message", tier=plan, input_tokens=0, output_tokens=0, estimated_cost=0, succeeded=False, counts_toward_quota=False, error_code=type(e).__name__, extra_data={"assistant_chat_id": str(chat_id)}, db=db, ) await db.commit() raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"AI provider error ({type(e).__name__}). Please try again.", ) await record_ai_usage( user_id=user_id, account_id=account_id, conversation_id=None, generation_type="assistant_message", tier=plan, input_tokens=chat.total_input_tokens, output_tokens=chat.total_output_tokens, estimated_cost=( chat.total_input_tokens * 1.0 / 1_000_000 + chat.total_output_tokens * 5.0 / 1_000_000 ), succeeded=True, counts_toward_quota=False, error_code=None, extra_data={"assistant_chat_id": str(chat_id)}, db=db, ) await db.commit() return ChatMessageResponse( content=ai_content, suggested_flows=[SuggestedFlow.model_validate(sf) for sf in suggested_flows], ) @router.post("/chats/{chat_id}/conclude", response_model=ConcludeChatResponse) @limiter.limit("10/minute") async def conclude_chat( request: Request, chat_id: UUID, data: ConcludeChatRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Conclude a chat session and generate ticket-ready summary.""" _require_ai_enabled() result = await db.execute( select(AssistantChat).where( AssistantChat.id == chat_id, AssistantChat.user_id == current_user.id, ) ) chat = result.scalar_one_or_none() if not chat: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Chat not found") if chat.concluded_at: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Chat already concluded", ) if chat.message_count < 2: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Chat must have at least one exchange before concluding", ) try: summary = await assistant_chat_service.generate_conclusion_summary( chat=chat, outcome=data.outcome, notes=data.notes, ) except Exception as e: logger.exception("Failed to generate conclusion summary: %s", e) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail="Failed to generate summary. Please try again.", ) now = datetime.now(timezone.utc) chat.conclusion_outcome = data.outcome chat.conclusion_summary = summary chat.concluded_at = now await db.commit() return ConcludeChatResponse( summary=summary, outcome=data.outcome, concluded_at=now, ) @router.patch("/chats/{chat_id}", response_model=ChatDetailResponse) async def update_chat( chat_id: UUID, data: ChatUpdateRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Update chat title or pin/unpin.""" result = await db.execute( select(AssistantChat).where( AssistantChat.id == chat_id, AssistantChat.user_id == current_user.id, ) ) chat = result.scalar_one_or_none() if not chat: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Chat not found") if data.title is not None: chat.title = data.title if data.pinned is not None: chat.pinned = data.pinned await db.commit() return ChatDetailResponse.model_validate(chat) @router.delete("/chats/{chat_id}", status_code=204) async def delete_chat( chat_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Delete a single chat.""" result = await db.execute( select(AssistantChat).where( AssistantChat.id == chat_id, AssistantChat.user_id == current_user.id, ) ) chat = result.scalar_one_or_none() if not chat: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Chat not found") await db.delete(chat) await db.commit() @router.delete("/chats", status_code=204) async def bulk_delete_chats( current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], older_than_days: int = Query(..., ge=1), ): """Bulk delete chats older than N days (skips pinned).""" cutoff = datetime.now(timezone.utc) - timedelta(days=older_than_days) await db.execute( delete(AssistantChat).where( AssistantChat.user_id == current_user.id, AssistantChat.pinned == False, # noqa: E712 AssistantChat.updated_at < cutoff, ) ) await db.commit() @router.get("/retention", response_model=RetentionSettingsResponse) async def get_retention_settings( current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Get account chat retention settings.""" result = await db.execute( select(Account).where(Account.id == current_user.account_id) ) account = result.scalar_one_or_none() if not account: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Account not found") return RetentionSettingsResponse( chat_retention_days=account.chat_retention_days, chat_retention_max_count=account.chat_retention_max_count, ) @router.patch("/retention", response_model=RetentionSettingsResponse) async def update_retention_settings( data: RetentionSettingsUpdate, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Update account chat retention settings (account owner only).""" result = await db.execute( select(Account).where(Account.id == current_user.account_id) ) account = result.scalar_one_or_none() if not account: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Account not found") if account.owner_id != current_user.id and not current_user.is_super_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only the account owner can update retention settings", ) if data.chat_retention_days is not None: account.chat_retention_days = data.chat_retention_days if data.chat_retention_max_count is not None: account.chat_retention_max_count = data.chat_retention_max_count await db.commit() return RetentionSettingsResponse( chat_retention_days=account.chat_retention_days, chat_retention_max_count=account.chat_retention_max_count, )