feat(analytics): add escalation time-to-first-action metric endpoint

GET /api/v1/analytics/flowpilot/escalations?period={7d,30d,90d}

Computes the in-product wedge metric for Escalation Mode: average / median /
p95 seconds between SessionHandoff.claimed_at and the first ai_session_step
created on the same session after that timestamp. Account-scoped, role-gated
to engineer-or-admin.

The metric is intentionally NOT called "minutes recovered" — that's the
two-metric framing locked by /codex review: this in-product number must be
paired with manual baseline (the verbal-handoff stopwatch from The Assignment)
to produce the savings claim. Schema's `metric_definition` field surfaces the
disclaimer in every response so callers don't oversell it.

Implementation notes:
- Uses correlated scalar subquery for first-step-after-claim per handoff,
  aggregates avg/median/p95 in Python (~1k rows/account/month is well within
  budget; cleaner than percentile_cont gymnastics in SQL)
- Excludes unclaimed handoffs (claimed_at IS NULL)
- Counts claimed-but-no-action handoffs in n_handoffs_claimed but not in
  n_handoffs_with_action — surfaces the conversion-rate signal
- Floors negative deltas at 0 to handle clock-drift edge cases

Tests cover happy path, zero-data, claimed-but-no-action accounting, period
window filtering, multi-handoff aggregation, multi-tenant isolation (Phase 4
RLS landmine pattern), viewer-role 403 gate, and period validation. 9 tests,
all green. No regressions in existing handoff_manager / session_handoffs
suites.

First piece of the Approach A wedge build per
docs/plans/2026-04-27-escalation-mode-wedge-design.md. Unblocks the queue
stat-card and the analytics page.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-27 15:25:46 -04:00
parent d51e95cdfa
commit 52f6d0308f
3 changed files with 498 additions and 1 deletions

View File

