From 52f6d0308fc7ebfb073d6d5fd5367322b5c23a4b Mon Sep 17 00:00:00 2001 From: Michael Chihlas Date: Mon, 27 Apr 2026 15:25:46 -0400 Subject: [PATCH] feat(analytics): add escalation time-to-first-action metric endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GET /api/v1/analytics/flowpilot/escalations?period={7d,30d,90d} Computes the in-product wedge metric for Escalation Mode: average / median / p95 seconds between SessionHandoff.claimed_at and the first ai_session_step created on the same session after that timestamp. Account-scoped, role-gated to engineer-or-admin. The metric is intentionally NOT called "minutes recovered" — that's the two-metric framing locked by /codex review: this in-product number must be paired with manual baseline (the verbal-handoff stopwatch from The Assignment) to produce the savings claim. Schema's `metric_definition` field surfaces the disclaimer in every response so callers don't oversell it. Implementation notes: - Uses correlated scalar subquery for first-step-after-claim per handoff, aggregates avg/median/p95 in Python (~1k rows/account/month is well within budget; cleaner than percentile_cont gymnastics in SQL) - Excludes unclaimed handoffs (claimed_at IS NULL) - Counts claimed-but-no-action handoffs in n_handoffs_claimed but not in n_handoffs_with_action — surfaces the conversion-rate signal - Floors negative deltas at 0 to handle clock-drift edge cases Tests cover happy path, zero-data, claimed-but-no-action accounting, period window filtering, multi-handoff aggregation, multi-tenant isolation (Phase 4 RLS landmine pattern), viewer-role 403 gate, and period validation. 9 tests, all green. No regressions in existing handoff_manager / session_handoffs suites. First piece of the Approach A wedge build per docs/plans/2026-04-27-escalation-mode-wedge-design.md. Unblocks the queue stat-card and the analytics page. Co-Authored-By: Claude Opus 4.7 --- .../app/api/endpoints/flowpilot_analytics.py | 113 +++++- backend/app/schemas/flowpilot_analytics.py | 23 ++ .../test_flowpilot_analytics_escalations.py | 363 ++++++++++++++++++ 3 files changed, 498 insertions(+), 1 deletion(-) create mode 100644 backend/tests/test_flowpilot_analytics_escalations.py diff --git a/backend/app/api/endpoints/flowpilot_analytics.py b/backend/app/api/endpoints/flowpilot_analytics.py index 66a322bb..870b434f 100644 --- a/backend/app/api/endpoints/flowpilot_analytics.py +++ b/backend/app/api/endpoints/flowpilot_analytics.py @@ -3,8 +3,10 @@ Endpoints: GET /analytics/flowpilot?period=30d — Main dashboard data GET /analytics/flowpilot/knowledge-gaps — Knowledge gap report + GET /analytics/flowpilot/escalations?period=30d — Escalation handoff metrics """ import logging +import statistics from datetime import datetime, timezone, timedelta from typing import Annotated, Optional @@ -13,10 +15,17 @@ from sqlalchemy import select, func, case, cast, Date, extract from sqlalchemy.ext.asyncio import AsyncSession from app.core.rate_limit import limiter -from app.api.deps import get_current_active_user, get_db, require_team_admin +from app.api.deps import ( + get_current_active_user, + get_db, + require_engineer_or_admin, + require_team_admin, +) from app.models.user import User from app.models.tree import Tree from app.models.ai_session import AISession +from app.models.ai_session_step import AISessionStep +from app.models.session_handoff import SessionHandoff from app.models.flow_proposal import FlowProposal from app.models.psa_activity_log import PsaActivityLog from app.models.psa_post_log import PsaPostLog @@ -36,6 +45,7 @@ from app.schemas.flowpilot_analytics import ( EnhancedPsaMetrics, PsaFunnel, PsaDailyTrend, + EscalationMetrics, ) from app.services.knowledge_gap_service import get_knowledge_gaps, KnowledgeGapReport @@ -727,3 +737,104 @@ async def get_enhanced_psa_metrics( push_funnel=push_funnel, daily_trend=daily_trend, ) + + +# ─── Escalation Mode metrics (wedge stat for /escalations queue + analytics page) +# +# Pulls all (handoff.claimed_at, first_step_after_claim.created_at) pairs in the +# window and aggregates avg/median/p95 of the delta in Python. Pilot scale +# (~1k rows max per account per month) makes this cheaper and clearer than +# Postgres percentile_cont gymnastics. +# +# IMPORTANT: this is the in-product metric only. The "minutes recovered" +# sales claim requires manual baseline measurement (see The Assignment in +# docs/plans/2026-04-27-escalation-mode-wedge-design.md). + + +@router.get("/escalations", response_model=EscalationMetrics) +@limiter.limit("30/minute") +async def get_escalation_metrics( + request: Request, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], + _: None = Depends(require_engineer_or_admin), + period: str = Query("30d", pattern="^(7d|30d|90d)$"), +) -> EscalationMetrics: + """Time-to-first-action after escalation claim, account-scoped. + + Returns: + n_handoffs_claimed: handoffs in window that were claimed by a senior. + n_handoffs_with_action: subset where the senior took at least one + action (an ai_session_step row created after claimed_at). + avg/median/p95_seconds_to_first_action: aggregates of + (first_step.created_at - claimed_at) in seconds. + + Excludes handoffs where claimed_at IS NULL (never claimed) and handoffs + where no ai_session_step was created after the claim. Both are + counted — n_handoffs_claimed includes "no action yet" handoffs so the + conversion rate is visible. + """ + if not current_user.account_id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, detail="No account" + ) + + account_id = current_user.account_id + period_start = _get_period_start(period) + + # First-action timestamp per handoff via correlated scalar subquery. + first_action_subq = ( + select(func.min(AISessionStep.created_at)) + .where( + AISessionStep.session_id == SessionHandoff.session_id, + AISessionStep.created_at > SessionHandoff.claimed_at, + ) + .correlate(SessionHandoff) + .scalar_subquery() + ) + + rows = ( + await db.execute( + select( + SessionHandoff.claimed_at, + first_action_subq.label("first_action_at"), + ).where( + SessionHandoff.account_id == account_id, + SessionHandoff.claimed_at.isnot(None), + SessionHandoff.claimed_at >= period_start, + ) + ) + ).all() + + n_handoffs_claimed = len(rows) + deltas: list[float] = [] + for claimed_at, first_action_at in rows: + if first_action_at is None: + continue + delta_s = (first_action_at - claimed_at).total_seconds() + # Floor at zero — clock drift between rows could in theory yield a + # tiny negative if a step's created_at races claimed_at. Surface as + # 0s rather than absurd negative deltas. + if delta_s < 0: + delta_s = 0.0 + deltas.append(delta_s) + + n_handoffs_with_action = len(deltas) + if n_handoffs_with_action == 0: + return EscalationMetrics( + period=period, + n_handoffs_claimed=n_handoffs_claimed, + n_handoffs_with_action=0, + ) + + sorted_deltas = sorted(deltas) + p95_idx = max(0, int(round(0.95 * (n_handoffs_with_action - 1)))) + + return EscalationMetrics( + period=period, + n_handoffs_claimed=n_handoffs_claimed, + n_handoffs_with_action=n_handoffs_with_action, + avg_seconds_to_first_action=round(statistics.fmean(deltas), 2), + median_seconds_to_first_action=round(statistics.median(deltas), 2), + p95_seconds_to_first_action=round(sorted_deltas[p95_idx], 2), + ) diff --git a/backend/app/schemas/flowpilot_analytics.py b/backend/app/schemas/flowpilot_analytics.py index b3155283..410f5141 100644 --- a/backend/app/schemas/flowpilot_analytics.py +++ b/backend/app/schemas/flowpilot_analytics.py @@ -124,3 +124,26 @@ class FlowPilotDashboard(BaseModel): confidence_breakdown: ConfidenceBreakdown knowledge_coverage: KnowledgeCoverage psa_metrics: PsaMetrics | None = None + + +class EscalationMetrics(BaseModel): + """In-product time-to-first-action metric for the Escalation Mode wedge. + + NOTE: this is the *in-product* metric (post-claim time-to-first-action). The + "minutes recovered" sales claim requires a manual baseline measurement of the + pre-Escalation-Mode verbal-handoff time. See + docs/plans/2026-04-27-escalation-mode-wedge-design.md for the two-metric + framing — do not roll this number alone into "minutes recovered." + """ + + period: str + n_handoffs_claimed: int + n_handoffs_with_action: int + avg_seconds_to_first_action: float | None = None + median_seconds_to_first_action: float | None = None + p95_seconds_to_first_action: float | None = None + metric_definition: str = ( + "elapsed_seconds(first ai_session_step in session where " + "created_at > SessionHandoff.claimed_at) — measures post-claim activity " + "lag, NOT verbal-handoff savings. Pair with manual baseline." + ) diff --git a/backend/tests/test_flowpilot_analytics_escalations.py b/backend/tests/test_flowpilot_analytics_escalations.py new file mode 100644 index 00000000..18b30212 --- /dev/null +++ b/backend/tests/test_flowpilot_analytics_escalations.py @@ -0,0 +1,363 @@ +"""Tests for GET /analytics/flowpilot/escalations — Escalation Mode wedge metric. + +Covers the in-product time-to-first-action measurement that powers the queue +stat-card and the analytics page. The savings claim itself comes from the +manual baseline (the Assignment); these tests only cover what the in-product +endpoint returns. +""" +from datetime import datetime, timedelta, timezone +from uuid import UUID as PyUUID + +import pytest +from httpx import AsyncClient +from sqlalchemy import select + +from app.models.ai_session import AISession +from app.models.ai_session_step import AISessionStep +from app.models.session_handoff import SessionHandoff +from app.models.user import User + + +URL = "/api/v1/analytics/flowpilot/escalations" + + +# ─── Helpers ────────────────────────────────────────────────────────────────── + + +async def _make_session(db, *, user_id, account_id) -> AISession: + s = AISession( + user_id=user_id, + account_id=account_id, + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="escalated", + confidence_tier="discovery", + conversation_messages=[], + ) + db.add(s) + await db.flush() + return s + + +async def _make_handoff( + db, + *, + session_id, + account_id, + user_id, + claimed_at: datetime | None, + claimed_by=None, +) -> SessionHandoff: + h = SessionHandoff( + session_id=session_id, + account_id=account_id, + handed_off_by=user_id, + intent="escalate", + snapshot={"branch_map": "stub"}, + priority="normal", + claimed_at=claimed_at, + claimed_by=claimed_by, + ) + db.add(h) + await db.flush() + return h + + +async def _make_step(db, *, session_id, account_id, created_at: datetime) -> AISessionStep: + """Insert an ai_session_step row with an explicit created_at. + + SQLAlchemy's default would set created_at to now(); the metric query keys + off this column so the tests need to control it directly. + """ + step = AISessionStep( + session_id=session_id, + account_id=account_id, + step_order=1, + step_type="note", + content={"text": "first action"}, + confidence_at_step=0.5, + input_tokens=0, + output_tokens=0, + is_fork_point=False, + was_free_text=False, + was_skipped=False, + created_at=created_at, + ) + db.add(step) + await db.flush() + return step + + +# ─── Tests ──────────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_returns_zero_metrics_when_no_handoffs( + client: AsyncClient, auth_headers, test_user +): + """Empty account → n_handoffs_claimed=0, all stats None, 200 OK.""" + response = await client.get(URL, headers=auth_headers) + assert response.status_code == 200 + body = response.json() + assert body["period"] == "30d" + assert body["n_handoffs_claimed"] == 0 + assert body["n_handoffs_with_action"] == 0 + assert body["avg_seconds_to_first_action"] is None + assert body["median_seconds_to_first_action"] is None + assert body["p95_seconds_to_first_action"] is None + # Disclaimer is part of the contract — pilots reading the API should see it. + assert "manual baseline" in body["metric_definition"].lower() + + +@pytest.mark.asyncio +async def test_happy_path_single_handoff_with_action( + client: AsyncClient, auth_headers, test_user, test_db +): + """One claimed handoff + a step 90s later → avg=median=p95=90.0.""" + user_id = PyUUID(test_user["user_data"]["id"]) + account_id = PyUUID(test_user["user_data"]["account_id"]) + + claimed_at = datetime.now(timezone.utc) - timedelta(hours=2) + first_action_at = claimed_at + timedelta(seconds=90) + + session = await _make_session(test_db, user_id=user_id, account_id=account_id) + await _make_handoff( + test_db, + session_id=session.id, + account_id=account_id, + user_id=user_id, + claimed_at=claimed_at, + claimed_by=user_id, + ) + await _make_step( + test_db, + session_id=session.id, + account_id=account_id, + created_at=first_action_at, + ) + await test_db.commit() + + response = await client.get(URL, headers=auth_headers) + assert response.status_code == 200 + body = response.json() + assert body["n_handoffs_claimed"] == 1 + assert body["n_handoffs_with_action"] == 1 + assert body["avg_seconds_to_first_action"] == 90.0 + assert body["median_seconds_to_first_action"] == 90.0 + assert body["p95_seconds_to_first_action"] == 90.0 + + +@pytest.mark.asyncio +async def test_handoff_claimed_but_no_action( + client: AsyncClient, auth_headers, test_user, test_db +): + """Claimed handoff with no post-claim step → counted in n_handoffs_claimed + but not in n_handoffs_with_action; aggregates remain None.""" + user_id = PyUUID(test_user["user_data"]["id"]) + account_id = PyUUID(test_user["user_data"]["account_id"]) + claimed_at = datetime.now(timezone.utc) - timedelta(minutes=5) + + session = await _make_session(test_db, user_id=user_id, account_id=account_id) + await _make_handoff( + test_db, + session_id=session.id, + account_id=account_id, + user_id=user_id, + claimed_at=claimed_at, + claimed_by=user_id, + ) + # Pre-claim step (created_at < claimed_at) — must NOT count. + await _make_step( + test_db, + session_id=session.id, + account_id=account_id, + created_at=claimed_at - timedelta(seconds=30), + ) + await test_db.commit() + + response = await client.get(URL, headers=auth_headers) + assert response.status_code == 200 + body = response.json() + assert body["n_handoffs_claimed"] == 1 + assert body["n_handoffs_with_action"] == 0 + assert body["avg_seconds_to_first_action"] is None + + +@pytest.mark.asyncio +async def test_unclaimed_handoffs_excluded( + client: AsyncClient, auth_headers, test_user, test_db +): + """Handoffs with claimed_at IS NULL are excluded entirely.""" + user_id = PyUUID(test_user["user_data"]["id"]) + account_id = PyUUID(test_user["user_data"]["account_id"]) + + session = await _make_session(test_db, user_id=user_id, account_id=account_id) + await _make_handoff( + test_db, + session_id=session.id, + account_id=account_id, + user_id=user_id, + claimed_at=None, + ) + await test_db.commit() + + response = await client.get(URL, headers=auth_headers) + assert response.status_code == 200 + assert response.json()["n_handoffs_claimed"] == 0 + + +@pytest.mark.asyncio +async def test_period_window_excludes_old_handoffs( + client: AsyncClient, auth_headers, test_user, test_db +): + """A handoff claimed >7d ago must not appear in ?period=7d.""" + user_id = PyUUID(test_user["user_data"]["id"]) + account_id = PyUUID(test_user["user_data"]["account_id"]) + + old_claimed_at = datetime.now(timezone.utc) - timedelta(days=10) + session = await _make_session(test_db, user_id=user_id, account_id=account_id) + await _make_handoff( + test_db, + session_id=session.id, + account_id=account_id, + user_id=user_id, + claimed_at=old_claimed_at, + claimed_by=user_id, + ) + await _make_step( + test_db, + session_id=session.id, + account_id=account_id, + created_at=old_claimed_at + timedelta(seconds=60), + ) + await test_db.commit() + + # 7d window: excluded + r7 = await client.get(URL, headers=auth_headers, params={"period": "7d"}) + assert r7.status_code == 200 + assert r7.json()["n_handoffs_claimed"] == 0 + + # 90d window: included + r90 = await client.get(URL, headers=auth_headers, params={"period": "90d"}) + assert r90.status_code == 200 + assert r90.json()["n_handoffs_claimed"] == 1 + assert r90.json()["n_handoffs_with_action"] == 1 + + +@pytest.mark.asyncio +async def test_aggregate_stats_for_multiple_handoffs( + client: AsyncClient, auth_headers, test_user, test_db +): + """Three handoffs with deltas 30/60/180s → avg=90, median=60, p95≈180.""" + user_id = PyUUID(test_user["user_data"]["id"]) + account_id = PyUUID(test_user["user_data"]["account_id"]) + + base = datetime.now(timezone.utc) - timedelta(hours=3) + deltas = [30, 60, 180] + for i, delta in enumerate(deltas): + s = await _make_session(test_db, user_id=user_id, account_id=account_id) + claimed_at = base + timedelta(minutes=i * 10) + await _make_handoff( + test_db, + session_id=s.id, + account_id=account_id, + user_id=user_id, + claimed_at=claimed_at, + claimed_by=user_id, + ) + await _make_step( + test_db, + session_id=s.id, + account_id=account_id, + created_at=claimed_at + timedelta(seconds=delta), + ) + await test_db.commit() + + response = await client.get(URL, headers=auth_headers) + body = response.json() + assert response.status_code == 200 + assert body["n_handoffs_claimed"] == 3 + assert body["n_handoffs_with_action"] == 3 + assert body["avg_seconds_to_first_action"] == 90.0 + assert body["median_seconds_to_first_action"] == 60.0 + assert body["p95_seconds_to_first_action"] == 180.0 + + +@pytest.mark.asyncio +async def test_account_isolation_requesting_user_only_sees_own_account( + client: AsyncClient, auth_headers, test_user, test_db +): + """A handoff in another account must not appear in this user's response. + + Critical: the Phase 4 RLS pattern can fail silently if account_id is wrong. + This test would catch an account-scoped query that accidentally returned + cross-tenant rows. + """ + from app.models.account import Account + + other_account = Account(name="Other MSP", display_code="OTHER001") + test_db.add(other_account) + await test_db.flush() + + other_user = User( + email="other@example.com", + password_hash="x", + name="Other Tech", + role="engineer", + account_id=other_account.id, + account_role="owner", + ) + test_db.add(other_user) + await test_db.flush() + + s = await _make_session( + test_db, user_id=other_user.id, account_id=other_account.id + ) + claimed_at = datetime.now(timezone.utc) - timedelta(hours=1) + await _make_handoff( + test_db, + session_id=s.id, + account_id=other_account.id, + user_id=other_user.id, + claimed_at=claimed_at, + claimed_by=other_user.id, + ) + await _make_step( + test_db, + session_id=s.id, + account_id=other_account.id, + created_at=claimed_at + timedelta(seconds=45), + ) + await test_db.commit() + + response = await client.get(URL, headers=auth_headers) + assert response.status_code == 200 + body = response.json() + # The other account's handoff must NOT leak into this account's response. + assert body["n_handoffs_claimed"] == 0 + assert body["n_handoffs_with_action"] == 0 + + +@pytest.mark.asyncio +async def test_viewer_role_is_blocked( + client: AsyncClient, test_user, auth_headers, test_db +): + """Downgrade the test user to 'viewer' and confirm the endpoint 403s.""" + user_id = PyUUID(test_user["user_data"]["id"]) + user = ( + await test_db.execute(select(User).where(User.id == user_id)) + ).scalar_one() + user.account_role = "viewer" + await test_db.commit() + + response = await client.get(URL, headers=auth_headers) + assert response.status_code == 403 + assert "engineer" in response.json()["detail"].lower() + + +@pytest.mark.asyncio +async def test_invalid_period_rejected(client: AsyncClient, auth_headers): + """period=1d is not in {7d,30d,90d} — must 422.""" + response = await client.get(URL, headers=auth_headers, params={"period": "1d"}) + assert response.status_code == 422