feat(l1): APScheduler hourly cleanup job for abandoned L1 sessions

flip_stale_sessions flips L1WalkSession.status from 'active' to
'abandoned' for rows where last_step_at is older than 24h. Preserves the
row for audit; removes it from the L1 dashboard's 'Resume in progress'
widget. Runs hourly via APScheduler with max_instances=1 (Lesson 1).
Uses the admin session factory (no RLS context at startup).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-28 13:37:55 -04:00
parent 96973c7968
commit e5bcf3b28e
3 changed files with 180 additions and 0 deletions

View File

@@ -221,6 +221,18 @@ async def lifespan(app: FastAPI):
max_instances=1,
)
# L1 walk session cleanup: flip stale active sessions to 'abandoned' (hourly)
from app.services.l1_session_cleanup import run_cleanup_job as l1_cleanup_run
scheduler.add_job(
l1_cleanup_run,
trigger="interval",
hours=1,
id="l1_session_cleanup",
replace_existing=True,
max_instances=1,
args=[async_session_maker],
)
# Auto-seed trees in background on PR environments
seed_task = None
if settings.SEED_ON_DEPLOY:

View File

@@ -0,0 +1,49 @@
"""Hourly cleanup job: flip stale active L1WalkSessions to 'abandoned'.
Sessions with status='active' and last_step_at older than 24h are considered
abandoned (L1 closed the browser, customer hung up, etc.). Flipping them
removes them from the "Resume in progress" widget while preserving the row
for audit/reporting.
Run via APScheduler interval job, max_instances=1 (Lesson 1).
"""
import logging
from datetime import datetime, timedelta, timezone
from sqlalchemy import update
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.l1_walk_session import L1WalkSession
logger = logging.getLogger(__name__)
async def flip_stale_sessions(db: AsyncSession) -> int:
"""Flip active sessions to 'abandoned' if last_step_at < now - 24h.
Returns the number of sessions flipped.
"""
cutoff = datetime.now(timezone.utc) - timedelta(hours=24)
stmt = (
update(L1WalkSession)
.where(L1WalkSession.status == "active")
.where(L1WalkSession.last_step_at < cutoff)
.values(status="abandoned")
)
result = await db.execute(stmt)
await db.commit()
return result.rowcount or 0
async def run_cleanup_job(session_factory) -> None:
"""APScheduler entry point. Uses the admin session factory (no RLS context)."""
async with session_factory() as db:
try:
count = await flip_stale_sessions(db)
if count > 0:
logger.info(
"l1_session_cleanup: flipped %d sessions to abandoned", count
)
except Exception:
logger.exception("l1_session_cleanup: error during run")

View File

@@ -0,0 +1,119 @@
"""Tests for the l1_session_cleanup job."""
import uuid
from datetime import datetime, timedelta, timezone
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.l1_walk_session import L1WalkSession
from app.models.account import Account
from app.models.user import User
from app.services.l1_session_cleanup import flip_stale_sessions
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _make_account(db: AsyncSession) -> Account:
import secrets
import string
code = "".join(secrets.choice(string.ascii_uppercase + string.digits) for _ in range(8))
a = Account(id=uuid.uuid4(), name="Test", display_code=code)
db.add(a)
await db.flush()
return a
async def _make_user(db: AsyncSession, *, account_id: uuid.UUID) -> User:
u = User(
id=uuid.uuid4(),
email=f"user-{uuid.uuid4()}@example.com",
name="L1",
account_id=account_id,
account_role="l1_tech",
role="engineer",
is_active=True,
)
db.add(u)
await db.flush()
return u
async def _make_session(
db: AsyncSession,
*,
account_id: uuid.UUID,
user_id: uuid.UUID,
status: str = "active",
last_step_at: datetime | None = None,
) -> L1WalkSession:
now = datetime.now(timezone.utc)
session = L1WalkSession(
id=uuid.uuid4(),
account_id=account_id,
created_by_user_id=user_id,
ticket_id="t",
ticket_kind="internal",
session_kind="adhoc",
status=status,
started_at=now,
last_step_at=last_step_at or now,
)
db.add(session)
await db.flush()
return session
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_flip_stale_sessions_only_affects_old_active_rows(test_db: AsyncSession):
account = await _make_account(test_db)
user = await _make_user(test_db, account_id=account.id)
# 1. Stale active (>24h ago) — should flip
stale = await _make_session(
test_db, account_id=account.id, user_id=user.id,
status="active",
last_step_at=datetime.now(timezone.utc) - timedelta(hours=25),
)
# 2. Fresh active (1h ago) — should stay active
fresh = await _make_session(
test_db, account_id=account.id, user_id=user.id,
status="active",
last_step_at=datetime.now(timezone.utc) - timedelta(hours=1),
)
# 3. Already-resolved (old) — should stay resolved, not flip
already_resolved = await _make_session(
test_db, account_id=account.id, user_id=user.id,
status="resolved",
last_step_at=datetime.now(timezone.utc) - timedelta(hours=48),
)
await test_db.commit()
count = await flip_stale_sessions(test_db)
assert count == 1
await test_db.refresh(stale)
await test_db.refresh(fresh)
await test_db.refresh(already_resolved)
assert stale.status == "abandoned"
assert fresh.status == "active"
assert already_resolved.status == "resolved"
@pytest.mark.asyncio
async def test_flip_stale_sessions_returns_zero_when_none_stale(test_db: AsyncSession):
account = await _make_account(test_db)
user = await _make_user(test_db, account_id=account.id)
await _make_session(
test_db, account_id=account.id, user_id=user.id,
status="active",
last_step_at=datetime.now(timezone.utc) - timedelta(hours=1),
)
await test_db.commit()
count = await flip_stale_sessions(test_db)
assert count == 0