From 5abff028bc1582b6a813be96d5620aa085bf1dde Mon Sep 17 00:00:00 2001 From: chihlasm Date: Tue, 17 Feb 2026 14:21:29 -0500 Subject: [PATCH] feat: APScheduler integration for maintenance flow auto-session creation - Add backend/app/core/scheduler.py with AsyncIOScheduler, CronTrigger-based job registration, and _fire_maintenance_schedule to create batch sessions - Wire scheduler.start()/load_all_schedules()/shutdown() into main.py lifespan - Call register_schedule() in create_schedule endpoint after commit - Call register_schedule()/unregister_schedule() in update_schedule based on is_active - Add TreeShare to models/__init__.py so all SQLAlchemy mapper relationships resolve before ORM queries in the scheduler context Co-Authored-By: Claude Sonnet 4.5 --- .../api/endpoints/maintenance_schedules.py | 11 ++ backend/app/core/scheduler.py | 139 ++++++++++++++++++ backend/app/main.py | 11 +- backend/app/models/__init__.py | 2 + 4 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 backend/app/core/scheduler.py diff --git a/backend/app/api/endpoints/maintenance_schedules.py b/backend/app/api/endpoints/maintenance_schedules.py index df427082..6b2a953a 100644 --- a/backend/app/api/endpoints/maintenance_schedules.py +++ b/backend/app/api/endpoints/maintenance_schedules.py @@ -76,6 +76,10 @@ async def create_schedule( db.add(schedule) await db.commit() await db.refresh(schedule) + + from app.core.scheduler import register_schedule + register_schedule(schedule) + return schedule @@ -135,4 +139,11 @@ async def update_schedule( await db.commit() await db.refresh(schedule) + + from app.core.scheduler import register_schedule, unregister_schedule + if schedule.is_active: + register_schedule(schedule) + else: + unregister_schedule(str(schedule.id)) + return schedule diff --git a/backend/app/core/scheduler.py b/backend/app/core/scheduler.py new file mode 100644 index 00000000..17692ad5 --- /dev/null +++ b/backend/app/core/scheduler.py @@ -0,0 +1,139 @@ +"""APScheduler integration for maintenance flow auto-session creation.""" +import logging +import uuid +from datetime import datetime, timezone + +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger +import pytz +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +logger = logging.getLogger(__name__) + +scheduler = AsyncIOScheduler() + + +async def _fire_maintenance_schedule(schedule_id: str) -> None: + """Create batch sessions for a scheduled maintenance run.""" + # Import all models first to ensure SQLAlchemy mapper relationships resolve + import app.models # noqa: F401 + from app.core.database import async_session_maker + from app.models.maintenance_schedule import MaintenanceSchedule + from app.models.session import Session + from app.models.target_list import TargetList + from app.models.tree import Tree + + async with async_session_maker() as db: + try: + result = await db.execute( + select(MaintenanceSchedule).where( + MaintenanceSchedule.id == uuid.UUID(schedule_id), + MaintenanceSchedule.is_active == True, + ) + ) + schedule = result.scalar_one_or_none() + if not schedule: + logger.warning(f"Schedule {schedule_id} not found or inactive") + return + + tree_result = await db.execute( + select(Tree).where(Tree.id == schedule.tree_id) + ) + tree = tree_result.scalar_one_or_none() + if not tree: + logger.error(f"Tree {schedule.tree_id} not found for schedule {schedule_id}") + return + + # Resolve targets + targets: list[dict] = [] + if schedule.target_list_id: + list_result = await db.execute( + select(TargetList).where(TargetList.id == schedule.target_list_id) + ) + target_list = list_result.scalar_one_or_none() + if target_list: + targets = list(target_list.targets) + + if not targets: + targets = [{"label": "Unassigned"}] + + batch_id = uuid.uuid4() + tree_snapshot = tree.tree_structure + + sessions_to_add = [] + for target in targets: + session = Session( + tree_id=tree.id, + user_id=schedule.created_by, + tree_snapshot=tree_snapshot, + path_taken=[], + decisions=[], + custom_steps=[], + session_variables={}, + batch_id=batch_id, + target_label=target.get("label", ""), + ) + sessions_to_add.append(session) + + for s in sessions_to_add: + db.add(s) + + # Update schedule tracking + schedule.last_run_at = datetime.now(timezone.utc) + from croniter import croniter + tz = pytz.timezone(schedule.timezone) + now = datetime.now(tz) + cron = croniter(schedule.cron_expression, now) + schedule.next_run_at = cron.get_next(datetime).astimezone(timezone.utc) + + await db.commit() + logger.info( + f"Schedule {schedule_id} fired: created {len(sessions_to_add)} sessions " + f"(batch {batch_id}) for tree '{tree.name}'" + ) + except Exception: + logger.exception(f"Error firing maintenance schedule {schedule_id}") + await db.rollback() + + +async def load_all_schedules(db: AsyncSession) -> None: + """Load all active schedules into APScheduler on startup.""" + # Import all models to ensure SQLAlchemy mapper relationships resolve + # before any ORM queries are executed. + import app.models # noqa: F401 + from app.models.maintenance_schedule import MaintenanceSchedule + result = await db.execute( + select(MaintenanceSchedule).where(MaintenanceSchedule.is_active == True) + ) + schedules = result.scalars().all() + for schedule in schedules: + register_schedule(schedule) + logger.info(f"Loaded {len(schedules)} active maintenance schedule(s)") + + +def register_schedule(schedule) -> None: + """Register or update a schedule in APScheduler.""" + job_id = f"maintenance_{schedule.id}" + try: + tz = pytz.timezone(schedule.timezone) + trigger = CronTrigger.from_crontab(schedule.cron_expression, timezone=tz) + scheduler.add_job( + _fire_maintenance_schedule, + trigger=trigger, + id=job_id, + args=[str(schedule.id)], + replace_existing=True, + misfire_grace_time=3600, + ) + logger.info(f"Registered schedule {schedule.id} ({schedule.cron_expression})") + except Exception: + logger.exception(f"Failed to register schedule {schedule.id}") + + +def unregister_schedule(schedule_id: str) -> None: + """Remove a schedule from APScheduler.""" + job_id = f"maintenance_{schedule_id}" + if scheduler.get_job(job_id): + scheduler.remove_job(job_id) + logger.info(f"Unregistered schedule {schedule_id}") diff --git a/backend/app/main.py b/backend/app/main.py index 0aab6815..5536e832 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -6,11 +6,12 @@ from slowapi import _rate_limit_exceeded_handler from slowapi.errors import RateLimitExceeded from app.core.config import settings -from app.core.database import init_db +from app.core.database import init_db, async_session_maker from app.core.logging_config import setup_logging from app.core.middleware import RequestLoggingMiddleware, ErrorLoggingMiddleware from app.core.rate_limit import limiter from app.api.router import api_router +from app.core.scheduler import scheduler, load_all_schedules # Initialize logging configuration setup_logging() @@ -26,8 +27,16 @@ async def lifespan(app: FastAPI): logger.info(f"ALLOW_RAILWAY_ORIGINS: {settings.ALLOW_RAILWAY_ORIGINS}") # Note: In production, use Alembic migrations instead of init_db # await init_db() + + # Start maintenance schedule runner + scheduler.start() + async with async_session_maker() as db: + await load_all_schedules(db) + yield + # Shutdown + scheduler.shutdown(wait=False) logger.info("Shutting down ResolutionFlow API server...") diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 8638db7c..a3c6f276 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -5,6 +5,7 @@ from .subscription import Subscription from .plan_limits import PlanLimits from .account_invite import AccountInvite from .tree import Tree +from .tree_share import TreeShare from .session import Session from .attachment import Attachment from .invite_code import InviteCode @@ -33,6 +34,7 @@ __all__ = [ "PlanLimits", "AccountInvite", "Tree", + "TreeShare", "Session", "Attachment", "InviteCode",