feat: analytics dashboards & two-tier feedback system (#78)

* docs: add analytics & user feedback design document

Covers team analytics, personal analytics, flow analytics,
step-level thumbs up/down feedback, and flow CSAT ratings.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: add analytics & feedback implementation plan

12-task TDD plan covering session ratings, step feedback,
team/personal/flow analytics endpoints, and frontend pages.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add session_ratings table and analytics indexes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add SessionRating model and analytics schemas

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add session CSAT rating endpoint with tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add step thumbs feedback and /ratings alias routes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add team, personal, and flow analytics endpoints

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add recharts, analytics types, and API client

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add inline step thumbs up/down feedback during sessions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add CSAT rating modal after session completion

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add Team Analytics page with charts and leaderboards

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add Flow Analytics panel with step dropoff and CSAT data

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: add My Analytics page with personal stats and charts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit was merged in pull request #78.
This commit is contained in:
chihlasm
2026-02-16 15:23:14 -05:00
committed by GitHub
parent 293ceaa9e9
commit bd12ced5ee
29 changed files with 4856 additions and 5 deletions

View File

@@ -0,0 +1,45 @@
"""Add session_ratings table and analytics indexes
Revision ID: 039
Revises: 038
Create Date: 2026-02-16
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
revision = '039'
down_revision = '038'
def upgrade():
# New session_ratings table for flow CSAT ratings
op.create_table(
'session_ratings',
sa.Column('id', UUID(as_uuid=True), server_default=sa.text('gen_random_uuid()'), primary_key=True),
sa.Column('session_id', UUID(as_uuid=True), sa.ForeignKey('sessions.id', ondelete='CASCADE'), nullable=False),
sa.Column('user_id', UUID(as_uuid=True), sa.ForeignKey('users.id', ondelete='CASCADE'), nullable=False),
sa.Column('tree_id', UUID(as_uuid=True), sa.ForeignKey('trees.id', ondelete='CASCADE'), nullable=False),
sa.Column('account_id', UUID(as_uuid=True), sa.ForeignKey('accounts.id', ondelete='CASCADE'), nullable=False),
sa.Column('rating', sa.Integer, nullable=False),
sa.Column('comment', sa.String(500), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('NOW()'), nullable=False),
sa.CheckConstraint('rating >= 1 AND rating <= 5', name='ck_session_ratings_rating_range'),
sa.UniqueConstraint('session_id', name='uq_session_ratings_session_id'),
)
# Analytics indexes
op.create_index('ix_session_ratings_tree_created', 'session_ratings', ['tree_id', 'created_at'])
op.create_index('ix_session_ratings_account_created', 'session_ratings', ['account_id', 'created_at'])
op.create_index('ix_sessions_completed', 'sessions', ['completed_at'])
op.create_index('ix_step_ratings_step_helpful', 'step_ratings', ['step_id', 'was_helpful'])
# Make step_ratings.rating column nullable (thumbs-only mode)
op.alter_column('step_ratings', 'rating', nullable=True)
def downgrade():
op.alter_column('step_ratings', 'rating', nullable=False)
op.drop_index('ix_step_ratings_step_helpful', 'step_ratings')
op.drop_index('ix_sessions_completed', 'sessions')
op.drop_index('ix_session_ratings_account_created', 'session_ratings')
op.drop_index('ix_session_ratings_tree_created', 'session_ratings')
op.drop_table('session_ratings')

View File

@@ -0,0 +1,404 @@
from datetime import datetime, timezone, timedelta
from uuid import UUID
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select, func, case, cast, Date
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.api.deps import get_current_active_user
from app.models import User, Session, Tree, SessionRating
from app.schemas.analytics import (
TeamAnalyticsResponse, PersonalAnalyticsResponse, FlowAnalyticsResponse,
AnalyticsSummary, OutcomeBreakdown, TimeSeriesPoint,
TopFlow, TopEngineer, StepFeedbackSummary, FlowRatingItem,
)
router = APIRouter(prefix="/analytics", tags=["analytics"])
def _get_period_start(period: str) -> datetime:
days = {"7d": 7, "30d": 30, "90d": 90}.get(period, 30)
return datetime.now(timezone.utc) - timedelta(days=days)
async def _build_summary(
db: AsyncSession,
base_filters: list,
join_tree: bool = False,
) -> AnalyticsSummary:
"""Build analytics summary. If join_tree=True, joins Session->Tree for account scoping."""
def _base_query():
q = select(func.count()).select_from(Session)
if join_tree:
q = q.join(Tree, Session.tree_id == Tree.id)
return q
# Total sessions
total_q = await db.execute(_base_query().where(*base_filters))
total = total_q.scalar() or 0
# Completed sessions
completed_q = await db.execute(
_base_query().where(*base_filters, Session.completed_at.isnot(None))
)
completed = completed_q.scalar() or 0
# Median duration (minutes) using percentile_cont
duration_base = select(
func.percentile_cont(0.5).within_group(
func.extract('epoch', Session.completed_at - Session.started_at) / 60
)
).select_from(Session)
if join_tree:
duration_base = duration_base.join(Tree, Session.tree_id == Tree.id)
duration_q = await db.execute(
duration_base.where(*base_filters, Session.completed_at.isnot(None))
)
raw_median = duration_q.scalar()
median_duration = round(float(raw_median), 1) if raw_median is not None else 0.0
# Active engineers (distinct users)
active_base = select(func.count(func.distinct(Session.user_id))).select_from(Session)
if join_tree:
active_base = active_base.join(Tree, Session.tree_id == Tree.id)
active_q = await db.execute(active_base.where(*base_filters))
active_engineers = active_q.scalar() or 0
# Outcome breakdown
outcome_base = select(Session.outcome, func.count()).select_from(Session)
if join_tree:
outcome_base = outcome_base.join(Tree, Session.tree_id == Tree.id)
outcome_q = await db.execute(
outcome_base.where(
*base_filters, Session.completed_at.isnot(None), Session.outcome.isnot(None)
).group_by(Session.outcome)
)
outcomes = dict(outcome_q.all())
return AnalyticsSummary(
total_sessions=total,
completed_sessions=completed,
completion_rate=round(completed / total, 3) if total > 0 else 0.0,
median_duration_minutes=median_duration,
active_engineers=active_engineers,
outcome_breakdown=OutcomeBreakdown(
resolved=outcomes.get("resolved", 0),
escalated=outcomes.get("escalated", 0),
workaround=outcomes.get("workaround", 0),
unresolved=outcomes.get("unresolved", 0),
),
)
async def _build_time_series(
db: AsyncSession,
base_filters: list,
join_tree: bool = False,
) -> list[TimeSeriesPoint]:
"""Build daily time-series using CASE expressions for outcome counting."""
q = select(
cast(Session.started_at, Date).label("date"),
func.count().label("sessions"),
func.sum(case((Session.outcome == "resolved", 1), else_=0)).label("resolved"),
func.sum(case((Session.outcome == "escalated", 1), else_=0)).label("escalated"),
func.sum(case((Session.outcome == "workaround", 1), else_=0)).label("workaround"),
func.sum(case((Session.outcome == "unresolved", 1), else_=0)).label("unresolved"),
).select_from(Session)
if join_tree:
q = q.join(Tree, Session.tree_id == Tree.id)
rows = await db.execute(
q.where(*base_filters)
.group_by(cast(Session.started_at, Date))
.order_by(cast(Session.started_at, Date))
)
return [
TimeSeriesPoint(
date=str(row.date), sessions=row.sessions,
resolved=int(row.resolved or 0), escalated=int(row.escalated or 0),
workaround=int(row.workaround or 0), unresolved=int(row.unresolved or 0),
)
for row in rows.all()
]
@router.get("/team", response_model=TeamAnalyticsResponse)
async def get_team_analytics(
period: str = Query("30d", pattern="^(7d|30d|90d)$"),
engineer_id: Optional[UUID] = None,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_user),
):
"""Team analytics - team_admin or super_admin only."""
if not (current_user.is_team_admin or current_user.is_super_admin):
raise HTTPException(status_code=403, detail="Team admin access required")
period_start = _get_period_start(period)
base_filters = [
Session.started_at >= period_start,
Tree.account_id == current_user.account_id,
]
if engineer_id:
base_filters.append(Session.user_id == engineer_id)
summary = await _build_summary(db, base_filters, join_tree=True)
time_series = await _build_time_series(db, base_filters, join_tree=True)
# Top flows (join Session->Tree)
top_flows_q = await db.execute(
select(
Tree.id, Tree.name,
func.count(Session.id).label("sessions"),
func.percentile_cont(0.5).within_group(
func.extract('epoch', Session.completed_at - Session.started_at) / 60
).label("median_duration"),
)
.join(Tree, Session.tree_id == Tree.id)
.where(Session.started_at >= period_start, Tree.account_id == current_user.account_id)
.group_by(Tree.id, Tree.name)
.order_by(func.count(Session.id).desc())
.limit(10)
)
top_flows = []
for row in top_flows_q.all():
# Compute completion rate separately to avoid division by zero
completed_count_q = await db.execute(
select(func.count()).select_from(Session)
.where(
Session.tree_id == row.id,
Session.started_at >= period_start,
Session.completed_at.isnot(None),
)
)
completed_count = completed_count_q.scalar() or 0
completion_rate = round(completed_count / row.sessions, 3) if row.sessions > 0 else 0.0
top_flows.append(
TopFlow(
tree_id=str(row.id), name=row.name, sessions=row.sessions,
completion_rate=completion_rate,
median_duration_minutes=round(float(row.median_duration or 0), 1),
)
)
# Top engineers (join Session->User + Session->Tree)
top_engineers_q = await db.execute(
select(
User.id, User.name,
func.count(Session.id).label("sessions"),
func.percentile_cont(0.5).within_group(
func.extract('epoch', Session.completed_at - Session.started_at) / 60
).label("median_duration"),
)
.join(User, Session.user_id == User.id)
.join(Tree, Session.tree_id == Tree.id)
.where(Session.started_at >= period_start, Tree.account_id == current_user.account_id)
.group_by(User.id, User.name)
.order_by(func.count(Session.id).desc())
.limit(10)
)
top_engineers = []
for row in top_engineers_q.all():
completed_count_q = await db.execute(
select(func.count()).select_from(Session)
.join(Tree, Session.tree_id == Tree.id)
.where(
Session.user_id == row.id,
Session.started_at >= period_start,
Tree.account_id == current_user.account_id,
Session.completed_at.isnot(None),
)
)
completed_count = completed_count_q.scalar() or 0
completion_rate = round(completed_count / row.sessions, 3) if row.sessions > 0 else 0.0
top_engineers.append(
TopEngineer(
user_id=str(row.id), name=row.name or "Unknown", sessions=row.sessions,
completion_rate=completion_rate,
median_duration_minutes=round(float(row.median_duration or 0), 1),
)
)
return TeamAnalyticsResponse(
summary=summary, time_series=time_series,
top_flows=top_flows, top_engineers=top_engineers,
)
@router.get("/me", response_model=PersonalAnalyticsResponse)
async def get_personal_analytics(
period: str = Query("30d", pattern="^(7d|30d|90d)$"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_user),
):
"""Personal analytics - any authenticated user."""
period_start = _get_period_start(period)
base_filters = [Session.started_at >= period_start, Session.user_id == current_user.id]
summary = await _build_summary(db, base_filters, join_tree=False)
# Override active_engineers=1 for personal view
summary.active_engineers = 1
time_series = await _build_time_series(db, base_filters, join_tree=False)
# Top flows
top_flows_q = await db.execute(
select(
Tree.id, Tree.name,
func.count(Session.id).label("sessions"),
func.percentile_cont(0.5).within_group(
func.extract('epoch', Session.completed_at - Session.started_at) / 60
).label("median_duration"),
)
.join(Tree, Session.tree_id == Tree.id)
.where(Session.started_at >= period_start, Session.user_id == current_user.id)
.group_by(Tree.id, Tree.name)
.order_by(func.count(Session.id).desc())
.limit(10)
)
top_flows = []
for row in top_flows_q.all():
completed_count_q = await db.execute(
select(func.count()).select_from(Session)
.where(
Session.tree_id == row.id,
Session.user_id == current_user.id,
Session.started_at >= period_start,
Session.completed_at.isnot(None),
)
)
completed_count = completed_count_q.scalar() or 0
completion_rate = round(completed_count / row.sessions, 3) if row.sessions > 0 else 0.0
top_flows.append(
TopFlow(
tree_id=str(row.id), name=row.name, sessions=row.sessions,
completion_rate=completion_rate,
median_duration_minutes=round(float(row.median_duration or 0), 1),
)
)
return PersonalAnalyticsResponse(
summary=summary, time_series=time_series, top_flows=top_flows,
)
@router.get("/flows/{tree_id}", response_model=FlowAnalyticsResponse)
async def get_flow_analytics(
tree_id: UUID,
period: str = Query("30d", pattern="^(7d|30d|90d)$"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_user),
):
"""Analytics for a specific flow."""
# Verify tree exists
result = await db.execute(select(Tree).where(Tree.id == tree_id))
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(status_code=404, detail="Flow not found")
period_start = _get_period_start(period)
base_filters = [Session.started_at >= period_start, Session.tree_id == tree_id]
summary = await _build_summary(db, base_filters, join_tree=False)
time_series = await _build_time_series(db, base_filters, join_tree=False)
# CSAT stats
csat_q = await db.execute(
select(func.avg(SessionRating.rating), func.count())
.where(SessionRating.tree_id == tree_id, SessionRating.created_at >= period_start)
)
csat_row = csat_q.one()
avg_csat = round(float(csat_row[0]), 1) if csat_row[0] else None
total_ratings = csat_row[1]
# Step feedback - compute from step_ratings for steps used in this tree's sessions
step_feedback: list[StepFeedbackSummary] = []
# Step dropoff analysis from session decisions JSONB
sessions_q = await db.execute(
select(Session.decisions, Session.completed_at)
.where(Session.tree_id == tree_id, Session.started_at >= period_start)
)
sessions_data = sessions_q.all()
node_visits: dict[str, int] = {}
node_dropoffs: dict[str, int] = {}
for sess in sessions_data:
decisions = sess.decisions or []
for decision in decisions:
node_id = decision.get("node_id", "")
if node_id:
node_visits[node_id] = node_visits.get(node_id, 0) + 1
# If session not completed, last decision node is a dropoff
if not sess.completed_at and decisions:
last_decision = decisions[-1]
last_node = last_decision.get("node_id", "")
if last_node:
node_dropoffs[last_node] = node_dropoffs.get(last_node, 0) + 1
# Build node title map from tree structure
node_title_map = _extract_node_titles(tree.tree_structure)
# Build step feedback with dropoff data
for node_id in sorted(node_visits.keys()):
visits = node_visits.get(node_id, 0)
dropoffs = node_dropoffs.get(node_id, 0)
step_feedback.append(StepFeedbackSummary(
node_id=node_id,
node_title=node_title_map.get(node_id, "Unknown Step"),
helpful_yes=0,
helpful_no=0,
helpful_rate=0.0,
visit_count=visits,
dropoff_count=dropoffs,
dropoff_rate=round(dropoffs / visits, 3) if visits > 0 else 0.0,
))
# Sort by dropoff_rate descending
step_feedback.sort(key=lambda x: x.dropoff_rate, reverse=True)
# Recent comments - ANONYMOUS (no user_name join)
comments_q = await db.execute(
select(SessionRating.rating, SessionRating.comment, SessionRating.created_at)
.where(
SessionRating.tree_id == tree_id,
SessionRating.comment.isnot(None),
SessionRating.comment != "",
)
.order_by(SessionRating.created_at.desc())
.limit(10)
)
recent_comments = [
FlowRatingItem(
rating=row.rating,
comment=row.comment,
created_at=row.created_at,
)
for row in comments_q.all()
]
return FlowAnalyticsResponse(
summary=summary, avg_csat=avg_csat, total_ratings=total_ratings,
time_series=time_series, step_feedback=step_feedback,
recent_comments=recent_comments,
)
def _extract_node_titles(tree_structure: dict) -> dict[str, str]:
"""Recursively extract node_id -> title/question from tree structure."""
titles = {}
def walk(node):
if not isinstance(node, dict):
return
node_id = node.get("id", "")
title = node.get("title") or node.get("question") or "Unnamed"
if node_id:
titles[node_id] = title
for child in node.get("children", []):
walk(child)
walk(tree_structure)
return titles

View File

@@ -0,0 +1,124 @@
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.api.deps import get_current_active_user
from app.models import User, Session, SessionRating
from app.models.step_library import StepLibrary, StepRating
from app.schemas.analytics import SessionRatingCreate, SessionRatingResponse, StepFeedbackCreate
router = APIRouter(tags=["ratings"])
@router.post("/sessions/{session_id}/rate", response_model=SessionRatingResponse, status_code=status.HTTP_201_CREATED)
async def rate_session(
session_id: UUID,
data: SessionRatingCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_user),
):
"""Submit a CSAT rating (1-5) for a completed session."""
# Verify session exists and belongs to user
result = await db.execute(select(Session).where(Session.id == session_id))
session = result.scalar_one_or_none()
if not session:
raise HTTPException(status_code=404, detail="Session not found")
if session.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Not your session")
if not session.completed_at:
raise HTTPException(status_code=400, detail="Session not completed yet")
# Check for duplicate
existing = await db.execute(
select(SessionRating).where(SessionRating.session_id == session_id)
)
if existing.scalar_one_or_none():
raise HTTPException(status_code=409, detail="Session already rated")
rating = SessionRating(
session_id=session_id,
user_id=current_user.id,
tree_id=session.tree_id,
account_id=current_user.account_id,
rating=data.rating,
comment=data.comment,
)
db.add(rating)
await db.commit()
await db.refresh(rating)
return SessionRatingResponse(
id=str(rating.id),
session_id=str(rating.session_id),
rating=rating.rating,
comment=rating.comment,
created_at=rating.created_at,
)
@router.post("/steps/{step_id}/feedback", status_code=status.HTTP_201_CREATED)
async def submit_step_feedback(
step_id: UUID,
data: StepFeedbackCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_user),
):
"""Submit thumbs up/down feedback for a step used in a session."""
# Verify step exists
result = await db.execute(select(StepLibrary).where(StepLibrary.id == step_id))
step = result.scalar_one_or_none()
if not step:
raise HTTPException(status_code=404, detail="Step not found")
session_uuid = UUID(data.session_id)
# Check for existing feedback for this step+user+session
existing_result = await db.execute(
select(StepRating).where(
StepRating.step_id == step_id,
StepRating.user_id == current_user.id,
StepRating.session_id == session_uuid,
)
)
existing = existing_result.scalar_one_or_none()
if existing:
existing.was_helpful = data.was_helpful
resp_status = "updated"
else:
new_rating = StepRating(
step_id=step_id,
user_id=current_user.id,
session_id=session_uuid,
was_helpful=data.was_helpful,
# rating is nullable now — thumbs-only mode
)
db.add(new_rating)
resp_status = "created"
# Update aggregates on step_library
await _update_step_helpful_counts(db, step_id)
await db.commit()
return {"step_id": str(step_id), "was_helpful": data.was_helpful, "status": resp_status}
async def _update_step_helpful_counts(db: AsyncSession, step_id: UUID):
"""Recalculate helpful_yes and helpful_no on step_library."""
yes_q = await db.execute(
select(func.count()).select_from(StepRating).where(
StepRating.step_id == step_id, StepRating.was_helpful == True
)
)
no_q = await db.execute(
select(func.count()).select_from(StepRating).where(
StepRating.step_id == step_id, StepRating.was_helpful == False
)
)
await db.execute(
StepLibrary.__table__.update()
.where(StepLibrary.id == step_id)
.values(helpful_yes=yes_q.scalar() or 0, helpful_no=no_q.scalar() or 0)
)

View File

@@ -554,6 +554,42 @@ async def delete_rating(
return None
# Backward-compatible /ratings alias routes
@router.post("/{step_id}/ratings", response_model=StepRatingResponse, status_code=201,
include_in_schema=False)
async def rate_step_alias(
step_id: UUID,
rating_data: StepRatingCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Alias for POST /{step_id}/rate."""
return await rate_step(step_id, rating_data, db, current_user)
@router.put("/{step_id}/ratings", response_model=StepRatingResponse,
include_in_schema=False)
async def update_rating_alias(
step_id: UUID,
rating_data: StepRatingUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Alias for PUT /{step_id}/rate."""
return await update_rating(step_id, rating_data, db, current_user)
@router.delete("/{step_id}/ratings", status_code=204,
include_in_schema=False)
async def delete_rating_alias(
step_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
"""Alias for DELETE /{step_id}/rate."""
return await delete_rating(step_id, db, current_user)
@router.get("/{step_id}/reviews", response_model=list[StepRatingResponse])
async def get_reviews(
step_id: UUID,

View File

@@ -1,6 +1,7 @@
from fastapi import APIRouter
from app.api.endpoints import auth, trees, sessions, invite, categories, tags, folders, step_categories, steps, admin, accounts, webhooks, shares, shared, tree_markdown
from app.api.endpoints import admin_dashboard, admin_audit, admin_plan_limits, admin_feature_flags, admin_settings, admin_categories
from app.api.endpoints import ratings, analytics
api_router = APIRouter()
@@ -25,3 +26,5 @@ api_router.include_router(webhooks.router)
api_router.include_router(shares.router)
api_router.include_router(shared.router) # Public endpoints (no auth)
api_router.include_router(tree_markdown.router)
api_router.include_router(ratings.router)
api_router.include_router(analytics.router)

View File

@@ -17,6 +17,7 @@ from .refresh_token import RefreshToken
from .audit_log import AuditLog
from .password_reset_token import PasswordResetToken
from .session_share import SessionShare, SessionShareView
from .session_rating import SessionRating
from .account_limit_override import AccountLimitOverride
from .feature_flag import FeatureFlag, PlanFeatureDefault, AccountFeatureOverride
from .platform_setting import PlatformSetting
@@ -47,6 +48,7 @@ __all__ = [
"PasswordResetToken",
"SessionShare",
"SessionShareView",
"SessionRating",
"AccountLimitOverride",
"FeatureFlag",
"PlanFeatureDefault",

View File

@@ -0,0 +1,46 @@
import uuid
from datetime import datetime, timezone
from typing import Optional, TYPE_CHECKING
from sqlalchemy import String, DateTime, Integer, CheckConstraint, UniqueConstraint, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from app.core.database import Base
if TYPE_CHECKING:
from app.models.user import User
from app.models.session import Session
from app.models.tree import Tree
class SessionRating(Base):
__tablename__ = "session_ratings"
__table_args__ = (
CheckConstraint("rating >= 1 AND rating <= 5", name="ck_session_ratings_rating_range"),
UniqueConstraint("session_id", name="uq_session_ratings_session_id"),
)
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
session_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("sessions.id", ondelete="CASCADE"), nullable=False
)
user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False
)
tree_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("trees.id", ondelete="CASCADE"), nullable=False
)
account_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("accounts.id", ondelete="CASCADE"), nullable=False
)
rating: Mapped[int] = mapped_column(Integer, nullable=False)
comment: Mapped[Optional[str]] = mapped_column(String(500), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False
)
# Relationships
session: Mapped["Session"] = relationship("Session", foreign_keys=[session_id])
user: Mapped["User"] = relationship("User", foreign_keys=[user_id])
tree: Mapped["Tree"] = relationship("Tree", foreign_keys=[tree_id])

View File

@@ -125,7 +125,7 @@ class StepRating(Base):
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False
)
rating: Mapped[int] = mapped_column(Integer, nullable=False)
rating: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
was_helpful: Mapped[Optional[bool]] = mapped_column(Boolean, nullable=True)
review_text: Mapped[Optional[str]] = mapped_column(String(500), nullable=True)
is_verified_use: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)

View File

@@ -0,0 +1,111 @@
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
# --- Session Rating Schemas ---
class SessionRatingCreate(BaseModel):
rating: int = Field(..., ge=1, le=5)
comment: Optional[str] = Field(None, max_length=500)
class SessionRatingResponse(BaseModel):
id: str
session_id: str
rating: int
comment: Optional[str]
created_at: datetime
model_config = {"from_attributes": True}
class FlowRatingItem(BaseModel):
"""Anonymous feedback item — no user_name for privacy."""
rating: int
comment: Optional[str]
created_at: datetime
# --- Step Feedback Schema ---
class StepFeedbackCreate(BaseModel):
session_id: str
was_helpful: bool
# --- Analytics Response Schemas ---
class OutcomeBreakdown(BaseModel):
resolved: int = 0
escalated: int = 0
workaround: int = 0
unresolved: int = 0
class AnalyticsSummary(BaseModel):
total_sessions: int
completed_sessions: int
completion_rate: float
median_duration_minutes: float
active_engineers: int = 0
outcome_breakdown: OutcomeBreakdown
class TimeSeriesPoint(BaseModel):
date: str
sessions: int = 0
resolved: int = 0
escalated: int = 0
workaround: int = 0
unresolved: int = 0
class TopFlow(BaseModel):
tree_id: str
name: str
sessions: int
completion_rate: float
median_duration_minutes: float
avg_csat: Optional[float] = None
class TopEngineer(BaseModel):
user_id: str
name: str
sessions: int
completion_rate: float
median_duration_minutes: float
class TeamAnalyticsResponse(BaseModel):
summary: AnalyticsSummary
time_series: list[TimeSeriesPoint]
top_flows: list[TopFlow]
top_engineers: list[TopEngineer]
class PersonalAnalyticsResponse(BaseModel):
summary: AnalyticsSummary
time_series: list[TimeSeriesPoint]
top_flows: list[TopFlow]
class StepFeedbackSummary(BaseModel):
node_id: str
node_title: str
helpful_yes: int
helpful_no: int
helpful_rate: float
visit_count: int = 0
dropoff_count: int = 0
dropoff_rate: float = 0.0
class FlowAnalyticsResponse(BaseModel):
summary: AnalyticsSummary
avg_csat: Optional[float]
total_ratings: int
time_series: list[TimeSeriesPoint]
step_feedback: list[StepFeedbackSummary]
recent_comments: list[FlowRatingItem]

View File

@@ -0,0 +1,137 @@
import pytest
from httpx import AsyncClient
pytestmark = pytest.mark.asyncio
@pytest.fixture
async def team_admin(client: AsyncClient, test_db):
"""Create a team admin user (gets own account via registration)."""
from uuid import UUID as PyUUID
from sqlalchemy import select
from app.models.user import User
data = {
"email": "teamadmin@example.com",
"password": "TeamAdmin123!",
"name": "Team Admin"
}
response = await client.post("/api/v1/auth/register", json=data)
assert response.status_code in (200, 201), f"Failed: {response.text}"
user_id = PyUUID(response.json()["id"])
result = await test_db.execute(select(User).where(User.id == user_id))
user = result.scalar_one()
user.is_team_admin = True
await test_db.commit()
return {"email": data["email"], "password": data["password"], "user_data": response.json()}
@pytest.fixture
async def team_admin_headers(client: AsyncClient, team_admin: dict):
"""Auth headers for team admin."""
response = await client.post(
"/api/v1/auth/login/json",
json={"email": team_admin["email"], "password": team_admin["password"]},
)
assert response.status_code == 200
return {"Authorization": f"Bearer {response.json()['access_token']}"}
async def test_team_analytics_success(client: AsyncClient, admin_auth_headers: dict):
"""Super admin can access team analytics."""
response = await client.get(
"/api/v1/analytics/team?period=30d",
headers=admin_auth_headers,
)
assert response.status_code == 200
data = response.json()
assert "summary" in data
assert "time_series" in data
assert "top_flows" in data
assert "top_engineers" in data
assert "total_sessions" in data["summary"]
assert "completion_rate" in data["summary"]
assert "median_duration_minutes" in data["summary"]
assert "active_engineers" in data["summary"]
async def test_team_analytics_team_admin(client: AsyncClient, team_admin_headers: dict):
"""Team admin can also access team analytics."""
response = await client.get(
"/api/v1/analytics/team",
headers=team_admin_headers,
)
assert response.status_code == 200
async def test_team_analytics_forbidden_for_engineer(client: AsyncClient, auth_headers: dict):
"""Regular engineers cannot access team analytics."""
response = await client.get(
"/api/v1/analytics/team",
headers=auth_headers,
)
assert response.status_code == 403
async def test_personal_analytics_success(client: AsyncClient, auth_headers: dict):
"""Any user can access personal analytics."""
response = await client.get(
"/api/v1/analytics/me?period=30d",
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert "summary" in data
assert "time_series" in data
assert "top_flows" in data
assert "median_duration_minutes" in data["summary"]
async def test_personal_analytics_empty(client: AsyncClient, auth_headers: dict):
"""Personal analytics with no sessions returns zeroes."""
response = await client.get(
"/api/v1/analytics/me?period=7d",
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert data["summary"]["total_sessions"] == 0
assert data["summary"]["completion_rate"] == 0.0
assert data["summary"]["median_duration_minutes"] == 0.0
assert data["summary"]["active_engineers"] == 1 # always 1 for personal
async def test_flow_analytics_success(client: AsyncClient, auth_headers: dict, test_tree: dict):
"""Can access analytics for a visible flow."""
response = await client.get(
f"/api/v1/analytics/flows/{test_tree['id']}",
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert "summary" in data
assert "step_feedback" in data
assert "recent_comments" in data
assert "avg_csat" in data
assert "total_ratings" in data
async def test_flow_analytics_404(client: AsyncClient, auth_headers: dict):
"""Non-existent flow returns 404."""
import uuid
response = await client.get(
f"/api/v1/analytics/flows/{uuid.uuid4()}",
headers=auth_headers,
)
assert response.status_code == 404
async def test_invalid_period_rejected(client: AsyncClient, auth_headers: dict):
"""Invalid period values are rejected."""
response = await client.get(
"/api/v1/analytics/me?period=1y",
headers=auth_headers,
)
assert response.status_code == 422

View File

@@ -0,0 +1,215 @@
import pytest
from httpx import AsyncClient
pytestmark = pytest.mark.asyncio
@pytest.fixture
async def test_session(client: AsyncClient, auth_headers: dict, test_tree: dict):
"""Create a test session from the test tree."""
response = await client.post(
"/api/v1/sessions",
json={"tree_id": test_tree["id"]},
headers=auth_headers,
)
assert response.status_code == 201, f"Failed to create session: {response.text}"
return response.json()
@pytest.fixture
async def completed_session(client: AsyncClient, auth_headers: dict, test_session: dict):
"""Complete a session so it can be rated."""
response = await client.post(
f"/api/v1/sessions/{test_session['id']}/complete",
json={"outcome": "resolved", "outcome_notes": "Test resolved"},
headers=auth_headers,
)
assert response.status_code == 200, f"Failed to complete session: {response.text}"
return response.json()
async def test_rate_session_success(client: AsyncClient, auth_headers: dict, completed_session: dict):
"""Rate a completed session with CSAT score."""
response = await client.post(
f"/api/v1/sessions/{completed_session['id']}/rate",
json={"rating": 4, "comment": "Very helpful flow"},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["rating"] == 4
assert data["comment"] == "Very helpful flow"
async def test_rate_session_no_comment(client: AsyncClient, auth_headers: dict, completed_session: dict):
"""Rate without a comment."""
response = await client.post(
f"/api/v1/sessions/{completed_session['id']}/rate",
json={"rating": 5},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["rating"] == 5
assert data["comment"] is None
async def test_rate_session_duplicate(client: AsyncClient, auth_headers: dict, completed_session: dict):
"""Cannot rate same session twice."""
await client.post(
f"/api/v1/sessions/{completed_session['id']}/rate",
json={"rating": 4},
headers=auth_headers,
)
response = await client.post(
f"/api/v1/sessions/{completed_session['id']}/rate",
json={"rating": 5},
headers=auth_headers,
)
assert response.status_code == 409
async def test_rate_session_invalid_rating(client: AsyncClient, auth_headers: dict, completed_session: dict):
"""Rating must be 1-5."""
response = await client.post(
f"/api/v1/sessions/{completed_session['id']}/rate",
json={"rating": 6},
headers=auth_headers,
)
assert response.status_code == 422
async def test_rate_incomplete_session(client: AsyncClient, auth_headers: dict, test_session: dict):
"""Cannot rate a session that hasn't been completed."""
response = await client.post(
f"/api/v1/sessions/{test_session['id']}/rate",
json={"rating": 4},
headers=auth_headers,
)
assert response.status_code == 400
# --- Step Feedback Tests ---
@pytest.fixture
async def test_step(client: AsyncClient, auth_headers: dict):
"""Create a step in the step library."""
response = await client.post(
"/api/v1/steps",
json={
"title": "Test Step",
"step_type": "action",
"content": {
"instructions": "Run the diagnostic command",
"commands": [{"label": "Echo test", "command": "echo hello"}]
},
},
headers=auth_headers,
)
assert response.status_code == 201, f"Failed to create step: {response.text}"
return response.json()
async def test_step_feedback_thumbs_up(client: AsyncClient, auth_headers: dict, test_step: dict, test_session: dict):
"""Submit thumbs-up feedback for a step."""
response = await client.post(
f"/api/v1/steps/{test_step['id']}/feedback",
json={"session_id": test_session["id"], "was_helpful": True},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["was_helpful"] is True
assert data["status"] == "created"
async def test_step_feedback_thumbs_down(client: AsyncClient, auth_headers: dict, test_step: dict, test_session: dict):
"""Submit thumbs-down feedback for a step."""
response = await client.post(
f"/api/v1/steps/{test_step['id']}/feedback",
json={"session_id": test_session["id"], "was_helpful": False},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["was_helpful"] is False
assert data["status"] == "created"
async def test_step_feedback_toggle(client: AsyncClient, auth_headers: dict, test_step: dict, test_session: dict):
"""Submitting again for same step+session updates the rating."""
await client.post(
f"/api/v1/steps/{test_step['id']}/feedback",
json={"session_id": test_session["id"], "was_helpful": True},
headers=auth_headers,
)
response = await client.post(
f"/api/v1/steps/{test_step['id']}/feedback",
json={"session_id": test_session["id"], "was_helpful": False},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["was_helpful"] is False
assert data["status"] == "updated"
async def test_step_feedback_not_found(client: AsyncClient, auth_headers: dict, test_session: dict):
"""Feedback on non-existent step returns 404."""
import uuid
fake_id = str(uuid.uuid4())
response = await client.post(
f"/api/v1/steps/{fake_id}/feedback",
json={"session_id": test_session["id"], "was_helpful": True},
headers=auth_headers,
)
assert response.status_code == 404
# --- /ratings Alias Route Tests ---
async def test_ratings_alias_post(client: AsyncClient, auth_headers: dict, test_step: dict):
"""POST /steps/{id}/ratings works as alias for /rate."""
response = await client.post(
f"/api/v1/steps/{test_step['id']}/ratings",
json={"rating": 4, "was_helpful": True},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["rating"] == 4
async def test_ratings_alias_put(client: AsyncClient, auth_headers: dict, test_step: dict):
"""PUT /steps/{id}/ratings works as alias for /rate."""
# First create a rating
await client.post(
f"/api/v1/steps/{test_step['id']}/rate",
json={"rating": 3, "was_helpful": True},
headers=auth_headers,
)
# Update via alias
response = await client.put(
f"/api/v1/steps/{test_step['id']}/ratings",
json={"rating": 5},
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert data["rating"] == 5
async def test_ratings_alias_delete(client: AsyncClient, auth_headers: dict, test_step: dict):
"""DELETE /steps/{id}/ratings works as alias for /rate."""
# First create a rating
await client.post(
f"/api/v1/steps/{test_step['id']}/rate",
json={"rating": 4, "was_helpful": True},
headers=auth_headers,
)
# Delete via alias
response = await client.delete(
f"/api/v1/steps/{test_step['id']}/ratings",
headers=auth_headers,
)
assert response.status_code == 204