feat(analytics): add coverage heatmap and flow quality scoring endpoints

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-20 00:08:09 +00:00
parent 779b29dbc4
commit 7ec626f45a
3 changed files with 746 additions and 0 deletions

View File

@@ -19,6 +19,7 @@ from app.models.tree import Tree
from app.models.ai_session import AISession
from app.models.flow_proposal import FlowProposal
from app.models.psa_post_log import PsaPostLog
from app.models.category import TreeCategory
from app.schemas.flowpilot_analytics import (
FlowPilotDashboard,
MTTRDataPoint,
@@ -27,6 +28,10 @@ from app.schemas.flowpilot_analytics import (
KnowledgeCoverage,
DomainCoverage,
PsaMetrics,
CoverageDomainRow,
CoverageResponse,
FlowQualityRow,
FlowQualityResponse,
)
from app.services.knowledge_gap_service import get_knowledge_gaps, KnowledgeGapReport
@@ -356,3 +361,255 @@ async def get_knowledge_gaps_endpoint(
days = {"7d": 7, "30d": 30, "90d": 90}.get(period, 30)
return await get_knowledge_gaps(current_user.account_id, db, period_days=days)
@router.get("/coverage", response_model=CoverageResponse)
@limiter.limit("15/minute")
async def get_coverage_heatmap(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_team_admin),
period: str = Query("30d", pattern="^(7d|30d|90d)$"),
):
"""Get coverage heatmap: sessions and flow coverage broken down by problem domain."""
if not current_user.account_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account")
account_id = current_user.account_id
period_start = _get_period_start(period)
# ── Session stats per domain ──
domain_stats_result = await db.execute(
select(
AISession.problem_domain,
func.count(AISession.id).label("session_count"),
func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved_count"),
func.sum(case((AISession.status.in_(["escalated", "requesting_escalation"]), 1), else_=0)).label("escalated_count"),
func.sum(case((AISession.confidence_tier == "guided", 1), else_=0)).label("guided_count"),
func.avg(
case(
(
(AISession.status == "resolved") & AISession.resolved_at.isnot(None),
extract("epoch", AISession.resolved_at - AISession.created_at) / 60,
),
else_=None,
)
).label("avg_resolution_minutes"),
)
.where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.problem_domain.isnot(None),
)
.group_by(AISession.problem_domain)
.order_by(func.count(AISession.id).desc())
)
domain_rows = domain_stats_result.all()
# ── Unmapped sessions (no problem_domain) ──
unmapped_result = await db.execute(
select(func.count(AISession.id)).where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.problem_domain.is_(None),
)
)
unmapped_session_count = int(unmapped_result.scalar() or 0)
# ── Flow counts per domain: match Category.name to problem_domain ──
# Joins Tree → TreeCategory and groups by category name
flow_counts_result = await db.execute(
select(
TreeCategory.name.label("domain"),
func.count(Tree.id).label("flow_count"),
)
.join(Tree, Tree.category_id == TreeCategory.id)
.where(
Tree.account_id == account_id,
Tree.is_active.is_(True),
Tree.deleted_at.is_(None),
)
.group_by(TreeCategory.name)
)
flow_counts_by_domain: dict[str, int] = {
r.domain: int(r.flow_count) for r in flow_counts_result.all()
}
domains = []
for r in domain_rows:
sc = int(r.session_count or 0)
resolved = int(r.resolved_count or 0)
escalated = int(r.escalated_count or 0)
guided = int(r.guided_count or 0)
domain_name = r.problem_domain or "unknown"
avg_res = float(r.avg_resolution_minutes) if r.avg_resolution_minutes is not None else None
domains.append(
CoverageDomainRow(
domain=domain_name,
flow_count=flow_counts_by_domain.get(domain_name, 0),
session_count=sc,
resolution_rate=round(resolved / sc, 4) if sc > 0 else 0.0,
escalation_rate=round(escalated / sc, 4) if sc > 0 else 0.0,
guided_rate=round(guided / sc, 4) if sc > 0 else 0.0,
avg_resolution_minutes=round(avg_res, 1) if avg_res is not None else None,
)
)
return CoverageResponse(
domains=domains,
unmapped_session_count=unmapped_session_count,
total_domains=len(domains),
)
@router.get("/flow-quality", response_model=FlowQualityResponse)
@limiter.limit("15/minute")
async def get_flow_quality(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_team_admin),
period: str = Query("30d", pattern="^(7d|30d|90d)$"),
sort: str = Query("quality", pattern="^(quality|usage|success_rate)$"),
):
"""Get flow quality scoring for all active flows in the account."""
if not current_user.account_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account")
account_id = current_user.account_id
period_start = _get_period_start(period)
now = datetime.now(timezone.utc)
# ── Get all active flows ──
flows_result = await db.execute(
select(Tree).where(
Tree.account_id == account_id,
Tree.is_active.is_(True),
Tree.deleted_at.is_(None),
)
)
flows = flows_result.scalars().all()
if not flows:
return FlowQualityResponse(flows=[], top_performers=[], needs_attention=[])
flow_ids = [f.id for f in flows]
# ── Session stats per flow within the period ──
session_stats_result = await db.execute(
select(
AISession.matched_flow_id,
func.count(AISession.id).label("total"),
func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved"),
func.sum(case((AISession.confidence_tier == "guided", 1), else_=0)).label("guided"),
func.max(AISession.created_at).label("last_matched_at"),
)
.where(
AISession.account_id == account_id,
AISession.matched_flow_id.in_(flow_ids),
AISession.created_at >= period_start,
)
.group_by(AISession.matched_flow_id)
)
stats_by_flow: dict = {}
for r in session_stats_result.all():
stats_by_flow[r.matched_flow_id] = {
"total": int(r.total or 0),
"resolved": int(r.resolved or 0),
"guided": int(r.guided or 0),
"last_matched_at": r.last_matched_at,
}
# ── Also get the most recent match ever (for recency score, regardless of period) ──
recent_match_result = await db.execute(
select(
AISession.matched_flow_id,
func.max(AISession.created_at).label("last_ever"),
)
.where(
AISession.account_id == account_id,
AISession.matched_flow_id.in_(flow_ids),
)
.group_by(AISession.matched_flow_id)
)
last_ever_by_flow: dict = {r.matched_flow_id: r.last_ever for r in recent_match_result.all()}
# ── Build scored rows ──
scored_rows: list[FlowQualityRow] = []
for flow in flows:
stats = stats_by_flow.get(flow.id)
last_ever = last_ever_by_flow.get(flow.id)
if stats and stats["total"] > 0:
total = stats["total"]
resolved = stats["resolved"]
guided = stats["guided"]
success_rate = resolved / total
guided_rate = guided / total
# Recency score based on last match ever
if last_ever is not None:
last_ever_aware = last_ever.replace(tzinfo=timezone.utc) if last_ever.tzinfo is None else last_ever
days_since = (now - last_ever_aware).total_seconds() / 86400
recency_score = max(0.0, 1.0 - days_since / 90.0)
else:
recency_score = 0.0
quality_score = round(
(success_rate * 0.5) + (guided_rate * 0.3) + (recency_score * 0.2),
4,
)
avg_confidence = round(guided_rate, 4) # guided_rate as confidence proxy
last_matched_at = stats.get("last_matched_at")
else:
success_rate = None
avg_confidence = None
quality_score = 0.0
last_matched_at = last_ever # may be None
if last_matched_at is not None and last_matched_at.tzinfo is None:
last_matched_at = last_matched_at.replace(tzinfo=timezone.utc)
scored_rows.append(
FlowQualityRow(
flow_id=str(flow.id),
name=flow.name,
tree_type=flow.tree_type,
usage_count=stats["total"] if stats else 0,
success_rate=round(success_rate, 4) if success_rate is not None else None,
last_matched_at=last_matched_at,
avg_confidence=avg_confidence,
quality_score=quality_score,
)
)
# ── Sort ──
if sort == "usage":
scored_rows.sort(key=lambda r: r.usage_count, reverse=True)
elif sort == "success_rate":
scored_rows.sort(key=lambda r: (r.success_rate is not None, r.success_rate or 0.0), reverse=True)
else:
scored_rows.sort(key=lambda r: r.quality_score, reverse=True)
# ── Top performers: top 5 by quality_score with usage > 0 ──
top_performers = [r for r in scored_rows if r.usage_count > 0]
top_performers = sorted(top_performers, key=lambda r: r.quality_score, reverse=True)[:5]
# ── Needs attention: used at least once, AND (success_rate < 0.5 OR not used in 30+ days) ──
thirty_days_ago = now - timedelta(days=30)
needs_attention = []
for r in scored_rows:
if r.usage_count == 0:
continue
low_success = r.success_rate is not None and r.success_rate < 0.5
stale = r.last_matched_at is not None and r.last_matched_at < thirty_days_ago
if low_success or stale:
needs_attention.append(r)
return FlowQualityResponse(
flows=scored_rows,
top_performers=top_performers,
needs_attention=needs_attention,
)