"""Knowledge Gap Detection Service. Aggregates signals from AI sessions to identify gaps in the knowledge base. Results are served by the analytics API and cached for 1 hour. Signals: 1. Frequent free-text escapes — FlowPilot's options didn't cover a common scenario 2. High escalation rate by domain — domains where engineers can't self-resolve 3. Discovery-mode resolutions — novel problems solved without flow guidance 4. Repeated unmatched patterns — keyword-frequency based (Phase 4: embedding clustering) """ import logging from collections import Counter from datetime import datetime, timezone, timedelta from typing import Any, Optional from uuid import UUID from pydantic import BaseModel from sqlalchemy import select, func, case, text from sqlalchemy.ext.asyncio import AsyncSession from app.models.ai_session import AISession from app.models.ai_session_step import AISessionStep from app.models.tree import Tree logger = logging.getLogger(__name__) # Cache for expensive gap analysis _cache: dict[str, Any] = {} _cache_expiry: dict[str, datetime] = {} CACHE_TTL = timedelta(hours=1) class KnowledgeGap(BaseModel): gap_type: str # "weak_options" | "high_escalation" | "uncharted_territory" | "repeated_pattern" domain: str | None = None severity: str # "high" | "medium" | "low" title: str description: str evidence: dict[str, Any] = {} suggested_action: str class KnowledgeGapReport(BaseModel): generated_at: datetime gaps: list[KnowledgeGap] async def get_knowledge_gaps( account_id: UUID, db: AsyncSession, period_days: int = 30, ) -> KnowledgeGapReport: """Generate a knowledge gap report for the account. Results are cached for 1 hour per account. """ cache_key = f"gaps:{account_id}:{period_days}" now = datetime.now(timezone.utc) if cache_key in _cache and _cache_expiry.get(cache_key, now) > now: return _cache[cache_key] period_start = now - timedelta(days=period_days) gaps: list[KnowledgeGap] = [] # Signal 1: Frequent free-text escapes signal1 = await _detect_weak_options(account_id, period_start, db) gaps.extend(signal1) # Signal 2: High escalation rate by domain signal2 = await _detect_high_escalation(account_id, period_start, db) gaps.extend(signal2) # Signal 3: Discovery-mode resolutions signal3 = await _detect_uncharted_territory(account_id, period_start, db) gaps.extend(signal3) # Signal 4: Repeated unmatched patterns (keyword-based for Phase 3) signal4 = await _detect_repeated_patterns(account_id, period_start, db) gaps.extend(signal4) # Sort by severity (high > medium > low) severity_order = {"high": 0, "medium": 1, "low": 2} gaps.sort(key=lambda g: severity_order.get(g.severity, 3)) report = KnowledgeGapReport(generated_at=now, gaps=gaps) _cache[cache_key] = report _cache_expiry[cache_key] = now + CACHE_TTL return report async def _detect_weak_options( account_id: UUID, period_start: datetime, db: AsyncSession, ) -> list[KnowledgeGap]: """Signal 1: Find questions where engineers frequently use free-text escapes.""" # Count free-text usage per step context_message (the question asked) result = await db.execute( select( AISessionStep.context_message, func.count(AISessionStep.id).label("total"), func.sum(case((AISessionStep.was_free_text.is_(True), 1), else_=0)).label("free_text_count"), ) .join(AISession, AISessionStep.session_id == AISession.id) .where( AISession.account_id == account_id, AISession.created_at >= period_start, AISessionStep.step_type == "question", AISessionStep.context_message.isnot(None), AISessionStep.responded_at.isnot(None), ) .group_by(AISessionStep.context_message) .having(func.count(AISessionStep.id) >= 3) # Minimum sample size .order_by(func.sum(case((AISessionStep.was_free_text.is_(True), 1), else_=0)).desc()) .limit(5) ) gaps = [] for row in result.all(): context_msg, total_raw, free_text_raw = row total = int(total_raw or 0) free_text_count = int(free_text_raw or 0) if total == 0 or not free_text_count: continue rate = free_text_count / total if rate < 0.3: continue severity = "high" if rate > 0.6 else "medium" gaps.append(KnowledgeGap( gap_type="weak_options", severity=severity, title=f"Weak options: {(context_msg or '')[:80]}", description=( f"Engineers used free-text input {free_text_count}/{total} times " f"({rate:.0%}) when asked this question. The predefined options " f"may not cover common scenarios." ), evidence={ "context_message": context_msg, "total_responses": total, "free_text_count": free_text_count, "free_text_rate": round(rate, 3), }, suggested_action="Review the free-text responses and add common answers as options.", )) return gaps async def _detect_high_escalation( account_id: UUID, period_start: datetime, db: AsyncSession, ) -> list[KnowledgeGap]: """Signal 2: Find domains with >40% escalation rate.""" result = await db.execute( select( AISession.problem_domain, func.count(AISession.id).label("total"), func.sum(case( (AISession.status == "resolved", 1), else_=0 )).label("resolved"), func.sum(case( (AISession.status.in_(["escalated", "requesting_escalation"]), 1), else_=0 )).label("escalated"), ) .where( AISession.account_id == account_id, AISession.created_at >= period_start, AISession.problem_domain.isnot(None), AISession.status.in_(["resolved", "escalated", "requesting_escalation"]), ) .group_by(AISession.problem_domain) .having(func.count(AISession.id) >= 3) # Minimum sample ) gaps = [] for row in result.all(): domain, total_raw, resolved_raw, escalated_raw = row total = int(total_raw or 0) resolved = int(resolved_raw or 0) escalated = int(escalated_raw or 0) if total == 0 or not escalated: continue escalation_rate = escalated / total if escalation_rate < 0.4: continue severity = "high" if escalation_rate > 0.6 else "medium" gaps.append(KnowledgeGap( gap_type="high_escalation", domain=domain, severity=severity, title=f"High escalation rate in {domain}", description=( f"{escalated}/{total} sessions ({escalation_rate:.0%}) in {domain} " f"were escalated. Only {resolved} resolved independently." ), evidence={ "domain": domain, "total": total, "resolved": resolved, "escalated": escalated, "escalation_rate": round(escalation_rate, 3), }, suggested_action=f"Create or improve troubleshooting flows for {domain} issues.", )) return gaps async def _detect_uncharted_territory( account_id: UUID, period_start: datetime, db: AsyncSession, ) -> list[KnowledgeGap]: """Signal 3: Find discovery-mode resolutions (novel problems solved without flows).""" result = await db.execute( select( AISession.problem_domain, func.count(AISession.id).label("count"), ) .where( AISession.account_id == account_id, AISession.created_at >= period_start, AISession.status == "resolved", AISession.confidence_tier == "discovery", ) .group_by(AISession.problem_domain) .having(func.count(AISession.id) >= 2) .order_by(func.count(AISession.id).desc()) .limit(5) ) gaps = [] for row in result.all(): domain, count = row severity = "high" if count >= 5 else "medium" if count >= 3 else "low" domain_label = domain or "unknown domain" gaps.append(KnowledgeGap( gap_type="uncharted_territory", domain=domain, severity=severity, title=f"Novel resolutions in {domain_label}", description=( f"{count} sessions in {domain_label} were resolved in discovery mode " f"(no matching flow, low confidence). These represent knowledge capture " f"opportunities — check the Review Queue for auto-generated proposals." ), evidence={ "domain": domain, "discovery_resolution_count": count, }, suggested_action="Review pending flow proposals or create flows from these session patterns.", )) return gaps async def _detect_repeated_patterns( account_id: UUID, period_start: datetime, db: AsyncSession, ) -> list[KnowledgeGap]: """Signal 4: Find repeated unmatched intake patterns (keyword-frequency based). Phase 3 uses keyword frequency on problem_summary. Phase 4 will use embedding clustering for deeper semantic analysis. """ # Get problem summaries from unmatched sessions result = await db.execute( select(AISession.problem_summary, AISession.problem_domain) .where( AISession.account_id == account_id, AISession.created_at >= period_start, AISession.problem_summary.isnot(None), AISession.matched_flow_id.is_(None), ) .limit(200) ) rows = result.all() if len(rows) < 3: return [] # Extract keywords from summaries and count frequency word_counts: Counter[str] = Counter() domain_for_word: dict[str, str | None] = {} for summary, domain in rows: if not summary: continue words = set(summary.lower().split()) # Filter out common stop words and short words stop_words = {"the", "a", "an", "is", "are", "was", "were", "in", "on", "at", "to", "for", "of", "and", "or", "not", "can", "can't", "with", "from", "by", "this", "that", "it", "its", "has", "have", "had", "user", "users", "issue", "error", "problem"} keywords = {w for w in words if len(w) > 3 and w not in stop_words} for kw in keywords: word_counts[kw] += 1 if kw not in domain_for_word: domain_for_word[kw] = domain gaps = [] # Find keywords that appear in many unmatched sessions for keyword, count in word_counts.most_common(3): if count < 3: continue severity = "medium" if count >= 5 else "low" domain = domain_for_word.get(keyword) gaps.append(KnowledgeGap( gap_type="repeated_pattern", domain=domain, severity=severity, title=f"Recurring unmatched pattern: '{keyword}'", description=( f"The keyword '{keyword}' appeared in {count} sessions that had no " f"matching flow. This may indicate a systematic knowledge gap." ), evidence={ "keyword": keyword, "unmatched_session_count": count, "domain": domain, }, suggested_action=f"Search for '{keyword}' in recent sessions and consider creating a flow.", )) return gaps