@@ -3,8 +3,10 @@
Endpoints: Endpoints:
GET /analytics/flowpilot?period=30d — Main dashboard data GET /analytics/flowpilot?period=30d — Main dashboard data
GET /analytics/flowpilot/knowledge-gaps — Knowledge gap report GET /analytics/flowpilot/knowledge-gaps — Knowledge gap report
GET /analytics/flowpilot/escalations?period=30d — Escalation handoff metrics
""" """
import logging import logging
import statistics
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from typing import Annotated, Optional from typing import Annotated, Optional
@@ -13,10 +15,17 @@ from sqlalchemy import select, func, case, cast, Date, extract
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.core.rate_limit import limiter from app.core.rate_limit import limiter
from app.api.deps import get_current_active_user, get_db, require_team_admin from app.api.deps import (
get_current_active_user,
get_db,
require_engineer_or_admin,
require_team_admin,
)
from app.models.user import User from app.models.user import User
from app.models.tree import Tree from app.models.tree import Tree
from app.models.ai_session import AISession from app.models.ai_session import AISession
from app.models.ai_session_step import AISessionStep
from app.models.session_handoff import SessionHandoff
from app.models.flow_proposal import FlowProposal from app.models.flow_proposal import FlowProposal
from app.models.psa_activity_log import PsaActivityLog from app.models.psa_activity_log import PsaActivityLog
from app.models.psa_post_log import PsaPostLog from app.models.psa_post_log import PsaPostLog
@@ -36,6 +45,7 @@ from app.schemas.flowpilot_analytics import (
EnhancedPsaMetrics, EnhancedPsaMetrics,
PsaFunnel, PsaFunnel,
PsaDailyTrend, PsaDailyTrend,
EscalationMetrics,
) )
from app.services.knowledge_gap_service import get_knowledge_gaps, KnowledgeGapReport from app.services.knowledge_gap_service import get_knowledge_gaps, KnowledgeGapReport
@@ -727,3 +737,104 @@ async def get_enhanced_psa_metrics(
push_funnel=push_funnel, push_funnel=push_funnel,
daily_trend=daily_trend, daily_trend=daily_trend,
) )
# ─── Escalation Mode metrics (wedge stat for /escalations queue + analytics page)
#
# Pulls all (handoff.claimed_at, first_step_after_claim.created_at) pairs in the
# window and aggregates avg/median/p95 of the delta in Python. Pilot scale
# (~1k rows max per account per month) makes this cheaper and clearer than
# Postgres percentile_cont gymnastics.
#
# IMPORTANT: this is the in-product metric only. The "minutes recovered"
# sales claim requires manual baseline measurement (see The Assignment in
# docs/plans/2026-04-27-escalation-mode-wedge-design.md).
@router.get("/escalations", response_model=EscalationMetrics)
@limiter.limit("30/minute")
async def get_escalation_metrics(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
period: str = Query("30d", pattern="^(7d|30d|90d)$"),
) -> EscalationMetrics:
"""Time-to-first-action after escalation claim, account-scoped.
Returns:
n_handoffs_claimed: handoffs in window that were claimed by a senior.
n_handoffs_with_action: subset where the senior took at least one
action (an ai_session_step row created after claimed_at).
avg/median/p95_seconds_to_first_action: aggregates of
(first_step.created_at - claimed_at) in seconds.
Excludes handoffs where claimed_at IS NULL (never claimed) and handoffs
where no ai_session_step was created after the claim. Both are
counted — n_handoffs_claimed includes "no action yet" handoffs so the
conversion rate is visible.
"""
if not current_user.account_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="No account"
)
account_id = current_user.account_id
period_start = _get_period_start(period)
# First-action timestamp per handoff via correlated scalar subquery.
first_action_subq = (
select(func.min(AISessionStep.created_at))
.where(
AISessionStep.session_id == SessionHandoff.session_id,
AISessionStep.created_at > SessionHandoff.claimed_at,
)
.correlate(SessionHandoff)
.scalar_subquery()
)
rows = (
await db.execute(
select(
SessionHandoff.claimed_at,
first_action_subq.label("first_action_at"),
).where(
SessionHandoff.account_id == account_id,
SessionHandoff.claimed_at.isnot(None),
SessionHandoff.claimed_at >= period_start,
)
)
).all()
n_handoffs_claimed = len(rows)
deltas: list[float] = []
for claimed_at, first_action_at in rows:
if first_action_at is None:
continue
delta_s = (first_action_at - claimed_at).total_seconds()
# Floor at zero — clock drift between rows could in theory yield a
# tiny negative if a step's created_at races claimed_at. Surface as
# 0s rather than absurd negative deltas.
if delta_s < 0:
delta_s = 0.0
deltas.append(delta_s)
n_handoffs_with_action = len(deltas)
if n_handoffs_with_action == 0:
return EscalationMetrics(
period=period,
n_handoffs_claimed=n_handoffs_claimed,
n_handoffs_with_action=0,
)
sorted_deltas = sorted(deltas)
p95_idx = max(0, int(round(0.95 * (n_handoffs_with_action - 1))))
return EscalationMetrics(
period=period,
n_handoffs_claimed=n_handoffs_claimed,
n_handoffs_with_action=n_handoffs_with_action,
avg_seconds_to_first_action=round(statistics.fmean(deltas), 2),
median_seconds_to_first_action=round(statistics.median(deltas), 2),
p95_seconds_to_first_action=round(sorted_deltas[p95_idx], 2),
)

View File

