"""Tests for Phase 5 analytics endpoints: coverage heatmap and flow quality scoring.""" import uuid from datetime import datetime, timezone, timedelta import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession pytestmark = pytest.mark.asyncio # ─── Fixtures ──────────────────────────────────────────────────────────────── @pytest.fixture async def team_admin(client: AsyncClient, test_db: AsyncSession): """Create a team admin user (registers → promotes to is_team_admin).""" from sqlalchemy import select from app.models.user import User data = { "email": "phase5admin@example.com", "password": "TeamAdmin123!", "name": "Phase5 Admin", } response = await client.post("/api/v1/auth/register", json=data) assert response.status_code in (200, 201), response.text user_id = uuid.UUID(response.json()["id"]) result = await test_db.execute(select(User).where(User.id == user_id)) user = result.scalar_one() user.is_team_admin = True await test_db.commit() await test_db.refresh(user) return {"email": data["email"], "password": data["password"], "user": user} @pytest.fixture async def team_admin_headers(client: AsyncClient, team_admin: dict): """Auth headers for the team admin fixture.""" response = await client.post( "/api/v1/auth/login/json", json={"email": team_admin["email"], "password": team_admin["password"]}, ) assert response.status_code == 200 return {"Authorization": f"Bearer {response.json()['access_token']}"} @pytest.fixture async def non_admin_headers(client: AsyncClient, test_db: AsyncSession, team_admin: dict): """Headers for a non-admin member of the same account (not owner, not team_admin).""" from app.models.user import User from app.core.security import get_password_hash # Create a user directly — no registration route (registration makes them owner) user = User( id=uuid.uuid4(), email="non_admin_phase5@example.com", password_hash=get_password_hash("NonAdmin123!"), name="Non Admin", is_active=True, is_team_admin=False, role="engineer", account_id=team_admin["user"].account_id, account_role="viewer", ) test_db.add(user) await test_db.commit() response = await client.post( "/api/v1/auth/login/json", json={"email": "non_admin_phase5@example.com", "password": "NonAdmin123!"}, ) assert response.status_code == 200 return {"Authorization": f"Bearer {response.json()['access_token']}"} async def _seed_sessions( db: AsyncSession, account_id: uuid.UUID, user_id: uuid.UUID, *, domain: str | None = "networking", status: str = "resolved", confidence_tier: str = "guided", matched_flow_id: uuid.UUID | None = None, count: int = 1, created_days_ago: int = 1, resolved_minutes: int = 15, ): """Insert AISession rows directly into the test DB.""" from app.models.ai_session import AISession now = datetime.now(timezone.utc) created_at = now - timedelta(days=created_days_ago) resolved_at = created_at + timedelta(minutes=resolved_minutes) if status == "resolved" else None sessions = [] for _ in range(count): s = AISession( id=uuid.uuid4(), user_id=user_id, account_id=account_id, problem_domain=domain, status=status, confidence_tier=confidence_tier, matched_flow_id=matched_flow_id, created_at=created_at, resolved_at=resolved_at, ) db.add(s) sessions.append(s) await db.commit() return sessions async def _seed_flow( db: AsyncSession, account_id: uuid.UUID, *, name: str = "Test Flow", tree_type: str = "troubleshooting", is_active: bool = True, ) -> uuid.UUID: """Insert a Tree row directly into the test DB and return its id.""" from app.models.tree import Tree flow = Tree( id=uuid.uuid4(), account_id=account_id, name=name, tree_type=tree_type, is_active=is_active, tree_structure={"id": "root", "type": "decision", "question": "test?", "options": [], "children": []}, visibility="team", status="published", ) db.add(flow) await db.commit() return flow.id # ─── Coverage endpoint tests ────────────────────────────────────────────────── class TestCoverageEndpoint: async def test_requires_auth(self, client: AsyncClient, test_db: AsyncSession): """Unauthenticated requests are rejected.""" response = await client.get("/api/v1/analytics/flowpilot/coverage") assert response.status_code == 401 async def test_requires_team_admin( self, client: AsyncClient, test_db: AsyncSession, non_admin_headers: dict, ): """Non-admin account members cannot access the coverage endpoint.""" response = await client.get( "/api/v1/analytics/flowpilot/coverage", headers=non_admin_headers, ) assert response.status_code == 403 async def test_returns_domain_breakdown( self, client: AsyncClient, test_db: AsyncSession, team_admin: dict, team_admin_headers: dict, ): """Coverage endpoint returns correct domain breakdown.""" account_id = team_admin["user"].account_id user_id = team_admin["user"].id assert account_id, "team_admin must have an account" # Seed 3 resolved + 1 escalated in "networking", 2 resolved in "vpn" await _seed_sessions(test_db, account_id, user_id, domain="networking", status="resolved", count=3) await _seed_sessions(test_db, account_id, user_id, domain="networking", status="escalated", count=1) await _seed_sessions(test_db, account_id, user_id, domain="vpn", status="resolved", count=2) response = await client.get( "/api/v1/analytics/flowpilot/coverage?period=30d", headers=team_admin_headers, ) assert response.status_code == 200 data = response.json() assert "domains" in data assert "unmapped_session_count" in data assert "total_domains" in data assert data["total_domains"] == 2 domains_by_name = {d["domain"]: d for d in data["domains"]} assert "networking" in domains_by_name networking = domains_by_name["networking"] assert networking["session_count"] == 4 assert networking["resolution_rate"] == pytest.approx(0.75, abs=0.01) assert networking["escalation_rate"] == pytest.approx(0.25, abs=0.01) vpn = domains_by_name["vpn"] assert vpn["session_count"] == 2 assert vpn["resolution_rate"] == pytest.approx(1.0, abs=0.01) # Sorted by session_count descending assert data["domains"][0]["domain"] == "networking" async def test_counts_unmapped_sessions( self, client: AsyncClient, test_db: AsyncSession, team_admin: dict, team_admin_headers: dict, ): """Sessions without a problem_domain are counted as unmapped.""" account_id = team_admin["user"].account_id user_id = team_admin["user"].id await _seed_sessions(test_db, account_id, user_id, domain=None, count=3) await _seed_sessions(test_db, account_id, user_id, domain="storage", count=1) response = await client.get( "/api/v1/analytics/flowpilot/coverage", headers=team_admin_headers, ) assert response.status_code == 200 data = response.json() assert data["unmapped_session_count"] == 3 assert data["total_domains"] == 1 async def test_handles_no_sessions( self, client: AsyncClient, test_db: AsyncSession, team_admin_headers: dict, ): """Coverage endpoint returns empty result gracefully when no sessions exist.""" response = await client.get( "/api/v1/analytics/flowpilot/coverage", headers=team_admin_headers, ) assert response.status_code == 200 data = response.json() assert data["domains"] == [] assert data["unmapped_session_count"] == 0 assert data["total_domains"] == 0 async def test_avg_resolution_minutes_populated( self, client: AsyncClient, test_db: AsyncSession, team_admin: dict, team_admin_headers: dict, ): """avg_resolution_minutes is computed for resolved sessions.""" account_id = team_admin["user"].account_id user_id = team_admin["user"].id await _seed_sessions( test_db, account_id, user_id, domain="dns", status="resolved", resolved_minutes=30, count=2, ) response = await client.get( "/api/v1/analytics/flowpilot/coverage", headers=team_admin_headers, ) assert response.status_code == 200 data = response.json() dns_row = next(d for d in data["domains"] if d["domain"] == "dns") assert dns_row["avg_resolution_minutes"] == pytest.approx(30.0, abs=1.0) # ─── Flow quality endpoint tests ────────────────────────────────────────────── class TestFlowQualityEndpoint: async def test_requires_auth(self, client: AsyncClient, test_db: AsyncSession): """Unauthenticated requests are rejected.""" response = await client.get("/api/v1/analytics/flowpilot/flow-quality") assert response.status_code == 401 async def test_requires_team_admin( self, client: AsyncClient, test_db: AsyncSession, non_admin_headers: dict, ): """Non-admin account members cannot access the flow quality endpoint.""" response = await client.get( "/api/v1/analytics/flowpilot/flow-quality", headers=non_admin_headers, ) assert response.status_code == 403 async def test_handles_no_flows( self, client: AsyncClient, test_db: AsyncSession, team_admin_headers: dict, ): """Flow quality returns empty lists gracefully when no flows exist.""" response = await client.get( "/api/v1/analytics/flowpilot/flow-quality", headers=team_admin_headers, ) assert response.status_code == 200 data = response.json() assert data["flows"] == [] assert data["top_performers"] == [] assert data["needs_attention"] == [] async def test_returns_scored_flows( self, client: AsyncClient, test_db: AsyncSession, team_admin: dict, team_admin_headers: dict, ): """Flow quality returns all active flows with quality_score field.""" account_id = team_admin["user"].account_id user_id = team_admin["user"].id flow_id = await _seed_flow(test_db, account_id, name="Network Diag") await _seed_sessions( test_db, account_id, user_id, domain="networking", status="resolved", confidence_tier="guided", matched_flow_id=flow_id, count=4, ) await _seed_sessions( test_db, account_id, user_id, domain="networking", status="escalated", confidence_tier="exploring", matched_flow_id=flow_id, count=1, ) response = await client.get( "/api/v1/analytics/flowpilot/flow-quality", headers=team_admin_headers, ) assert response.status_code == 200 data = response.json() assert len(data["flows"]) >= 1 flow_row = next(f for f in data["flows"] if f["flow_id"] == str(flow_id)) assert flow_row["name"] == "Network Diag" assert flow_row["usage_count"] == 5 assert flow_row["success_rate"] == pytest.approx(0.8, abs=0.01) assert "quality_score" in flow_row assert flow_row["quality_score"] > 0 async def test_flow_with_no_sessions_has_zero_quality( self, client: AsyncClient, test_db: AsyncSession, team_admin: dict, team_admin_headers: dict, ): """Flows with no sessions receive quality_score of 0 and null success_rate.""" account_id = team_admin["user"].account_id flow_id = await _seed_flow(test_db, account_id, name="Unused Flow") response = await client.get( "/api/v1/analytics/flowpilot/flow-quality", headers=team_admin_headers, ) assert response.status_code == 200 data = response.json() row = next(f for f in data["flows"] if f["flow_id"] == str(flow_id)) assert row["quality_score"] == 0.0 assert row["success_rate"] is None assert row["usage_count"] == 0 async def test_top_performers_and_needs_attention_populated( self, client: AsyncClient, test_db: AsyncSession, team_admin: dict, team_admin_headers: dict, ): """top_performers contains high-quality flows; needs_attention flags low-performing ones.""" account_id = team_admin["user"].account_id user_id = team_admin["user"].id # High-quality flow: all resolved + guided good_flow_id = await _seed_flow(test_db, account_id, name="Good Flow") await _seed_sessions( test_db, account_id, user_id, domain="storage", status="resolved", confidence_tier="guided", matched_flow_id=good_flow_id, count=5, ) # Low-quality flow: mostly escalated bad_flow_id = await _seed_flow(test_db, account_id, name="Bad Flow") await _seed_sessions( test_db, account_id, user_id, domain="storage", status="escalated", confidence_tier="discovery", matched_flow_id=bad_flow_id, count=4, ) await _seed_sessions( test_db, account_id, user_id, domain="storage", status="resolved", confidence_tier="discovery", matched_flow_id=bad_flow_id, count=1, ) response = await client.get( "/api/v1/analytics/flowpilot/flow-quality", headers=team_admin_headers, ) assert response.status_code == 200 data = response.json() top_ids = [f["flow_id"] for f in data["top_performers"]] assert str(good_flow_id) in top_ids attn_ids = [f["flow_id"] for f in data["needs_attention"]] assert str(bad_flow_id) in attn_ids async def test_sort_by_usage( self, client: AsyncClient, test_db: AsyncSession, team_admin: dict, team_admin_headers: dict, ): """sort=usage orders flows by session count descending.""" account_id = team_admin["user"].account_id user_id = team_admin["user"].id flow_a = await _seed_flow(test_db, account_id, name="Flow A") flow_b = await _seed_flow(test_db, account_id, name="Flow B") await _seed_sessions(test_db, account_id, user_id, matched_flow_id=flow_a, count=1) await _seed_sessions(test_db, account_id, user_id, matched_flow_id=flow_b, count=3) response = await client.get( "/api/v1/analytics/flowpilot/flow-quality?sort=usage", headers=team_admin_headers, ) assert response.status_code == 200 flows = response.json()["flows"] usage_counts = [f["usage_count"] for f in flows] assert usage_counts == sorted(usage_counts, reverse=True) # ─── PSA metrics endpoint tests ─────────────────────────────────────────────── class TestPsaMetrics: """Tests for GET /api/v1/analytics/flowpilot/psa-metrics.""" async def test_requires_auth(self, client: AsyncClient, test_db: AsyncSession): """Unauthenticated requests are rejected.""" response = await client.get("/api/v1/analytics/flowpilot/psa-metrics") assert response.status_code == 401 async def test_empty_state( self, client: AsyncClient, test_db: AsyncSession, team_admin_headers: dict, ): """When no PSA activity logs exist, returns zeros gracefully.""" response = await client.get( "/api/v1/analytics/flowpilot/psa-metrics", headers=team_admin_headers, ) assert response.status_code == 200 data = response.json() assert data["total_time_entries"] == 0 assert data["total_hours_logged"] == 0.0 assert data["avg_hours_per_session"] == 0.0 assert data["daily_trend"] == [] funnel = data["push_funnel"] assert funnel["total_sessions"] == 0 assert funnel["linked_to_ticket"] == 0 assert funnel["doc_pushed"] == 0 assert funnel["time_entry_logged"] == 0 async def test_returns_time_entry_metrics( self, client: AsyncClient, test_db: AsyncSession, team_admin: dict, team_admin_headers: dict, ): """Seeded time_entry_posted logs produce correct totals and averages.""" from app.models.psa_activity_log import PsaActivityLog account_id = team_admin["user"].account_id # 3 time entries: 1.5 + 2.0 + 0.5 = 4.0 hours total, avg = 4.0/3 ≈ 1.33 for hours in (1.5, 2.0, 0.5): log = PsaActivityLog( id=uuid.uuid4(), account_id=account_id, activity_type="time_entry_posted", hours_logged=hours, ) test_db.add(log) await test_db.commit() response = await client.get( "/api/v1/analytics/flowpilot/psa-metrics", headers=team_admin_headers, ) assert response.status_code == 200 data = response.json() assert data["total_time_entries"] == 3 assert data["total_hours_logged"] == pytest.approx(4.0, abs=0.01) assert data["avg_hours_per_session"] == pytest.approx(4.0 / 3, abs=0.01) async def test_non_time_entry_logs_excluded( self, client: AsyncClient, test_db: AsyncSession, team_admin: dict, team_admin_headers: dict, ): """Activity logs with a different activity_type do not count as time entries.""" from app.models.psa_activity_log import PsaActivityLog account_id = team_admin["user"].account_id # One real time entry, one "note_posted" that should be ignored test_db.add(PsaActivityLog( id=uuid.uuid4(), account_id=account_id, activity_type="time_entry_posted", hours_logged=1.0, )) test_db.add(PsaActivityLog( id=uuid.uuid4(), account_id=account_id, activity_type="note_posted", hours_logged=5.0, )) await test_db.commit() response = await client.get( "/api/v1/analytics/flowpilot/psa-metrics", headers=team_admin_headers, ) assert response.status_code == 200 data = response.json() assert data["total_time_entries"] == 1 assert data["total_hours_logged"] == pytest.approx(1.0, abs=0.01) async def test_funnel_counts( self, client: AsyncClient, test_db: AsyncSession, team_admin: dict, team_admin_headers: dict, ): """Funnel steps count the correct subset of sessions.""" from app.models.ai_session import AISession from app.models.psa_activity_log import PsaActivityLog from app.models.psa_post_log import PsaPostLog account_id = team_admin["user"].account_id user_id = team_admin["user"].id # 4 total sessions — 2 linked to a ticket session_ids = [uuid.uuid4() for _ in range(4)] for i, sid in enumerate(session_ids): s = AISession( id=sid, user_id=user_id, account_id=account_id, status="resolved", psa_ticket_id="TICKET-123" if i < 2 else None, ) test_db.add(s) await test_db.commit() # 1 session with a successful doc push push_session_id = session_ids[0] post_log = PsaPostLog( id=uuid.uuid4(), ai_session_id=push_session_id, account_id=account_id, ticket_id="TICKET-123", note_type="internal", content_posted="Session summary", status="success", posted_by=user_id, ) test_db.add(post_log) # 1 session with a time entry logged (same session as push for realism) activity_log = PsaActivityLog( id=uuid.uuid4(), account_id=account_id, session_id=push_session_id, activity_type="time_entry_posted", hours_logged=1.0, ) test_db.add(activity_log) await test_db.commit() response = await client.get( "/api/v1/analytics/flowpilot/psa-metrics", headers=team_admin_headers, ) assert response.status_code == 200 funnel = response.json()["push_funnel"] assert funnel["total_sessions"] == 4 assert funnel["linked_to_ticket"] == 2 assert funnel["doc_pushed"] == 1 assert funnel["time_entry_logged"] == 1 async def test_daily_trend( self, client: AsyncClient, test_db: AsyncSession, team_admin: dict, team_admin_headers: dict, ): """Time entries grouped by date produce the correct daily trend array.""" from app.models.psa_activity_log import PsaActivityLog account_id = team_admin["user"].account_id now = datetime.now(timezone.utc) # Day -2: 2 entries totalling 3.0 hours # Day -1: 1 entry with 1.5 hours entries = [ (now - timedelta(days=2), 1.5), (now - timedelta(days=2), 1.5), (now - timedelta(days=1), 1.5), ] for created_at, hours in entries: log = PsaActivityLog( id=uuid.uuid4(), account_id=account_id, activity_type="time_entry_posted", hours_logged=hours, created_at=created_at, ) test_db.add(log) await test_db.commit() response = await client.get( "/api/v1/analytics/flowpilot/psa-metrics?period=30d", headers=team_admin_headers, ) assert response.status_code == 200 trend = response.json()["daily_trend"] assert len(trend) == 2 # Trend is ordered by date ascending dates = [t["date"] for t in trend] assert dates == sorted(dates) # Oldest day: 2 entries, 3.0 hours oldest = trend[0] assert oldest["entries"] == 2 assert oldest["hours"] == pytest.approx(3.0, abs=0.01) # Most recent day: 1 entry, 1.5 hours newest = trend[1] assert newest["entries"] == 1 assert newest["hours"] == pytest.approx(1.5, abs=0.01)