from datetime import datetime, timezone, timedelta from uuid import UUID from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import select, func, case, cast, Date from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_db from app.api.deps import get_current_active_user from app.models import User, Session, Tree, SessionRating from app.schemas.analytics import ( TeamAnalyticsResponse, PersonalAnalyticsResponse, FlowAnalyticsResponse, AnalyticsSummary, OutcomeBreakdown, TimeSeriesPoint, TopFlow, TopEngineer, StepFeedbackSummary, FlowRatingItem, ) router = APIRouter(prefix="/analytics", tags=["analytics"]) def _get_period_start(period: str) -> datetime: days = {"7d": 7, "30d": 30, "90d": 90}.get(period, 30) return datetime.now(timezone.utc) - timedelta(days=days) async def _build_summary( db: AsyncSession, base_filters: list, join_tree: bool = False, ) -> AnalyticsSummary: """Build analytics summary. If join_tree=True, joins Session->Tree for account scoping.""" def _base_query(): q = select(func.count()).select_from(Session) if join_tree: q = q.join(Tree, Session.tree_id == Tree.id) return q # Total sessions total_q = await db.execute(_base_query().where(*base_filters)) total = total_q.scalar() or 0 # Completed sessions completed_q = await db.execute( _base_query().where(*base_filters, Session.completed_at.isnot(None)) ) completed = completed_q.scalar() or 0 # Median duration (minutes) using percentile_cont duration_base = select( func.percentile_cont(0.5).within_group( func.extract('epoch', Session.completed_at - Session.started_at) / 60 ) ).select_from(Session) if join_tree: duration_base = duration_base.join(Tree, Session.tree_id == Tree.id) duration_q = await db.execute( duration_base.where(*base_filters, Session.completed_at.isnot(None)) ) raw_median = duration_q.scalar() median_duration = round(float(raw_median), 1) if raw_median is not None else 0.0 # Active engineers (distinct users) active_base = select(func.count(func.distinct(Session.user_id))).select_from(Session) if join_tree: active_base = active_base.join(Tree, Session.tree_id == Tree.id) active_q = await db.execute(active_base.where(*base_filters)) active_engineers = active_q.scalar() or 0 # Outcome breakdown outcome_base = select(Session.outcome, func.count()).select_from(Session) if join_tree: outcome_base = outcome_base.join(Tree, Session.tree_id == Tree.id) outcome_q = await db.execute( outcome_base.where( *base_filters, Session.completed_at.isnot(None), Session.outcome.isnot(None) ).group_by(Session.outcome) ) outcomes = dict(outcome_q.all()) return AnalyticsSummary( total_sessions=total, completed_sessions=completed, completion_rate=round(completed / total, 3) if total > 0 else 0.0, median_duration_minutes=median_duration, active_engineers=active_engineers, outcome_breakdown=OutcomeBreakdown( resolved=outcomes.get("resolved", 0), escalated=outcomes.get("escalated", 0), workaround=outcomes.get("workaround", 0), unresolved=outcomes.get("unresolved", 0), ), ) async def _build_time_series( db: AsyncSession, base_filters: list, join_tree: bool = False, ) -> list[TimeSeriesPoint]: """Build daily time-series using CASE expressions for outcome counting.""" q = select( cast(Session.started_at, Date).label("date"), func.count().label("sessions"), func.sum(case((Session.outcome == "resolved", 1), else_=0)).label("resolved"), func.sum(case((Session.outcome == "escalated", 1), else_=0)).label("escalated"), func.sum(case((Session.outcome == "workaround", 1), else_=0)).label("workaround"), func.sum(case((Session.outcome == "unresolved", 1), else_=0)).label("unresolved"), ).select_from(Session) if join_tree: q = q.join(Tree, Session.tree_id == Tree.id) rows = await db.execute( q.where(*base_filters) .group_by(cast(Session.started_at, Date)) .order_by(cast(Session.started_at, Date)) ) return [ TimeSeriesPoint( date=str(row.date), sessions=row.sessions, resolved=int(row.resolved or 0), escalated=int(row.escalated or 0), workaround=int(row.workaround or 0), unresolved=int(row.unresolved or 0), ) for row in rows.all() ] @router.get("/team", response_model=TeamAnalyticsResponse) async def get_team_analytics( period: str = Query("30d", pattern="^(7d|30d|90d)$"), engineer_id: Optional[UUID] = None, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_active_user), ): """Team analytics - team_admin or super_admin only.""" if not (current_user.is_team_admin or current_user.is_super_admin): raise HTTPException(status_code=403, detail="Team admin access required") period_start = _get_period_start(period) base_filters = [ Session.started_at >= period_start, Tree.account_id == current_user.account_id, ] if engineer_id: base_filters.append(Session.user_id == engineer_id) summary = await _build_summary(db, base_filters, join_tree=True) time_series = await _build_time_series(db, base_filters, join_tree=True) # Top flows (join Session->Tree) top_flows_q = await db.execute( select( Tree.id, Tree.name, func.count(Session.id).label("sessions"), func.percentile_cont(0.5).within_group( func.extract('epoch', Session.completed_at - Session.started_at) / 60 ).label("median_duration"), ) .join(Tree, Session.tree_id == Tree.id) .where(Session.started_at >= period_start, Tree.account_id == current_user.account_id) .group_by(Tree.id, Tree.name) .order_by(func.count(Session.id).desc()) .limit(10) ) top_flows = [] for row in top_flows_q.all(): # Compute completion rate separately to avoid division by zero completed_count_q = await db.execute( select(func.count()).select_from(Session) .where( Session.tree_id == row.id, Session.started_at >= period_start, Session.completed_at.isnot(None), ) ) completed_count = completed_count_q.scalar() or 0 completion_rate = round(completed_count / row.sessions, 3) if row.sessions > 0 else 0.0 top_flows.append( TopFlow( tree_id=str(row.id), name=row.name, sessions=row.sessions, completion_rate=completion_rate, median_duration_minutes=round(float(row.median_duration or 0), 1), ) ) # Top engineers (join Session->User + Session->Tree) top_engineers_q = await db.execute( select( User.id, User.name, func.count(Session.id).label("sessions"), func.percentile_cont(0.5).within_group( func.extract('epoch', Session.completed_at - Session.started_at) / 60 ).label("median_duration"), ) .join(User, Session.user_id == User.id) .join(Tree, Session.tree_id == Tree.id) .where(Session.started_at >= period_start, Tree.account_id == current_user.account_id) .group_by(User.id, User.name) .order_by(func.count(Session.id).desc()) .limit(10) ) top_engineers = [] for row in top_engineers_q.all(): completed_count_q = await db.execute( select(func.count()).select_from(Session) .join(Tree, Session.tree_id == Tree.id) .where( Session.user_id == row.id, Session.started_at >= period_start, Tree.account_id == current_user.account_id, Session.completed_at.isnot(None), ) ) completed_count = completed_count_q.scalar() or 0 completion_rate = round(completed_count / row.sessions, 3) if row.sessions > 0 else 0.0 top_engineers.append( TopEngineer( user_id=str(row.id), name=row.name or "Unknown", sessions=row.sessions, completion_rate=completion_rate, median_duration_minutes=round(float(row.median_duration or 0), 1), ) ) return TeamAnalyticsResponse( summary=summary, time_series=time_series, top_flows=top_flows, top_engineers=top_engineers, ) @router.get("/me", response_model=PersonalAnalyticsResponse) async def get_personal_analytics( period: str = Query("30d", pattern="^(7d|30d|90d)$"), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_active_user), ): """Personal analytics - any authenticated user.""" period_start = _get_period_start(period) base_filters = [Session.started_at >= period_start, Session.user_id == current_user.id] summary = await _build_summary(db, base_filters, join_tree=False) # Override active_engineers=1 for personal view summary.active_engineers = 1 time_series = await _build_time_series(db, base_filters, join_tree=False) # Top flows top_flows_q = await db.execute( select( Tree.id, Tree.name, func.count(Session.id).label("sessions"), func.percentile_cont(0.5).within_group( func.extract('epoch', Session.completed_at - Session.started_at) / 60 ).label("median_duration"), ) .join(Tree, Session.tree_id == Tree.id) .where(Session.started_at >= period_start, Session.user_id == current_user.id) .group_by(Tree.id, Tree.name) .order_by(func.count(Session.id).desc()) .limit(10) ) top_flows = [] for row in top_flows_q.all(): completed_count_q = await db.execute( select(func.count()).select_from(Session) .where( Session.tree_id == row.id, Session.user_id == current_user.id, Session.started_at >= period_start, Session.completed_at.isnot(None), ) ) completed_count = completed_count_q.scalar() or 0 completion_rate = round(completed_count / row.sessions, 3) if row.sessions > 0 else 0.0 top_flows.append( TopFlow( tree_id=str(row.id), name=row.name, sessions=row.sessions, completion_rate=completion_rate, median_duration_minutes=round(float(row.median_duration or 0), 1), ) ) return PersonalAnalyticsResponse( summary=summary, time_series=time_series, top_flows=top_flows, ) @router.get("/flows/{tree_id}", response_model=FlowAnalyticsResponse) async def get_flow_analytics( tree_id: UUID, period: str = Query("30d", pattern="^(7d|30d|90d)$"), db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_active_user), ): """Analytics for a specific flow.""" # Verify tree exists result = await db.execute(select(Tree).where(Tree.id == tree_id)) tree = result.scalar_one_or_none() if not tree: raise HTTPException(status_code=404, detail="Flow not found") period_start = _get_period_start(period) base_filters = [Session.started_at >= period_start, Session.tree_id == tree_id] summary = await _build_summary(db, base_filters, join_tree=False) time_series = await _build_time_series(db, base_filters, join_tree=False) # CSAT stats csat_q = await db.execute( select(func.avg(SessionRating.rating), func.count()) .where(SessionRating.tree_id == tree_id, SessionRating.created_at >= period_start) ) csat_row = csat_q.one() avg_csat = round(float(csat_row[0]), 1) if csat_row[0] else None total_ratings = csat_row[1] # Step feedback - compute from step_ratings for steps used in this tree's sessions step_feedback: list[StepFeedbackSummary] = [] # Step dropoff analysis from session decisions JSONB sessions_q = await db.execute( select(Session.decisions, Session.completed_at) .where(Session.tree_id == tree_id, Session.started_at >= period_start) ) sessions_data = sessions_q.all() node_visits: dict[str, int] = {} node_dropoffs: dict[str, int] = {} for sess in sessions_data: decisions = sess.decisions or [] for decision in decisions: node_id = decision.get("node_id", "") if node_id: node_visits[node_id] = node_visits.get(node_id, 0) + 1 # If session not completed, last decision node is a dropoff if not sess.completed_at and decisions: last_decision = decisions[-1] last_node = last_decision.get("node_id", "") if last_node: node_dropoffs[last_node] = node_dropoffs.get(last_node, 0) + 1 # Build node title map from tree structure node_title_map = _extract_node_titles(tree.tree_structure) # Build step feedback with dropoff data for node_id in sorted(node_visits.keys()): visits = node_visits.get(node_id, 0) dropoffs = node_dropoffs.get(node_id, 0) step_feedback.append(StepFeedbackSummary( node_id=node_id, node_title=node_title_map.get(node_id, "Unknown Step"), helpful_yes=0, helpful_no=0, helpful_rate=0.0, visit_count=visits, dropoff_count=dropoffs, dropoff_rate=round(dropoffs / visits, 3) if visits > 0 else 0.0, )) # Sort by dropoff_rate descending step_feedback.sort(key=lambda x: x.dropoff_rate, reverse=True) # Recent comments - ANONYMOUS (no user_name join) comments_q = await db.execute( select(SessionRating.rating, SessionRating.comment, SessionRating.created_at) .where( SessionRating.tree_id == tree_id, SessionRating.comment.isnot(None), SessionRating.comment != "", ) .order_by(SessionRating.created_at.desc()) .limit(10) ) recent_comments = [ FlowRatingItem( rating=row.rating, comment=row.comment, created_at=row.created_at, ) for row in comments_q.all() ] return FlowAnalyticsResponse( summary=summary, avg_csat=avg_csat, total_ratings=total_ratings, time_series=time_series, step_feedback=step_feedback, recent_comments=recent_comments, ) def _extract_node_titles(tree_structure: dict) -> dict[str, str]: """Recursively extract node_id -> title/question from tree structure.""" titles = {} def walk(node): if not isinstance(node, dict): return node_id = node.get("id", "") title = node.get("title") or node.get("question") or "Unnamed" if node_id: titles[node_id] = title for child in node.get("children", []): walk(child) walk(tree_structure) return titles