"""In-session copilot endpoints. Contextual AI assistant during flow navigation: POST /copilot/conversations — Start conversation (requires tree_id) POST /copilot/conversations/{id}/messages — Send message, get response + suggestions GET /copilot/conversations/{id} — Get conversation history """ import logging from typing import Annotated from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Request, status from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.rate_limit import limiter from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin from app.core.config import settings from app.core.ai_quota_service import check_ai_quota, record_ai_usage, get_user_plan from app.models.user import User from app.schemas.copilot import ( CopilotStartRequest, CopilotStartResponse, CopilotMessageRequest, CopilotMessageResponse, CopilotConversationResponse, SuggestedFlow, ) from app.models.copilot_conversation import CopilotConversation from app.services import copilot_service logger = logging.getLogger(__name__) router = APIRouter(prefix="/copilot", tags=["copilot"]) def _require_ai_enabled() -> None: if not settings.ai_enabled: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="AI is not configured. Set GOOGLE_AI_API_KEY or ANTHROPIC_API_KEY.", ) @router.post("/conversations", response_model=CopilotStartResponse, status_code=201) @limiter.limit("10/minute") async def start_conversation( request: Request, data: CopilotStartRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Start a new copilot conversation for a flow.""" _require_ai_enabled() allowed, quota_status = await check_ai_quota( user_id=current_user.id, account_id=current_user.account_id, db=db, billing_anchor=current_user.ai_billing_cycle_anchor_at, is_super_admin=current_user.is_super_admin, ) if not allowed: reset_key = "daily_reset_at" if quota_status.get("deny_reason") == "daily" else "monthly_reset_at" raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail={ "message": f"AI limit exceeded ({quota_status['deny_reason']})", "reset_at": quota_status.get(reset_key), "quota": quota_status, }, ) try: conversation, greeting = await copilot_service.start_conversation( user_id=current_user.id, account_id=current_user.account_id, tree_id=data.tree_id, session_id=data.session_id, current_node_id=data.current_node_id, db=db, ) except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) except Exception as e: logger.exception("Copilot conversation start failed: %s", e) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"AI provider error ({type(e).__name__}). Please try again.", ) await db.commit() return CopilotStartResponse( conversation_id=conversation.id, greeting=greeting, ) @router.post("/conversations/{conversation_id}/messages", response_model=CopilotMessageResponse) @limiter.limit("10/minute") async def post_message( request: Request, conversation_id: UUID, data: CopilotMessageRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Send a message and get AI response with flow suggestions.""" _require_ai_enabled() plan = await get_user_plan(current_user.account_id, db) try: ai_content, suggested_flows, conversation = await copilot_service.send_message( conversation_id=conversation_id, user_id=current_user.id, message=data.message, current_node_id=data.current_node_id, db=db, ) except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) except Exception as e: logger.exception("Copilot message failed: %s", e) await record_ai_usage( user_id=current_user.id, account_id=current_user.account_id, conversation_id=None, generation_type="copilot_message", tier=plan, input_tokens=0, output_tokens=0, estimated_cost=0, succeeded=False, counts_toward_quota=False, error_code=type(e).__name__, extra_data={"copilot_conversation_id": str(conversation_id)}, db=db, ) await db.commit() raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"AI provider error ({type(e).__name__}). Please try again.", ) await record_ai_usage( user_id=current_user.id, account_id=current_user.account_id, conversation_id=None, generation_type="copilot_message", tier=plan, input_tokens=conversation.total_input_tokens, output_tokens=conversation.total_output_tokens, estimated_cost=( conversation.total_input_tokens * 1.0 / 1_000_000 + conversation.total_output_tokens * 5.0 / 1_000_000 ), succeeded=True, counts_toward_quota=False, error_code=None, extra_data={"copilot_conversation_id": str(conversation_id)}, db=db, ) await db.commit() return CopilotMessageResponse( content=ai_content, suggested_flows=[SuggestedFlow.model_validate(sf) for sf in suggested_flows], ) @router.get("/conversations/{conversation_id}", response_model=CopilotConversationResponse) async def get_conversation( conversation_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Get copilot conversation history.""" result = await db.execute( select(CopilotConversation).where( CopilotConversation.id == conversation_id, CopilotConversation.user_id == current_user.id, ) ) conversation = result.scalar_one_or_none() if not conversation: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Conversation not found") return CopilotConversationResponse.model_validate(conversation)