@@ -124,3 +124,26 @@ class FlowPilotDashboard(BaseModel):
confidence_breakdown: ConfidenceBreakdown confidence_breakdown: ConfidenceBreakdown
knowledge_coverage: KnowledgeCoverage knowledge_coverage: KnowledgeCoverage
psa_metrics: PsaMetrics | None = None psa_metrics: PsaMetrics | None = None
class EscalationMetrics(BaseModel):
"""In-product time-to-first-action metric for the Escalation Mode wedge.
NOTE: this is the *in-product* metric (post-claim time-to-first-action). The
"minutes recovered" sales claim requires a manual baseline measurement of the
pre-Escalation-Mode verbal-handoff time. See
docs/plans/2026-04-27-escalation-mode-wedge-design.md for the two-metric
framing — do not roll this number alone into "minutes recovered."
"""
period: str
n_handoffs_claimed: int
n_handoffs_with_action: int
avg_seconds_to_first_action: float | None = None
median_seconds_to_first_action: float | None = None
p95_seconds_to_first_action: float | None = None
metric_definition: str = (
"elapsed_seconds(first ai_session_step in session where "
"created_at > SessionHandoff.claimed_at) — measures post-claim activity "
"lag, NOT verbal-handoff savings. Pair with manual baseline."
)

View File

