Files
resolutionflow/backend/app/core/scheduler.py
chihlasm 6240d68d09 fix: apply code review security and robustness fixes
- Add require_engineer_or_admin to POST/PUT/DELETE in target_lists.py (blocks viewers from write ops)
- Add require_engineer_or_admin to POST/PATCH in maintenance_schedules.py (blocks viewers from write ops)
- Add team ownership guard in batch_launch_sessions after active/published checks (Fix 2)
- Wrap scheduler.remove_job in try/except for SchedulerNotRunningError and JobLookupError (Fix 3)
- Recompute next_run_at when is_active flips to True, capturing was_active before update (Fix 4)
- Add optional batch_id and target_label fields to Session type; remove unsafe cast in MaintenanceFlowDetailPage.tsx (Fix 5)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-17 16:15:19 -05:00

145 lines
5.4 KiB
Python

"""APScheduler integration for maintenance flow auto-session creation."""
import logging
import uuid
from datetime import datetime, timezone
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.base import SchedulerNotRunningError
from apscheduler.jobstores.base import JobLookupError
from apscheduler.triggers.cron import CronTrigger
import pytz
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
async def _fire_maintenance_schedule(schedule_id: str) -> None:
"""Create batch sessions for a scheduled maintenance run."""
# Import all models first to ensure SQLAlchemy mapper relationships resolve
import app.models # noqa: F401
from app.core.database import async_session_maker
from app.models.maintenance_schedule import MaintenanceSchedule
from app.models.session import Session
from app.models.target_list import TargetList
from app.models.tree import Tree
async with async_session_maker() as db:
try:
result = await db.execute(
select(MaintenanceSchedule).where(
MaintenanceSchedule.id == uuid.UUID(schedule_id),
MaintenanceSchedule.is_active == True,
)
)
schedule = result.scalar_one_or_none()
if not schedule:
logger.warning(f"Schedule {schedule_id} not found or inactive")
return
tree_result = await db.execute(
select(Tree).where(Tree.id == schedule.tree_id)
)
tree = tree_result.scalar_one_or_none()
if not tree:
logger.error(f"Tree {schedule.tree_id} not found for schedule {schedule_id}")
return
# Resolve targets
targets: list[dict] = []
if schedule.target_list_id:
list_result = await db.execute(
select(TargetList).where(TargetList.id == schedule.target_list_id)
)
target_list = list_result.scalar_one_or_none()
if target_list:
targets = list(target_list.targets)
if not targets:
targets = [{"label": "Unassigned"}]
batch_id = uuid.uuid4()
tree_snapshot = tree.tree_structure
sessions_to_add = []
for target in targets:
session = Session(
tree_id=tree.id,
user_id=schedule.created_by,
tree_snapshot=tree_snapshot,
path_taken=[],
decisions=[],
custom_steps=[],
session_variables={},
batch_id=batch_id,
target_label=target.get("label", ""),
)
sessions_to_add.append(session)
for s in sessions_to_add:
db.add(s)
# Update schedule tracking
schedule.last_run_at = datetime.now(timezone.utc)
from croniter import croniter
tz = pytz.timezone(schedule.timezone)
now = datetime.now(tz)
cron = croniter(schedule.cron_expression, now)
schedule.next_run_at = cron.get_next(datetime).astimezone(timezone.utc)
await db.commit()
logger.info(
f"Schedule {schedule_id} fired: created {len(sessions_to_add)} sessions "
f"(batch {batch_id}) for tree '{tree.name}'"
)
except Exception:
logger.exception(f"Error firing maintenance schedule {schedule_id}")
await db.rollback()
async def load_all_schedules(db: AsyncSession) -> None:
"""Load all active schedules into APScheduler on startup."""
# Import all models to ensure SQLAlchemy mapper relationships resolve
# before any ORM queries are executed.
import app.models # noqa: F401
from app.models.maintenance_schedule import MaintenanceSchedule
result = await db.execute(
select(MaintenanceSchedule).where(MaintenanceSchedule.is_active == True)
)
schedules = result.scalars().all()
for schedule in schedules:
register_schedule(schedule)
logger.info(f"Loaded {len(schedules)} active maintenance schedule(s)")
def register_schedule(schedule) -> None:
"""Register or update a schedule in APScheduler."""
job_id = f"maintenance_{schedule.id}"
try:
tz = pytz.timezone(schedule.timezone)
trigger = CronTrigger.from_crontab(schedule.cron_expression, timezone=tz)
scheduler.add_job(
_fire_maintenance_schedule,
trigger=trigger,
id=job_id,
args=[str(schedule.id)],
replace_existing=True,
misfire_grace_time=3600,
)
logger.info(f"Registered schedule {schedule.id} ({schedule.cron_expression})")
except Exception:
logger.exception(f"Failed to register schedule {schedule.id}")
def unregister_schedule(schedule_id: str) -> None:
"""Remove a schedule from APScheduler."""
job_id = f"maintenance_{schedule_id}"
if scheduler.get_job(job_id):
try:
scheduler.remove_job(job_id)
logger.info(f"Unregistered schedule {schedule_id}")
except (SchedulerNotRunningError, JobLookupError):
logger.warning(f"Could not remove job {job_id}: scheduler not running or job already removed")