Files
resolutionflow/backend/app/core/scheduler.py
chihlasm 5abff028bc feat: APScheduler integration for maintenance flow auto-session creation
- Add backend/app/core/scheduler.py with AsyncIOScheduler, CronTrigger-based
  job registration, and _fire_maintenance_schedule to create batch sessions
- Wire scheduler.start()/load_all_schedules()/shutdown() into main.py lifespan
- Call register_schedule() in create_schedule endpoint after commit
- Call register_schedule()/unregister_schedule() in update_schedule based on is_active
- Add TreeShare to models/__init__.py so all SQLAlchemy mapper relationships
  resolve before ORM queries in the scheduler context

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-17 14:21:29 -05:00

140 lines
5.1 KiB
Python

"""APScheduler integration for maintenance flow auto-session creation."""
import logging
import uuid
from datetime import datetime, timezone
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import pytz
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
async def _fire_maintenance_schedule(schedule_id: str) -> None:
"""Create batch sessions for a scheduled maintenance run."""
# Import all models first to ensure SQLAlchemy mapper relationships resolve
import app.models # noqa: F401
from app.core.database import async_session_maker
from app.models.maintenance_schedule import MaintenanceSchedule
from app.models.session import Session
from app.models.target_list import TargetList
from app.models.tree import Tree
async with async_session_maker() as db:
try:
result = await db.execute(
select(MaintenanceSchedule).where(
MaintenanceSchedule.id == uuid.UUID(schedule_id),
MaintenanceSchedule.is_active == True,
)
)
schedule = result.scalar_one_or_none()
if not schedule:
logger.warning(f"Schedule {schedule_id} not found or inactive")
return
tree_result = await db.execute(
select(Tree).where(Tree.id == schedule.tree_id)
)
tree = tree_result.scalar_one_or_none()
if not tree:
logger.error(f"Tree {schedule.tree_id} not found for schedule {schedule_id}")
return
# Resolve targets
targets: list[dict] = []
if schedule.target_list_id:
list_result = await db.execute(
select(TargetList).where(TargetList.id == schedule.target_list_id)
)
target_list = list_result.scalar_one_or_none()
if target_list:
targets = list(target_list.targets)
if not targets:
targets = [{"label": "Unassigned"}]
batch_id = uuid.uuid4()
tree_snapshot = tree.tree_structure
sessions_to_add = []
for target in targets:
session = Session(
tree_id=tree.id,
user_id=schedule.created_by,
tree_snapshot=tree_snapshot,
path_taken=[],
decisions=[],
custom_steps=[],
session_variables={},
batch_id=batch_id,
target_label=target.get("label", ""),
)
sessions_to_add.append(session)
for s in sessions_to_add:
db.add(s)
# Update schedule tracking
schedule.last_run_at = datetime.now(timezone.utc)
from croniter import croniter
tz = pytz.timezone(schedule.timezone)
now = datetime.now(tz)
cron = croniter(schedule.cron_expression, now)
schedule.next_run_at = cron.get_next(datetime).astimezone(timezone.utc)
await db.commit()
logger.info(
f"Schedule {schedule_id} fired: created {len(sessions_to_add)} sessions "
f"(batch {batch_id}) for tree '{tree.name}'"
)
except Exception:
logger.exception(f"Error firing maintenance schedule {schedule_id}")
await db.rollback()
async def load_all_schedules(db: AsyncSession) -> None:
"""Load all active schedules into APScheduler on startup."""
# Import all models to ensure SQLAlchemy mapper relationships resolve
# before any ORM queries are executed.
import app.models # noqa: F401
from app.models.maintenance_schedule import MaintenanceSchedule
result = await db.execute(
select(MaintenanceSchedule).where(MaintenanceSchedule.is_active == True)
)
schedules = result.scalars().all()
for schedule in schedules:
register_schedule(schedule)
logger.info(f"Loaded {len(schedules)} active maintenance schedule(s)")
def register_schedule(schedule) -> None:
"""Register or update a schedule in APScheduler."""
job_id = f"maintenance_{schedule.id}"
try:
tz = pytz.timezone(schedule.timezone)
trigger = CronTrigger.from_crontab(schedule.cron_expression, timezone=tz)
scheduler.add_job(
_fire_maintenance_schedule,
trigger=trigger,
id=job_id,
args=[str(schedule.id)],
replace_existing=True,
misfire_grace_time=3600,
)
logger.info(f"Registered schedule {schedule.id} ({schedule.cron_expression})")
except Exception:
logger.exception(f"Failed to register schedule {schedule.id}")
def unregister_schedule(schedule_id: str) -> None:
"""Remove a schedule from APScheduler."""
job_id = f"maintenance_{schedule_id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
logger.info(f"Unregistered schedule {schedule_id}")