GET /api/v1/analytics/flowpilot/escalations?period={7d,30d,90d}
Computes the in-product wedge metric for Escalation Mode: average / median /
p95 seconds between SessionHandoff.claimed_at and the first ai_session_step
created on the same session after that timestamp. Account-scoped, role-gated
to engineer-or-admin.
The metric is intentionally NOT called "minutes recovered" — that's the
two-metric framing locked by /codex review: this in-product number must be
paired with manual baseline (the verbal-handoff stopwatch from The Assignment)
to produce the savings claim. Schema's `metric_definition` field surfaces the
disclaimer in every response so callers don't oversell it.
Implementation notes:
- Uses correlated scalar subquery for first-step-after-claim per handoff,
aggregates avg/median/p95 in Python (~1k rows/account/month is well within
budget; cleaner than percentile_cont gymnastics in SQL)
- Excludes unclaimed handoffs (claimed_at IS NULL)
- Counts claimed-but-no-action handoffs in n_handoffs_claimed but not in
n_handoffs_with_action — surfaces the conversion-rate signal
- Floors negative deltas at 0 to handle clock-drift edge cases
Tests cover happy path, zero-data, claimed-but-no-action accounting, period
window filtering, multi-handoff aggregation, multi-tenant isolation (Phase 4
RLS landmine pattern), viewer-role 403 gate, and period validation. 9 tests,
all green. No regressions in existing handoff_manager / session_handoffs
suites.
First piece of the Approach A wedge build per
docs/plans/2026-04-27-escalation-mode-wedge-design.md. Unblocks the queue
stat-card and the analytics page.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
364 lines
12 KiB
Python
364 lines
12 KiB
Python
"""Tests for GET /analytics/flowpilot/escalations — Escalation Mode wedge metric.
|
|
|
|
Covers the in-product time-to-first-action measurement that powers the queue
|
|
stat-card and the analytics page. The savings claim itself comes from the
|
|
manual baseline (the Assignment); these tests only cover what the in-product
|
|
endpoint returns.
|
|
"""
|
|
from datetime import datetime, timedelta, timezone
|
|
from uuid import UUID as PyUUID
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
from sqlalchemy import select
|
|
|
|
from app.models.ai_session import AISession
|
|
from app.models.ai_session_step import AISessionStep
|
|
from app.models.session_handoff import SessionHandoff
|
|
from app.models.user import User
|
|
|
|
|
|
URL = "/api/v1/analytics/flowpilot/escalations"
|
|
|
|
|
|
# ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
|
|
async def _make_session(db, *, user_id, account_id) -> AISession:
|
|
s = AISession(
|
|
user_id=user_id,
|
|
account_id=account_id,
|
|
session_type="guided",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="escalated",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
db.add(s)
|
|
await db.flush()
|
|
return s
|
|
|
|
|
|
async def _make_handoff(
|
|
db,
|
|
*,
|
|
session_id,
|
|
account_id,
|
|
user_id,
|
|
claimed_at: datetime | None,
|
|
claimed_by=None,
|
|
) -> SessionHandoff:
|
|
h = SessionHandoff(
|
|
session_id=session_id,
|
|
account_id=account_id,
|
|
handed_off_by=user_id,
|
|
intent="escalate",
|
|
snapshot={"branch_map": "stub"},
|
|
priority="normal",
|
|
claimed_at=claimed_at,
|
|
claimed_by=claimed_by,
|
|
)
|
|
db.add(h)
|
|
await db.flush()
|
|
return h
|
|
|
|
|
|
async def _make_step(db, *, session_id, account_id, created_at: datetime) -> AISessionStep:
|
|
"""Insert an ai_session_step row with an explicit created_at.
|
|
|
|
SQLAlchemy's default would set created_at to now(); the metric query keys
|
|
off this column so the tests need to control it directly.
|
|
"""
|
|
step = AISessionStep(
|
|
session_id=session_id,
|
|
account_id=account_id,
|
|
step_order=1,
|
|
step_type="note",
|
|
content={"text": "first action"},
|
|
confidence_at_step=0.5,
|
|
input_tokens=0,
|
|
output_tokens=0,
|
|
is_fork_point=False,
|
|
was_free_text=False,
|
|
was_skipped=False,
|
|
created_at=created_at,
|
|
)
|
|
db.add(step)
|
|
await db.flush()
|
|
return step
|
|
|
|
|
|
# ─── Tests ────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_zero_metrics_when_no_handoffs(
|
|
client: AsyncClient, auth_headers, test_user
|
|
):
|
|
"""Empty account → n_handoffs_claimed=0, all stats None, 200 OK."""
|
|
response = await client.get(URL, headers=auth_headers)
|
|
assert response.status_code == 200
|
|
body = response.json()
|
|
assert body["period"] == "30d"
|
|
assert body["n_handoffs_claimed"] == 0
|
|
assert body["n_handoffs_with_action"] == 0
|
|
assert body["avg_seconds_to_first_action"] is None
|
|
assert body["median_seconds_to_first_action"] is None
|
|
assert body["p95_seconds_to_first_action"] is None
|
|
# Disclaimer is part of the contract — pilots reading the API should see it.
|
|
assert "manual baseline" in body["metric_definition"].lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_happy_path_single_handoff_with_action(
|
|
client: AsyncClient, auth_headers, test_user, test_db
|
|
):
|
|
"""One claimed handoff + a step 90s later → avg=median=p95=90.0."""
|
|
user_id = PyUUID(test_user["user_data"]["id"])
|
|
account_id = PyUUID(test_user["user_data"]["account_id"])
|
|
|
|
claimed_at = datetime.now(timezone.utc) - timedelta(hours=2)
|
|
first_action_at = claimed_at + timedelta(seconds=90)
|
|
|
|
session = await _make_session(test_db, user_id=user_id, account_id=account_id)
|
|
await _make_handoff(
|
|
test_db,
|
|
session_id=session.id,
|
|
account_id=account_id,
|
|
user_id=user_id,
|
|
claimed_at=claimed_at,
|
|
claimed_by=user_id,
|
|
)
|
|
await _make_step(
|
|
test_db,
|
|
session_id=session.id,
|
|
account_id=account_id,
|
|
created_at=first_action_at,
|
|
)
|
|
await test_db.commit()
|
|
|
|
response = await client.get(URL, headers=auth_headers)
|
|
assert response.status_code == 200
|
|
body = response.json()
|
|
assert body["n_handoffs_claimed"] == 1
|
|
assert body["n_handoffs_with_action"] == 1
|
|
assert body["avg_seconds_to_first_action"] == 90.0
|
|
assert body["median_seconds_to_first_action"] == 90.0
|
|
assert body["p95_seconds_to_first_action"] == 90.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handoff_claimed_but_no_action(
|
|
client: AsyncClient, auth_headers, test_user, test_db
|
|
):
|
|
"""Claimed handoff with no post-claim step → counted in n_handoffs_claimed
|
|
but not in n_handoffs_with_action; aggregates remain None."""
|
|
user_id = PyUUID(test_user["user_data"]["id"])
|
|
account_id = PyUUID(test_user["user_data"]["account_id"])
|
|
claimed_at = datetime.now(timezone.utc) - timedelta(minutes=5)
|
|
|
|
session = await _make_session(test_db, user_id=user_id, account_id=account_id)
|
|
await _make_handoff(
|
|
test_db,
|
|
session_id=session.id,
|
|
account_id=account_id,
|
|
user_id=user_id,
|
|
claimed_at=claimed_at,
|
|
claimed_by=user_id,
|
|
)
|
|
# Pre-claim step (created_at < claimed_at) — must NOT count.
|
|
await _make_step(
|
|
test_db,
|
|
session_id=session.id,
|
|
account_id=account_id,
|
|
created_at=claimed_at - timedelta(seconds=30),
|
|
)
|
|
await test_db.commit()
|
|
|
|
response = await client.get(URL, headers=auth_headers)
|
|
assert response.status_code == 200
|
|
body = response.json()
|
|
assert body["n_handoffs_claimed"] == 1
|
|
assert body["n_handoffs_with_action"] == 0
|
|
assert body["avg_seconds_to_first_action"] is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unclaimed_handoffs_excluded(
|
|
client: AsyncClient, auth_headers, test_user, test_db
|
|
):
|
|
"""Handoffs with claimed_at IS NULL are excluded entirely."""
|
|
user_id = PyUUID(test_user["user_data"]["id"])
|
|
account_id = PyUUID(test_user["user_data"]["account_id"])
|
|
|
|
session = await _make_session(test_db, user_id=user_id, account_id=account_id)
|
|
await _make_handoff(
|
|
test_db,
|
|
session_id=session.id,
|
|
account_id=account_id,
|
|
user_id=user_id,
|
|
claimed_at=None,
|
|
)
|
|
await test_db.commit()
|
|
|
|
response = await client.get(URL, headers=auth_headers)
|
|
assert response.status_code == 200
|
|
assert response.json()["n_handoffs_claimed"] == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_period_window_excludes_old_handoffs(
|
|
client: AsyncClient, auth_headers, test_user, test_db
|
|
):
|
|
"""A handoff claimed >7d ago must not appear in ?period=7d."""
|
|
user_id = PyUUID(test_user["user_data"]["id"])
|
|
account_id = PyUUID(test_user["user_data"]["account_id"])
|
|
|
|
old_claimed_at = datetime.now(timezone.utc) - timedelta(days=10)
|
|
session = await _make_session(test_db, user_id=user_id, account_id=account_id)
|
|
await _make_handoff(
|
|
test_db,
|
|
session_id=session.id,
|
|
account_id=account_id,
|
|
user_id=user_id,
|
|
claimed_at=old_claimed_at,
|
|
claimed_by=user_id,
|
|
)
|
|
await _make_step(
|
|
test_db,
|
|
session_id=session.id,
|
|
account_id=account_id,
|
|
created_at=old_claimed_at + timedelta(seconds=60),
|
|
)
|
|
await test_db.commit()
|
|
|
|
# 7d window: excluded
|
|
r7 = await client.get(URL, headers=auth_headers, params={"period": "7d"})
|
|
assert r7.status_code == 200
|
|
assert r7.json()["n_handoffs_claimed"] == 0
|
|
|
|
# 90d window: included
|
|
r90 = await client.get(URL, headers=auth_headers, params={"period": "90d"})
|
|
assert r90.status_code == 200
|
|
assert r90.json()["n_handoffs_claimed"] == 1
|
|
assert r90.json()["n_handoffs_with_action"] == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_aggregate_stats_for_multiple_handoffs(
|
|
client: AsyncClient, auth_headers, test_user, test_db
|
|
):
|
|
"""Three handoffs with deltas 30/60/180s → avg=90, median=60, p95≈180."""
|
|
user_id = PyUUID(test_user["user_data"]["id"])
|
|
account_id = PyUUID(test_user["user_data"]["account_id"])
|
|
|
|
base = datetime.now(timezone.utc) - timedelta(hours=3)
|
|
deltas = [30, 60, 180]
|
|
for i, delta in enumerate(deltas):
|
|
s = await _make_session(test_db, user_id=user_id, account_id=account_id)
|
|
claimed_at = base + timedelta(minutes=i * 10)
|
|
await _make_handoff(
|
|
test_db,
|
|
session_id=s.id,
|
|
account_id=account_id,
|
|
user_id=user_id,
|
|
claimed_at=claimed_at,
|
|
claimed_by=user_id,
|
|
)
|
|
await _make_step(
|
|
test_db,
|
|
session_id=s.id,
|
|
account_id=account_id,
|
|
created_at=claimed_at + timedelta(seconds=delta),
|
|
)
|
|
await test_db.commit()
|
|
|
|
response = await client.get(URL, headers=auth_headers)
|
|
body = response.json()
|
|
assert response.status_code == 200
|
|
assert body["n_handoffs_claimed"] == 3
|
|
assert body["n_handoffs_with_action"] == 3
|
|
assert body["avg_seconds_to_first_action"] == 90.0
|
|
assert body["median_seconds_to_first_action"] == 60.0
|
|
assert body["p95_seconds_to_first_action"] == 180.0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_account_isolation_requesting_user_only_sees_own_account(
|
|
client: AsyncClient, auth_headers, test_user, test_db
|
|
):
|
|
"""A handoff in another account must not appear in this user's response.
|
|
|
|
Critical: the Phase 4 RLS pattern can fail silently if account_id is wrong.
|
|
This test would catch an account-scoped query that accidentally returned
|
|
cross-tenant rows.
|
|
"""
|
|
from app.models.account import Account
|
|
|
|
other_account = Account(name="Other MSP", display_code="OTHER001")
|
|
test_db.add(other_account)
|
|
await test_db.flush()
|
|
|
|
other_user = User(
|
|
email="other@example.com",
|
|
password_hash="x",
|
|
name="Other Tech",
|
|
role="engineer",
|
|
account_id=other_account.id,
|
|
account_role="owner",
|
|
)
|
|
test_db.add(other_user)
|
|
await test_db.flush()
|
|
|
|
s = await _make_session(
|
|
test_db, user_id=other_user.id, account_id=other_account.id
|
|
)
|
|
claimed_at = datetime.now(timezone.utc) - timedelta(hours=1)
|
|
await _make_handoff(
|
|
test_db,
|
|
session_id=s.id,
|
|
account_id=other_account.id,
|
|
user_id=other_user.id,
|
|
claimed_at=claimed_at,
|
|
claimed_by=other_user.id,
|
|
)
|
|
await _make_step(
|
|
test_db,
|
|
session_id=s.id,
|
|
account_id=other_account.id,
|
|
created_at=claimed_at + timedelta(seconds=45),
|
|
)
|
|
await test_db.commit()
|
|
|
|
response = await client.get(URL, headers=auth_headers)
|
|
assert response.status_code == 200
|
|
body = response.json()
|
|
# The other account's handoff must NOT leak into this account's response.
|
|
assert body["n_handoffs_claimed"] == 0
|
|
assert body["n_handoffs_with_action"] == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_viewer_role_is_blocked(
|
|
client: AsyncClient, test_user, auth_headers, test_db
|
|
):
|
|
"""Downgrade the test user to 'viewer' and confirm the endpoint 403s."""
|
|
user_id = PyUUID(test_user["user_data"]["id"])
|
|
user = (
|
|
await test_db.execute(select(User).where(User.id == user_id))
|
|
).scalar_one()
|
|
user.account_role = "viewer"
|
|
await test_db.commit()
|
|
|
|
response = await client.get(URL, headers=auth_headers)
|
|
assert response.status_code == 403
|
|
assert "engineer" in response.json()["detail"].lower()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_period_rejected(client: AsyncClient, auth_headers):
|
|
"""period=1d is not in {7d,30d,90d} — must 422."""
|
|
response = await client.get(URL, headers=auth_headers, params={"period": "1d"})
|
|
assert response.status_code == 422
|