Files
resolutionflow/backend/app/core/scheduler.py
chihlasm a48660700a fix: background jobs and lifespan must use BYPASSRLS sessions
All code that runs outside a request context (APScheduler jobs,
lifespan startup) has no app.current_account_id set, so the
app-role session returns 0 rows from every RLS-protected table.

Changed to _admin_session_factory (BYPASSRLS) in:
- knowledge_flywheel_scheduler.py — queries ai_sessions
- psa_retry_scheduler.py — queries psa_post_log
- retention_cleanup.py — queries assistant_chats
- scheduler.py (_fire_maintenance_schedule, _cleanup_expired_ai_conversations)
- main.py (archive_stale_ai_sessions, _process_notification_retries,
  load_all_schedules at startup)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 03:44:23 +00:00

182 lines
6.9 KiB
Python

"""APScheduler integration for maintenance flow auto-session creation and AI cleanup."""
import logging
import uuid
from datetime import datetime, timezone
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.base import SchedulerNotRunningError
from apscheduler.jobstores.base import JobLookupError
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
import pytz
from sqlalchemy import select, delete
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler()
async def _fire_maintenance_schedule(schedule_id: str) -> None:
"""Create batch sessions for a scheduled maintenance run."""
# Import all models first to ensure SQLAlchemy mapper relationships resolve
import app.models # noqa: F401
from app.core.admin_database import _admin_session_factory as async_session_maker
from app.models.maintenance_schedule import MaintenanceSchedule
from app.models.session import Session
from app.models.target_list import TargetList
from app.models.tree import Tree
async with async_session_maker() as db:
try:
result = await db.execute(
select(MaintenanceSchedule).where(
MaintenanceSchedule.id == uuid.UUID(schedule_id),
MaintenanceSchedule.is_active == True,
)
)
schedule = result.scalar_one_or_none()
if not schedule:
logger.warning(f"Schedule {schedule_id} not found or inactive")
return
tree_result = await db.execute(
select(Tree).where(Tree.id == schedule.tree_id)
)
tree = tree_result.scalar_one_or_none()
if not tree:
logger.error(f"Tree {schedule.tree_id} not found for schedule {schedule_id}")
return
if tree.tree_type != "maintenance":
logger.warning(f"Skipping schedule {schedule_id}: tree {tree.id} is not a maintenance flow")
return
if not tree.is_active or tree.status == "draft":
logger.warning(
f"Skipping schedule {schedule_id}: tree {tree.id} is inactive or draft"
)
return
# Resolve targets
targets: list[dict] = []
if schedule.target_list_id:
list_result = await db.execute(
select(TargetList).where(TargetList.id == schedule.target_list_id)
)
target_list = list_result.scalar_one_or_none()
if target_list:
targets = list(target_list.targets)
if not targets:
targets = [{"label": "Unassigned"}]
batch_id = uuid.uuid4()
tree_snapshot = {
**tree.tree_structure,
"name": tree.name,
"description": tree.description,
"tree_type": tree.tree_type,
}
sessions_to_add = []
for target in targets:
session = Session(
tree_id=tree.id,
user_id=schedule.created_by,
tree_snapshot=tree_snapshot,
path_taken=[],
decisions=[],
custom_steps=[],
session_variables={},
batch_id=batch_id,
target_label=target.get("label", ""),
)
sessions_to_add.append(session)
for s in sessions_to_add:
db.add(s)
# Update schedule tracking
schedule.last_run_at = datetime.now(timezone.utc)
from croniter import croniter
tz = pytz.timezone(schedule.timezone)
now = datetime.now(tz)
cron = croniter(schedule.cron_expression, now)
schedule.next_run_at = cron.get_next(datetime).astimezone(timezone.utc)
await db.commit()
logger.info(
f"Schedule {schedule_id} fired: created {len(sessions_to_add)} sessions "
f"(batch {batch_id}) for tree '{tree.name}'"
)
except Exception:
logger.exception(f"Error firing maintenance schedule {schedule_id}")
await db.rollback()
async def _cleanup_expired_ai_conversations() -> None:
"""Delete expired AI wizard conversations."""
import app.models # noqa: F401
from app.core.admin_database import _admin_session_factory as async_session_maker
from app.models.ai_conversation import AIConversation
async with async_session_maker() as db:
try:
result = await db.execute(
delete(AIConversation).where(
AIConversation.expires_at < datetime.now(timezone.utc)
)
)
if result.rowcount > 0:
logger.info(f"Cleaned up {result.rowcount} expired AI conversation(s)")
await db.commit()
except Exception:
logger.exception("Error cleaning up expired AI conversations")
await db.rollback()
async def load_all_schedules(db: AsyncSession) -> None:
"""Load all active schedules into APScheduler on startup."""
# Import all models to ensure SQLAlchemy mapper relationships resolve
# before any ORM queries are executed.
import app.models # noqa: F401
from app.models.maintenance_schedule import MaintenanceSchedule
result = await db.execute(
select(MaintenanceSchedule).where(MaintenanceSchedule.is_active == True)
)
schedules = result.scalars().all()
for schedule in schedules:
register_schedule(schedule)
logger.info(f"Loaded {len(schedules)} active maintenance schedule(s)")
def register_schedule(schedule) -> None:
"""Register or update a schedule in APScheduler."""
job_id = f"maintenance_{schedule.id}"
try:
tz = pytz.timezone(schedule.timezone)
trigger = CronTrigger.from_crontab(schedule.cron_expression, timezone=tz)
scheduler.add_job(
_fire_maintenance_schedule,
trigger=trigger,
id=job_id,
args=[str(schedule.id)],
replace_existing=True,
misfire_grace_time=3600,
)
logger.info(f"Registered schedule {schedule.id} ({schedule.cron_expression})")
except Exception:
logger.exception(f"Failed to register schedule {schedule.id}")
def unregister_schedule(schedule_id: str) -> None:
"""Remove a schedule from APScheduler."""
job_id = f"maintenance_{schedule_id}"
if scheduler.get_job(job_id):
try:
scheduler.remove_job(job_id)
logger.info(f"Unregistered schedule {schedule_id}")
except (SchedulerNotRunningError, JobLookupError):
logger.warning(f"Could not remove job {job_id}: scheduler not running or job already removed")