diff --git a/backend/app/api/endpoints/flowpilot_analytics.py b/backend/app/api/endpoints/flowpilot_analytics.py index 20b095cb..c92a55aa 100644 --- a/backend/app/api/endpoints/flowpilot_analytics.py +++ b/backend/app/api/endpoints/flowpilot_analytics.py @@ -19,6 +19,7 @@ from app.models.tree import Tree from app.models.ai_session import AISession from app.models.flow_proposal import FlowProposal from app.models.psa_post_log import PsaPostLog +from app.models.category import TreeCategory from app.schemas.flowpilot_analytics import ( FlowPilotDashboard, MTTRDataPoint, @@ -27,6 +28,10 @@ from app.schemas.flowpilot_analytics import ( KnowledgeCoverage, DomainCoverage, PsaMetrics, + CoverageDomainRow, + CoverageResponse, + FlowQualityRow, + FlowQualityResponse, ) from app.services.knowledge_gap_service import get_knowledge_gaps, KnowledgeGapReport @@ -356,3 +361,255 @@ async def get_knowledge_gaps_endpoint( days = {"7d": 7, "30d": 30, "90d": 90}.get(period, 30) return await get_knowledge_gaps(current_user.account_id, db, period_days=days) + + +@router.get("/coverage", response_model=CoverageResponse) +@limiter.limit("15/minute") +async def get_coverage_heatmap( + request: Request, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], + _: None = Depends(require_team_admin), + period: str = Query("30d", pattern="^(7d|30d|90d)$"), +): + """Get coverage heatmap: sessions and flow coverage broken down by problem domain.""" + if not current_user.account_id: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account") + + account_id = current_user.account_id + period_start = _get_period_start(period) + + # ── Session stats per domain ── + domain_stats_result = await db.execute( + select( + AISession.problem_domain, + func.count(AISession.id).label("session_count"), + func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved_count"), + func.sum(case((AISession.status.in_(["escalated", "requesting_escalation"]), 1), else_=0)).label("escalated_count"), + func.sum(case((AISession.confidence_tier == "guided", 1), else_=0)).label("guided_count"), + func.avg( + case( + ( + (AISession.status == "resolved") & AISession.resolved_at.isnot(None), + extract("epoch", AISession.resolved_at - AISession.created_at) / 60, + ), + else_=None, + ) + ).label("avg_resolution_minutes"), + ) + .where( + AISession.account_id == account_id, + AISession.created_at >= period_start, + AISession.problem_domain.isnot(None), + ) + .group_by(AISession.problem_domain) + .order_by(func.count(AISession.id).desc()) + ) + domain_rows = domain_stats_result.all() + + # ── Unmapped sessions (no problem_domain) ── + unmapped_result = await db.execute( + select(func.count(AISession.id)).where( + AISession.account_id == account_id, + AISession.created_at >= period_start, + AISession.problem_domain.is_(None), + ) + ) + unmapped_session_count = int(unmapped_result.scalar() or 0) + + # ── Flow counts per domain: match Category.name to problem_domain ── + # Joins Tree → TreeCategory and groups by category name + flow_counts_result = await db.execute( + select( + TreeCategory.name.label("domain"), + func.count(Tree.id).label("flow_count"), + ) + .join(Tree, Tree.category_id == TreeCategory.id) + .where( + Tree.account_id == account_id, + Tree.is_active.is_(True), + Tree.deleted_at.is_(None), + ) + .group_by(TreeCategory.name) + ) + flow_counts_by_domain: dict[str, int] = { + r.domain: int(r.flow_count) for r in flow_counts_result.all() + } + + domains = [] + for r in domain_rows: + sc = int(r.session_count or 0) + resolved = int(r.resolved_count or 0) + escalated = int(r.escalated_count or 0) + guided = int(r.guided_count or 0) + domain_name = r.problem_domain or "unknown" + avg_res = float(r.avg_resolution_minutes) if r.avg_resolution_minutes is not None else None + + domains.append( + CoverageDomainRow( + domain=domain_name, + flow_count=flow_counts_by_domain.get(domain_name, 0), + session_count=sc, + resolution_rate=round(resolved / sc, 4) if sc > 0 else 0.0, + escalation_rate=round(escalated / sc, 4) if sc > 0 else 0.0, + guided_rate=round(guided / sc, 4) if sc > 0 else 0.0, + avg_resolution_minutes=round(avg_res, 1) if avg_res is not None else None, + ) + ) + + return CoverageResponse( + domains=domains, + unmapped_session_count=unmapped_session_count, + total_domains=len(domains), + ) + + +@router.get("/flow-quality", response_model=FlowQualityResponse) +@limiter.limit("15/minute") +async def get_flow_quality( + request: Request, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], + _: None = Depends(require_team_admin), + period: str = Query("30d", pattern="^(7d|30d|90d)$"), + sort: str = Query("quality", pattern="^(quality|usage|success_rate)$"), +): + """Get flow quality scoring for all active flows in the account.""" + if not current_user.account_id: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account") + + account_id = current_user.account_id + period_start = _get_period_start(period) + now = datetime.now(timezone.utc) + + # ── Get all active flows ── + flows_result = await db.execute( + select(Tree).where( + Tree.account_id == account_id, + Tree.is_active.is_(True), + Tree.deleted_at.is_(None), + ) + ) + flows = flows_result.scalars().all() + + if not flows: + return FlowQualityResponse(flows=[], top_performers=[], needs_attention=[]) + + flow_ids = [f.id for f in flows] + + # ── Session stats per flow within the period ── + session_stats_result = await db.execute( + select( + AISession.matched_flow_id, + func.count(AISession.id).label("total"), + func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved"), + func.sum(case((AISession.confidence_tier == "guided", 1), else_=0)).label("guided"), + func.max(AISession.created_at).label("last_matched_at"), + ) + .where( + AISession.account_id == account_id, + AISession.matched_flow_id.in_(flow_ids), + AISession.created_at >= period_start, + ) + .group_by(AISession.matched_flow_id) + ) + stats_by_flow: dict = {} + for r in session_stats_result.all(): + stats_by_flow[r.matched_flow_id] = { + "total": int(r.total or 0), + "resolved": int(r.resolved or 0), + "guided": int(r.guided or 0), + "last_matched_at": r.last_matched_at, + } + + # ── Also get the most recent match ever (for recency score, regardless of period) ── + recent_match_result = await db.execute( + select( + AISession.matched_flow_id, + func.max(AISession.created_at).label("last_ever"), + ) + .where( + AISession.account_id == account_id, + AISession.matched_flow_id.in_(flow_ids), + ) + .group_by(AISession.matched_flow_id) + ) + last_ever_by_flow: dict = {r.matched_flow_id: r.last_ever for r in recent_match_result.all()} + + # ── Build scored rows ── + scored_rows: list[FlowQualityRow] = [] + for flow in flows: + stats = stats_by_flow.get(flow.id) + last_ever = last_ever_by_flow.get(flow.id) + + if stats and stats["total"] > 0: + total = stats["total"] + resolved = stats["resolved"] + guided = stats["guided"] + success_rate = resolved / total + guided_rate = guided / total + + # Recency score based on last match ever + if last_ever is not None: + last_ever_aware = last_ever.replace(tzinfo=timezone.utc) if last_ever.tzinfo is None else last_ever + days_since = (now - last_ever_aware).total_seconds() / 86400 + recency_score = max(0.0, 1.0 - days_since / 90.0) + else: + recency_score = 0.0 + + quality_score = round( + (success_rate * 0.5) + (guided_rate * 0.3) + (recency_score * 0.2), + 4, + ) + avg_confidence = round(guided_rate, 4) # guided_rate as confidence proxy + last_matched_at = stats.get("last_matched_at") + else: + success_rate = None + avg_confidence = None + quality_score = 0.0 + last_matched_at = last_ever # may be None + + if last_matched_at is not None and last_matched_at.tzinfo is None: + last_matched_at = last_matched_at.replace(tzinfo=timezone.utc) + + scored_rows.append( + FlowQualityRow( + flow_id=str(flow.id), + name=flow.name, + tree_type=flow.tree_type, + usage_count=stats["total"] if stats else 0, + success_rate=round(success_rate, 4) if success_rate is not None else None, + last_matched_at=last_matched_at, + avg_confidence=avg_confidence, + quality_score=quality_score, + ) + ) + + # ── Sort ── + if sort == "usage": + scored_rows.sort(key=lambda r: r.usage_count, reverse=True) + elif sort == "success_rate": + scored_rows.sort(key=lambda r: (r.success_rate is not None, r.success_rate or 0.0), reverse=True) + else: + scored_rows.sort(key=lambda r: r.quality_score, reverse=True) + + # ── Top performers: top 5 by quality_score with usage > 0 ── + top_performers = [r for r in scored_rows if r.usage_count > 0] + top_performers = sorted(top_performers, key=lambda r: r.quality_score, reverse=True)[:5] + + # ── Needs attention: used at least once, AND (success_rate < 0.5 OR not used in 30+ days) ── + thirty_days_ago = now - timedelta(days=30) + needs_attention = [] + for r in scored_rows: + if r.usage_count == 0: + continue + low_success = r.success_rate is not None and r.success_rate < 0.5 + stale = r.last_matched_at is not None and r.last_matched_at < thirty_days_ago + if low_success or stale: + needs_attention.append(r) + + return FlowQualityResponse( + flows=scored_rows, + top_performers=top_performers, + needs_attention=needs_attention, + ) diff --git a/backend/app/schemas/flowpilot_analytics.py b/backend/app/schemas/flowpilot_analytics.py index 2b918992..196fd24a 100644 --- a/backend/app/schemas/flowpilot_analytics.py +++ b/backend/app/schemas/flowpilot_analytics.py @@ -54,6 +54,39 @@ class PsaMetrics(BaseModel): total_hours_logged: float +class CoverageDomainRow(BaseModel): + domain: str + flow_count: int + session_count: int + resolution_rate: float + escalation_rate: float + guided_rate: float + avg_resolution_minutes: float | None = None + + +class CoverageResponse(BaseModel): + domains: list[CoverageDomainRow] + unmapped_session_count: int + total_domains: int + + +class FlowQualityRow(BaseModel): + flow_id: str + name: str + tree_type: str + usage_count: int + success_rate: float | None = None + last_matched_at: datetime | None = None + avg_confidence: float | None = None + quality_score: float + + +class FlowQualityResponse(BaseModel): + flows: list[FlowQualityRow] + top_performers: list[FlowQualityRow] + needs_attention: list[FlowQualityRow] + + class FlowPilotDashboard(BaseModel): period: str total_sessions: int diff --git a/backend/tests/test_analytics_phase5.py b/backend/tests/test_analytics_phase5.py new file mode 100644 index 00000000..db8dd5db --- /dev/null +++ b/backend/tests/test_analytics_phase5.py @@ -0,0 +1,456 @@ +"""Tests for Phase 5 analytics endpoints: coverage heatmap and flow quality scoring.""" +import uuid +from datetime import datetime, timezone, timedelta + +import pytest +from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession + +pytestmark = pytest.mark.asyncio + + +# ─── Fixtures ──────────────────────────────────────────────────────────────── + +@pytest.fixture +async def team_admin(client: AsyncClient, test_db: AsyncSession): + """Create a team admin user (registers → promotes to is_team_admin).""" + from sqlalchemy import select + from app.models.user import User + + data = { + "email": "phase5admin@example.com", + "password": "TeamAdmin123!", + "name": "Phase5 Admin", + } + response = await client.post("/api/v1/auth/register", json=data) + assert response.status_code in (200, 201), response.text + + user_id = uuid.UUID(response.json()["id"]) + result = await test_db.execute(select(User).where(User.id == user_id)) + user = result.scalar_one() + user.is_team_admin = True + await test_db.commit() + await test_db.refresh(user) + + return {"email": data["email"], "password": data["password"], "user": user} + + +@pytest.fixture +async def team_admin_headers(client: AsyncClient, team_admin: dict): + """Auth headers for the team admin fixture.""" + response = await client.post( + "/api/v1/auth/login/json", + json={"email": team_admin["email"], "password": team_admin["password"]}, + ) + assert response.status_code == 200 + return {"Authorization": f"Bearer {response.json()['access_token']}"} + + +@pytest.fixture +async def non_admin_headers(client: AsyncClient, test_db: AsyncSession, team_admin: dict): + """Headers for a non-admin member of the same account (not owner, not team_admin).""" + from app.models.user import User + from app.core.security import get_password_hash + + # Create a user directly — no registration route (registration makes them owner) + user = User( + id=uuid.uuid4(), + email="non_admin_phase5@example.com", + password_hash=get_password_hash("NonAdmin123!"), + name="Non Admin", + is_active=True, + is_team_admin=False, + role="engineer", + account_id=team_admin["user"].account_id, + account_role="viewer", + ) + test_db.add(user) + await test_db.commit() + + response = await client.post( + "/api/v1/auth/login/json", + json={"email": "non_admin_phase5@example.com", "password": "NonAdmin123!"}, + ) + assert response.status_code == 200 + return {"Authorization": f"Bearer {response.json()['access_token']}"} + + +async def _seed_sessions( + db: AsyncSession, + account_id: uuid.UUID, + user_id: uuid.UUID, + *, + domain: str | None = "networking", + status: str = "resolved", + confidence_tier: str = "guided", + matched_flow_id: uuid.UUID | None = None, + count: int = 1, + created_days_ago: int = 1, + resolved_minutes: int = 15, +): + """Insert AISession rows directly into the test DB.""" + from app.models.ai_session import AISession + + now = datetime.now(timezone.utc) + created_at = now - timedelta(days=created_days_ago) + resolved_at = created_at + timedelta(minutes=resolved_minutes) if status == "resolved" else None + + sessions = [] + for _ in range(count): + s = AISession( + id=uuid.uuid4(), + user_id=user_id, + account_id=account_id, + problem_domain=domain, + status=status, + confidence_tier=confidence_tier, + matched_flow_id=matched_flow_id, + created_at=created_at, + resolved_at=resolved_at, + ) + db.add(s) + sessions.append(s) + await db.commit() + return sessions + + +async def _seed_flow( + db: AsyncSession, + account_id: uuid.UUID, + *, + name: str = "Test Flow", + tree_type: str = "troubleshooting", + is_active: bool = True, +) -> uuid.UUID: + """Insert a Tree row directly into the test DB and return its id.""" + from app.models.tree import Tree + + flow = Tree( + id=uuid.uuid4(), + account_id=account_id, + name=name, + tree_type=tree_type, + is_active=is_active, + tree_structure={"id": "root", "type": "decision", "question": "test?", "options": [], "children": []}, + visibility="team", + status="published", + ) + db.add(flow) + await db.commit() + return flow.id + + +# ─── Coverage endpoint tests ────────────────────────────────────────────────── + +class TestCoverageEndpoint: + async def test_requires_auth(self, client: AsyncClient, test_db: AsyncSession): + """Unauthenticated requests are rejected.""" + response = await client.get("/api/v1/analytics/flowpilot/coverage") + assert response.status_code == 401 + + async def test_requires_team_admin( + self, + client: AsyncClient, + test_db: AsyncSession, + non_admin_headers: dict, + ): + """Non-admin account members cannot access the coverage endpoint.""" + response = await client.get( + "/api/v1/analytics/flowpilot/coverage", + headers=non_admin_headers, + ) + assert response.status_code == 403 + + async def test_returns_domain_breakdown( + self, + client: AsyncClient, + test_db: AsyncSession, + team_admin: dict, + team_admin_headers: dict, + ): + """Coverage endpoint returns correct domain breakdown.""" + account_id = team_admin["user"].account_id + user_id = team_admin["user"].id + assert account_id, "team_admin must have an account" + + # Seed 3 resolved + 1 escalated in "networking", 2 resolved in "vpn" + await _seed_sessions(test_db, account_id, user_id, domain="networking", status="resolved", count=3) + await _seed_sessions(test_db, account_id, user_id, domain="networking", status="escalated", count=1) + await _seed_sessions(test_db, account_id, user_id, domain="vpn", status="resolved", count=2) + + response = await client.get( + "/api/v1/analytics/flowpilot/coverage?period=30d", + headers=team_admin_headers, + ) + assert response.status_code == 200 + data = response.json() + + assert "domains" in data + assert "unmapped_session_count" in data + assert "total_domains" in data + assert data["total_domains"] == 2 + + domains_by_name = {d["domain"]: d for d in data["domains"]} + assert "networking" in domains_by_name + networking = domains_by_name["networking"] + assert networking["session_count"] == 4 + assert networking["resolution_rate"] == pytest.approx(0.75, abs=0.01) + assert networking["escalation_rate"] == pytest.approx(0.25, abs=0.01) + + vpn = domains_by_name["vpn"] + assert vpn["session_count"] == 2 + assert vpn["resolution_rate"] == pytest.approx(1.0, abs=0.01) + + # Sorted by session_count descending + assert data["domains"][0]["domain"] == "networking" + + async def test_counts_unmapped_sessions( + self, + client: AsyncClient, + test_db: AsyncSession, + team_admin: dict, + team_admin_headers: dict, + ): + """Sessions without a problem_domain are counted as unmapped.""" + account_id = team_admin["user"].account_id + user_id = team_admin["user"].id + + await _seed_sessions(test_db, account_id, user_id, domain=None, count=3) + await _seed_sessions(test_db, account_id, user_id, domain="storage", count=1) + + response = await client.get( + "/api/v1/analytics/flowpilot/coverage", + headers=team_admin_headers, + ) + assert response.status_code == 200 + data = response.json() + assert data["unmapped_session_count"] == 3 + assert data["total_domains"] == 1 + + async def test_handles_no_sessions( + self, + client: AsyncClient, + test_db: AsyncSession, + team_admin_headers: dict, + ): + """Coverage endpoint returns empty result gracefully when no sessions exist.""" + response = await client.get( + "/api/v1/analytics/flowpilot/coverage", + headers=team_admin_headers, + ) + assert response.status_code == 200 + data = response.json() + assert data["domains"] == [] + assert data["unmapped_session_count"] == 0 + assert data["total_domains"] == 0 + + async def test_avg_resolution_minutes_populated( + self, + client: AsyncClient, + test_db: AsyncSession, + team_admin: dict, + team_admin_headers: dict, + ): + """avg_resolution_minutes is computed for resolved sessions.""" + account_id = team_admin["user"].account_id + user_id = team_admin["user"].id + await _seed_sessions( + test_db, account_id, user_id, + domain="dns", + status="resolved", + resolved_minutes=30, + count=2, + ) + + response = await client.get( + "/api/v1/analytics/flowpilot/coverage", + headers=team_admin_headers, + ) + assert response.status_code == 200 + data = response.json() + dns_row = next(d for d in data["domains"] if d["domain"] == "dns") + assert dns_row["avg_resolution_minutes"] == pytest.approx(30.0, abs=1.0) + + +# ─── Flow quality endpoint tests ────────────────────────────────────────────── + +class TestFlowQualityEndpoint: + async def test_requires_auth(self, client: AsyncClient, test_db: AsyncSession): + """Unauthenticated requests are rejected.""" + response = await client.get("/api/v1/analytics/flowpilot/flow-quality") + assert response.status_code == 401 + + async def test_requires_team_admin( + self, + client: AsyncClient, + test_db: AsyncSession, + non_admin_headers: dict, + ): + """Non-admin account members cannot access the flow quality endpoint.""" + response = await client.get( + "/api/v1/analytics/flowpilot/flow-quality", + headers=non_admin_headers, + ) + assert response.status_code == 403 + + async def test_handles_no_flows( + self, + client: AsyncClient, + test_db: AsyncSession, + team_admin_headers: dict, + ): + """Flow quality returns empty lists gracefully when no flows exist.""" + response = await client.get( + "/api/v1/analytics/flowpilot/flow-quality", + headers=team_admin_headers, + ) + assert response.status_code == 200 + data = response.json() + assert data["flows"] == [] + assert data["top_performers"] == [] + assert data["needs_attention"] == [] + + async def test_returns_scored_flows( + self, + client: AsyncClient, + test_db: AsyncSession, + team_admin: dict, + team_admin_headers: dict, + ): + """Flow quality returns all active flows with quality_score field.""" + account_id = team_admin["user"].account_id + user_id = team_admin["user"].id + flow_id = await _seed_flow(test_db, account_id, name="Network Diag") + await _seed_sessions( + test_db, account_id, user_id, + domain="networking", + status="resolved", + confidence_tier="guided", + matched_flow_id=flow_id, + count=4, + ) + await _seed_sessions( + test_db, account_id, user_id, + domain="networking", + status="escalated", + confidence_tier="exploring", + matched_flow_id=flow_id, + count=1, + ) + + response = await client.get( + "/api/v1/analytics/flowpilot/flow-quality", + headers=team_admin_headers, + ) + assert response.status_code == 200 + data = response.json() + assert len(data["flows"]) >= 1 + + flow_row = next(f for f in data["flows"] if f["flow_id"] == str(flow_id)) + assert flow_row["name"] == "Network Diag" + assert flow_row["usage_count"] == 5 + assert flow_row["success_rate"] == pytest.approx(0.8, abs=0.01) + assert "quality_score" in flow_row + assert flow_row["quality_score"] > 0 + + async def test_flow_with_no_sessions_has_zero_quality( + self, + client: AsyncClient, + test_db: AsyncSession, + team_admin: dict, + team_admin_headers: dict, + ): + """Flows with no sessions receive quality_score of 0 and null success_rate.""" + account_id = team_admin["user"].account_id + flow_id = await _seed_flow(test_db, account_id, name="Unused Flow") + + response = await client.get( + "/api/v1/analytics/flowpilot/flow-quality", + headers=team_admin_headers, + ) + assert response.status_code == 200 + data = response.json() + row = next(f for f in data["flows"] if f["flow_id"] == str(flow_id)) + assert row["quality_score"] == 0.0 + assert row["success_rate"] is None + assert row["usage_count"] == 0 + + async def test_top_performers_and_needs_attention_populated( + self, + client: AsyncClient, + test_db: AsyncSession, + team_admin: dict, + team_admin_headers: dict, + ): + """top_performers contains high-quality flows; needs_attention flags low-performing ones.""" + account_id = team_admin["user"].account_id + user_id = team_admin["user"].id + + # High-quality flow: all resolved + guided + good_flow_id = await _seed_flow(test_db, account_id, name="Good Flow") + await _seed_sessions( + test_db, account_id, user_id, + domain="storage", + status="resolved", + confidence_tier="guided", + matched_flow_id=good_flow_id, + count=5, + ) + + # Low-quality flow: mostly escalated + bad_flow_id = await _seed_flow(test_db, account_id, name="Bad Flow") + await _seed_sessions( + test_db, account_id, user_id, + domain="storage", + status="escalated", + confidence_tier="discovery", + matched_flow_id=bad_flow_id, + count=4, + ) + await _seed_sessions( + test_db, account_id, user_id, + domain="storage", + status="resolved", + confidence_tier="discovery", + matched_flow_id=bad_flow_id, + count=1, + ) + + response = await client.get( + "/api/v1/analytics/flowpilot/flow-quality", + headers=team_admin_headers, + ) + assert response.status_code == 200 + data = response.json() + + top_ids = [f["flow_id"] for f in data["top_performers"]] + assert str(good_flow_id) in top_ids + + attn_ids = [f["flow_id"] for f in data["needs_attention"]] + assert str(bad_flow_id) in attn_ids + + async def test_sort_by_usage( + self, + client: AsyncClient, + test_db: AsyncSession, + team_admin: dict, + team_admin_headers: dict, + ): + """sort=usage orders flows by session count descending.""" + account_id = team_admin["user"].account_id + user_id = team_admin["user"].id + + flow_a = await _seed_flow(test_db, account_id, name="Flow A") + flow_b = await _seed_flow(test_db, account_id, name="Flow B") + + await _seed_sessions(test_db, account_id, user_id, matched_flow_id=flow_a, count=1) + await _seed_sessions(test_db, account_id, user_id, matched_flow_id=flow_b, count=3) + + response = await client.get( + "/api/v1/analytics/flowpilot/flow-quality?sort=usage", + headers=team_admin_headers, + ) + assert response.status_code == 200 + flows = response.json()["flows"] + usage_counts = [f["usage_count"] for f in flows] + assert usage_counts == sorted(usage_counts, reverse=True)