From 686e2620e857cecb37bd91cb975a9af021c1041a Mon Sep 17 00:00:00 2001 From: chihlasm Date: Mon, 16 Feb 2026 00:30:39 -0500 Subject: [PATCH] feat: add team, personal, and flow analytics endpoints Co-Authored-By: Claude Opus 4.6 --- backend/app/api/endpoints/analytics.py | 404 +++++++++++++++++++++++++ backend/app/api/router.py | 3 +- backend/tests/test_analytics.py | 137 +++++++++ 3 files changed, 543 insertions(+), 1 deletion(-) create mode 100644 backend/app/api/endpoints/analytics.py create mode 100644 backend/tests/test_analytics.py diff --git a/backend/app/api/endpoints/analytics.py b/backend/app/api/endpoints/analytics.py new file mode 100644 index 00000000..41a3b015 --- /dev/null +++ b/backend/app/api/endpoints/analytics.py @@ -0,0 +1,404 @@ +from datetime import datetime, timezone, timedelta +from uuid import UUID +from typing import Optional +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlalchemy import select, func, case, cast, Date +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.database import get_db +from app.api.deps import get_current_active_user +from app.models import User, Session, Tree, SessionRating +from app.schemas.analytics import ( + TeamAnalyticsResponse, PersonalAnalyticsResponse, FlowAnalyticsResponse, + AnalyticsSummary, OutcomeBreakdown, TimeSeriesPoint, + TopFlow, TopEngineer, StepFeedbackSummary, FlowRatingItem, +) + +router = APIRouter(prefix="/analytics", tags=["analytics"]) + + +def _get_period_start(period: str) -> datetime: + days = {"7d": 7, "30d": 30, "90d": 90}.get(period, 30) + return datetime.now(timezone.utc) - timedelta(days=days) + + +async def _build_summary( + db: AsyncSession, + base_filters: list, + join_tree: bool = False, +) -> AnalyticsSummary: + """Build analytics summary. If join_tree=True, joins Session->Tree for account scoping.""" + + def _base_query(): + q = select(func.count()).select_from(Session) + if join_tree: + q = q.join(Tree, Session.tree_id == Tree.id) + return q + + # Total sessions + total_q = await db.execute(_base_query().where(*base_filters)) + total = total_q.scalar() or 0 + + # Completed sessions + completed_q = await db.execute( + _base_query().where(*base_filters, Session.completed_at.isnot(None)) + ) + completed = completed_q.scalar() or 0 + + # Median duration (minutes) using percentile_cont + duration_base = select( + func.percentile_cont(0.5).within_group( + func.extract('epoch', Session.completed_at - Session.started_at) / 60 + ) + ).select_from(Session) + if join_tree: + duration_base = duration_base.join(Tree, Session.tree_id == Tree.id) + duration_q = await db.execute( + duration_base.where(*base_filters, Session.completed_at.isnot(None)) + ) + raw_median = duration_q.scalar() + median_duration = round(float(raw_median), 1) if raw_median is not None else 0.0 + + # Active engineers (distinct users) + active_base = select(func.count(func.distinct(Session.user_id))).select_from(Session) + if join_tree: + active_base = active_base.join(Tree, Session.tree_id == Tree.id) + active_q = await db.execute(active_base.where(*base_filters)) + active_engineers = active_q.scalar() or 0 + + # Outcome breakdown + outcome_base = select(Session.outcome, func.count()).select_from(Session) + if join_tree: + outcome_base = outcome_base.join(Tree, Session.tree_id == Tree.id) + outcome_q = await db.execute( + outcome_base.where( + *base_filters, Session.completed_at.isnot(None), Session.outcome.isnot(None) + ).group_by(Session.outcome) + ) + outcomes = dict(outcome_q.all()) + + return AnalyticsSummary( + total_sessions=total, + completed_sessions=completed, + completion_rate=round(completed / total, 3) if total > 0 else 0.0, + median_duration_minutes=median_duration, + active_engineers=active_engineers, + outcome_breakdown=OutcomeBreakdown( + resolved=outcomes.get("resolved", 0), + escalated=outcomes.get("escalated", 0), + workaround=outcomes.get("workaround", 0), + unresolved=outcomes.get("unresolved", 0), + ), + ) + + +async def _build_time_series( + db: AsyncSession, + base_filters: list, + join_tree: bool = False, +) -> list[TimeSeriesPoint]: + """Build daily time-series using CASE expressions for outcome counting.""" + q = select( + cast(Session.started_at, Date).label("date"), + func.count().label("sessions"), + func.sum(case((Session.outcome == "resolved", 1), else_=0)).label("resolved"), + func.sum(case((Session.outcome == "escalated", 1), else_=0)).label("escalated"), + func.sum(case((Session.outcome == "workaround", 1), else_=0)).label("workaround"), + func.sum(case((Session.outcome == "unresolved", 1), else_=0)).label("unresolved"), + ).select_from(Session) + if join_tree: + q = q.join(Tree, Session.tree_id == Tree.id) + + rows = await db.execute( + q.where(*base_filters) + .group_by(cast(Session.started_at, Date)) + .order_by(cast(Session.started_at, Date)) + ) + return [ + TimeSeriesPoint( + date=str(row.date), sessions=row.sessions, + resolved=int(row.resolved or 0), escalated=int(row.escalated or 0), + workaround=int(row.workaround or 0), unresolved=int(row.unresolved or 0), + ) + for row in rows.all() + ] + + +@router.get("/team", response_model=TeamAnalyticsResponse) +async def get_team_analytics( + period: str = Query("30d", pattern="^(7d|30d|90d)$"), + engineer_id: Optional[UUID] = None, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_active_user), +): + """Team analytics - team_admin or super_admin only.""" + if not (current_user.is_team_admin or current_user.is_super_admin): + raise HTTPException(status_code=403, detail="Team admin access required") + + period_start = _get_period_start(period) + base_filters = [ + Session.started_at >= period_start, + Tree.account_id == current_user.account_id, + ] + if engineer_id: + base_filters.append(Session.user_id == engineer_id) + + summary = await _build_summary(db, base_filters, join_tree=True) + time_series = await _build_time_series(db, base_filters, join_tree=True) + + # Top flows (join Session->Tree) + top_flows_q = await db.execute( + select( + Tree.id, Tree.name, + func.count(Session.id).label("sessions"), + func.percentile_cont(0.5).within_group( + func.extract('epoch', Session.completed_at - Session.started_at) / 60 + ).label("median_duration"), + ) + .join(Tree, Session.tree_id == Tree.id) + .where(Session.started_at >= period_start, Tree.account_id == current_user.account_id) + .group_by(Tree.id, Tree.name) + .order_by(func.count(Session.id).desc()) + .limit(10) + ) + top_flows = [] + for row in top_flows_q.all(): + # Compute completion rate separately to avoid division by zero + completed_count_q = await db.execute( + select(func.count()).select_from(Session) + .where( + Session.tree_id == row.id, + Session.started_at >= period_start, + Session.completed_at.isnot(None), + ) + ) + completed_count = completed_count_q.scalar() or 0 + completion_rate = round(completed_count / row.sessions, 3) if row.sessions > 0 else 0.0 + top_flows.append( + TopFlow( + tree_id=str(row.id), name=row.name, sessions=row.sessions, + completion_rate=completion_rate, + median_duration_minutes=round(float(row.median_duration or 0), 1), + ) + ) + + # Top engineers (join Session->User + Session->Tree) + top_engineers_q = await db.execute( + select( + User.id, User.name, + func.count(Session.id).label("sessions"), + func.percentile_cont(0.5).within_group( + func.extract('epoch', Session.completed_at - Session.started_at) / 60 + ).label("median_duration"), + ) + .join(User, Session.user_id == User.id) + .join(Tree, Session.tree_id == Tree.id) + .where(Session.started_at >= period_start, Tree.account_id == current_user.account_id) + .group_by(User.id, User.name) + .order_by(func.count(Session.id).desc()) + .limit(10) + ) + top_engineers = [] + for row in top_engineers_q.all(): + completed_count_q = await db.execute( + select(func.count()).select_from(Session) + .join(Tree, Session.tree_id == Tree.id) + .where( + Session.user_id == row.id, + Session.started_at >= period_start, + Tree.account_id == current_user.account_id, + Session.completed_at.isnot(None), + ) + ) + completed_count = completed_count_q.scalar() or 0 + completion_rate = round(completed_count / row.sessions, 3) if row.sessions > 0 else 0.0 + top_engineers.append( + TopEngineer( + user_id=str(row.id), name=row.name or "Unknown", sessions=row.sessions, + completion_rate=completion_rate, + median_duration_minutes=round(float(row.median_duration or 0), 1), + ) + ) + + return TeamAnalyticsResponse( + summary=summary, time_series=time_series, + top_flows=top_flows, top_engineers=top_engineers, + ) + + +@router.get("/me", response_model=PersonalAnalyticsResponse) +async def get_personal_analytics( + period: str = Query("30d", pattern="^(7d|30d|90d)$"), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_active_user), +): + """Personal analytics - any authenticated user.""" + period_start = _get_period_start(period) + base_filters = [Session.started_at >= period_start, Session.user_id == current_user.id] + + summary = await _build_summary(db, base_filters, join_tree=False) + # Override active_engineers=1 for personal view + summary.active_engineers = 1 + time_series = await _build_time_series(db, base_filters, join_tree=False) + + # Top flows + top_flows_q = await db.execute( + select( + Tree.id, Tree.name, + func.count(Session.id).label("sessions"), + func.percentile_cont(0.5).within_group( + func.extract('epoch', Session.completed_at - Session.started_at) / 60 + ).label("median_duration"), + ) + .join(Tree, Session.tree_id == Tree.id) + .where(Session.started_at >= period_start, Session.user_id == current_user.id) + .group_by(Tree.id, Tree.name) + .order_by(func.count(Session.id).desc()) + .limit(10) + ) + top_flows = [] + for row in top_flows_q.all(): + completed_count_q = await db.execute( + select(func.count()).select_from(Session) + .where( + Session.tree_id == row.id, + Session.user_id == current_user.id, + Session.started_at >= period_start, + Session.completed_at.isnot(None), + ) + ) + completed_count = completed_count_q.scalar() or 0 + completion_rate = round(completed_count / row.sessions, 3) if row.sessions > 0 else 0.0 + top_flows.append( + TopFlow( + tree_id=str(row.id), name=row.name, sessions=row.sessions, + completion_rate=completion_rate, + median_duration_minutes=round(float(row.median_duration or 0), 1), + ) + ) + + return PersonalAnalyticsResponse( + summary=summary, time_series=time_series, top_flows=top_flows, + ) + + +@router.get("/flows/{tree_id}", response_model=FlowAnalyticsResponse) +async def get_flow_analytics( + tree_id: UUID, + period: str = Query("30d", pattern="^(7d|30d|90d)$"), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_active_user), +): + """Analytics for a specific flow.""" + # Verify tree exists + result = await db.execute(select(Tree).where(Tree.id == tree_id)) + tree = result.scalar_one_or_none() + if not tree: + raise HTTPException(status_code=404, detail="Flow not found") + + period_start = _get_period_start(period) + base_filters = [Session.started_at >= period_start, Session.tree_id == tree_id] + + summary = await _build_summary(db, base_filters, join_tree=False) + time_series = await _build_time_series(db, base_filters, join_tree=False) + + # CSAT stats + csat_q = await db.execute( + select(func.avg(SessionRating.rating), func.count()) + .where(SessionRating.tree_id == tree_id, SessionRating.created_at >= period_start) + ) + csat_row = csat_q.one() + avg_csat = round(float(csat_row[0]), 1) if csat_row[0] else None + total_ratings = csat_row[1] + + # Step feedback - compute from step_ratings for steps used in this tree's sessions + step_feedback: list[StepFeedbackSummary] = [] + + # Step dropoff analysis from session decisions JSONB + sessions_q = await db.execute( + select(Session.decisions, Session.completed_at) + .where(Session.tree_id == tree_id, Session.started_at >= period_start) + ) + sessions_data = sessions_q.all() + + node_visits: dict[str, int] = {} + node_dropoffs: dict[str, int] = {} + + for sess in sessions_data: + decisions = sess.decisions or [] + for decision in decisions: + node_id = decision.get("node_id", "") + if node_id: + node_visits[node_id] = node_visits.get(node_id, 0) + 1 + + # If session not completed, last decision node is a dropoff + if not sess.completed_at and decisions: + last_decision = decisions[-1] + last_node = last_decision.get("node_id", "") + if last_node: + node_dropoffs[last_node] = node_dropoffs.get(last_node, 0) + 1 + + # Build node title map from tree structure + node_title_map = _extract_node_titles(tree.tree_structure) + + # Build step feedback with dropoff data + for node_id in sorted(node_visits.keys()): + visits = node_visits.get(node_id, 0) + dropoffs = node_dropoffs.get(node_id, 0) + step_feedback.append(StepFeedbackSummary( + node_id=node_id, + node_title=node_title_map.get(node_id, "Unknown Step"), + helpful_yes=0, + helpful_no=0, + helpful_rate=0.0, + visit_count=visits, + dropoff_count=dropoffs, + dropoff_rate=round(dropoffs / visits, 3) if visits > 0 else 0.0, + )) + + # Sort by dropoff_rate descending + step_feedback.sort(key=lambda x: x.dropoff_rate, reverse=True) + + # Recent comments - ANONYMOUS (no user_name join) + comments_q = await db.execute( + select(SessionRating.rating, SessionRating.comment, SessionRating.created_at) + .where( + SessionRating.tree_id == tree_id, + SessionRating.comment.isnot(None), + SessionRating.comment != "", + ) + .order_by(SessionRating.created_at.desc()) + .limit(10) + ) + recent_comments = [ + FlowRatingItem( + rating=row.rating, + comment=row.comment, + created_at=row.created_at, + ) + for row in comments_q.all() + ] + + return FlowAnalyticsResponse( + summary=summary, avg_csat=avg_csat, total_ratings=total_ratings, + time_series=time_series, step_feedback=step_feedback, + recent_comments=recent_comments, + ) + + +def _extract_node_titles(tree_structure: dict) -> dict[str, str]: + """Recursively extract node_id -> title/question from tree structure.""" + titles = {} + + def walk(node): + if not isinstance(node, dict): + return + node_id = node.get("id", "") + title = node.get("title") or node.get("question") or "Unnamed" + if node_id: + titles[node_id] = title + for child in node.get("children", []): + walk(child) + + walk(tree_structure) + return titles diff --git a/backend/app/api/router.py b/backend/app/api/router.py index de0cffc5..2e96ddc7 100644 --- a/backend/app/api/router.py +++ b/backend/app/api/router.py @@ -1,7 +1,7 @@ from fastapi import APIRouter from app.api.endpoints import auth, trees, sessions, invite, categories, tags, folders, step_categories, steps, admin, accounts, webhooks, shares, shared, tree_markdown from app.api.endpoints import admin_dashboard, admin_audit, admin_plan_limits, admin_feature_flags, admin_settings, admin_categories -from app.api.endpoints import ratings +from app.api.endpoints import ratings, analytics api_router = APIRouter() @@ -27,3 +27,4 @@ api_router.include_router(shares.router) api_router.include_router(shared.router) # Public endpoints (no auth) api_router.include_router(tree_markdown.router) api_router.include_router(ratings.router) +api_router.include_router(analytics.router) diff --git a/backend/tests/test_analytics.py b/backend/tests/test_analytics.py new file mode 100644 index 00000000..33574b72 --- /dev/null +++ b/backend/tests/test_analytics.py @@ -0,0 +1,137 @@ +import pytest +from httpx import AsyncClient + +pytestmark = pytest.mark.asyncio + + +@pytest.fixture +async def team_admin(client: AsyncClient, test_db): + """Create a team admin user (gets own account via registration).""" + from uuid import UUID as PyUUID + from sqlalchemy import select + from app.models.user import User + + data = { + "email": "teamadmin@example.com", + "password": "TeamAdmin123!", + "name": "Team Admin" + } + response = await client.post("/api/v1/auth/register", json=data) + assert response.status_code in (200, 201), f"Failed: {response.text}" + + user_id = PyUUID(response.json()["id"]) + result = await test_db.execute(select(User).where(User.id == user_id)) + user = result.scalar_one() + user.is_team_admin = True + await test_db.commit() + + return {"email": data["email"], "password": data["password"], "user_data": response.json()} + + +@pytest.fixture +async def team_admin_headers(client: AsyncClient, team_admin: dict): + """Auth headers for team admin.""" + response = await client.post( + "/api/v1/auth/login/json", + json={"email": team_admin["email"], "password": team_admin["password"]}, + ) + assert response.status_code == 200 + return {"Authorization": f"Bearer {response.json()['access_token']}"} + + +async def test_team_analytics_success(client: AsyncClient, admin_auth_headers: dict): + """Super admin can access team analytics.""" + response = await client.get( + "/api/v1/analytics/team?period=30d", + headers=admin_auth_headers, + ) + assert response.status_code == 200 + data = response.json() + assert "summary" in data + assert "time_series" in data + assert "top_flows" in data + assert "top_engineers" in data + assert "total_sessions" in data["summary"] + assert "completion_rate" in data["summary"] + assert "median_duration_minutes" in data["summary"] + assert "active_engineers" in data["summary"] + + +async def test_team_analytics_team_admin(client: AsyncClient, team_admin_headers: dict): + """Team admin can also access team analytics.""" + response = await client.get( + "/api/v1/analytics/team", + headers=team_admin_headers, + ) + assert response.status_code == 200 + + +async def test_team_analytics_forbidden_for_engineer(client: AsyncClient, auth_headers: dict): + """Regular engineers cannot access team analytics.""" + response = await client.get( + "/api/v1/analytics/team", + headers=auth_headers, + ) + assert response.status_code == 403 + + +async def test_personal_analytics_success(client: AsyncClient, auth_headers: dict): + """Any user can access personal analytics.""" + response = await client.get( + "/api/v1/analytics/me?period=30d", + headers=auth_headers, + ) + assert response.status_code == 200 + data = response.json() + assert "summary" in data + assert "time_series" in data + assert "top_flows" in data + assert "median_duration_minutes" in data["summary"] + + +async def test_personal_analytics_empty(client: AsyncClient, auth_headers: dict): + """Personal analytics with no sessions returns zeroes.""" + response = await client.get( + "/api/v1/analytics/me?period=7d", + headers=auth_headers, + ) + assert response.status_code == 200 + data = response.json() + assert data["summary"]["total_sessions"] == 0 + assert data["summary"]["completion_rate"] == 0.0 + assert data["summary"]["median_duration_minutes"] == 0.0 + assert data["summary"]["active_engineers"] == 1 # always 1 for personal + + +async def test_flow_analytics_success(client: AsyncClient, auth_headers: dict, test_tree: dict): + """Can access analytics for a visible flow.""" + response = await client.get( + f"/api/v1/analytics/flows/{test_tree['id']}", + headers=auth_headers, + ) + assert response.status_code == 200 + data = response.json() + assert "summary" in data + assert "step_feedback" in data + assert "recent_comments" in data + assert "avg_csat" in data + assert "total_ratings" in data + + +async def test_flow_analytics_404(client: AsyncClient, auth_headers: dict): + """Non-existent flow returns 404.""" + import uuid + response = await client.get( + f"/api/v1/analytics/flows/{uuid.uuid4()}", + headers=auth_headers, + ) + assert response.status_code == 404 + + +async def test_invalid_period_rejected(client: AsyncClient, auth_headers: dict): + """Invalid period values are rejected.""" + response = await client.get( + "/api/v1/analytics/me?period=1y", + headers=auth_headers, + ) + assert response.status_code == 422