@@ -0,0 +1,363 @@
"""Tests for GET /analytics/flowpilot/escalations — Escalation Mode wedge metric.
Covers the in-product time-to-first-action measurement that powers the queue
stat-card and the analytics page. The savings claim itself comes from the
manual baseline (the Assignment); these tests only cover what the in-product
endpoint returns.
"""
from datetime import datetime, timedelta, timezone
from uuid import UUID as PyUUID
import pytest
from httpx import AsyncClient
from sqlalchemy import select
from app.models.ai_session import AISession
from app.models.ai_session_step import AISessionStep
from app.models.session_handoff import SessionHandoff
from app.models.user import User
URL = "/api/v1/analytics/flowpilot/escalations"
# ─── Helpers ──────────────────────────────────────────────────────────────────
async def _make_session(db, *, user_id, account_id) -> AISession:
s = AISession(
user_id=user_id,
account_id=account_id,
session_type="guided",
intake_type="free_text",
intake_content={"text": "test"},
status="escalated",
confidence_tier="discovery",
conversation_messages=[],
)
db.add(s)
await db.flush()
return s
async def _make_handoff(
db,
*,
session_id,
account_id,
user_id,
claimed_at: datetime | None,
claimed_by=None,
) -> SessionHandoff:
h = SessionHandoff(
session_id=session_id,
account_id=account_id,
handed_off_by=user_id,
intent="escalate",
snapshot={"branch_map": "stub"},
priority="normal",
claimed_at=claimed_at,
claimed_by=claimed_by,
)
db.add(h)
await db.flush()
return h
async def _make_step(db, *, session_id, account_id, created_at: datetime) -> AISessionStep:
"""Insert an ai_session_step row with an explicit created_at.
SQLAlchemy's default would set created_at to now(); the metric query keys
off this column so the tests need to control it directly.
"""
step = AISessionStep(
session_id=session_id,
account_id=account_id,
step_order=1,
step_type="note",
content={"text": "first action"},
confidence_at_step=0.5,
input_tokens=0,
output_tokens=0,
is_fork_point=False,
was_free_text=False,
was_skipped=False,
created_at=created_at,
)
db.add(step)
await db.flush()
return step
# ─── Tests ────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_returns_zero_metrics_when_no_handoffs(
client: AsyncClient, auth_headers, test_user
):
"""Empty account → n_handoffs_claimed=0, all stats None, 200 OK."""
response = await client.get(URL, headers=auth_headers)
assert response.status_code == 200
body = response.json()
assert body["period"] == "30d"
assert body["n_handoffs_claimed"] == 0
assert body["n_handoffs_with_action"] == 0
assert body["avg_seconds_to_first_action"] is None
assert body["median_seconds_to_first_action"] is None
assert body["p95_seconds_to_first_action"] is None
# Disclaimer is part of the contract — pilots reading the API should see it.
assert "manual baseline" in body["metric_definition"].lower()
@pytest.mark.asyncio
async def test_happy_path_single_handoff_with_action(
client: AsyncClient, auth_headers, test_user, test_db
):
"""One claimed handoff + a step 90s later → avg=median=p95=90.0."""
user_id = PyUUID(test_user["user_data"]["id"])
account_id = PyUUID(test_user["user_data"]["account_id"])
claimed_at = datetime.now(timezone.utc) - timedelta(hours=2)
first_action_at = claimed_at + timedelta(seconds=90)
session = await _make_session(test_db, user_id=user_id, account_id=account_id)
await _make_handoff(
test_db,
session_id=session.id,
account_id=account_id,
user_id=user_id,
claimed_at=claimed_at,
claimed_by=user_id,
)
await _make_step(
test_db,
session_id=session.id,
account_id=account_id,
created_at=first_action_at,
)
await test_db.commit()
response = await client.get(URL, headers=auth_headers)
assert response.status_code == 200
body = response.json()
assert body["n_handoffs_claimed"] == 1
assert body["n_handoffs_with_action"] == 1
assert body["avg_seconds_to_first_action"] == 90.0
assert body["median_seconds_to_first_action"] == 90.0
assert body["p95_seconds_to_first_action"] == 90.0
@pytest.mark.asyncio
async def test_handoff_claimed_but_no_action(
client: AsyncClient, auth_headers, test_user, test_db
):
"""Claimed handoff with no post-claim step → counted in n_handoffs_claimed
but not in n_handoffs_with_action; aggregates remain None."""
user_id = PyUUID(test_user["user_data"]["id"])
account_id = PyUUID(test_user["user_data"]["account_id"])
claimed_at = datetime.now(timezone.utc) - timedelta(minutes=5)
session = await _make_session(test_db, user_id=user_id, account_id=account_id)
await _make_handoff(
test_db,
session_id=session.id,
account_id=account_id,
user_id=user_id,
claimed_at=claimed_at,
claimed_by=user_id,
)
# Pre-claim step (created_at < claimed_at) — must NOT count.
await _make_step(
test_db,
session_id=session.id,
account_id=account_id,
created_at=claimed_at - timedelta(seconds=30),
)
await test_db.commit()
response = await client.get(URL, headers=auth_headers)
assert response.status_code == 200
body = response.json()
assert body["n_handoffs_claimed"] == 1
assert body["n_handoffs_with_action"] == 0
assert body["avg_seconds_to_first_action"] is None
@pytest.mark.asyncio
async def test_unclaimed_handoffs_excluded(
client: AsyncClient, auth_headers, test_user, test_db
):
"""Handoffs with claimed_at IS NULL are excluded entirely."""
user_id = PyUUID(test_user["user_data"]["id"])
account_id = PyUUID(test_user["user_data"]["account_id"])
session = await _make_session(test_db, user_id=user_id, account_id=account_id)
await _make_handoff(
test_db,
session_id=session.id,
account_id=account_id,
user_id=user_id,
claimed_at=None,
)
await test_db.commit()
response = await client.get(URL, headers=auth_headers)
assert response.status_code == 200
assert response.json()["n_handoffs_claimed"] == 0
@pytest.mark.asyncio
async def test_period_window_excludes_old_handoffs(
client: AsyncClient, auth_headers, test_user, test_db
):
"""A handoff claimed >7d ago must not appear in ?period=7d."""
user_id = PyUUID(test_user["user_data"]["id"])
account_id = PyUUID(test_user["user_data"]["account_id"])
old_claimed_at = datetime.now(timezone.utc) - timedelta(days=10)
session = await _make_session(test_db, user_id=user_id, account_id=account_id)
await _make_handoff(
test_db,
session_id=session.id,
account_id=account_id,
user_id=user_id,
claimed_at=old_claimed_at,
claimed_by=user_id,
)
await _make_step(
test_db,
session_id=session.id,
account_id=account_id,
created_at=old_claimed_at + timedelta(seconds=60),
)
await test_db.commit()
# 7d window: excluded
r7 = await client.get(URL, headers=auth_headers, params={"period": "7d"})
assert r7.status_code == 200
assert r7.json()["n_handoffs_claimed"] == 0
# 90d window: included
r90 = await client.get(URL, headers=auth_headers, params={"period": "90d"})
assert r90.status_code == 200
assert r90.json()["n_handoffs_claimed"] == 1
assert r90.json()["n_handoffs_with_action"] == 1
@pytest.mark.asyncio
async def test_aggregate_stats_for_multiple_handoffs(
client: AsyncClient, auth_headers, test_user, test_db
):
"""Three handoffs with deltas 30/60/180s → avg=90, median=60, p95≈180."""
user_id = PyUUID(test_user["user_data"]["id"])
account_id = PyUUID(test_user["user_data"]["account_id"])
base = datetime.now(timezone.utc) - timedelta(hours=3)
deltas = [30, 60, 180]
for i, delta in enumerate(deltas):
s = await _make_session(test_db, user_id=user_id, account_id=account_id)
claimed_at = base + timedelta(minutes=i * 10)
await _make_handoff(
test_db,
session_id=s.id,
account_id=account_id,
user_id=user_id,
claimed_at=claimed_at,
claimed_by=user_id,
)
await _make_step(
test_db,
session_id=s.id,
account_id=account_id,
created_at=claimed_at + timedelta(seconds=delta),
)
await test_db.commit()
response = await client.get(URL, headers=auth_headers)
body = response.json()
assert response.status_code == 200
assert body["n_handoffs_claimed"] == 3
assert body["n_handoffs_with_action"] == 3
assert body["avg_seconds_to_first_action"] == 90.0
assert body["median_seconds_to_first_action"] == 60.0
assert body["p95_seconds_to_first_action"] == 180.0
@pytest.mark.asyncio
async def test_account_isolation_requesting_user_only_sees_own_account(
client: AsyncClient, auth_headers, test_user, test_db
):
"""A handoff in another account must not appear in this user's response.
Critical: the Phase 4 RLS pattern can fail silently if account_id is wrong.
This test would catch an account-scoped query that accidentally returned
cross-tenant rows.
"""
from app.models.account import Account
other_account = Account(name="Other MSP", display_code="OTHER001")
test_db.add(other_account)
await test_db.flush()
other_user = User(
email="other@example.com",
password_hash="x",
name="Other Tech",
role="engineer",
account_id=other_account.id,
account_role="owner",
)
test_db.add(other_user)
await test_db.flush()
s = await _make_session(
test_db, user_id=other_user.id, account_id=other_account.id
)
claimed_at = datetime.now(timezone.utc) - timedelta(hours=1)
await _make_handoff(
test_db,
session_id=s.id,
account_id=other_account.id,
user_id=other_user.id,
claimed_at=claimed_at,
claimed_by=other_user.id,
)
await _make_step(
test_db,
session_id=s.id,
account_id=other_account.id,
created_at=claimed_at + timedelta(seconds=45),
)
await test_db.commit()
response = await client.get(URL, headers=auth_headers)
assert response.status_code == 200
body = response.json()
# The other account's handoff must NOT leak into this account's response.
assert body["n_handoffs_claimed"] == 0
assert body["n_handoffs_with_action"] == 0
@pytest.mark.asyncio
async def test_viewer_role_is_blocked(
client: AsyncClient, test_user, auth_headers, test_db
):
"""Downgrade the test user to 'viewer' and confirm the endpoint 403s."""
user_id = PyUUID(test_user["user_data"]["id"])
user = (
await test_db.execute(select(User).where(User.id == user_id))
).scalar_one()
user.account_role = "viewer"
await test_db.commit()
response = await client.get(URL, headers=auth_headers)
assert response.status_code == 403
assert "engineer" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_invalid_period_rejected(client: AsyncClient, auth_headers):
"""period=1d is not in {7d,30d,90d} — must 422."""
response = await client.get(URL, headers=auth_headers, params={"period": "1d"})
assert response.status_code == 422