Files
resolutionflow/backend/app/api/endpoints/flowpilot_analytics.py
chihlasm 94fbb38f84 fix(analytics): fix 6 backend audit issues — domain matching, funnel counts, decimal casts, dead queries
- Issue 1: Normalize domain lookup to lowercase on both sides (flow category names and session problem_domain) to fix case-sensitive mismatch in coverage endpoint
- Issue 2: Count distinct ai_session_id (not PsaPostLog.id) in doc_pushed funnel step to avoid inflating counts on retried sessions
- Issue 3: Clamp recency_score to [0.0, 1.0] with min/max to handle negative days_since from future timestamps (clock skew/test data)
- Issue 4: Wrap func.sum(case(...)) results with int() in dashboard endpoint to handle Decimal returns from asyncpg
- Issue 5: Remove dead domain_flow_counts_result query (result fetched but never consumed) to eliminate unnecessary DB round-trip
- Issue 6: Replace select(Tree) with select(Tree.id, Tree.name, Tree.tree_type) in flow-quality endpoint to avoid loading large tree_structure JSONB column

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 00:36:30 +00:00

730 lines
28 KiB
Python

"""FlowPilot Analytics API — MTTR, resolution rates, knowledge coverage.
Endpoints:
GET /analytics/flowpilot?period=30d — Main dashboard data
GET /analytics/flowpilot/knowledge-gaps — Knowledge gap report
"""
import logging
from datetime import datetime, timezone, timedelta
from typing import Annotated, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import select, func, case, cast, Date, extract
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.rate_limit import limiter
from app.api.deps import get_current_active_user, get_db, require_team_admin
from app.models.user import User
from app.models.tree import Tree
from app.models.ai_session import AISession
from app.models.flow_proposal import FlowProposal
from app.models.psa_activity_log import PsaActivityLog
from app.models.psa_post_log import PsaPostLog
from app.models.category import TreeCategory
from app.schemas.flowpilot_analytics import (
FlowPilotDashboard,
MTTRDataPoint,
DomainBreakdown,
ConfidenceBreakdown,
KnowledgeCoverage,
DomainCoverage,
PsaMetrics,
CoverageDomainRow,
CoverageResponse,
FlowQualityRow,
FlowQualityResponse,
EnhancedPsaMetrics,
PsaFunnel,
PsaDailyTrend,
)
from app.services.knowledge_gap_service import get_knowledge_gaps, KnowledgeGapReport
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/analytics/flowpilot", tags=["flowpilot-analytics"])
def _get_period_start(period: str) -> datetime:
days = {"7d": 7, "30d": 30, "90d": 90}.get(period, 30)
return datetime.now(timezone.utc) - timedelta(days=days)
@router.get("", response_model=FlowPilotDashboard)
@limiter.limit("15/minute")
async def get_dashboard(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_team_admin),
period: str = Query("30d", pattern="^(7d|30d|90d)$"),
):
"""Get FlowPilot analytics dashboard data."""
if not current_user.account_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account")
account_id = current_user.account_id
period_start = _get_period_start(period)
# ── Session counts ──
counts_result = await db.execute(
select(
func.count(AISession.id).label("total"),
func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved"),
func.sum(case((AISession.status.in_(["escalated", "requesting_escalation"]), 1), else_=0)).label("escalated"),
func.sum(case((AISession.status == "abandoned", 1), else_=0)).label("abandoned"),
func.avg(case((AISession.status == "resolved", AISession.step_count), else_=None)).label("avg_steps"),
func.avg(AISession.session_rating).label("avg_rating"),
)
.where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
)
)
row = counts_result.one()
total = int(row.total or 0)
resolved = int(row.resolved or 0)
escalated = int(row.escalated or 0)
abandoned = int(row.abandoned or 0)
avg_steps = float(row.avg_steps or 0)
avg_rating = float(row.avg_rating) if row.avg_rating else None
resolution_rate = (resolved / total * 100) if total > 0 else 0.0
# ── MTTR ──
mttr_result = await db.execute(
select(
func.avg(
extract("epoch", AISession.resolved_at - AISession.created_at) / 60
).label("avg_mttr"),
)
.where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.status == "resolved",
AISession.resolved_at.isnot(None),
)
)
mttr_row = mttr_result.one()
mttr_minutes = float(mttr_row.avg_mttr) if mttr_row.avg_mttr else None
# ── Average duration ──
duration_result = await db.execute(
select(
func.avg(
extract("epoch", AISession.resolved_at - AISession.created_at) / 60
).label("avg_duration"),
)
.where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.resolved_at.isnot(None),
)
)
dur_row = duration_result.one()
avg_duration = float(dur_row.avg_duration) if dur_row.avg_duration else 0.0
# ── MTTR trend ──
mttr_trend_result = await db.execute(
select(
cast(AISession.resolved_at, Date).label("day"),
func.avg(
extract("epoch", AISession.resolved_at - AISession.created_at) / 60
).label("mttr"),
func.count(AISession.id).label("count"),
)
.where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.status == "resolved",
AISession.resolved_at.isnot(None),
)
.group_by(cast(AISession.resolved_at, Date))
.order_by(cast(AISession.resolved_at, Date))
)
mttr_trend = [
MTTRDataPoint(
date=str(r.day),
mttr_minutes=round(float(r.mttr or 0), 1),
session_count=r.count,
)
for r in mttr_trend_result.all()
]
# ── Domain breakdown ──
domain_result = await db.execute(
select(
AISession.problem_domain,
func.count(AISession.id).label("total"),
func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved"),
func.sum(case((AISession.status.in_(["escalated", "requesting_escalation"]), 1), else_=0)).label("escalated"),
)
.where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.problem_domain.isnot(None),
)
.group_by(AISession.problem_domain)
.order_by(func.count(AISession.id).desc())
)
sessions_by_domain = [
DomainBreakdown(
domain=r.problem_domain or "unknown",
total=int(r.total or 0),
resolved=int(r.resolved or 0),
escalated=int(r.escalated or 0),
resolution_rate=round(int(r.resolved or 0) / int(r.total) * 100, 1) if r.total else 0.0,
)
for r in domain_result.all()
]
# ── Confidence breakdown ──
confidence_result = await db.execute(
select(
AISession.confidence_tier,
func.count(AISession.id).label("total"),
func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved"),
)
.where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.status.in_(["resolved", "escalated", "requesting_escalation"]),
)
.group_by(AISession.confidence_tier)
)
conf_data = {r.confidence_tier: (int(r.total or 0), int(r.resolved or 0)) for r in confidence_result.all()}
guided_total, guided_resolved = conf_data.get("guided", (0, 0))
exploring_total, exploring_resolved = conf_data.get("exploring", (0, 0))
discovery_total, discovery_resolved = conf_data.get("discovery", (0, 0))
confidence_breakdown = ConfidenceBreakdown(
guided_sessions=guided_total,
guided_resolution_rate=round(guided_resolved / guided_total * 100, 1) if guided_total > 0 else 0.0,
exploring_sessions=exploring_total,
exploring_resolution_rate=round(exploring_resolved / exploring_total * 100, 1) if exploring_total > 0 else 0.0,
discovery_sessions=discovery_total,
discovery_resolution_rate=round(discovery_resolved / discovery_total * 100, 1) if discovery_total > 0 else 0.0,
)
# ── Knowledge coverage ──
total_flows_result = await db.execute(
select(func.count(Tree.id)).where(Tree.account_id == account_id)
)
total_flows = total_flows_result.scalar() or 0
ai_flows_result = await db.execute(
select(func.count(Tree.id)).where(
Tree.account_id == account_id,
Tree.origin.in_(["ai_generated", "ai_enhanced"]),
)
)
ai_generated_flows = ai_flows_result.scalar() or 0
pending_proposals_result = await db.execute(
select(func.count(FlowProposal.id)).where(
FlowProposal.account_id == account_id,
FlowProposal.status == "pending",
)
)
total_proposals_pending = pending_proposals_result.scalar() or 0
approved_result = await db.execute(
select(func.count(FlowProposal.id)).where(
FlowProposal.account_id == account_id,
FlowProposal.reviewed_at >= period_start,
FlowProposal.status.in_(["approved", "modified"]),
)
)
proposals_approved = approved_result.scalar() or 0
rejected_result = await db.execute(
select(func.count(FlowProposal.id)).where(
FlowProposal.account_id == account_id,
FlowProposal.reviewed_at >= period_start,
FlowProposal.status == "rejected",
)
)
proposals_rejected = rejected_result.scalar() or 0
# Domain coverage
domain_coverage_result = await db.execute(
select(
AISession.problem_domain,
func.count(AISession.id).label("session_count"),
func.sum(case((AISession.confidence_tier == "guided", 1), else_=0)).label("guided_count"),
)
.where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.problem_domain.isnot(None),
)
.group_by(AISession.problem_domain)
)
# For now, flow_count per domain isn't directly available since Tree doesn't have problem_domain.
# Use match_keywords or just report 0. We'll improve this in Phase 4 with better flow categorization.
domain_cov_data = {}
for r in domain_coverage_result.all():
domain = r.problem_domain or "unknown"
sc = r.session_count or 0
gc = r.guided_count or 0
domain_cov_data[domain] = DomainCoverage(
domain=domain,
flow_count=0, # TODO: match via category/tags in Phase 4
session_count=sc,
guided_rate=round(gc / sc * 100, 1) if sc > 0 else 0.0,
)
knowledge_coverage = KnowledgeCoverage(
total_flows=total_flows,
ai_generated_flows=ai_generated_flows,
total_proposals_pending=total_proposals_pending,
proposals_approved_this_period=proposals_approved,
proposals_rejected_this_period=proposals_rejected,
coverage_by_domain=list(domain_cov_data.values()),
)
# ── PSA metrics ──
psa_metrics = None
psa_linked = await db.execute(
select(func.count(AISession.id)).where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.psa_ticket_id.isnot(None),
)
)
psa_linked_count = psa_linked.scalar() or 0
if psa_linked_count > 0 and total > 0:
psa_push_result = await db.execute(
select(
func.count(PsaPostLog.id).label("total_pushes"),
func.sum(case((PsaPostLog.status == "success", 1), else_=0)).label("first_success"),
func.sum(case(
((PsaPostLog.status == "success") & (PsaPostLog.retry_count > 0), 1),
else_=0
)).label("retry_success"),
)
.join(AISession, PsaPostLog.ai_session_id == AISession.id)
.where(
AISession.account_id == account_id,
PsaPostLog.ai_session_id.isnot(None),
PsaPostLog.posted_at >= period_start,
)
)
push_row = psa_push_result.one()
total_pushes = push_row.total_pushes or 0
first_success = push_row.first_success or 0
retry_success = push_row.retry_success or 0
psa_metrics = PsaMetrics(
ticket_link_rate=round(psa_linked_count / total * 100, 1),
auto_push_success_rate=round(first_success / total_pushes * 100, 1) if total_pushes > 0 else 0.0,
auto_push_retry_success_rate=round(retry_success / total_pushes * 100, 1) if total_pushes > 0 else 0.0,
total_time_entries_logged=0, # TODO: track from CW time entries
total_hours_logged=0.0,
)
return FlowPilotDashboard(
period=period,
total_sessions=total,
resolved_sessions=resolved,
escalated_sessions=escalated,
abandoned_sessions=abandoned,
resolution_rate=round(resolution_rate, 1),
avg_steps_to_resolution=round(avg_steps, 1),
avg_session_duration_minutes=round(avg_duration, 1),
avg_rating=round(avg_rating, 2) if avg_rating else None,
mttr_minutes=round(mttr_minutes, 1) if mttr_minutes else None,
mttr_trend=mttr_trend,
sessions_by_domain=sessions_by_domain,
confidence_breakdown=confidence_breakdown,
knowledge_coverage=knowledge_coverage,
psa_metrics=psa_metrics,
)
@router.get("/knowledge-gaps", response_model=KnowledgeGapReport)
@limiter.limit("10/minute")
async def get_knowledge_gaps_endpoint(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_team_admin),
period: str = Query("30d", pattern="^(7d|30d|90d)$"),
):
"""Get knowledge gap analysis report."""
if not current_user.account_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account")
days = {"7d": 7, "30d": 30, "90d": 90}.get(period, 30)
return await get_knowledge_gaps(current_user.account_id, db, period_days=days)
@router.get("/coverage", response_model=CoverageResponse)
@limiter.limit("15/minute")
async def get_coverage_heatmap(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_team_admin),
period: str = Query("30d", pattern="^(7d|30d|90d)$"),
):
"""Get coverage heatmap: sessions and flow coverage broken down by problem domain."""
if not current_user.account_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account")
account_id = current_user.account_id
period_start = _get_period_start(period)
# ── Session stats per domain ──
domain_stats_result = await db.execute(
select(
AISession.problem_domain,
func.count(AISession.id).label("session_count"),
func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved_count"),
func.sum(case((AISession.status.in_(["escalated", "requesting_escalation"]), 1), else_=0)).label("escalated_count"),
func.sum(case((AISession.confidence_tier == "guided", 1), else_=0)).label("guided_count"),
func.avg(
case(
(
(AISession.status == "resolved") & AISession.resolved_at.isnot(None),
extract("epoch", AISession.resolved_at - AISession.created_at) / 60,
),
else_=None,
)
).label("avg_resolution_minutes"),
)
.where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.problem_domain.isnot(None),
)
.group_by(AISession.problem_domain)
.order_by(func.count(AISession.id).desc())
)
domain_rows = domain_stats_result.all()
# ── Unmapped sessions (no problem_domain) ──
unmapped_result = await db.execute(
select(func.count(AISession.id)).where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.problem_domain.is_(None),
)
)
unmapped_session_count = int(unmapped_result.scalar() or 0)
# ── Flow counts per domain: match Category.name to problem_domain ──
# Joins Tree → TreeCategory and groups by lowercased category name for case-insensitive matching
flow_counts_result = await db.execute(
select(
func.lower(TreeCategory.name).label("domain"),
func.count(Tree.id).label("flow_count"),
)
.join(Tree, Tree.category_id == TreeCategory.id)
.where(
Tree.account_id == account_id,
Tree.is_active.is_(True),
Tree.deleted_at.is_(None),
)
.group_by(func.lower(TreeCategory.name))
)
flow_counts_by_domain: dict[str, int] = {
r.domain: int(r.flow_count) for r in flow_counts_result.all()
}
domains = []
for r in domain_rows:
sc = int(r.session_count or 0)
resolved = int(r.resolved_count or 0)
escalated = int(r.escalated_count or 0)
guided = int(r.guided_count or 0)
domain_name = r.problem_domain or "unknown"
avg_res = float(r.avg_resolution_minutes) if r.avg_resolution_minutes is not None else None
domains.append(
CoverageDomainRow(
domain=domain_name,
flow_count=flow_counts_by_domain.get(domain_name.lower(), 0),
session_count=sc,
resolution_rate=round(resolved / sc, 4) if sc > 0 else 0.0,
escalation_rate=round(escalated / sc, 4) if sc > 0 else 0.0,
guided_rate=round(guided / sc, 4) if sc > 0 else 0.0,
avg_resolution_minutes=round(avg_res, 1) if avg_res is not None else None,
)
)
return CoverageResponse(
domains=domains,
unmapped_session_count=unmapped_session_count,
total_domains=len(domains),
)
@router.get("/flow-quality", response_model=FlowQualityResponse)
@limiter.limit("15/minute")
async def get_flow_quality(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_team_admin),
period: str = Query("30d", pattern="^(7d|30d|90d)$"),
sort: str = Query("quality", pattern="^(quality|usage|success_rate)$"),
):
"""Get flow quality scoring for all active flows in the account."""
if not current_user.account_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account")
account_id = current_user.account_id
period_start = _get_period_start(period)
now = datetime.now(timezone.utc)
# ── Get all active flows (only needed columns — avoids loading large tree_structure JSONB) ──
flows_result = await db.execute(
select(Tree.id, Tree.name, Tree.tree_type).where(
Tree.account_id == account_id,
Tree.is_active.is_(True),
Tree.deleted_at.is_(None),
)
)
flows = flows_result.all()
if not flows:
return FlowQualityResponse(flows=[], top_performers=[], needs_attention=[])
flow_ids = [f.id for f in flows]
# ── Session stats per flow within the period ──
session_stats_result = await db.execute(
select(
AISession.matched_flow_id,
func.count(AISession.id).label("total"),
func.sum(case((AISession.status == "resolved", 1), else_=0)).label("resolved"),
func.sum(case((AISession.confidence_tier == "guided", 1), else_=0)).label("guided"),
func.max(AISession.created_at).label("last_matched_at"),
)
.where(
AISession.account_id == account_id,
AISession.matched_flow_id.in_(flow_ids),
AISession.created_at >= period_start,
)
.group_by(AISession.matched_flow_id)
)
stats_by_flow: dict = {}
for r in session_stats_result.all():
stats_by_flow[r.matched_flow_id] = {
"total": int(r.total or 0),
"resolved": int(r.resolved or 0),
"guided": int(r.guided or 0),
"last_matched_at": r.last_matched_at,
}
# ── Also get the most recent match ever (for recency score, regardless of period) ──
recent_match_result = await db.execute(
select(
AISession.matched_flow_id,
func.max(AISession.created_at).label("last_ever"),
)
.where(
AISession.account_id == account_id,
AISession.matched_flow_id.in_(flow_ids),
)
.group_by(AISession.matched_flow_id)
)
last_ever_by_flow: dict = {r.matched_flow_id: r.last_ever for r in recent_match_result.all()}
# ── Build scored rows ──
scored_rows: list[FlowQualityRow] = []
for flow in flows:
stats = stats_by_flow.get(flow.id)
last_ever = last_ever_by_flow.get(flow.id)
if stats and stats["total"] > 0:
total = stats["total"]
resolved = stats["resolved"]
guided = stats["guided"]
success_rate = resolved / total
guided_rate = guided / total
# Recency score based on last match ever
if last_ever is not None:
last_ever_aware = last_ever.replace(tzinfo=timezone.utc) if last_ever.tzinfo is None else last_ever
days_since = (now - last_ever_aware).total_seconds() / 86400
recency_score = max(0.0, min(1.0, 1.0 - days_since / 90.0))
else:
recency_score = 0.0
quality_score = round(
(success_rate * 0.5) + (guided_rate * 0.3) + (recency_score * 0.2),
4,
)
avg_confidence = round(guided_rate, 4) # guided_rate as confidence proxy
last_matched_at = stats.get("last_matched_at")
else:
success_rate = None
avg_confidence = None
quality_score = 0.0
last_matched_at = last_ever # may be None
if last_matched_at is not None and last_matched_at.tzinfo is None:
last_matched_at = last_matched_at.replace(tzinfo=timezone.utc)
scored_rows.append(
FlowQualityRow(
flow_id=str(flow.id),
name=flow.name,
tree_type=flow.tree_type,
usage_count=stats["total"] if stats else 0,
success_rate=round(success_rate, 4) if success_rate is not None else None,
last_matched_at=last_matched_at,
avg_confidence=avg_confidence,
quality_score=quality_score,
)
)
# ── Sort ──
if sort == "usage":
scored_rows.sort(key=lambda r: r.usage_count, reverse=True)
elif sort == "success_rate":
scored_rows.sort(key=lambda r: (r.success_rate is not None, r.success_rate or 0.0), reverse=True)
else:
scored_rows.sort(key=lambda r: r.quality_score, reverse=True)
# ── Top performers: top 5 by quality_score with usage > 0 ──
top_performers = [r for r in scored_rows if r.usage_count > 0]
top_performers = sorted(top_performers, key=lambda r: r.quality_score, reverse=True)[:5]
# ── Needs attention: used at least once, AND (success_rate < 0.5 OR not used in 30+ days) ──
thirty_days_ago = now - timedelta(days=30)
needs_attention = []
for r in scored_rows:
if r.usage_count == 0:
continue
low_success = r.success_rate is not None and r.success_rate < 0.5
stale = r.last_matched_at is not None and r.last_matched_at < thirty_days_ago
if low_success or stale:
needs_attention.append(r)
return FlowQualityResponse(
flows=scored_rows,
top_performers=top_performers,
needs_attention=needs_attention,
)
@router.get("/psa-metrics", response_model=EnhancedPsaMetrics)
@limiter.limit("15/minute")
async def get_enhanced_psa_metrics(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_team_admin),
period: str = Query("30d", pattern="^(7d|30d|90d)$"),
):
"""Get enhanced PSA integration metrics including time entry stats, push funnel, and daily trend."""
if not current_user.account_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account")
account_id = current_user.account_id
period_start = _get_period_start(period)
# ── Time entry totals from psa_activity_logs ──
time_entry_result = await db.execute(
select(
func.count(PsaActivityLog.id).label("entry_count"),
func.sum(PsaActivityLog.hours_logged).label("total_hours"),
)
.where(
PsaActivityLog.account_id == account_id,
PsaActivityLog.activity_type == "time_entry_posted",
PsaActivityLog.created_at >= period_start,
)
)
te_row = time_entry_result.one()
total_time_entries = int(te_row.entry_count or 0)
total_hours_raw = te_row.total_hours or 0
total_hours_logged = round(float(total_hours_raw), 2)
avg_hours = round(total_hours_logged / total_time_entries, 2) if total_time_entries > 0 else 0.0
# ── Push funnel ──
# Total sessions in period
total_sessions_result = await db.execute(
select(func.count(AISession.id)).where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
)
)
total_sessions = int(total_sessions_result.scalar() or 0)
# Sessions linked to a ticket
linked_result = await db.execute(
select(func.count(AISession.id)).where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.psa_ticket_id.isnot(None),
)
)
linked_to_ticket = int(linked_result.scalar() or 0)
# Sessions with a successful doc push (via PsaPostLog) — count unique sessions, not log entries
doc_pushed_result = await db.execute(
select(func.count(PsaPostLog.ai_session_id.distinct()))
.join(AISession, PsaPostLog.ai_session_id == AISession.id)
.where(
AISession.account_id == account_id,
PsaPostLog.ai_session_id.isnot(None),
PsaPostLog.status == "success",
PsaPostLog.posted_at >= period_start,
)
)
doc_pushed = int(doc_pushed_result.scalar() or 0)
# Sessions with a time entry logged (via psa_activity_logs)
time_entry_sessions_result = await db.execute(
select(func.count(PsaActivityLog.session_id.distinct())).where(
PsaActivityLog.account_id == account_id,
PsaActivityLog.activity_type == "time_entry_posted",
PsaActivityLog.created_at >= period_start,
PsaActivityLog.session_id.isnot(None),
)
)
time_entry_logged = int(time_entry_sessions_result.scalar() or 0)
push_funnel = PsaFunnel(
total_sessions=total_sessions,
linked_to_ticket=linked_to_ticket,
doc_pushed=doc_pushed,
time_entry_logged=time_entry_logged,
)
# ── Daily trend (time entries grouped by date) ──
daily_result = await db.execute(
select(
cast(PsaActivityLog.created_at, Date).label("day"),
func.count(PsaActivityLog.id).label("entries"),
func.sum(PsaActivityLog.hours_logged).label("hours"),
)
.where(
PsaActivityLog.account_id == account_id,
PsaActivityLog.activity_type == "time_entry_posted",
PsaActivityLog.created_at >= period_start,
)
.group_by(cast(PsaActivityLog.created_at, Date))
.order_by(cast(PsaActivityLog.created_at, Date))
)
daily_trend = [
PsaDailyTrend(
date=str(r.day),
entries=int(r.entries or 0),
hours=round(float(r.hours or 0), 2),
)
for r in daily_result.all()
]
return EnhancedPsaMetrics(
total_time_entries=total_time_entries,
total_hours_logged=total_hours_logged,
avg_hours_per_session=avg_hours,
push_funnel=push_funnel,
daily_trend=daily_trend,
)