"""Maintenance schedule CRUD endpoints.""" from typing import Annotated from uuid import UUID from datetime import datetime, timezone from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from croniter import croniter import pytz from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin from app.models.maintenance_schedule import MaintenanceSchedule from app.models.tree import Tree from app.models.user import User from app.schemas.maintenance_schedule import ( MaintenanceScheduleCreate, MaintenanceScheduleUpdate, MaintenanceScheduleResponse, ) router = APIRouter(prefix="/maintenance-schedules", tags=["maintenance-schedules"]) def _compute_next_run(cron_expression: str, tz_name: str) -> datetime: """Compute next run time from cron expression, returned as UTC.""" tz = pytz.timezone(tz_name) now = datetime.now(tz) cron = croniter(cron_expression, now) return cron.get_next(datetime).astimezone(timezone.utc) async def _get_tree_or_403(tree_id: UUID, current_user: User, db: AsyncSession) -> "Tree": """Fetch tree and verify the current user's team owns it.""" result = await db.execute(select(Tree).where(Tree.id == tree_id)) tree = result.scalar_one_or_none() if not tree: raise HTTPException(status_code=404, detail="Tree not found") # Super admins can access any tree; regular users must be on the same team if not getattr(current_user, 'is_super_admin', False): if tree.team_id != current_user.team_id: raise HTTPException(status_code=403, detail="Access denied") return tree @router.post("", response_model=MaintenanceScheduleResponse, status_code=201) async def create_schedule( data: MaintenanceScheduleCreate, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Create a cron schedule for a maintenance flow. One per flow.""" # Verify user's team owns the tree tree = await _get_tree_or_403(data.tree_id, current_user, db) if tree.tree_type != "maintenance": raise HTTPException(status_code=400, detail="Schedules are only supported for maintenance flows") # Check no existing schedule for this tree existing = await db.execute( select(MaintenanceSchedule).where(MaintenanceSchedule.tree_id == data.tree_id) ) if existing.scalar_one_or_none(): raise HTTPException(status_code=409, detail="Schedule already exists for this tree") try: next_run = _compute_next_run(data.cron_expression, data.timezone) except (ValueError, KeyError) as e: raise HTTPException(status_code=422, detail=f"Invalid cron expression or timezone: {e}") schedule = MaintenanceSchedule( tree_id=data.tree_id, created_by=current_user.id, cron_expression=data.cron_expression, timezone=data.timezone, target_list_id=data.target_list_id, is_active=True, next_run_at=next_run, ) db.add(schedule) await db.commit() await db.refresh(schedule) from app.core.scheduler import register_schedule register_schedule(schedule) return schedule @router.get("/tree/{tree_id}", response_model=MaintenanceScheduleResponse) async def get_schedule_for_tree( tree_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Get the schedule for a specific maintenance flow.""" # Verify user's team owns the tree before returning schedule data await _get_tree_or_403(tree_id, current_user, db) result = await db.execute( select(MaintenanceSchedule).where(MaintenanceSchedule.tree_id == tree_id) ) schedule = result.scalar_one_or_none() if not schedule: raise HTTPException(status_code=404, detail="No schedule found for this tree") return schedule @router.patch("/{schedule_id}", response_model=MaintenanceScheduleResponse) async def update_schedule( schedule_id: UUID, data: MaintenanceScheduleUpdate, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ): """Update a schedule (disable, change cron, change timezone, change target list).""" result = await db.execute( select(MaintenanceSchedule).where(MaintenanceSchedule.id == schedule_id) ) schedule = result.scalar_one_or_none() if not schedule: raise HTTPException(status_code=404, detail="Schedule not found") # Verify user's team owns the tree this schedule belongs to await _get_tree_or_403(schedule.tree_id, current_user, db) update_fields = data.model_fields_set was_active = schedule.is_active if "cron_expression" in update_fields and data.cron_expression is not None: schedule.cron_expression = data.cron_expression if "timezone" in update_fields and data.timezone is not None: schedule.timezone = data.timezone if "target_list_id" in update_fields: schedule.target_list_id = data.target_list_id if "is_active" in update_fields and data.is_active is not None: schedule.is_active = data.is_active # Recompute next_run_at if schedule timing changed or schedule is being re-activated reactivating = "is_active" in update_fields and data.is_active is True and not was_active if "cron_expression" in update_fields or "timezone" in update_fields or reactivating: try: schedule.next_run_at = _compute_next_run(schedule.cron_expression, schedule.timezone) except (ValueError, KeyError) as e: raise HTTPException(status_code=422, detail=f"Invalid cron expression or timezone: {e}") await db.commit() await db.refresh(schedule) from app.core.scheduler import register_schedule, unregister_schedule if schedule.is_active: register_schedule(schedule) else: unregister_schedule(str(schedule.id)) return schedule