"""AI Flow Builder wizard endpoints. 4-stage wizard: POST /ai/start — Stage 1: create conversation with metadata POST /ai/scaffold — Stage 2: AI suggests branches POST /ai/branch-detail — Stage 3: AI generates detail for one branch POST /ai/assemble — Stage 4: assemble branches into tree (no AI) GET /ai/quota — quota status """ import logging from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, Request, status from sqlalchemy.ext.asyncio import AsyncSession from app.core.rate_limit import limiter from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin from app.core.config import settings from app.core.ai_conversation_store import ( create_conversation, get_conversation, update_conversation, ) from app.core.ai_quota_service import check_ai_quota, record_ai_usage, get_user_plan from app.core.ai_tree_generator_service import ( scaffold_branches, generate_branch_detail, assemble_tree, ) from app.models.user import User from app.schemas.ai_builder import ( AIStartRequest, AIStartResponse, AIScaffoldRequest, AIScaffoldResponse, AIBranchDetailRequest, AIBranchDetailResponse, AIAssembleRequest, AIAssembleResponse, AIQuotaStatusResponse, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/ai", tags=["ai-builder"]) def _require_ai_enabled() -> None: """Raise 503 if AI is not configured.""" if not settings.ai_enabled: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="AI flow builder is not configured. Set GOOGLE_AI_API_KEY or ANTHROPIC_API_KEY.", ) @router.get("/provider-debug") async def provider_debug( current_user: Annotated[User, Depends(get_current_active_user)], _: None = Depends(require_engineer_or_admin), ): """Temporary debug endpoint — shows which AI provider would be selected.""" from app.core.ai_provider import get_ai_provider has_gemini_key = bool(settings.GOOGLE_AI_API_KEY) has_anthropic_key = bool(settings.ANTHROPIC_API_KEY) provider_setting = settings.AI_PROVIDER try: provider = get_ai_provider() provider_type = type(provider).__name__ except RuntimeError as e: provider_type = f"ERROR: {e}" return { "ai_provider_setting": provider_setting, "has_gemini_key": has_gemini_key, "gemini_key_prefix": settings.GOOGLE_AI_API_KEY[:8] + "..." if settings.GOOGLE_AI_API_KEY else None, "has_anthropic_key": has_anthropic_key, "anthropic_key_prefix": settings.ANTHROPIC_API_KEY[:8] + "..." if settings.ANTHROPIC_API_KEY else None, "selected_provider": provider_type, "gemini_model": settings.AI_MODEL_GEMINI, "anthropic_model": settings.AI_MODEL_ANTHROPIC, } @router.get("/quota", response_model=AIQuotaStatusResponse) async def get_quota( current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Get current user's AI quota status.""" if not settings.ai_enabled: return AIQuotaStatusResponse( plan="free", monthly_used=0, monthly_limit=None, monthly_reset_at="", daily_used=0, daily_limit=None, daily_reset_at="", allowed=False, ai_enabled=False, ) _, quota_status = await check_ai_quota( user_id=current_user.id, account_id=current_user.account_id, db=db, billing_anchor=current_user.ai_billing_cycle_anchor_at, is_super_admin=current_user.is_super_admin, ) return AIQuotaStatusResponse( **quota_status, ai_enabled=True, ) @router.post("/start", response_model=AIStartResponse, status_code=201) @limiter.limit("10/minute") async def start_conversation( request: Request, data: AIStartRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Stage 1: Create a new AI wizard conversation with foundation metadata.""" _require_ai_enabled() # Check daily quota (anti-abuse) — super admins bypass allowed, quota_status = await check_ai_quota( user_id=current_user.id, account_id=current_user.account_id, db=db, billing_anchor=current_user.ai_billing_cycle_anchor_at, is_super_admin=current_user.is_super_admin, ) if not allowed: reset_key = ( "daily_reset_at" if quota_status.get("deny_reason") == "daily" else "monthly_reset_at" ) raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail={ "message": f"AI build limit exceeded ({quota_status['deny_reason']})", "reset_at": quota_status.get(reset_key), "quota": quota_status, }, ) wizard_state = { "flow_type": data.flow_type, "name": data.name, "description": data.description, "environment_tags": data.environment_tags, "category_id": str(data.category_id) if data.category_id else None, } conversation = await create_conversation( user_id=current_user.id, account_id=current_user.account_id, wizard_state=wizard_state, db=db, ) await db.commit() return AIStartResponse( conversation_id=conversation.id, status=conversation.status, ) @router.post("/scaffold", response_model=AIScaffoldResponse) @limiter.limit("10/minute") async def scaffold( request: Request, data: AIScaffoldRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Stage 2: AI suggests top-level branches.""" _require_ai_enabled() conversation = await get_conversation( data.conversation_id, current_user.id, db ) # Check per-flow call limit if conversation.question_rounds >= settings.AI_MAX_CALLS_PER_FLOW: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Maximum AI calls per flow exceeded", ) plan = await get_user_plan(current_user.account_id, db) try: branches, input_tokens, output_tokens, cost = await scaffold_branches( conversation.wizard_state, ) except ValueError as e: await record_ai_usage( user_id=current_user.id, account_id=current_user.account_id, conversation_id=conversation.id, generation_type="scaffold", tier=plan, input_tokens=0, output_tokens=0, estimated_cost=0, succeeded=False, counts_toward_quota=False, error_code="invalid_output", extra_data={"error": str(e)}, db=db, ) await db.commit() raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"AI returned invalid output: {e}", ) except Exception as e: logger.exception("AI scaffold failed: %s: %s", type(e).__name__, e) await record_ai_usage( user_id=current_user.id, account_id=current_user.account_id, conversation_id=conversation.id, generation_type="scaffold", tier=plan, input_tokens=0, output_tokens=0, estimated_cost=0, succeeded=False, counts_toward_quota=False, error_code=type(e).__name__, extra_data={"error": str(e)}, db=db, ) await db.commit() raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"AI provider error ({type(e).__name__}). Please try again.", ) # Record successful usage await record_ai_usage( user_id=current_user.id, account_id=current_user.account_id, conversation_id=conversation.id, generation_type="scaffold", tier=plan, input_tokens=input_tokens, output_tokens=output_tokens, estimated_cost=cost, succeeded=True, counts_toward_quota=False, error_code=None, extra_data=None, db=db, ) # Update conversation state wizard_state = dict(conversation.wizard_state) wizard_state["branches"] = branches await update_conversation( conversation.id, current_user.id, { "status": "scaffolding", "wizard_state": wizard_state, "question_rounds": conversation.question_rounds + 1, }, db, ) await db.commit() return AIScaffoldResponse( conversation_id=conversation.id, branches=branches, status="scaffolding", ) @router.post("/branch-detail", response_model=AIBranchDetailResponse) @limiter.limit("10/minute") async def branch_detail( request: Request, data: AIBranchDetailRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Stage 3: AI generates detailed nodes for one branch.""" _require_ai_enabled() conversation = await get_conversation( data.conversation_id, current_user.id, db ) if conversation.question_rounds >= settings.AI_MAX_CALLS_PER_FLOW: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Maximum AI calls per flow exceeded", ) wizard_state = conversation.wizard_state existing_branches = [ b.get("name", "") for b in wizard_state.get("branches", []) ] plan = await get_user_plan(current_user.account_id, db) try: branch_tree, input_tokens, output_tokens, cost = ( await generate_branch_detail( wizard_state, data.branch_name, existing_branches, ) ) except ValueError as e: await record_ai_usage( user_id=current_user.id, account_id=current_user.account_id, conversation_id=conversation.id, generation_type="branch_detail", tier=plan, input_tokens=0, output_tokens=0, estimated_cost=0, succeeded=False, counts_toward_quota=False, error_code="invalid_output", extra_data={"error": str(e), "branch_name": data.branch_name}, db=db, ) await db.commit() raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"AI returned invalid output: {e}", ) except Exception as e: logger.exception("AI branch_detail failed: %s: %s", type(e).__name__, e) await record_ai_usage( user_id=current_user.id, account_id=current_user.account_id, conversation_id=conversation.id, generation_type="branch_detail", tier=plan, input_tokens=0, output_tokens=0, estimated_cost=0, succeeded=False, counts_toward_quota=False, error_code=type(e).__name__, extra_data={"error": str(e), "branch_name": data.branch_name}, db=db, ) await db.commit() raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"AI provider error ({type(e).__name__}). Please try again.", ) # Record successful usage await record_ai_usage( user_id=current_user.id, account_id=current_user.account_id, conversation_id=conversation.id, generation_type="branch_detail", tier=plan, input_tokens=input_tokens, output_tokens=output_tokens, estimated_cost=cost, succeeded=True, counts_toward_quota=False, error_code=None, extra_data={"branch_name": data.branch_name}, db=db, ) # Update conversation await update_conversation( conversation.id, current_user.id, { "status": "detailing", "question_rounds": conversation.question_rounds + 1, }, db, ) await db.commit() return AIBranchDetailResponse( conversation_id=conversation.id, branch_name=data.branch_name, steps=branch_tree, status="detailing", ) @router.post("/assemble", response_model=AIAssembleResponse) @limiter.limit("10/minute") async def assemble( request: Request, data: AIAssembleRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Stage 4: Assemble selected branches into a complete tree (no AI calls).""" conversation = await get_conversation( data.conversation_id, current_user.id, db ) wizard_state = conversation.wizard_state branches_for_assembly = [b.model_dump() for b in data.selected_branches] try: tree_structure, name, description, stats = assemble_tree( wizard_state, branches_for_assembly ) except ValueError as e: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(e), ) # Record quota-consuming usage on successful assembly plan = await get_user_plan(current_user.account_id, db) await record_ai_usage( user_id=current_user.id, account_id=current_user.account_id, conversation_id=conversation.id, generation_type="tree", tier=plan, input_tokens=0, output_tokens=0, estimated_cost=0, succeeded=True, counts_toward_quota=True, error_code=None, extra_data={"stats": stats}, db=db, ) # Update conversation with assembled tree await update_conversation( conversation.id, current_user.id, { "status": "completed", "generated_tree": tree_structure, }, db, ) await db.commit() return AIAssembleResponse( tree_structure=tree_structure, suggested_name=name, suggested_description=description, summary=stats, status="completed", )