"""FlowPilot AI session endpoints. CRUD and interaction endpoints for AI-powered troubleshooting sessions: POST /ai-sessions — Start a new session POST /ai-sessions/{id}/respond — Submit step response, get next step POST /ai-sessions/{id}/resolve — Resolve the session POST /ai-sessions/{id}/escalate — Escalate the session GET /ai-sessions — List user's sessions (paginated) GET /ai-sessions/{id} — Get session detail with all steps GET /ai-sessions/{id}/documentation — Get auto-generated documentation POST /ai-sessions/{id}/rate — Submit post-session rating """ import logging from typing import Annotated, Optional from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from sqlalchemy import or_, select, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.core.rate_limit import limiter from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin from app.core.config import settings from app.core.ai_quota_service import check_ai_quota, record_ai_usage, get_user_plan from app.models.user import User from app.models.ai_session import AISession from app.schemas.ai_session import ( AISessionCreateRequest, AISessionCreateResponse, StepResponseRequest, StepResponseResponse, ResolveSessionRequest, EscalateSessionRequest, SessionCloseResponse, SessionDocumentation, RateSessionRequest, PickupSessionRequest, LinkTicketRequest, AISessionSummary, AISessionDetail, AISessionStepResponse, StepOptionSchema, ) from app.services import flowpilot_engine from app.services.psa_documentation_service import retry_failed_push logger = logging.getLogger(__name__) router = APIRouter(prefix="/ai-sessions", tags=["ai-sessions"]) def _require_ai_enabled() -> None: if not settings.ai_enabled: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="AI is not configured. Set GOOGLE_AI_API_KEY or ANTHROPIC_API_KEY.", ) async def _check_quota(user: User, db: AsyncSession) -> None: """Check AI quota and raise 429 if exceeded.""" allowed, quota_status = await check_ai_quota( user_id=user.id, account_id=user.account_id, db=db, billing_anchor=user.ai_billing_cycle_anchor_at, is_super_admin=user.is_super_admin, ) if not allowed: reset_key = "daily_reset_at" if quota_status.get("deny_reason") == "daily" else "monthly_reset_at" raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail={ "message": f"AI limit exceeded ({quota_status['deny_reason']})", "reset_at": quota_status.get(reset_key), "quota": quota_status, }, ) async def _record_usage( user: User, db: AsyncSession, generation_type: str, input_tokens: int, output_tokens: int, succeeded: bool, session_id: Optional[UUID] = None, error_code: Optional[str] = None, ) -> None: """Record AI usage after an LLM call.""" plan = await get_user_plan(user.account_id, db) estimated_cost = ( input_tokens * 3.0 / 1_000_000 + output_tokens * 15.0 / 1_000_000 ) await record_ai_usage( user_id=user.id, account_id=user.account_id, conversation_id=None, generation_type=generation_type, tier=plan, input_tokens=input_tokens, output_tokens=output_tokens, estimated_cost=estimated_cost, succeeded=succeeded, counts_toward_quota=True, error_code=error_code, extra_data={"ai_session_id": str(session_id)} if session_id else None, db=db, ) # ── Create session ── @router.post("", response_model=AISessionCreateResponse, status_code=201) @limiter.limit("5/minute") async def create_session( request: Request, data: AISessionCreateRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Start a new FlowPilot troubleshooting session.""" _require_ai_enabled() await _check_quota(current_user, db) try: result = await flowpilot_engine.start_session( request=data, user_id=current_user.id, account_id=current_user.account_id, team_id=current_user.team_id, db=db, ) except Exception as e: logger.exception("FlowPilot session start failed: %s", e) await _record_usage( current_user, db, generation_type="flowpilot_start", input_tokens=0, output_tokens=0, succeeded=False, error_code=type(e).__name__, ) await db.commit() raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"AI provider error ({type(e).__name__}). Please try again.", ) await _record_usage( current_user, db, generation_type="flowpilot_start", input_tokens=result.first_step.confidence_score and 0, # Tracked on session output_tokens=0, succeeded=True, session_id=result.session_id, ) await db.commit() return result # ── Respond to step ── @router.post("/{session_id}/respond", response_model=StepResponseResponse) @limiter.limit("15/minute") async def respond_to_step( request: Request, session_id: UUID, data: StepResponseRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Submit an engineer's response to a FlowPilot step and get the next step.""" _require_ai_enabled() await _check_quota(current_user, db) try: result = await flowpilot_engine.process_response( session_id=session_id, request=data, user_id=current_user.id, db=db, ) except ValueError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except PermissionError as e: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) except Exception as e: logger.exception("FlowPilot response failed: %s", e) await _record_usage( current_user, db, generation_type="flowpilot_respond", input_tokens=0, output_tokens=0, succeeded=False, session_id=session_id, error_code=type(e).__name__, ) await db.commit() raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"AI provider error ({type(e).__name__}). Please try again.", ) await _record_usage( current_user, db, generation_type="flowpilot_respond", input_tokens=0, output_tokens=0, succeeded=True, session_id=session_id, ) await db.commit() return result # ── Resolve ── @router.post("/{session_id}/resolve", response_model=SessionCloseResponse) @limiter.limit("15/minute") async def resolve_session( request: Request, session_id: UUID, data: ResolveSessionRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Resolve a FlowPilot session and generate documentation.""" try: result = await flowpilot_engine.resolve_session( session_id=session_id, request=data, user_id=current_user.id, db=db, ) except ValueError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except PermissionError as e: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) await db.commit() return result # ── Escalate ── @router.post("/{session_id}/escalate", response_model=SessionCloseResponse) @limiter.limit("15/minute") async def escalate_session( request: Request, session_id: UUID, data: EscalateSessionRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Escalate a FlowPilot session to another engineer.""" try: result = await flowpilot_engine.escalate_session( session_id=session_id, request=data, user_id=current_user.id, db=db, ) except ValueError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except PermissionError as e: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) await db.commit() return result # ── Pause ── @router.post("/{session_id}/pause", status_code=204) @limiter.limit("15/minute") async def pause_session( request: Request, session_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Pause an active FlowPilot session for later resume.""" try: await flowpilot_engine.pause_session( session_id=session_id, user_id=current_user.id, db=db, ) except ValueError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except PermissionError as e: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) await db.commit() # ── Resume ── @router.post("/{session_id}/resume", status_code=204) @limiter.limit("15/minute") async def resume_session( request: Request, session_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Resume a paused FlowPilot session.""" try: await flowpilot_engine.resume_session( session_id=session_id, user_id=current_user.id, db=db, ) except ValueError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except PermissionError as e: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) await db.commit() # ── Escalation Queue ── @router.get("/escalation-queue", response_model=list[AISessionSummary]) @limiter.limit("30/minute") async def get_escalation_queue( request: Request, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """List sessions requesting escalation for the current user's team.""" if not current_user.team_id: return [] result = await db.execute( select(AISession) .where( AISession.team_id == current_user.team_id, AISession.status == "requesting_escalation", AISession.user_id != current_user.id, # Don't show own escalated sessions ) .order_by(AISession.created_at.desc()) ) sessions = result.scalars().all() return [AISessionSummary.model_validate(s) for s in sessions] # ── Pickup Escalated Session ── @router.post("/{session_id}/pickup", response_model=StepResponseResponse) @limiter.limit("5/minute") async def pickup_session( request: Request, session_id: UUID, data: PickupSessionRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Pick up an escalated session as a new engineer.""" _require_ai_enabled() await _check_quota(current_user, db) try: result = await flowpilot_engine.pickup_session( session_id=session_id, resume_mode=data.resume_mode, additional_context=data.additional_context, user_id=current_user.id, team_id=current_user.team_id, db=db, ) except ValueError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except PermissionError as e: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) except Exception as e: logger.exception("FlowPilot pickup failed: %s", e) await _record_usage( current_user, db, generation_type="flowpilot_pickup", input_tokens=0, output_tokens=0, succeeded=False, session_id=session_id, error_code=type(e).__name__, ) await db.commit() raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"AI provider error ({type(e).__name__}). Please try again.", ) await _record_usage( current_user, db, generation_type="flowpilot_pickup", input_tokens=0, output_tokens=0, succeeded=True, session_id=session_id, ) await db.commit() return result # ── Link Ticket ── @router.post("/{session_id}/link-ticket", response_model=AISessionDetail) @limiter.limit("10/minute") async def link_ticket_to_session( request: Request, session_id: UUID, data: LinkTicketRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Link a PSA ticket to an in-progress session retroactively.""" try: await flowpilot_engine.link_ticket( session_id=session_id, psa_ticket_id=data.psa_ticket_id, psa_connection_id=data.psa_connection_id, user_id=current_user.id, db=db, ) except ValueError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except PermissionError as e: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) await db.commit() # Return updated session detail result = await db.execute( select(AISession) .options(selectinload(AISession.steps)) .where(AISession.id == session_id) ) session = result.scalar_one_or_none() if not session: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") detail = AISessionDetail.model_validate(session) return detail # ── List sessions ── @router.get("", response_model=list[AISessionSummary]) @limiter.limit("30/minute") async def list_sessions( request: Request, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], session_status: Optional[str] = Query(None, alias="status"), skip: int = Query(0, ge=0), limit: int = Query(20, ge=1, le=100), ): """List the current user's AI sessions (owned or picked up).""" user_id_str = str(current_user.id) query = ( select(AISession) .where( or_( AISession.user_id == current_user.id, AISession.escalation_package["picked_up_by"].as_string() == user_id_str, ) ) .order_by(AISession.created_at.desc()) .offset(skip) .limit(limit) ) if session_status: query = query.where(AISession.status == session_status) result = await db.execute(query) sessions = result.scalars().all() return [AISessionSummary.model_validate(s) for s in sessions] # ── Get session detail ── @router.get("/{session_id}", response_model=AISessionDetail) @limiter.limit("30/minute") async def get_session( request: Request, session_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Get full session detail with all steps.""" result = await db.execute( select(AISession) .options(selectinload(AISession.steps)) .where(AISession.id == session_id) ) session = result.scalar_one_or_none() if not session: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") # Allow access if user is owner, escalation target, or picked-up handler pkg = session.escalation_package or {} is_handler = pkg.get("picked_up_by") == str(current_user.id) if session.user_id != current_user.id and session.escalated_to_id != current_user.id and not is_handler: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized") # Build step responses step_responses = [] for step in session.steps: options = [] if step.options_presented: options = [ StepOptionSchema( label=opt.get("label", ""), value=opt.get("value", ""), followup_hint=opt.get("followup_hint"), ) for opt in step.options_presented ] content = step.content or {} step_responses.append(AISessionStepResponse( step_id=step.id, step_order=step.step_order, step_type=step.step_type, content=content, context_message=step.context_message, options=options, allow_free_text=content.get("allow_free_text", True), allow_skip=content.get("allow_skip", True), confidence_tier=session.confidence_tier, confidence_score=step.confidence_at_step, )) detail = AISessionDetail.model_validate(session) detail.steps = step_responses return detail # ── Documentation ── @router.get("/{session_id}/documentation", response_model=SessionDocumentation) @limiter.limit("30/minute") async def get_documentation( request: Request, session_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Get auto-generated documentation for a session.""" try: return await flowpilot_engine.get_session_documentation( session_id=session_id, user_id=current_user.id, db=db, ) except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) except PermissionError as e: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) # ── Rate ── @router.post("/{session_id}/rate", status_code=204) @limiter.limit("15/minute") async def rate_session( request: Request, session_id: UUID, data: RateSessionRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Submit a post-session rating.""" try: await flowpilot_engine.rate_session( session_id=session_id, rating=data.rating, feedback=data.feedback, user_id=current_user.id, db=db, ) except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) except PermissionError as e: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) await db.commit() # ── Retry PSA Push ── @router.post("/{session_id}/retry-psa-push") @limiter.limit("5/minute") async def retry_psa_push_endpoint( request: Request, session_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Manually retry a failed PSA documentation push.""" from app.models.psa_post_log import PsaPostLog # Find the latest failed push log for this session result = await db.execute( select(PsaPostLog) .where( PsaPostLog.ai_session_id == session_id, PsaPostLog.status.in_(["failed", "pending_retry"]), ) .order_by(PsaPostLog.posted_at.desc()) .limit(1) ) log_entry = result.scalar_one_or_none() if not log_entry: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No failed PSA push found for this session", ) # Reset to pending_retry and attempt immediately log_entry.status = "pending_retry" log_entry.retry_count = max(0, log_entry.retry_count - 1) # Give one more attempt success = await retry_failed_push(log_entry, db) await db.commit() return { "psa_push_status": "sent" if success else log_entry.status, "psa_push_error": log_entry.error_message if not success else None, }