"""APScheduler integration for maintenance flow auto-session creation.""" import logging import uuid from datetime import datetime, timezone from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger import pytz from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession logger = logging.getLogger(__name__) scheduler = AsyncIOScheduler() async def _fire_maintenance_schedule(schedule_id: str) -> None: """Create batch sessions for a scheduled maintenance run.""" # Import all models first to ensure SQLAlchemy mapper relationships resolve import app.models # noqa: F401 from app.core.database import async_session_maker from app.models.maintenance_schedule import MaintenanceSchedule from app.models.session import Session from app.models.target_list import TargetList from app.models.tree import Tree async with async_session_maker() as db: try: result = await db.execute( select(MaintenanceSchedule).where( MaintenanceSchedule.id == uuid.UUID(schedule_id), MaintenanceSchedule.is_active == True, ) ) schedule = result.scalar_one_or_none() if not schedule: logger.warning(f"Schedule {schedule_id} not found or inactive") return tree_result = await db.execute( select(Tree).where(Tree.id == schedule.tree_id) ) tree = tree_result.scalar_one_or_none() if not tree: logger.error(f"Tree {schedule.tree_id} not found for schedule {schedule_id}") return # Resolve targets targets: list[dict] = [] if schedule.target_list_id: list_result = await db.execute( select(TargetList).where(TargetList.id == schedule.target_list_id) ) target_list = list_result.scalar_one_or_none() if target_list: targets = list(target_list.targets) if not targets: targets = [{"label": "Unassigned"}] batch_id = uuid.uuid4() tree_snapshot = tree.tree_structure sessions_to_add = [] for target in targets: session = Session( tree_id=tree.id, user_id=schedule.created_by, tree_snapshot=tree_snapshot, path_taken=[], decisions=[], custom_steps=[], session_variables={}, batch_id=batch_id, target_label=target.get("label", ""), ) sessions_to_add.append(session) for s in sessions_to_add: db.add(s) # Update schedule tracking schedule.last_run_at = datetime.now(timezone.utc) from croniter import croniter tz = pytz.timezone(schedule.timezone) now = datetime.now(tz) cron = croniter(schedule.cron_expression, now) schedule.next_run_at = cron.get_next(datetime).astimezone(timezone.utc) await db.commit() logger.info( f"Schedule {schedule_id} fired: created {len(sessions_to_add)} sessions " f"(batch {batch_id}) for tree '{tree.name}'" ) except Exception: logger.exception(f"Error firing maintenance schedule {schedule_id}") await db.rollback() async def load_all_schedules(db: AsyncSession) -> None: """Load all active schedules into APScheduler on startup.""" # Import all models to ensure SQLAlchemy mapper relationships resolve # before any ORM queries are executed. import app.models # noqa: F401 from app.models.maintenance_schedule import MaintenanceSchedule result = await db.execute( select(MaintenanceSchedule).where(MaintenanceSchedule.is_active == True) ) schedules = result.scalars().all() for schedule in schedules: register_schedule(schedule) logger.info(f"Loaded {len(schedules)} active maintenance schedule(s)") def register_schedule(schedule) -> None: """Register or update a schedule in APScheduler.""" job_id = f"maintenance_{schedule.id}" try: tz = pytz.timezone(schedule.timezone) trigger = CronTrigger.from_crontab(schedule.cron_expression, timezone=tz) scheduler.add_job( _fire_maintenance_schedule, trigger=trigger, id=job_id, args=[str(schedule.id)], replace_existing=True, misfire_grace_time=3600, ) logger.info(f"Registered schedule {schedule.id} ({schedule.cron_expression})") except Exception: logger.exception(f"Failed to register schedule {schedule.id}") def unregister_schedule(schedule_id: str) -> None: """Remove a schedule from APScheduler.""" job_id = f"maintenance_{schedule_id}" if scheduler.get_job(job_id): scheduler.remove_job(job_id) logger.info(f"Unregistered schedule {schedule_id}")