Files
resolutionflow/backend/tests/test_analytics_phase5.py
2026-03-20 00:08:09 +00:00

457 lines
16 KiB
Python

"""Tests for Phase 5 analytics endpoints: coverage heatmap and flow quality scoring."""
import uuid
from datetime import datetime, timezone, timedelta
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
pytestmark = pytest.mark.asyncio
# ─── Fixtures ────────────────────────────────────────────────────────────────
@pytest.fixture
async def team_admin(client: AsyncClient, test_db: AsyncSession):
"""Create a team admin user (registers → promotes to is_team_admin)."""
from sqlalchemy import select
from app.models.user import User
data = {
"email": "phase5admin@example.com",
"password": "TeamAdmin123!",
"name": "Phase5 Admin",
}
response = await client.post("/api/v1/auth/register", json=data)
assert response.status_code in (200, 201), response.text
user_id = uuid.UUID(response.json()["id"])
result = await test_db.execute(select(User).where(User.id == user_id))
user = result.scalar_one()
user.is_team_admin = True
await test_db.commit()
await test_db.refresh(user)
return {"email": data["email"], "password": data["password"], "user": user}
@pytest.fixture
async def team_admin_headers(client: AsyncClient, team_admin: dict):
"""Auth headers for the team admin fixture."""
response = await client.post(
"/api/v1/auth/login/json",
json={"email": team_admin["email"], "password": team_admin["password"]},
)
assert response.status_code == 200
return {"Authorization": f"Bearer {response.json()['access_token']}"}
@pytest.fixture
async def non_admin_headers(client: AsyncClient, test_db: AsyncSession, team_admin: dict):
"""Headers for a non-admin member of the same account (not owner, not team_admin)."""
from app.models.user import User
from app.core.security import get_password_hash
# Create a user directly — no registration route (registration makes them owner)
user = User(
id=uuid.uuid4(),
email="non_admin_phase5@example.com",
password_hash=get_password_hash("NonAdmin123!"),
name="Non Admin",
is_active=True,
is_team_admin=False,
role="engineer",
account_id=team_admin["user"].account_id,
account_role="viewer",
)
test_db.add(user)
await test_db.commit()
response = await client.post(
"/api/v1/auth/login/json",
json={"email": "non_admin_phase5@example.com", "password": "NonAdmin123!"},
)
assert response.status_code == 200
return {"Authorization": f"Bearer {response.json()['access_token']}"}
async def _seed_sessions(
db: AsyncSession,
account_id: uuid.UUID,
user_id: uuid.UUID,
*,
domain: str | None = "networking",
status: str = "resolved",
confidence_tier: str = "guided",
matched_flow_id: uuid.UUID | None = None,
count: int = 1,
created_days_ago: int = 1,
resolved_minutes: int = 15,
):
"""Insert AISession rows directly into the test DB."""
from app.models.ai_session import AISession
now = datetime.now(timezone.utc)
created_at = now - timedelta(days=created_days_ago)
resolved_at = created_at + timedelta(minutes=resolved_minutes) if status == "resolved" else None
sessions = []
for _ in range(count):
s = AISession(
id=uuid.uuid4(),
user_id=user_id,
account_id=account_id,
problem_domain=domain,
status=status,
confidence_tier=confidence_tier,
matched_flow_id=matched_flow_id,
created_at=created_at,
resolved_at=resolved_at,
)
db.add(s)
sessions.append(s)
await db.commit()
return sessions
async def _seed_flow(
db: AsyncSession,
account_id: uuid.UUID,
*,
name: str = "Test Flow",
tree_type: str = "troubleshooting",
is_active: bool = True,
) -> uuid.UUID:
"""Insert a Tree row directly into the test DB and return its id."""
from app.models.tree import Tree
flow = Tree(
id=uuid.uuid4(),
account_id=account_id,
name=name,
tree_type=tree_type,
is_active=is_active,
tree_structure={"id": "root", "type": "decision", "question": "test?", "options": [], "children": []},
visibility="team",
status="published",
)
db.add(flow)
await db.commit()
return flow.id
# ─── Coverage endpoint tests ──────────────────────────────────────────────────
class TestCoverageEndpoint:
async def test_requires_auth(self, client: AsyncClient, test_db: AsyncSession):
"""Unauthenticated requests are rejected."""
response = await client.get("/api/v1/analytics/flowpilot/coverage")
assert response.status_code == 401
async def test_requires_team_admin(
self,
client: AsyncClient,
test_db: AsyncSession,
non_admin_headers: dict,
):
"""Non-admin account members cannot access the coverage endpoint."""
response = await client.get(
"/api/v1/analytics/flowpilot/coverage",
headers=non_admin_headers,
)
assert response.status_code == 403
async def test_returns_domain_breakdown(
self,
client: AsyncClient,
test_db: AsyncSession,
team_admin: dict,
team_admin_headers: dict,
):
"""Coverage endpoint returns correct domain breakdown."""
account_id = team_admin["user"].account_id
user_id = team_admin["user"].id
assert account_id, "team_admin must have an account"
# Seed 3 resolved + 1 escalated in "networking", 2 resolved in "vpn"
await _seed_sessions(test_db, account_id, user_id, domain="networking", status="resolved", count=3)
await _seed_sessions(test_db, account_id, user_id, domain="networking", status="escalated", count=1)
await _seed_sessions(test_db, account_id, user_id, domain="vpn", status="resolved", count=2)
response = await client.get(
"/api/v1/analytics/flowpilot/coverage?period=30d",
headers=team_admin_headers,
)
assert response.status_code == 200
data = response.json()
assert "domains" in data
assert "unmapped_session_count" in data
assert "total_domains" in data
assert data["total_domains"] == 2
domains_by_name = {d["domain"]: d for d in data["domains"]}
assert "networking" in domains_by_name
networking = domains_by_name["networking"]
assert networking["session_count"] == 4
assert networking["resolution_rate"] == pytest.approx(0.75, abs=0.01)
assert networking["escalation_rate"] == pytest.approx(0.25, abs=0.01)
vpn = domains_by_name["vpn"]
assert vpn["session_count"] == 2
assert vpn["resolution_rate"] == pytest.approx(1.0, abs=0.01)
# Sorted by session_count descending
assert data["domains"][0]["domain"] == "networking"
async def test_counts_unmapped_sessions(
self,
client: AsyncClient,
test_db: AsyncSession,
team_admin: dict,
team_admin_headers: dict,
):
"""Sessions without a problem_domain are counted as unmapped."""
account_id = team_admin["user"].account_id
user_id = team_admin["user"].id
await _seed_sessions(test_db, account_id, user_id, domain=None, count=3)
await _seed_sessions(test_db, account_id, user_id, domain="storage", count=1)
response = await client.get(
"/api/v1/analytics/flowpilot/coverage",
headers=team_admin_headers,
)
assert response.status_code == 200
data = response.json()
assert data["unmapped_session_count"] == 3
assert data["total_domains"] == 1
async def test_handles_no_sessions(
self,
client: AsyncClient,
test_db: AsyncSession,
team_admin_headers: dict,
):
"""Coverage endpoint returns empty result gracefully when no sessions exist."""
response = await client.get(
"/api/v1/analytics/flowpilot/coverage",
headers=team_admin_headers,
)
assert response.status_code == 200
data = response.json()
assert data["domains"] == []
assert data["unmapped_session_count"] == 0
assert data["total_domains"] == 0
async def test_avg_resolution_minutes_populated(
self,
client: AsyncClient,
test_db: AsyncSession,
team_admin: dict,
team_admin_headers: dict,
):
"""avg_resolution_minutes is computed for resolved sessions."""
account_id = team_admin["user"].account_id
user_id = team_admin["user"].id
await _seed_sessions(
test_db, account_id, user_id,
domain="dns",
status="resolved",
resolved_minutes=30,
count=2,
)
response = await client.get(
"/api/v1/analytics/flowpilot/coverage",
headers=team_admin_headers,
)
assert response.status_code == 200
data = response.json()
dns_row = next(d for d in data["domains"] if d["domain"] == "dns")
assert dns_row["avg_resolution_minutes"] == pytest.approx(30.0, abs=1.0)
# ─── Flow quality endpoint tests ──────────────────────────────────────────────
class TestFlowQualityEndpoint:
async def test_requires_auth(self, client: AsyncClient, test_db: AsyncSession):
"""Unauthenticated requests are rejected."""
response = await client.get("/api/v1/analytics/flowpilot/flow-quality")
assert response.status_code == 401
async def test_requires_team_admin(
self,
client: AsyncClient,
test_db: AsyncSession,
non_admin_headers: dict,
):
"""Non-admin account members cannot access the flow quality endpoint."""
response = await client.get(
"/api/v1/analytics/flowpilot/flow-quality",
headers=non_admin_headers,
)
assert response.status_code == 403
async def test_handles_no_flows(
self,
client: AsyncClient,
test_db: AsyncSession,
team_admin_headers: dict,
):
"""Flow quality returns empty lists gracefully when no flows exist."""
response = await client.get(
"/api/v1/analytics/flowpilot/flow-quality",
headers=team_admin_headers,
)
assert response.status_code == 200
data = response.json()
assert data["flows"] == []
assert data["top_performers"] == []
assert data["needs_attention"] == []
async def test_returns_scored_flows(
self,
client: AsyncClient,
test_db: AsyncSession,
team_admin: dict,
team_admin_headers: dict,
):
"""Flow quality returns all active flows with quality_score field."""
account_id = team_admin["user"].account_id
user_id = team_admin["user"].id
flow_id = await _seed_flow(test_db, account_id, name="Network Diag")
await _seed_sessions(
test_db, account_id, user_id,
domain="networking",
status="resolved",
confidence_tier="guided",
matched_flow_id=flow_id,
count=4,
)
await _seed_sessions(
test_db, account_id, user_id,
domain="networking",
status="escalated",
confidence_tier="exploring",
matched_flow_id=flow_id,
count=1,
)
response = await client.get(
"/api/v1/analytics/flowpilot/flow-quality",
headers=team_admin_headers,
)
assert response.status_code == 200
data = response.json()
assert len(data["flows"]) >= 1
flow_row = next(f for f in data["flows"] if f["flow_id"] == str(flow_id))
assert flow_row["name"] == "Network Diag"
assert flow_row["usage_count"] == 5
assert flow_row["success_rate"] == pytest.approx(0.8, abs=0.01)
assert "quality_score" in flow_row
assert flow_row["quality_score"] > 0
async def test_flow_with_no_sessions_has_zero_quality(
self,
client: AsyncClient,
test_db: AsyncSession,
team_admin: dict,
team_admin_headers: dict,
):
"""Flows with no sessions receive quality_score of 0 and null success_rate."""
account_id = team_admin["user"].account_id
flow_id = await _seed_flow(test_db, account_id, name="Unused Flow")
response = await client.get(
"/api/v1/analytics/flowpilot/flow-quality",
headers=team_admin_headers,
)
assert response.status_code == 200
data = response.json()
row = next(f for f in data["flows"] if f["flow_id"] == str(flow_id))
assert row["quality_score"] == 0.0
assert row["success_rate"] is None
assert row["usage_count"] == 0
async def test_top_performers_and_needs_attention_populated(
self,
client: AsyncClient,
test_db: AsyncSession,
team_admin: dict,
team_admin_headers: dict,
):
"""top_performers contains high-quality flows; needs_attention flags low-performing ones."""
account_id = team_admin["user"].account_id
user_id = team_admin["user"].id
# High-quality flow: all resolved + guided
good_flow_id = await _seed_flow(test_db, account_id, name="Good Flow")
await _seed_sessions(
test_db, account_id, user_id,
domain="storage",
status="resolved",
confidence_tier="guided",
matched_flow_id=good_flow_id,
count=5,
)
# Low-quality flow: mostly escalated
bad_flow_id = await _seed_flow(test_db, account_id, name="Bad Flow")
await _seed_sessions(
test_db, account_id, user_id,
domain="storage",
status="escalated",
confidence_tier="discovery",
matched_flow_id=bad_flow_id,
count=4,
)
await _seed_sessions(
test_db, account_id, user_id,
domain="storage",
status="resolved",
confidence_tier="discovery",
matched_flow_id=bad_flow_id,
count=1,
)
response = await client.get(
"/api/v1/analytics/flowpilot/flow-quality",
headers=team_admin_headers,
)
assert response.status_code == 200
data = response.json()
top_ids = [f["flow_id"] for f in data["top_performers"]]
assert str(good_flow_id) in top_ids
attn_ids = [f["flow_id"] for f in data["needs_attention"]]
assert str(bad_flow_id) in attn_ids
async def test_sort_by_usage(
self,
client: AsyncClient,
test_db: AsyncSession,
team_admin: dict,
team_admin_headers: dict,
):
"""sort=usage orders flows by session count descending."""
account_id = team_admin["user"].account_id
user_id = team_admin["user"].id
flow_a = await _seed_flow(test_db, account_id, name="Flow A")
flow_b = await _seed_flow(test_db, account_id, name="Flow B")
await _seed_sessions(test_db, account_id, user_id, matched_flow_id=flow_a, count=1)
await _seed_sessions(test_db, account_id, user_id, matched_flow_id=flow_b, count=3)
response = await client.get(
"/api/v1/analytics/flowpilot/flow-quality?sort=usage",
headers=team_admin_headers,
)
assert response.status_code == 200
flows = response.json()["flows"]
usage_counts = [f["usage_count"] for f in flows]
assert usage_counts == sorted(usage_counts, reverse=True)