Service layer (production code): - branch_manager: set account_id on SessionBranch (root + fork) and ForkPoint from session.account_id; load session in create_fork for this purpose - handoff_manager: set account_id on SessionHandoff from session.account_id - ai_suggestions endpoint: set account_id on AISuggestion from current_user - steps endpoint (/feedback): set account_id on StepRating from current_user - ratings endpoint: set account_id on StepRating from current_user Test infrastructure: - conftest.py: seed PLATFORM_ACCOUNT_ID (00000000-...-0001) account after Base.metadata.create_all so global categories and gallery items have a valid FK - test_rls_isolation: add _ensure_rls_schema fixture that runs 'alembic upgrade head' before module tests — previous function-scoped test_db fixtures drop the schema, leaving the RLS tests with no tables - test_branding: create Account before User in helper functions - test_admin_gallery: set account_id=PLATFORM_ACCOUNT_ID on Tree/ScriptTemplate - test_public_templates: set account_id=PLATFORM_ACCOUNT_ID on Tree, ScriptTemplate, TreeCategory - test_resolution_outputs: set account_id=session.account_id on SessionResolutionOutput - test_analytics_phase5: set account_id on PsaPostLog - test_draft_trees: replace account_id=None with PLATFORM_ACCOUNT_ID in migration default test (NOT NULL now enforced) - test_maintenance_schedules: set account_id on other_tree - test_save_session_as_tree: set account_id on all 5 Session() constructors Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
682 lines
23 KiB
Python
682 lines
23 KiB
Python
"""Tests for Phase 5 analytics endpoints: coverage heatmap and flow quality scoring."""
|
|
import uuid
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
|
|
# ─── Fixtures ────────────────────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
async def team_admin(client: AsyncClient, test_db: AsyncSession):
|
|
"""Create a team admin user (registers → promotes to is_team_admin)."""
|
|
from sqlalchemy import select
|
|
from app.models.user import User
|
|
|
|
data = {
|
|
"email": "phase5admin@example.com",
|
|
"password": "TeamAdmin123!",
|
|
"name": "Phase5 Admin",
|
|
}
|
|
response = await client.post("/api/v1/auth/register", json=data)
|
|
assert response.status_code in (200, 201), response.text
|
|
|
|
user_id = uuid.UUID(response.json()["id"])
|
|
result = await test_db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one()
|
|
user.is_team_admin = True
|
|
await test_db.commit()
|
|
await test_db.refresh(user)
|
|
|
|
return {"email": data["email"], "password": data["password"], "user": user}
|
|
|
|
|
|
@pytest.fixture
|
|
async def team_admin_headers(client: AsyncClient, team_admin: dict):
|
|
"""Auth headers for the team admin fixture."""
|
|
response = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": team_admin["email"], "password": team_admin["password"]},
|
|
)
|
|
assert response.status_code == 200
|
|
return {"Authorization": f"Bearer {response.json()['access_token']}"}
|
|
|
|
|
|
@pytest.fixture
|
|
async def non_admin_headers(client: AsyncClient, test_db: AsyncSession, team_admin: dict):
|
|
"""Headers for a non-admin member of the same account (not owner, not team_admin)."""
|
|
from app.models.user import User
|
|
from app.core.security import get_password_hash
|
|
|
|
# Create a user directly — no registration route (registration makes them owner)
|
|
user = User(
|
|
id=uuid.uuid4(),
|
|
email="non_admin_phase5@example.com",
|
|
password_hash=get_password_hash("NonAdmin123!"),
|
|
name="Non Admin",
|
|
is_active=True,
|
|
is_team_admin=False,
|
|
role="engineer",
|
|
account_id=team_admin["user"].account_id,
|
|
account_role="viewer",
|
|
)
|
|
test_db.add(user)
|
|
await test_db.commit()
|
|
|
|
response = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": "non_admin_phase5@example.com", "password": "NonAdmin123!"},
|
|
)
|
|
assert response.status_code == 200
|
|
return {"Authorization": f"Bearer {response.json()['access_token']}"}
|
|
|
|
|
|
async def _seed_sessions(
|
|
db: AsyncSession,
|
|
account_id: uuid.UUID,
|
|
user_id: uuid.UUID,
|
|
*,
|
|
domain: str | None = "networking",
|
|
status: str = "resolved",
|
|
confidence_tier: str = "guided",
|
|
matched_flow_id: uuid.UUID | None = None,
|
|
count: int = 1,
|
|
created_days_ago: int = 1,
|
|
resolved_minutes: int = 15,
|
|
):
|
|
"""Insert AISession rows directly into the test DB."""
|
|
from app.models.ai_session import AISession
|
|
|
|
now = datetime.now(timezone.utc)
|
|
created_at = now - timedelta(days=created_days_ago)
|
|
resolved_at = created_at + timedelta(minutes=resolved_minutes) if status == "resolved" else None
|
|
|
|
sessions = []
|
|
for _ in range(count):
|
|
s = AISession(
|
|
id=uuid.uuid4(),
|
|
user_id=user_id,
|
|
account_id=account_id,
|
|
problem_domain=domain,
|
|
status=status,
|
|
confidence_tier=confidence_tier,
|
|
matched_flow_id=matched_flow_id,
|
|
created_at=created_at,
|
|
resolved_at=resolved_at,
|
|
)
|
|
db.add(s)
|
|
sessions.append(s)
|
|
await db.commit()
|
|
return sessions
|
|
|
|
|
|
async def _seed_flow(
|
|
db: AsyncSession,
|
|
account_id: uuid.UUID,
|
|
*,
|
|
name: str = "Test Flow",
|
|
tree_type: str = "troubleshooting",
|
|
is_active: bool = True,
|
|
) -> uuid.UUID:
|
|
"""Insert a Tree row directly into the test DB and return its id."""
|
|
from app.models.tree import Tree
|
|
|
|
flow = Tree(
|
|
id=uuid.uuid4(),
|
|
account_id=account_id,
|
|
name=name,
|
|
tree_type=tree_type,
|
|
is_active=is_active,
|
|
tree_structure={"id": "root", "type": "decision", "question": "test?", "options": [], "children": []},
|
|
visibility="team",
|
|
status="published",
|
|
)
|
|
db.add(flow)
|
|
await db.commit()
|
|
return flow.id
|
|
|
|
|
|
# ─── Coverage endpoint tests ──────────────────────────────────────────────────
|
|
|
|
class TestCoverageEndpoint:
|
|
async def test_requires_auth(self, client: AsyncClient, test_db: AsyncSession):
|
|
"""Unauthenticated requests are rejected."""
|
|
response = await client.get("/api/v1/analytics/flowpilot/coverage")
|
|
assert response.status_code == 401
|
|
|
|
async def test_requires_team_admin(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
non_admin_headers: dict,
|
|
):
|
|
"""Non-admin account members cannot access the coverage endpoint."""
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/coverage",
|
|
headers=non_admin_headers,
|
|
)
|
|
assert response.status_code == 403
|
|
|
|
async def test_returns_domain_breakdown(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin: dict,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""Coverage endpoint returns correct domain breakdown."""
|
|
account_id = team_admin["user"].account_id
|
|
user_id = team_admin["user"].id
|
|
assert account_id, "team_admin must have an account"
|
|
|
|
# Seed 3 resolved + 1 escalated in "networking", 2 resolved in "vpn"
|
|
await _seed_sessions(test_db, account_id, user_id, domain="networking", status="resolved", count=3)
|
|
await _seed_sessions(test_db, account_id, user_id, domain="networking", status="escalated", count=1)
|
|
await _seed_sessions(test_db, account_id, user_id, domain="vpn", status="resolved", count=2)
|
|
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/coverage?period=30d",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert "domains" in data
|
|
assert "unmapped_session_count" in data
|
|
assert "total_domains" in data
|
|
assert data["total_domains"] == 2
|
|
|
|
domains_by_name = {d["domain"]: d for d in data["domains"]}
|
|
assert "networking" in domains_by_name
|
|
networking = domains_by_name["networking"]
|
|
assert networking["session_count"] == 4
|
|
assert networking["resolution_rate"] == pytest.approx(0.75, abs=0.01)
|
|
assert networking["escalation_rate"] == pytest.approx(0.25, abs=0.01)
|
|
|
|
vpn = domains_by_name["vpn"]
|
|
assert vpn["session_count"] == 2
|
|
assert vpn["resolution_rate"] == pytest.approx(1.0, abs=0.01)
|
|
|
|
# Sorted by session_count descending
|
|
assert data["domains"][0]["domain"] == "networking"
|
|
|
|
async def test_counts_unmapped_sessions(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin: dict,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""Sessions without a problem_domain are counted as unmapped."""
|
|
account_id = team_admin["user"].account_id
|
|
user_id = team_admin["user"].id
|
|
|
|
await _seed_sessions(test_db, account_id, user_id, domain=None, count=3)
|
|
await _seed_sessions(test_db, account_id, user_id, domain="storage", count=1)
|
|
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/coverage",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["unmapped_session_count"] == 3
|
|
assert data["total_domains"] == 1
|
|
|
|
async def test_handles_no_sessions(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""Coverage endpoint returns empty result gracefully when no sessions exist."""
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/coverage",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["domains"] == []
|
|
assert data["unmapped_session_count"] == 0
|
|
assert data["total_domains"] == 0
|
|
|
|
async def test_avg_resolution_minutes_populated(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin: dict,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""avg_resolution_minutes is computed for resolved sessions."""
|
|
account_id = team_admin["user"].account_id
|
|
user_id = team_admin["user"].id
|
|
await _seed_sessions(
|
|
test_db, account_id, user_id,
|
|
domain="dns",
|
|
status="resolved",
|
|
resolved_minutes=30,
|
|
count=2,
|
|
)
|
|
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/coverage",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
dns_row = next(d for d in data["domains"] if d["domain"] == "dns")
|
|
assert dns_row["avg_resolution_minutes"] == pytest.approx(30.0, abs=1.0)
|
|
|
|
|
|
# ─── Flow quality endpoint tests ──────────────────────────────────────────────
|
|
|
|
class TestFlowQualityEndpoint:
|
|
async def test_requires_auth(self, client: AsyncClient, test_db: AsyncSession):
|
|
"""Unauthenticated requests are rejected."""
|
|
response = await client.get("/api/v1/analytics/flowpilot/flow-quality")
|
|
assert response.status_code == 401
|
|
|
|
async def test_requires_team_admin(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
non_admin_headers: dict,
|
|
):
|
|
"""Non-admin account members cannot access the flow quality endpoint."""
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/flow-quality",
|
|
headers=non_admin_headers,
|
|
)
|
|
assert response.status_code == 403
|
|
|
|
async def test_handles_no_flows(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""Flow quality returns empty lists gracefully when no flows exist."""
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/flow-quality",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["flows"] == []
|
|
assert data["top_performers"] == []
|
|
assert data["needs_attention"] == []
|
|
|
|
async def test_returns_scored_flows(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin: dict,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""Flow quality returns all active flows with quality_score field."""
|
|
account_id = team_admin["user"].account_id
|
|
user_id = team_admin["user"].id
|
|
flow_id = await _seed_flow(test_db, account_id, name="Network Diag")
|
|
await _seed_sessions(
|
|
test_db, account_id, user_id,
|
|
domain="networking",
|
|
status="resolved",
|
|
confidence_tier="guided",
|
|
matched_flow_id=flow_id,
|
|
count=4,
|
|
)
|
|
await _seed_sessions(
|
|
test_db, account_id, user_id,
|
|
domain="networking",
|
|
status="escalated",
|
|
confidence_tier="exploring",
|
|
matched_flow_id=flow_id,
|
|
count=1,
|
|
)
|
|
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/flow-quality",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data["flows"]) >= 1
|
|
|
|
flow_row = next(f for f in data["flows"] if f["flow_id"] == str(flow_id))
|
|
assert flow_row["name"] == "Network Diag"
|
|
assert flow_row["usage_count"] == 5
|
|
assert flow_row["success_rate"] == pytest.approx(0.8, abs=0.01)
|
|
assert "quality_score" in flow_row
|
|
assert flow_row["quality_score"] > 0
|
|
|
|
async def test_flow_with_no_sessions_has_zero_quality(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin: dict,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""Flows with no sessions receive quality_score of 0 and null success_rate."""
|
|
account_id = team_admin["user"].account_id
|
|
flow_id = await _seed_flow(test_db, account_id, name="Unused Flow")
|
|
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/flow-quality",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
row = next(f for f in data["flows"] if f["flow_id"] == str(flow_id))
|
|
assert row["quality_score"] == 0.0
|
|
assert row["success_rate"] is None
|
|
assert row["usage_count"] == 0
|
|
|
|
async def test_top_performers_and_needs_attention_populated(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin: dict,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""top_performers contains high-quality flows; needs_attention flags low-performing ones."""
|
|
account_id = team_admin["user"].account_id
|
|
user_id = team_admin["user"].id
|
|
|
|
# High-quality flow: all resolved + guided
|
|
good_flow_id = await _seed_flow(test_db, account_id, name="Good Flow")
|
|
await _seed_sessions(
|
|
test_db, account_id, user_id,
|
|
domain="storage",
|
|
status="resolved",
|
|
confidence_tier="guided",
|
|
matched_flow_id=good_flow_id,
|
|
count=5,
|
|
)
|
|
|
|
# Low-quality flow: mostly escalated
|
|
bad_flow_id = await _seed_flow(test_db, account_id, name="Bad Flow")
|
|
await _seed_sessions(
|
|
test_db, account_id, user_id,
|
|
domain="storage",
|
|
status="escalated",
|
|
confidence_tier="discovery",
|
|
matched_flow_id=bad_flow_id,
|
|
count=4,
|
|
)
|
|
await _seed_sessions(
|
|
test_db, account_id, user_id,
|
|
domain="storage",
|
|
status="resolved",
|
|
confidence_tier="discovery",
|
|
matched_flow_id=bad_flow_id,
|
|
count=1,
|
|
)
|
|
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/flow-quality",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
top_ids = [f["flow_id"] for f in data["top_performers"]]
|
|
assert str(good_flow_id) in top_ids
|
|
|
|
attn_ids = [f["flow_id"] for f in data["needs_attention"]]
|
|
assert str(bad_flow_id) in attn_ids
|
|
|
|
async def test_sort_by_usage(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin: dict,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""sort=usage orders flows by session count descending."""
|
|
account_id = team_admin["user"].account_id
|
|
user_id = team_admin["user"].id
|
|
|
|
flow_a = await _seed_flow(test_db, account_id, name="Flow A")
|
|
flow_b = await _seed_flow(test_db, account_id, name="Flow B")
|
|
|
|
await _seed_sessions(test_db, account_id, user_id, matched_flow_id=flow_a, count=1)
|
|
await _seed_sessions(test_db, account_id, user_id, matched_flow_id=flow_b, count=3)
|
|
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/flow-quality?sort=usage",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
flows = response.json()["flows"]
|
|
usage_counts = [f["usage_count"] for f in flows]
|
|
assert usage_counts == sorted(usage_counts, reverse=True)
|
|
|
|
|
|
# ─── PSA metrics endpoint tests ───────────────────────────────────────────────
|
|
|
|
class TestPsaMetrics:
|
|
"""Tests for GET /api/v1/analytics/flowpilot/psa-metrics."""
|
|
|
|
async def test_requires_auth(self, client: AsyncClient, test_db: AsyncSession):
|
|
"""Unauthenticated requests are rejected."""
|
|
response = await client.get("/api/v1/analytics/flowpilot/psa-metrics")
|
|
assert response.status_code == 401
|
|
|
|
async def test_empty_state(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""When no PSA activity logs exist, returns zeros gracefully."""
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/psa-metrics",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data["total_time_entries"] == 0
|
|
assert data["total_hours_logged"] == 0.0
|
|
assert data["avg_hours_per_session"] == 0.0
|
|
assert data["daily_trend"] == []
|
|
|
|
funnel = data["push_funnel"]
|
|
assert funnel["total_sessions"] == 0
|
|
assert funnel["linked_to_ticket"] == 0
|
|
assert funnel["doc_pushed"] == 0
|
|
assert funnel["time_entry_logged"] == 0
|
|
|
|
async def test_returns_time_entry_metrics(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin: dict,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""Seeded time_entry_posted logs produce correct totals and averages."""
|
|
from app.models.psa_activity_log import PsaActivityLog
|
|
|
|
account_id = team_admin["user"].account_id
|
|
|
|
# 3 time entries: 1.5 + 2.0 + 0.5 = 4.0 hours total, avg = 4.0/3 ≈ 1.33
|
|
for hours in (1.5, 2.0, 0.5):
|
|
log = PsaActivityLog(
|
|
id=uuid.uuid4(),
|
|
account_id=account_id,
|
|
activity_type="time_entry_posted",
|
|
hours_logged=hours,
|
|
)
|
|
test_db.add(log)
|
|
await test_db.commit()
|
|
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/psa-metrics",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
assert data["total_time_entries"] == 3
|
|
assert data["total_hours_logged"] == pytest.approx(4.0, abs=0.01)
|
|
assert data["avg_hours_per_session"] == pytest.approx(4.0 / 3, abs=0.01)
|
|
|
|
async def test_non_time_entry_logs_excluded(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin: dict,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""Activity logs with a different activity_type do not count as time entries."""
|
|
from app.models.psa_activity_log import PsaActivityLog
|
|
|
|
account_id = team_admin["user"].account_id
|
|
|
|
# One real time entry, one "note_posted" that should be ignored
|
|
test_db.add(PsaActivityLog(
|
|
id=uuid.uuid4(),
|
|
account_id=account_id,
|
|
activity_type="time_entry_posted",
|
|
hours_logged=1.0,
|
|
))
|
|
test_db.add(PsaActivityLog(
|
|
id=uuid.uuid4(),
|
|
account_id=account_id,
|
|
activity_type="note_posted",
|
|
hours_logged=5.0,
|
|
))
|
|
await test_db.commit()
|
|
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/psa-metrics",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["total_time_entries"] == 1
|
|
assert data["total_hours_logged"] == pytest.approx(1.0, abs=0.01)
|
|
|
|
async def test_funnel_counts(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin: dict,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""Funnel steps count the correct subset of sessions."""
|
|
from app.models.ai_session import AISession
|
|
from app.models.psa_activity_log import PsaActivityLog
|
|
from app.models.psa_post_log import PsaPostLog
|
|
|
|
account_id = team_admin["user"].account_id
|
|
user_id = team_admin["user"].id
|
|
|
|
# 4 total sessions — 2 linked to a ticket
|
|
session_ids = [uuid.uuid4() for _ in range(4)]
|
|
for i, sid in enumerate(session_ids):
|
|
s = AISession(
|
|
id=sid,
|
|
user_id=user_id,
|
|
account_id=account_id,
|
|
status="resolved",
|
|
psa_ticket_id="TICKET-123" if i < 2 else None,
|
|
)
|
|
test_db.add(s)
|
|
await test_db.commit()
|
|
|
|
# 1 session with a successful doc push
|
|
push_session_id = session_ids[0]
|
|
post_log = PsaPostLog(
|
|
id=uuid.uuid4(),
|
|
ai_session_id=push_session_id,
|
|
account_id=account_id,
|
|
ticket_id="TICKET-123",
|
|
note_type="internal",
|
|
content_posted="Session summary",
|
|
status="success",
|
|
posted_by=user_id,
|
|
)
|
|
test_db.add(post_log)
|
|
|
|
# 1 session with a time entry logged (same session as push for realism)
|
|
activity_log = PsaActivityLog(
|
|
id=uuid.uuid4(),
|
|
account_id=account_id,
|
|
session_id=push_session_id,
|
|
activity_type="time_entry_posted",
|
|
hours_logged=1.0,
|
|
)
|
|
test_db.add(activity_log)
|
|
await test_db.commit()
|
|
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/psa-metrics",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
funnel = response.json()["push_funnel"]
|
|
|
|
assert funnel["total_sessions"] == 4
|
|
assert funnel["linked_to_ticket"] == 2
|
|
assert funnel["doc_pushed"] == 1
|
|
assert funnel["time_entry_logged"] == 1
|
|
|
|
async def test_daily_trend(
|
|
self,
|
|
client: AsyncClient,
|
|
test_db: AsyncSession,
|
|
team_admin: dict,
|
|
team_admin_headers: dict,
|
|
):
|
|
"""Time entries grouped by date produce the correct daily trend array."""
|
|
from app.models.psa_activity_log import PsaActivityLog
|
|
|
|
account_id = team_admin["user"].account_id
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# Day -2: 2 entries totalling 3.0 hours
|
|
# Day -1: 1 entry with 1.5 hours
|
|
entries = [
|
|
(now - timedelta(days=2), 1.5),
|
|
(now - timedelta(days=2), 1.5),
|
|
(now - timedelta(days=1), 1.5),
|
|
]
|
|
for created_at, hours in entries:
|
|
log = PsaActivityLog(
|
|
id=uuid.uuid4(),
|
|
account_id=account_id,
|
|
activity_type="time_entry_posted",
|
|
hours_logged=hours,
|
|
created_at=created_at,
|
|
)
|
|
test_db.add(log)
|
|
await test_db.commit()
|
|
|
|
response = await client.get(
|
|
"/api/v1/analytics/flowpilot/psa-metrics?period=30d",
|
|
headers=team_admin_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
trend = response.json()["daily_trend"]
|
|
|
|
assert len(trend) == 2
|
|
|
|
# Trend is ordered by date ascending
|
|
dates = [t["date"] for t in trend]
|
|
assert dates == sorted(dates)
|
|
|
|
# Oldest day: 2 entries, 3.0 hours
|
|
oldest = trend[0]
|
|
assert oldest["entries"] == 2
|
|
assert oldest["hours"] == pytest.approx(3.0, abs=0.01)
|
|
|
|
# Most recent day: 1 entry, 1.5 hours
|
|
newest = trend[1]
|
|
assert newest["entries"] == 1
|
|
assert newest["hours"] == pytest.approx(1.5, abs=0.01)
|