"""FlowPilot Analytics API — MTTR, resolution rates, knowledge coverage. Endpoints: GET /analytics/flowpilot?period=30d — Main dashboard data GET /analytics/flowpilot/knowledge-gaps — Knowledge gap report """ import logging from datetime import datetime, timezone, timedelta from typing import Annotated, Optional from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from sqlalchemy import select, func, case, cast, Date, extract from sqlalchemy.ext.asyncio import AsyncSession from app.core.rate_limit import limiter from app.api.deps import get_current_active_user, get_db, require_team_admin from app.models.user import User from app.models.tree import Tree from app.models.ai_session import AISession from app.models.flow_proposal import FlowProposal from app.models.psa_activity_log import PsaActivityLog from app.models.psa_post_log import PsaPostLog from app.models.category import TreeCategory from app.schemas.flowpilot_analytics import ( FlowPilotDashboard, MTTRDataPoint, DomainBreakdown, ConfidenceBreakdown, KnowledgeCoverage, DomainCoverage, PsaMetrics, CoverageDomainRow, CoverageResponse, FlowQualityRow, FlowQualityResponse, EnhancedPsaMetrics, PsaFunnel, PsaDailyTrend, ) from app.services.knowledge_gap_service import get_knowledge_gaps, KnowledgeGapReport logger = logging.getLogger(__name__) router = APIRouter(prefix="/analytics/flowpilot", tags=["flowpilot-analytics"]) def _get_period_start(period: str) -> datetime: days = {"7d": 7, "30d": 30, "90d": 90}.get(period, 30) return datetime.now(timezone.utc) - timedelta(days=days) @router.get("", response_model=FlowPilotDashboard) @limiter.limit("15/minute") async def get_dashboard( request: Request, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_team_admin), period: str = Query("30d", pattern="^(7d|30d|90d)$"), ): """Get FlowPilot analytics dashboard data.""" if not current_user.account_id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account") account_id = current_user.account_id period_start = _get_period_start(period) # ── Session counts ── counts_result = await db.execute( select( func.count(AISession.id).label("total"), func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved"), func.sum(case((AISession.status.in_(["escalated", "requesting_escalation"]), 1), else_=0)).label("escalated"), func.sum(case((AISession.status == "abandoned", 1), else_=0)).label("abandoned"), func.avg(case((AISession.status == "resolved", AISession.step_count), else_=None)).label("avg_steps"), func.avg(AISession.session_rating).label("avg_rating"), ) .where( AISession.account_id == account_id, AISession.created_at >= period_start, ) ) row = counts_result.one() total = int(row.total or 0) resolved = int(row.resolved or 0) escalated = int(row.escalated or 0) abandoned = int(row.abandoned or 0) avg_steps = float(row.avg_steps or 0) avg_rating = float(row.avg_rating) if row.avg_rating else None resolution_rate = (resolved / total * 100) if total > 0 else 0.0 # ── MTTR ── mttr_result = await db.execute( select( func.avg( extract("epoch", AISession.resolved_at - AISession.created_at) / 60 ).label("avg_mttr"), ) .where( AISession.account_id == account_id, AISession.created_at >= period_start, AISession.status == "resolved", AISession.resolved_at.isnot(None), ) ) mttr_row = mttr_result.one() mttr_minutes = float(mttr_row.avg_mttr) if mttr_row.avg_mttr else None # ── Average duration ── duration_result = await db.execute( select( func.avg( extract("epoch", AISession.resolved_at - AISession.created_at) / 60 ).label("avg_duration"), ) .where( AISession.account_id == account_id, AISession.created_at >= period_start, AISession.resolved_at.isnot(None), ) ) dur_row = duration_result.one() avg_duration = float(dur_row.avg_duration) if dur_row.avg_duration else 0.0 # ── MTTR trend ── mttr_trend_result = await db.execute( select( cast(AISession.resolved_at, Date).label("day"), func.avg( extract("epoch", AISession.resolved_at - AISession.created_at) / 60 ).label("mttr"), func.count(AISession.id).label("count"), ) .where( AISession.account_id == account_id, AISession.created_at >= period_start, AISession.status == "resolved", AISession.resolved_at.isnot(None), ) .group_by(cast(AISession.resolved_at, Date)) .order_by(cast(AISession.resolved_at, Date)) ) mttr_trend = [ MTTRDataPoint( date=str(r.day), mttr_minutes=round(float(r.mttr or 0), 1), session_count=r.count, ) for r in mttr_trend_result.all() ] # ── Domain breakdown ── domain_result = await db.execute( select( AISession.problem_domain, func.count(AISession.id).label("total"), func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved"), func.sum(case((AISession.status.in_(["escalated", "requesting_escalation"]), 1), else_=0)).label("escalated"), ) .where( AISession.account_id == account_id, AISession.created_at >= period_start, AISession.problem_domain.isnot(None), ) .group_by(AISession.problem_domain) .order_by(func.count(AISession.id).desc()) ) sessions_by_domain = [ DomainBreakdown( domain=r.problem_domain or "unknown", total=int(r.total or 0), resolved=int(r.resolved or 0), escalated=int(r.escalated or 0), resolution_rate=round(int(r.resolved or 0) / int(r.total) * 100, 1) if r.total else 0.0, ) for r in domain_result.all() ] # ── Confidence breakdown ── confidence_result = await db.execute( select( AISession.confidence_tier, func.count(AISession.id).label("total"), func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved"), ) .where( AISession.account_id == account_id, AISession.created_at >= period_start, AISession.status.in_(["resolved", "escalated", "requesting_escalation"]), ) .group_by(AISession.confidence_tier) ) conf_data = {r.confidence_tier: (int(r.total or 0), int(r.resolved or 0)) for r in confidence_result.all()} guided_total, guided_resolved = conf_data.get("guided", (0, 0)) exploring_total, exploring_resolved = conf_data.get("exploring", (0, 0)) discovery_total, discovery_resolved = conf_data.get("discovery", (0, 0)) confidence_breakdown = ConfidenceBreakdown( guided_sessions=guided_total, guided_resolution_rate=round(guided_resolved / guided_total * 100, 1) if guided_total > 0 else 0.0, exploring_sessions=exploring_total, exploring_resolution_rate=round(exploring_resolved / exploring_total * 100, 1) if exploring_total > 0 else 0.0, discovery_sessions=discovery_total, discovery_resolution_rate=round(discovery_resolved / discovery_total * 100, 1) if discovery_total > 0 else 0.0, ) # ── Knowledge coverage ── total_flows_result = await db.execute( select(func.count(Tree.id)).where(Tree.account_id == account_id) ) total_flows = total_flows_result.scalar() or 0 ai_flows_result = await db.execute( select(func.count(Tree.id)).where( Tree.account_id == account_id, Tree.origin.in_(["ai_generated", "ai_enhanced"]), ) ) ai_generated_flows = ai_flows_result.scalar() or 0 pending_proposals_result = await db.execute( select(func.count(FlowProposal.id)).where( FlowProposal.account_id == account_id, FlowProposal.status == "pending", ) ) total_proposals_pending = pending_proposals_result.scalar() or 0 approved_result = await db.execute( select(func.count(FlowProposal.id)).where( FlowProposal.account_id == account_id, FlowProposal.reviewed_at >= period_start, FlowProposal.status.in_(["approved", "modified"]), ) ) proposals_approved = approved_result.scalar() or 0 rejected_result = await db.execute( select(func.count(FlowProposal.id)).where( FlowProposal.account_id == account_id, FlowProposal.reviewed_at >= period_start, FlowProposal.status == "rejected", ) ) proposals_rejected = rejected_result.scalar() or 0 # Domain coverage domain_coverage_result = await db.execute( select( AISession.problem_domain, func.count(AISession.id).label("session_count"), func.sum(case((AISession.confidence_tier == "guided", 1), else_=0)).label("guided_count"), ) .where( AISession.account_id == account_id, AISession.created_at >= period_start, AISession.problem_domain.isnot(None), ) .group_by(AISession.problem_domain) ) # For now, flow_count per domain isn't directly available since Tree doesn't have problem_domain. # Use match_keywords or just report 0. We'll improve this in Phase 4 with better flow categorization. domain_cov_data = {} for r in domain_coverage_result.all(): domain = r.problem_domain or "unknown" sc = r.session_count or 0 gc = r.guided_count or 0 domain_cov_data[domain] = DomainCoverage( domain=domain, flow_count=0, # TODO: match via category/tags in Phase 4 session_count=sc, guided_rate=round(gc / sc * 100, 1) if sc > 0 else 0.0, ) knowledge_coverage = KnowledgeCoverage( total_flows=total_flows, ai_generated_flows=ai_generated_flows, total_proposals_pending=total_proposals_pending, proposals_approved_this_period=proposals_approved, proposals_rejected_this_period=proposals_rejected, coverage_by_domain=list(domain_cov_data.values()), ) # ── PSA metrics ── psa_metrics = None psa_linked = await db.execute( select(func.count(AISession.id)).where( AISession.account_id == account_id, AISession.created_at >= period_start, AISession.psa_ticket_id.isnot(None), ) ) psa_linked_count = psa_linked.scalar() or 0 if psa_linked_count > 0 and total > 0: psa_push_result = await db.execute( select( func.count(PsaPostLog.id).label("total_pushes"), func.sum(case((PsaPostLog.status == "success", 1), else_=0)).label("first_success"), func.sum(case( ((PsaPostLog.status == "success") & (PsaPostLog.retry_count > 0), 1), else_=0 )).label("retry_success"), ) .join(AISession, PsaPostLog.ai_session_id == AISession.id) .where( AISession.account_id == account_id, PsaPostLog.ai_session_id.isnot(None), PsaPostLog.posted_at >= period_start, ) ) push_row = psa_push_result.one() total_pushes = push_row.total_pushes or 0 first_success = push_row.first_success or 0 retry_success = push_row.retry_success or 0 psa_metrics = PsaMetrics( ticket_link_rate=round(psa_linked_count / total * 100, 1), auto_push_success_rate=round(first_success / total_pushes * 100, 1) if total_pushes > 0 else 0.0, auto_push_retry_success_rate=round(retry_success / total_pushes * 100, 1) if total_pushes > 0 else 0.0, total_time_entries_logged=0, # TODO: track from CW time entries total_hours_logged=0.0, ) return FlowPilotDashboard( period=period, total_sessions=total, resolved_sessions=resolved, escalated_sessions=escalated, abandoned_sessions=abandoned, resolution_rate=round(resolution_rate, 1), avg_steps_to_resolution=round(avg_steps, 1), avg_session_duration_minutes=round(avg_duration, 1), avg_rating=round(avg_rating, 2) if avg_rating else None, mttr_minutes=round(mttr_minutes, 1) if mttr_minutes else None, mttr_trend=mttr_trend, sessions_by_domain=sessions_by_domain, confidence_breakdown=confidence_breakdown, knowledge_coverage=knowledge_coverage, psa_metrics=psa_metrics, ) @router.get("/knowledge-gaps", response_model=KnowledgeGapReport) @limiter.limit("10/minute") async def get_knowledge_gaps_endpoint( request: Request, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_team_admin), period: str = Query("30d", pattern="^(7d|30d|90d)$"), ): """Get knowledge gap analysis report.""" if not current_user.account_id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account") days = {"7d": 7, "30d": 30, "90d": 90}.get(period, 30) return await get_knowledge_gaps(current_user.account_id, db, period_days=days) @router.get("/coverage", response_model=CoverageResponse) @limiter.limit("15/minute") async def get_coverage_heatmap( request: Request, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_team_admin), period: str = Query("30d", pattern="^(7d|30d|90d)$"), ): """Get coverage heatmap: sessions and flow coverage broken down by problem domain.""" if not current_user.account_id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account") account_id = current_user.account_id period_start = _get_period_start(period) # ── Session stats per domain ── domain_stats_result = await db.execute( select( AISession.problem_domain, func.count(AISession.id).label("session_count"), func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved_count"), func.sum(case((AISession.status.in_(["escalated", "requesting_escalation"]), 1), else_=0)).label("escalated_count"), func.sum(case((AISession.confidence_tier == "guided", 1), else_=0)).label("guided_count"), func.avg( case( ( (AISession.status == "resolved") & AISession.resolved_at.isnot(None), extract("epoch", AISession.resolved_at - AISession.created_at) / 60, ), else_=None, ) ).label("avg_resolution_minutes"), ) .where( AISession.account_id == account_id, AISession.created_at >= period_start, AISession.problem_domain.isnot(None), ) .group_by(AISession.problem_domain) .order_by(func.count(AISession.id).desc()) ) domain_rows = domain_stats_result.all() # ── Unmapped sessions (no problem_domain) ── unmapped_result = await db.execute( select(func.count(AISession.id)).where( AISession.account_id == account_id, AISession.created_at >= period_start, AISession.problem_domain.is_(None), ) ) unmapped_session_count = int(unmapped_result.scalar() or 0) # ── Flow counts per domain: match Category.name to problem_domain ── # Joins Tree → TreeCategory and groups by lowercased category name for case-insensitive matching flow_counts_result = await db.execute( select( func.lower(TreeCategory.name).label("domain"), func.count(Tree.id).label("flow_count"), ) .join(Tree, Tree.category_id == TreeCategory.id) .where( Tree.account_id == account_id, Tree.is_active.is_(True), Tree.deleted_at.is_(None), ) .group_by(func.lower(TreeCategory.name)) ) flow_counts_by_domain: dict[str, int] = { r.domain: int(r.flow_count) for r in flow_counts_result.all() } domains = [] for r in domain_rows: sc = int(r.session_count or 0) resolved = int(r.resolved_count or 0) escalated = int(r.escalated_count or 0) guided = int(r.guided_count or 0) domain_name = r.problem_domain or "unknown" avg_res = float(r.avg_resolution_minutes) if r.avg_resolution_minutes is not None else None domains.append( CoverageDomainRow( domain=domain_name, flow_count=flow_counts_by_domain.get(domain_name.lower(), 0), session_count=sc, resolution_rate=round(resolved / sc, 4) if sc > 0 else 0.0, escalation_rate=round(escalated / sc, 4) if sc > 0 else 0.0, guided_rate=round(guided / sc, 4) if sc > 0 else 0.0, avg_resolution_minutes=round(avg_res, 1) if avg_res is not None else None, ) ) return CoverageResponse( domains=domains, unmapped_session_count=unmapped_session_count, total_domains=len(domains), ) @router.get("/flow-quality", response_model=FlowQualityResponse) @limiter.limit("15/minute") async def get_flow_quality( request: Request, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_team_admin), period: str = Query("30d", pattern="^(7d|30d|90d)$"), sort: str = Query("quality", pattern="^(quality|usage|success_rate)$"), ): """Get flow quality scoring for all active flows in the account.""" if not current_user.account_id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account") account_id = current_user.account_id period_start = _get_period_start(period) now = datetime.now(timezone.utc) # ── Get all active flows (only needed columns — avoids loading large tree_structure JSONB) ── flows_result = await db.execute( select(Tree.id, Tree.name, Tree.tree_type).where( Tree.account_id == account_id, Tree.is_active.is_(True), Tree.deleted_at.is_(None), ) ) flows = flows_result.all() if not flows: return FlowQualityResponse(flows=[], top_performers=[], needs_attention=[]) flow_ids = [f.id for f in flows] # ── Session stats per flow within the period ── session_stats_result = await db.execute( select( AISession.matched_flow_id, func.count(AISession.id).label("total"), func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved"), func.sum(case((AISession.confidence_tier == "guided", 1), else_=0)).label("guided"), func.max(AISession.created_at).label("last_matched_at"), ) .where( AISession.account_id == account_id, AISession.matched_flow_id.in_(flow_ids), AISession.created_at >= period_start, ) .group_by(AISession.matched_flow_id) ) stats_by_flow: dict = {} for r in session_stats_result.all(): stats_by_flow[r.matched_flow_id] = { "total": int(r.total or 0), "resolved": int(r.resolved or 0), "guided": int(r.guided or 0), "last_matched_at": r.last_matched_at, } # ── Also get the most recent match ever (for recency score, regardless of period) ── recent_match_result = await db.execute( select( AISession.matched_flow_id, func.max(AISession.created_at).label("last_ever"), ) .where( AISession.account_id == account_id, AISession.matched_flow_id.in_(flow_ids), ) .group_by(AISession.matched_flow_id) ) last_ever_by_flow: dict = {r.matched_flow_id: r.last_ever for r in recent_match_result.all()} # ── Build scored rows ── scored_rows: list[FlowQualityRow] = [] for flow in flows: stats = stats_by_flow.get(flow.id) last_ever = last_ever_by_flow.get(flow.id) if stats and stats["total"] > 0: total = stats["total"] resolved = stats["resolved"] guided = stats["guided"] success_rate = resolved / total guided_rate = guided / total # Recency score based on last match ever if last_ever is not None: last_ever_aware = last_ever.replace(tzinfo=timezone.utc) if last_ever.tzinfo is None else last_ever days_since = (now - last_ever_aware).total_seconds() / 86400 recency_score = max(0.0, min(1.0, 1.0 - days_since / 90.0)) else: recency_score = 0.0 quality_score = round( (success_rate * 0.5) + (guided_rate * 0.3) + (recency_score * 0.2), 4, ) avg_confidence = round(guided_rate, 4) # guided_rate as confidence proxy last_matched_at = stats.get("last_matched_at") else: success_rate = None avg_confidence = None quality_score = 0.0 last_matched_at = last_ever # may be None if last_matched_at is not None and last_matched_at.tzinfo is None: last_matched_at = last_matched_at.replace(tzinfo=timezone.utc) scored_rows.append( FlowQualityRow( flow_id=str(flow.id), name=flow.name, tree_type=flow.tree_type, usage_count=stats["total"] if stats else 0, success_rate=round(success_rate, 4) if success_rate is not None else None, last_matched_at=last_matched_at, avg_confidence=avg_confidence, quality_score=quality_score, ) ) # ── Sort ── if sort == "usage": scored_rows.sort(key=lambda r: r.usage_count, reverse=True) elif sort == "success_rate": scored_rows.sort(key=lambda r: (r.success_rate is not None, r.success_rate or 0.0), reverse=True) else: scored_rows.sort(key=lambda r: r.quality_score, reverse=True) # ── Top performers: top 5 by quality_score with usage > 0 ── top_performers = [r for r in scored_rows if r.usage_count > 0] top_performers = sorted(top_performers, key=lambda r: r.quality_score, reverse=True)[:5] # ── Needs attention: used at least once, AND (success_rate < 0.5 OR not used in 30+ days) ── thirty_days_ago = now - timedelta(days=30) needs_attention = [] for r in scored_rows: if r.usage_count == 0: continue low_success = r.success_rate is not None and r.success_rate < 0.5 stale = r.last_matched_at is not None and r.last_matched_at < thirty_days_ago if low_success or stale: needs_attention.append(r) return FlowQualityResponse( flows=scored_rows, top_performers=top_performers, needs_attention=needs_attention, ) @router.get("/psa-metrics", response_model=EnhancedPsaMetrics) @limiter.limit("15/minute") async def get_enhanced_psa_metrics( request: Request, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_team_admin), period: str = Query("30d", pattern="^(7d|30d|90d)$"), ): """Get enhanced PSA integration metrics including time entry stats, push funnel, and daily trend.""" if not current_user.account_id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account") account_id = current_user.account_id period_start = _get_period_start(period) # ── Time entry totals from psa_activity_logs ── time_entry_result = await db.execute( select( func.count(PsaActivityLog.id).label("entry_count"), func.sum(PsaActivityLog.hours_logged).label("total_hours"), ) .where( PsaActivityLog.account_id == account_id, PsaActivityLog.activity_type == "time_entry_posted", PsaActivityLog.created_at >= period_start, ) ) te_row = time_entry_result.one() total_time_entries = int(te_row.entry_count or 0) total_hours_raw = te_row.total_hours or 0 total_hours_logged = round(float(total_hours_raw), 2) avg_hours = round(total_hours_logged / total_time_entries, 2) if total_time_entries > 0 else 0.0 # ── Push funnel ── # Total sessions in period total_sessions_result = await db.execute( select(func.count(AISession.id)).where( AISession.account_id == account_id, AISession.created_at >= period_start, ) ) total_sessions = int(total_sessions_result.scalar() or 0) # Sessions linked to a ticket linked_result = await db.execute( select(func.count(AISession.id)).where( AISession.account_id == account_id, AISession.created_at >= period_start, AISession.psa_ticket_id.isnot(None), ) ) linked_to_ticket = int(linked_result.scalar() or 0) # Sessions with a successful doc push (via PsaPostLog) — count unique sessions, not log entries doc_pushed_result = await db.execute( select(func.count(PsaPostLog.ai_session_id.distinct())) .join(AISession, PsaPostLog.ai_session_id == AISession.id) .where( AISession.account_id == account_id, PsaPostLog.ai_session_id.isnot(None), PsaPostLog.status == "success", PsaPostLog.posted_at >= period_start, ) ) doc_pushed = int(doc_pushed_result.scalar() or 0) # Sessions with a time entry logged (via psa_activity_logs) time_entry_sessions_result = await db.execute( select(func.count(PsaActivityLog.session_id.distinct())).where( PsaActivityLog.account_id == account_id, PsaActivityLog.activity_type == "time_entry_posted", PsaActivityLog.created_at >= period_start, PsaActivityLog.session_id.isnot(None), ) ) time_entry_logged = int(time_entry_sessions_result.scalar() or 0) push_funnel = PsaFunnel( total_sessions=total_sessions, linked_to_ticket=linked_to_ticket, doc_pushed=doc_pushed, time_entry_logged=time_entry_logged, ) # ── Daily trend (time entries grouped by date) ── daily_result = await db.execute( select( cast(PsaActivityLog.created_at, Date).label("day"), func.count(PsaActivityLog.id).label("entries"), func.sum(PsaActivityLog.hours_logged).label("hours"), ) .where( PsaActivityLog.account_id == account_id, PsaActivityLog.activity_type == "time_entry_posted", PsaActivityLog.created_at >= period_start, ) .group_by(cast(PsaActivityLog.created_at, Date)) .order_by(cast(PsaActivityLog.created_at, Date)) ) daily_trend = [ PsaDailyTrend( date=str(r.day), entries=int(r.entries or 0), hours=round(float(r.hours or 0), 2), ) for r in daily_result.all() ] return EnhancedPsaMetrics( total_time_entries=total_time_entries, total_hours_logged=total_hours_logged, avg_hours_per_session=avg_hours, push_funnel=push_funnel, daily_trend=daily_trend, )