import asyncio import logging import os from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from slowapi import _rate_limit_exceeded_handler from slowapi.errors import RateLimitExceeded from app.core.config import settings # Initialize Sentry before any other app code import sentry_sdk if settings.SENTRY_DSN: sentry_sdk.init( dsn=settings.SENTRY_DSN, environment="development" if settings.DEBUG else "production", send_default_pii=True, traces_sample_rate=1.0 if settings.DEBUG else 0.2, # Profiling — included in free plan profiles_sample_rate=1.0 if settings.DEBUG else 0.2, # Filter out noisy health check transactions traces_sampler=lambda ctx: ( 0.0 if ctx.get("transaction_context", {}).get("name", "").startswith("GET /health") else None ), ) from app.core.database import init_db from app.core.admin_database import _admin_session_factory as async_session_maker from app.core.logging_config import setup_logging from app.core.middleware import RequestLoggingMiddleware, ErrorLoggingMiddleware from app.core.security_headers import SecurityHeadersMiddleware from app.core.rate_limit import limiter from app.api.router import api_router from app.core.scheduler import scheduler, load_all_schedules, _cleanup_expired_ai_conversations from app.services.retention_cleanup import cleanup_expired_chats from app.services.notification_service import retry_failed_notifications from app.core.service_account import ensure_service_account # Initialize logging configuration setup_logging() logger = logging.getLogger(__name__) async def archive_stale_ai_sessions(): """Archive AI chat sessions with no activity for 30 days.""" from app.models.ai_chat_session import AIChatSession from sqlalchemy import update from datetime import datetime, timezone, timedelta cutoff = datetime.now(timezone.utc) - timedelta(days=30) async with async_session_maker() as db: result = await db.execute( update(AIChatSession) .where( AIChatSession.updated_at < cutoff, AIChatSession.archived_at.is_(None), AIChatSession.status != "abandoned", ) .values(archived_at=datetime.now(timezone.utc)) ) await db.commit() logger.info(f"[archive] Archived {result.rowcount} stale AI chat sessions") async def _process_notification_retries(): """Retry failed notification deliveries.""" async with async_session_maker() as db: retried = await retry_failed_notifications(db) if retried: logger.info("Retried %d failed notifications", retried) def _configure_seed_module(mod: object, api_url: str, email: str, password: str) -> None: """Set globals on a seed script module.""" mod.API_BASE_URL = api_url # type: ignore[attr-defined] mod.ADMIN_EMAIL = email # type: ignore[attr-defined] mod.ADMIN_PASSWORD = password # type: ignore[attr-defined] async def _seed_users_directly() -> None: """Seed test users directly via DB if they don't exist yet.""" try: from scripts.seed_test_users import main as seed_users logger.info("[seed] Seeding test users directly via DB...") await seed_users() logger.info("[seed] Test users seeded!") except Exception as e: logger.warning(f"[seed] User seeding failed: {e}") raise async def _seed_trees_background() -> None: """Background task: seed test users + all flows after server is ready.""" await asyncio.sleep(5) # Wait for server to be fully ready port = os.environ.get("PORT", "8000") api_url = f"http://127.0.0.1:{port}/api/v1" email = "admin@resolutionflow.example.com" password = "TestPass123!" try: import httpx # Try to login — if it fails, seed users first async with httpx.AsyncClient(base_url=api_url, timeout=30) as client: login_resp = await client.post("/auth/login/json", json={"email": email, "password": password}) if login_resp.status_code != 200: logger.warning("[seed] Admin login failed — seeding users first") await _seed_users_directly() # Retry login after seeding users login_resp = await client.post("/auth/login/json", json={"email": email, "password": password}) if login_resp.status_code != 200: logger.error(f"[seed] Admin login still failing after user seed (status={login_resp.status_code}) — aborting") return token = login_resp.json()["access_token"] # Check if trees already exist trees_resp = await client.get("/trees", headers={"Authorization": f"Bearer {token}"}) if trees_resp.status_code == 200 and len(trees_resp.json()) > 0: logger.info(f"[seed] {len(trees_resp.json())} flows already exist — skipping flow seeding") return # No flows yet — run all seed scripts seed_scripts = [ ("seed_trees (Tier 1)", "scripts.seed_trees", "seed_database"), ("seed_trees_v2 (AD/M365/Networking)", "scripts.seed_trees_v2", "seed_database"), ("seed_procedural_flows", "scripts.seed_procedural_flows", "seed_procedural_flows"), ("seed_maintenance_flows", "scripts.seed_maintenance_flows", "seed_maintenance_flows"), ] for label, module_path, func_name in seed_scripts: try: import importlib mod = importlib.import_module(module_path) _configure_seed_module(mod, api_url, email, password) seed_fn = getattr(mod, func_name) logger.info(f"[seed] Running {label}...") await seed_fn() logger.info(f"[seed] {label} complete!") except Exception as e: logger.warning(f"[seed] {label} failed (non-fatal): {e}") logger.info("[seed] All flow seeding complete!") except Exception as e: logger.warning(f"[seed] Flow seeding failed (non-fatal): {e}") @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan handler.""" # Startup logger.info("Starting ResolutionFlow API server...") logger.info(f"Environment: {'Development' if settings.DEBUG else 'Production'}") logger.info(f"ALLOW_RAILWAY_ORIGINS: {settings.ALLOW_RAILWAY_ORIGINS}") # Note: In production, use Alembic migrations instead of init_db # await init_db() # Ensure service account exists and cache its ID for sync operations async with async_session_maker() as db: service_account_id = await ensure_service_account(db) app.state.service_account_id = service_account_id logger.info(f"[service_account] Service account ready (id={service_account_id})") # Start maintenance schedule runner + AI conversation cleanup scheduler.start() async with async_session_maker() as db: await load_all_schedules(db) scheduler.add_job( _cleanup_expired_ai_conversations, trigger="interval", hours=1, id="cleanup_ai_conversations", replace_existing=True, ) # Chat retention cleanup (daily) scheduler.add_job( cleanup_expired_chats, trigger="interval", hours=24, id="cleanup_expired_chats", replace_existing=True, ) # Auto-archive stale AI chat sessions (daily at 3 AM) scheduler.add_job( archive_stale_ai_sessions, "cron", hour=3, id="archive_stale_ai_sessions", replace_existing=True, ) # PSA push retry (every 5 minutes) from app.services.psa_retry_scheduler import process_pending_retries scheduler.add_job( process_pending_retries, trigger="interval", minutes=5, id="psa_push_retry", replace_existing=True, ) # Knowledge Flywheel analysis (every 5 minutes) from app.services.knowledge_flywheel_scheduler import process_pending_analyses scheduler.add_job( process_pending_analyses, trigger="interval", minutes=5, id="knowledge_flywheel_analysis", replace_existing=True, max_instances=1, ) # Notification retry (every minute) scheduler.add_job( _process_notification_retries, trigger="interval", minutes=1, id="notification_retry", replace_existing=True, max_instances=1, ) # L1 walk session cleanup: flip stale active sessions to 'abandoned' (hourly) from app.services.l1_session_cleanup import run_cleanup_job as l1_cleanup_run scheduler.add_job( l1_cleanup_run, trigger="interval", hours=1, id="l1_session_cleanup", replace_existing=True, max_instances=1, args=[async_session_maker], ) # Auto-seed trees in background on PR environments seed_task = None if settings.SEED_ON_DEPLOY: logger.info("[seed] SEED_ON_DEPLOY=true — scheduling background tree seeding") seed_task = asyncio.create_task(_seed_trees_background()) yield # Shutdown if seed_task and not seed_task.done(): seed_task.cancel() scheduler.shutdown(wait=False) logger.info("Shutting down ResolutionFlow API server...") app = FastAPI( title=settings.APP_NAME, description="ResolutionFlow - Take the path MOST traveled. Guided troubleshooting with automatic documentation.", version="1.0.0", docs_url="/api/docs", redoc_url="/api/redoc", openapi_url="/api/openapi.json", lifespan=lifespan ) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # Add logging middleware (BEFORE CORS to log all requests) app.add_middleware(ErrorLoggingMiddleware) app.add_middleware(RequestLoggingMiddleware) # Configure CORS # Note: When ALLOW_RAILWAY_ORIGINS is True, we use allow_origin_regex for Railway domains # PLUS the explicit allowed_origins list (for custom domains like app.resolutionflow.com) if settings.ALLOW_RAILWAY_ORIGINS: app.add_middleware( CORSMiddleware, allow_origins=settings.allowed_origins, allow_origin_regex=r"https://.*\.up\.railway\.app", allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["X-Redaction-Mode", "X-Redaction-Summary"], ) else: app.add_middleware( CORSMiddleware, allow_origins=settings.allowed_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["X-Redaction-Mode", "X-Redaction-Summary"], ) # Add security headers middleware (after CORS so preflight responses work) app.add_middleware(SecurityHeadersMiddleware) # Include API router app.include_router(api_router, prefix=settings.API_V1_PREFIX) @app.get("/") async def root(): """Root endpoint.""" return { "message": "ResolutionFlow API", "docs": "/api/docs", "version": "1.0.0" } @app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy"} if settings.DEBUG: @app.get("/debug/cors") async def debug_cors(): """Debug endpoint to check CORS configuration.""" return { "allow_railway_origins": settings.ALLOW_RAILWAY_ORIGINS, "cors_mode": "regex + list" if settings.ALLOW_RAILWAY_ORIGINS else "list", "allowed_origins": settings.allowed_origins, "railway_regex": r"https://.*\.up\.railway\.app" if settings.ALLOW_RAILWAY_ORIGINS else None }