All code that runs outside a request context (APScheduler jobs, lifespan startup) has no app.current_account_id set, so the app-role session returns 0 rows from every RLS-protected table. Changed to _admin_session_factory (BYPASSRLS) in: - knowledge_flywheel_scheduler.py — queries ai_sessions - psa_retry_scheduler.py — queries psa_post_log - retention_cleanup.py — queries assistant_chats - scheduler.py (_fire_maintenance_schedule, _cleanup_expired_ai_conversations) - main.py (archive_stale_ai_sessions, _process_notification_retries, load_all_schedules at startup) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
182 lines
6.9 KiB
Python
182 lines
6.9 KiB
Python
"""APScheduler integration for maintenance flow auto-session creation and AI cleanup."""
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.schedulers.base import SchedulerNotRunningError
|
|
from apscheduler.jobstores.base import JobLookupError
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
import pytz
|
|
from sqlalchemy import select, delete
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
scheduler = AsyncIOScheduler()
|
|
|
|
|
|
async def _fire_maintenance_schedule(schedule_id: str) -> None:
|
|
"""Create batch sessions for a scheduled maintenance run."""
|
|
# Import all models first to ensure SQLAlchemy mapper relationships resolve
|
|
import app.models # noqa: F401
|
|
from app.core.admin_database import _admin_session_factory as async_session_maker
|
|
from app.models.maintenance_schedule import MaintenanceSchedule
|
|
from app.models.session import Session
|
|
from app.models.target_list import TargetList
|
|
from app.models.tree import Tree
|
|
|
|
async with async_session_maker() as db:
|
|
try:
|
|
result = await db.execute(
|
|
select(MaintenanceSchedule).where(
|
|
MaintenanceSchedule.id == uuid.UUID(schedule_id),
|
|
MaintenanceSchedule.is_active == True,
|
|
)
|
|
)
|
|
schedule = result.scalar_one_or_none()
|
|
if not schedule:
|
|
logger.warning(f"Schedule {schedule_id} not found or inactive")
|
|
return
|
|
|
|
tree_result = await db.execute(
|
|
select(Tree).where(Tree.id == schedule.tree_id)
|
|
)
|
|
tree = tree_result.scalar_one_or_none()
|
|
if not tree:
|
|
logger.error(f"Tree {schedule.tree_id} not found for schedule {schedule_id}")
|
|
return
|
|
|
|
if tree.tree_type != "maintenance":
|
|
logger.warning(f"Skipping schedule {schedule_id}: tree {tree.id} is not a maintenance flow")
|
|
return
|
|
|
|
if not tree.is_active or tree.status == "draft":
|
|
logger.warning(
|
|
f"Skipping schedule {schedule_id}: tree {tree.id} is inactive or draft"
|
|
)
|
|
return
|
|
|
|
# Resolve targets
|
|
targets: list[dict] = []
|
|
if schedule.target_list_id:
|
|
list_result = await db.execute(
|
|
select(TargetList).where(TargetList.id == schedule.target_list_id)
|
|
)
|
|
target_list = list_result.scalar_one_or_none()
|
|
if target_list:
|
|
targets = list(target_list.targets)
|
|
|
|
if not targets:
|
|
targets = [{"label": "Unassigned"}]
|
|
|
|
batch_id = uuid.uuid4()
|
|
tree_snapshot = {
|
|
**tree.tree_structure,
|
|
"name": tree.name,
|
|
"description": tree.description,
|
|
"tree_type": tree.tree_type,
|
|
}
|
|
|
|
sessions_to_add = []
|
|
for target in targets:
|
|
session = Session(
|
|
tree_id=tree.id,
|
|
user_id=schedule.created_by,
|
|
tree_snapshot=tree_snapshot,
|
|
path_taken=[],
|
|
decisions=[],
|
|
custom_steps=[],
|
|
session_variables={},
|
|
batch_id=batch_id,
|
|
target_label=target.get("label", ""),
|
|
)
|
|
sessions_to_add.append(session)
|
|
|
|
for s in sessions_to_add:
|
|
db.add(s)
|
|
|
|
# Update schedule tracking
|
|
schedule.last_run_at = datetime.now(timezone.utc)
|
|
from croniter import croniter
|
|
tz = pytz.timezone(schedule.timezone)
|
|
now = datetime.now(tz)
|
|
cron = croniter(schedule.cron_expression, now)
|
|
schedule.next_run_at = cron.get_next(datetime).astimezone(timezone.utc)
|
|
|
|
await db.commit()
|
|
logger.info(
|
|
f"Schedule {schedule_id} fired: created {len(sessions_to_add)} sessions "
|
|
f"(batch {batch_id}) for tree '{tree.name}'"
|
|
)
|
|
except Exception:
|
|
logger.exception(f"Error firing maintenance schedule {schedule_id}")
|
|
await db.rollback()
|
|
|
|
|
|
async def _cleanup_expired_ai_conversations() -> None:
|
|
"""Delete expired AI wizard conversations."""
|
|
import app.models # noqa: F401
|
|
from app.core.admin_database import _admin_session_factory as async_session_maker
|
|
from app.models.ai_conversation import AIConversation
|
|
|
|
async with async_session_maker() as db:
|
|
try:
|
|
result = await db.execute(
|
|
delete(AIConversation).where(
|
|
AIConversation.expires_at < datetime.now(timezone.utc)
|
|
)
|
|
)
|
|
if result.rowcount > 0:
|
|
logger.info(f"Cleaned up {result.rowcount} expired AI conversation(s)")
|
|
await db.commit()
|
|
except Exception:
|
|
logger.exception("Error cleaning up expired AI conversations")
|
|
await db.rollback()
|
|
|
|
|
|
async def load_all_schedules(db: AsyncSession) -> None:
|
|
"""Load all active schedules into APScheduler on startup."""
|
|
# Import all models to ensure SQLAlchemy mapper relationships resolve
|
|
# before any ORM queries are executed.
|
|
import app.models # noqa: F401
|
|
from app.models.maintenance_schedule import MaintenanceSchedule
|
|
result = await db.execute(
|
|
select(MaintenanceSchedule).where(MaintenanceSchedule.is_active == True)
|
|
)
|
|
schedules = result.scalars().all()
|
|
for schedule in schedules:
|
|
register_schedule(schedule)
|
|
logger.info(f"Loaded {len(schedules)} active maintenance schedule(s)")
|
|
|
|
|
|
def register_schedule(schedule) -> None:
|
|
"""Register or update a schedule in APScheduler."""
|
|
job_id = f"maintenance_{schedule.id}"
|
|
try:
|
|
tz = pytz.timezone(schedule.timezone)
|
|
trigger = CronTrigger.from_crontab(schedule.cron_expression, timezone=tz)
|
|
scheduler.add_job(
|
|
_fire_maintenance_schedule,
|
|
trigger=trigger,
|
|
id=job_id,
|
|
args=[str(schedule.id)],
|
|
replace_existing=True,
|
|
misfire_grace_time=3600,
|
|
)
|
|
logger.info(f"Registered schedule {schedule.id} ({schedule.cron_expression})")
|
|
except Exception:
|
|
logger.exception(f"Failed to register schedule {schedule.id}")
|
|
|
|
|
|
def unregister_schedule(schedule_id: str) -> None:
|
|
"""Remove a schedule from APScheduler."""
|
|
job_id = f"maintenance_{schedule_id}"
|
|
if scheduler.get_job(job_id):
|
|
try:
|
|
scheduler.remove_job(job_id)
|
|
logger.info(f"Unregistered schedule {schedule_id}")
|
|
except (SchedulerNotRunningError, JobLookupError):
|
|
logger.warning(f"Could not remove job {job_id}: scheduler not running or job already removed")
|