feat(knowledge-flywheel): add Phase 3 Knowledge Flywheel — AI analysis, review queue, analytics

Phase 3 implementation:
- AI session analysis service that generates flow proposals from resolved sessions
- APScheduler job for batch processing pending analyses (max_instances=1)
- Knowledge gap detection (weak options, high escalation signals)
- Flow proposals CRUD with team admin review workflow (approve/edit/dismiss/reject)
- FlowPilot analytics dashboard with confidence tiers, PSA metrics, knowledge gaps
- In-session script generator component
- Review queue page with filtering and proposal detail panel

Bug fixes from review (12 total):
- Fix "Edit & Publish" navigating to non-existent /editor/new route
- Hide Approve button for enhancement proposals (require Edit & Publish)
- Add max_instances=1 to scheduler to prevent TOCTOU race
- Fix eventual_success case() double-counting failed retries
- Add tree_structure validation before creating tree from proposal
- Simplify script generator rendering condition
- Add severity style fallback, toFixed on rates, Link instead of <a href>
- Add toast.warning on dismiss failure, fix dedup for domain-less sessions
- Cast Decimal to int in knowledge gap evidence dicts

Also updates CLAUDE.md with lessons 67-71 and Phase 3 project structure.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-19 05:12:10 +00:00
parent ce118b51d8
commit 9bad49d568
42 changed files with 5427 additions and 13 deletions

View File

@@ -11,7 +11,7 @@ from datetime import datetime, timezone
from typing import Any, Optional
from uuid import UUID
from sqlalchemy import select
from sqlalchemy import select, or_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
@@ -228,6 +228,11 @@ async def start_session(
if ticket_context_block:
ticket_prompt_section = f"\n## PSA TICKET CONTEXT\n{ticket_context_block}\n"
# Include available script templates for in-session script generation
script_context = await _build_script_context(team_id, db)
if script_context:
ticket_prompt_section += f"\n{script_context}\n"
system_prompt = FLOWPILOT_SYSTEM_PROMPT.format(
structured_output_schema=STRUCTURED_OUTPUT_SCHEMA,
team_context=ticket_prompt_section,
@@ -448,6 +453,9 @@ async def resolve_session(
documentation = _generate_documentation(session)
# Queue for Knowledge Flywheel analysis
session.analysis_status = "pending"
await db.flush()
# Push documentation to PSA if ticket is linked
@@ -909,6 +917,13 @@ def _create_step_from_parsed(
if parsed["type"] == "action":
content["action_type"] = parsed.get("action_type", "instruction")
content["expected_outcome"] = parsed.get("expected_outcome")
# Script generation fields (populated when FlowPilot suggests a script)
if parsed.get("template_id"):
content["template_id"] = parsed["template_id"]
if parsed.get("pre_filled_params"):
content["pre_filled_params"] = parsed["pre_filled_params"]
if parsed.get("instructions"):
content["instructions"] = parsed["instructions"]
elif parsed["type"] == "resolution_suggestion":
content["resolution_summary"] = parsed.get("resolution_summary")
content["follow_up_recommendations"] = parsed.get("follow_up_recommendations", [])
@@ -1066,6 +1081,51 @@ async def _process_ticket_intake(
return None, None, "unavailable"
async def _build_script_context(
team_id: Optional[UUID],
db: AsyncSession,
) -> Optional[str]:
"""Build script template context for the system prompt.
Includes available script templates so FlowPilot can suggest
script_generation actions with pre-filled parameters.
"""
try:
from app.models.script_template import ScriptTemplate
result = await db.execute(
select(ScriptTemplate)
.where(
ScriptTemplate.is_active.is_(True),
or_(
ScriptTemplate.team_id.is_(None),
ScriptTemplate.team_id == team_id,
),
)
.order_by(ScriptTemplate.usage_count.desc())
.limit(20)
)
templates = result.scalars().all()
if not templates:
return None
lines = ["## AVAILABLE SCRIPTS"]
lines.append("When the engineer needs to run a script, suggest an action with action_type='script_generation'.")
lines.append("Include template_id and pre_filled_params based on the diagnostic context.\n")
for t in templates:
params = t.parameters_schema.get("parameters", [])
param_keys = ", ".join(p.get("key", "") for p in params if p.get("key"))
lines.append(f"- {t.name} (ID: {t.id}): {t.description or 'No description'}")
if param_keys:
lines.append(f" Parameters: {param_keys}")
return "\n".join(lines)
except Exception as e:
logger.warning("Failed to build script context: %s", e)
return None
async def _build_escalation_package_enhanced(
session: AISession,
user_id: UUID,

View File

@@ -0,0 +1,454 @@
"""Knowledge Flywheel — post-session analysis engine.
Analyzes resolved AI sessions and generates flow proposals:
- new_flow: Novel resolution path → propose a new troubleshooting flow
- enhancement: Diverged from a matched flow → propose additions
- auto_reinforced: Followed a flow exactly → update flow stats
Called by the knowledge_flywheel_scheduler (APScheduler) after sessions resolve.
"""
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any, Optional
from uuid import UUID
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.ai_provider import get_ai_provider
from app.core.config import settings
from app.models.ai_session import AISession
from app.models.ai_session_step import AISessionStep
from app.models.flow_proposal import FlowProposal
from app.models.tree import Tree
logger = logging.getLogger(__name__)
# Daily budget cap for proposal generation LLM calls per account
MAX_PROPOSALS_PER_DAY = 50
FLOW_GENERATION_PROMPT = """\
You are a knowledge engineer converting a troubleshooting session into a reusable flow definition.
Given the session transcript below, generate a JSON flow definition that captures the diagnostic logic so other engineers can follow the same path.
## OUTPUT FORMAT
Respond with ONLY valid JSON:
{
"title": "Short descriptive title (5-10 words)",
"description": "When to use this flow (1-2 sentences)",
"match_keywords": ["keyword1", "keyword2", ...],
"problem_domain": "active_directory | networking | m365 | hardware | endpoint | virtualization | security | backup | email | printing | cloud | other",
"tree_structure": {
"id": "root",
"type": "decision",
"question": "First diagnostic question",
"help_text": "Context for the engineer",
"options": [
{"id": "opt1", "label": "Option text", "next_node_id": "node_id"}
],
"children": [
{
"id": "node_id",
"type": "decision | action | solution",
"title": "Node title",
"question": "For decision nodes",
"description": "For action/solution nodes",
"options": [],
"next_node_id": "next_id or null for terminal nodes"
}
]
}
}
## RULES
- tree_structure uses a flat children array with id-based references via next_node_id
- The root node has type "decision" with a question and options
- Decision nodes have options with next_node_id pointing to child nodes
- Action nodes describe what the engineer should do with a description field
- Solution nodes describe the resolution (terminal — no next_node_id)
- Every decision node must have 2-5 options
- Include the key diagnostic questions that narrowed down the problem
- Skip redundant or dead-end paths from the session
- match_keywords should be symptoms, error messages, and technology names (5-10 keywords)
- Do NOT wrap JSON in markdown code fences\
"""
ENHANCEMENT_PROMPT = """\
You are a knowledge engineer analyzing how a troubleshooting session diverged from an existing flow.
Given the session transcript and the existing flow structure, identify what should be added or changed.
## OUTPUT FORMAT
Respond with ONLY valid JSON:
{
"title": "Enhancement: <what changed>",
"description": "Why this enhancement is needed",
"diff_description": "Human-readable summary of changes",
"new_nodes": [
{
"id": "new_node_id",
"type": "decision | action | solution",
"title": "Node title",
"question": "For decision nodes",
"description": "For action/solution nodes",
"options": [],
"attach_after_node_id": "existing node ID where this branches off",
"new_option_label": "Label for the new option on the parent node"
}
],
"modified_options": [
{
"node_id": "existing node ID",
"add_option": {"id": "new_opt", "label": "New option text", "next_node_id": "new_node_id"}
}
]
}
## RULES
- Only propose changes supported by the session evidence
- Minimize changes — add branches, don't restructure
- new_nodes should follow the same format as the existing flow
- Do NOT wrap JSON in markdown code fences\
"""
def _build_session_context(session: AISession) -> str:
"""Build a text summary of a session for the LLM prompt."""
parts = [
f"Problem: {session.problem_summary or 'Unknown'}",
f"Domain: {session.problem_domain or 'Unknown'}",
f"Confidence at resolution: {session.confidence_tier} ({session.confidence_score:.0%})",
f"Resolution: {session.resolution_summary or 'No summary'}",
]
if session.escalation_reason:
parts.append(f"Escalation reason: {session.escalation_reason}")
# Build step-by-step diagnostic trail
steps = sorted(session.steps, key=lambda s: s.step_order)
if steps:
parts.append("\n--- DIAGNOSTIC TRAIL ---")
for step in steps:
content = step.content or {}
step_desc = content.get("text", "")
step_type = content.get("type", step.step_type)
line = f"Step {step.step_order + 1} [{step_type}]: {step_desc}"
# Engineer response
if step.was_skipped:
line += "\n → Skipped"
elif step.selected_option:
# Find label from options
label = step.selected_option
if step.options_presented:
for opt in step.options_presented:
if opt.get("value") == step.selected_option:
label = opt.get("label", step.selected_option)
break
line += f"\n → Selected: {label}"
elif step.free_text_input:
line += f"\n → Free text: {step.free_text_input}"
if step.action_result:
result = step.action_result
outcome = "Succeeded" if result.get("success") else "Did not resolve"
if details := result.get("details"):
outcome += f"{details}"
line += f"\n → Result: {outcome}"
parts.append(line)
return "\n".join(parts)
def _has_free_text_escapes(session: AISession) -> bool:
"""Check if the session used free-text escapes (diverged from options)."""
return any(step.was_free_text for step in session.steps)
async def _check_daily_budget(account_id: UUID, db: AsyncSession) -> bool:
"""Check if the account has exceeded the daily proposal generation budget."""
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
result = await db.execute(
select(func.count(FlowProposal.id))
.where(
FlowProposal.account_id == account_id,
FlowProposal.created_at >= today_start,
FlowProposal.status != "auto_reinforced", # Don't count no-LLM proposals
)
)
count = result.scalar() or 0
return count < MAX_PROPOSALS_PER_DAY
async def _find_similar_pending_proposal(
title: str,
problem_domain: Optional[str],
account_id: UUID,
db: AsyncSession,
) -> Optional[FlowProposal]:
"""Find an existing pending proposal with similar title and domain.
Uses simple keyword overlap for now. Phase 4 will add embedding similarity.
"""
# Build domain filter — match NULL domain proposals if domain is NULL
domain_filter = (
FlowProposal.problem_domain == problem_domain
if problem_domain
else FlowProposal.problem_domain.is_(None)
)
result = await db.execute(
select(FlowProposal)
.where(
FlowProposal.account_id == account_id,
FlowProposal.status == "pending",
domain_filter,
)
.limit(20)
)
candidates = result.scalars().all()
if not candidates:
return None
# Simple keyword overlap check
title_words = set(title.lower().split())
for candidate in candidates:
candidate_words = set(candidate.title.lower().split())
if len(title_words) > 0 and len(candidate_words) > 0:
overlap = len(title_words & candidate_words) / max(len(title_words), len(candidate_words))
if overlap > 0.6:
return candidate
return None
async def analyze_session(session: AISession, db: AsyncSession) -> None:
"""Analyze a resolved session and create appropriate flow proposal.
Dispatches to one of three outcomes:
1. new_flow — novel resolution, no matching flow
2. enhancement — matched flow but diverged
3. auto_reinforced — followed existing flow closely
"""
# Re-fetch with eager-loaded steps to avoid async lazy-load errors
result = await db.execute(
select(AISession)
.where(AISession.id == session.id)
.options(selectinload(AISession.steps))
)
session = result.scalar_one()
# Determine which analysis path to take
has_match = session.matched_flow_id is not None
match_score = session.match_score or 0.0
has_divergence = _has_free_text_escapes(session)
if has_match and match_score > 0.8 and not has_divergence:
# Path 3: Auto-reinforcement
await _auto_reinforce(session, db)
elif has_match and match_score > 0.5 and has_divergence:
# Path 2: Enhancement proposal
await _propose_enhancement(session, db)
elif not has_match or match_score < 0.5:
# Path 1: New flow proposal
await _propose_new_flow(session, db)
else:
# Edge case: matched but moderate score, no divergence — reinforce
await _auto_reinforce(session, db)
async def _auto_reinforce(session: AISession, db: AsyncSession) -> None:
"""Update the matched flow's stats and create a tracking record."""
if session.matched_flow_id:
result = await db.execute(
select(Tree).where(Tree.id == session.matched_flow_id)
)
flow = result.scalar_one_or_none()
if flow:
# Update flow stats
current_rate = flow.success_rate or 0.0
# Simple moving average
flow.success_rate = round(current_rate * 0.9 + 1.0 * 0.1, 4)
flow.last_matched_at = datetime.now(timezone.utc)
# Create tracking record (no review needed)
proposal = FlowProposal(
id=uuid.uuid4(),
account_id=session.account_id,
team_id=session.team_id,
source_session_id=session.id,
proposal_type="auto_reinforced",
title=f"Reinforcement: {session.problem_summary or 'Session'}",
description="Session followed existing flow closely. No changes needed.",
proposed_flow_data={},
confidence_score=session.confidence_score,
supporting_session_ids=[str(session.id)],
problem_domain=session.problem_domain,
status="auto_reinforced",
target_flow_id=session.matched_flow_id,
)
db.add(proposal)
logger.info("Auto-reinforced flow %s from session %s", session.matched_flow_id, session.id)
async def _propose_new_flow(session: AISession, db: AsyncSession) -> None:
"""Generate a new flow proposal from a novel session."""
if not await _check_daily_budget(session.account_id, db):
logger.warning("Daily proposal budget exceeded for account %s", session.account_id)
return
session_context = _build_session_context(session)
try:
provider = get_ai_provider(settings.get_model_for_action("open_chat"))
raw_response, _, _ = await provider.generate_json(
system_prompt=FLOW_GENERATION_PROMPT,
messages=[{"role": "user", "content": session_context}],
max_tokens=4096,
)
parsed = _parse_llm_json(raw_response)
except Exception as e:
logger.warning("Knowledge Flywheel LLM call failed for session %s: %s", session.id, e)
return
title = parsed.get("title", session.problem_summary or "Untitled Flow")
domain = parsed.get("problem_domain", session.problem_domain)
# Check for similar pending proposals
existing = await _find_similar_pending_proposal(title, domain, session.account_id, db)
if existing:
# Merge into existing proposal
existing.supporting_session_count += 1
sids = existing.supporting_session_ids or []
sids.append(str(session.id))
existing.supporting_session_ids = sids
existing.confidence_score = min(1.0, existing.confidence_score + 0.1)
logger.info(
"Merged session %s into existing proposal %s (now %d supporting)",
session.id, existing.id, existing.supporting_session_count,
)
return
proposal = FlowProposal(
id=uuid.uuid4(),
account_id=session.account_id,
team_id=session.team_id,
source_session_id=session.id,
proposal_type="new_flow",
title=title,
description=parsed.get("description"),
proposed_flow_data={
"tree_structure": parsed.get("tree_structure", {}),
"match_keywords": parsed.get("match_keywords", []),
},
confidence_score=session.confidence_score,
supporting_session_ids=[str(session.id)],
problem_domain=domain,
status="pending",
)
db.add(proposal)
logger.info("Created new_flow proposal for session %s: %s", session.id, title)
async def _propose_enhancement(session: AISession, db: AsyncSession) -> None:
"""Generate an enhancement proposal for an existing flow."""
if not session.matched_flow_id:
# Fallback to new flow if no match
await _propose_new_flow(session, db)
return
if not await _check_daily_budget(session.account_id, db):
logger.warning("Daily proposal budget exceeded for account %s", session.account_id)
return
# Load the matched flow
result = await db.execute(
select(Tree).where(Tree.id == session.matched_flow_id)
)
matched_flow = result.scalar_one_or_none()
if not matched_flow:
await _propose_new_flow(session, db)
return
session_context = _build_session_context(session)
flow_json = json.dumps(matched_flow.tree_structure, indent=None)
if len(flow_json) > 4000:
flow_json = flow_json[:4000] + "... [truncated]"
prompt_content = (
f"## EXISTING FLOW\n"
f"Name: {matched_flow.name}\n"
f"Structure:\n{flow_json}\n\n"
f"## SESSION THAT DIVERGED\n"
f"{session_context}"
)
try:
provider = get_ai_provider(settings.get_model_for_action("open_chat"))
raw_response, _, _ = await provider.generate_json(
system_prompt=ENHANCEMENT_PROMPT,
messages=[{"role": "user", "content": prompt_content}],
max_tokens=4096,
)
parsed = _parse_llm_json(raw_response)
except Exception as e:
logger.warning("Knowledge Flywheel enhancement LLM call failed for session %s: %s", session.id, e)
return
title = parsed.get("title", f"Enhancement: {session.problem_summary or 'Flow update'}")
diff_description = parsed.get("diff_description", "Session diverged from existing flow")
proposal = FlowProposal(
id=uuid.uuid4(),
account_id=session.account_id,
team_id=session.team_id,
source_session_id=session.id,
proposal_type="enhancement",
target_flow_id=session.matched_flow_id,
title=title,
description=diff_description,
proposed_flow_data={
"new_nodes": parsed.get("new_nodes", []),
"modified_options": parsed.get("modified_options", []),
},
proposed_diff={
"diff_description": diff_description,
"new_nodes": parsed.get("new_nodes", []),
"modified_options": parsed.get("modified_options", []),
},
confidence_score=session.confidence_score,
supporting_session_ids=[str(session.id)],
problem_domain=session.problem_domain,
status="pending",
)
db.add(proposal)
logger.info(
"Created enhancement proposal for flow %s from session %s: %s",
session.matched_flow_id, session.id, title,
)
def _parse_llm_json(raw_text: str) -> dict[str, Any]:
"""Parse JSON from LLM response, handling common quirks."""
text = raw_text.strip()
# Strip markdown code fences if present
if text.startswith("```"):
lines = text.split("\n")
lines = [line for line in lines if not line.strip().startswith("```")]
text = "\n".join(lines).strip()
try:
return json.loads(text)
except json.JSONDecodeError as e:
logger.warning("Knowledge Flywheel JSON parse failed: %s — raw: %.300s", e, text)
raise ValueError(f"Invalid JSON from LLM: {e}") from e

View File

@@ -0,0 +1,72 @@
"""Background scheduler for Knowledge Flywheel analysis.
Runs every 5 minutes via APScheduler, picks up AISession entries
with analysis_status='pending' and runs flow proposal analysis.
Each session is committed individually to prevent a single failure
from rolling back all progress or causing duplicate proposals.
"""
import logging
from sqlalchemy import select
from app.core.database import async_session_maker
from app.models.ai_session import AISession
from app.services.knowledge_flywheel import analyze_session
logger = logging.getLogger(__name__)
async def process_pending_analyses() -> None:
"""Process resolved sessions awaiting Knowledge Flywheel analysis."""
async with async_session_maker() as db:
try:
result = await db.execute(
select(AISession.id)
.where(AISession.analysis_status == "pending")
.order_by(AISession.resolved_at.asc())
.limit(10)
)
session_ids = [row[0] for row in result.all()]
except Exception as e:
logger.error("Knowledge Flywheel scheduler query error: %s", e)
return
if not session_ids:
return
logger.info("Processing %d pending Knowledge Flywheel analyses", len(session_ids))
# Process each session in its own DB session to isolate failures
for session_id in session_ids:
async with async_session_maker() as db:
try:
result = await db.execute(
select(AISession).where(AISession.id == session_id)
)
session = result.scalar_one_or_none()
if not session or session.analysis_status != "pending":
continue
await analyze_session(session, db)
session.analysis_status = "completed"
await db.commit()
logger.info("Knowledge Flywheel completed for session %s", session_id)
except Exception as e:
await db.rollback()
logger.warning(
"Knowledge Flywheel failed for session %s: %s",
session_id, e,
)
# Mark as failed in a separate transaction
try:
async with async_session_maker() as db2:
result = await db2.execute(
select(AISession).where(AISession.id == session_id)
)
s = result.scalar_one_or_none()
if s:
s.analysis_status = "failed"
await db2.commit()
except Exception:
logger.error("Failed to mark session %s as failed", session_id)

View File

@@ -0,0 +1,334 @@
"""Knowledge Gap Detection Service.
Aggregates signals from AI sessions to identify gaps in the knowledge base.
Results are served by the analytics API and cached for 1 hour.
Signals:
1. Frequent free-text escapes — FlowPilot's options didn't cover a common scenario
2. High escalation rate by domain — domains where engineers can't self-resolve
3. Discovery-mode resolutions — novel problems solved without flow guidance
4. Repeated unmatched patterns — keyword-frequency based (Phase 4: embedding clustering)
"""
import logging
from collections import Counter
from datetime import datetime, timezone, timedelta
from typing import Any, Optional
from uuid import UUID
from pydantic import BaseModel
from sqlalchemy import select, func, case, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.ai_session import AISession
from app.models.ai_session_step import AISessionStep
from app.models.tree import Tree
logger = logging.getLogger(__name__)
# Cache for expensive gap analysis
_cache: dict[str, Any] = {}
_cache_expiry: dict[str, datetime] = {}
CACHE_TTL = timedelta(hours=1)
class KnowledgeGap(BaseModel):
gap_type: str # "weak_options" | "high_escalation" | "uncharted_territory" | "repeated_pattern"
domain: str | None = None
severity: str # "high" | "medium" | "low"
title: str
description: str
evidence: dict[str, Any] = {}
suggested_action: str
class KnowledgeGapReport(BaseModel):
generated_at: datetime
gaps: list[KnowledgeGap]
async def get_knowledge_gaps(
account_id: UUID,
db: AsyncSession,
period_days: int = 30,
) -> KnowledgeGapReport:
"""Generate a knowledge gap report for the account.
Results are cached for 1 hour per account.
"""
cache_key = f"gaps:{account_id}:{period_days}"
now = datetime.now(timezone.utc)
if cache_key in _cache and _cache_expiry.get(cache_key, now) > now:
return _cache[cache_key]
period_start = now - timedelta(days=period_days)
gaps: list[KnowledgeGap] = []
# Signal 1: Frequent free-text escapes
signal1 = await _detect_weak_options(account_id, period_start, db)
gaps.extend(signal1)
# Signal 2: High escalation rate by domain
signal2 = await _detect_high_escalation(account_id, period_start, db)
gaps.extend(signal2)
# Signal 3: Discovery-mode resolutions
signal3 = await _detect_uncharted_territory(account_id, period_start, db)
gaps.extend(signal3)
# Signal 4: Repeated unmatched patterns (keyword-based for Phase 3)
signal4 = await _detect_repeated_patterns(account_id, period_start, db)
gaps.extend(signal4)
# Sort by severity (high > medium > low)
severity_order = {"high": 0, "medium": 1, "low": 2}
gaps.sort(key=lambda g: severity_order.get(g.severity, 3))
report = KnowledgeGapReport(generated_at=now, gaps=gaps)
_cache[cache_key] = report
_cache_expiry[cache_key] = now + CACHE_TTL
return report
async def _detect_weak_options(
account_id: UUID,
period_start: datetime,
db: AsyncSession,
) -> list[KnowledgeGap]:
"""Signal 1: Find questions where engineers frequently use free-text escapes."""
# Count free-text usage per step context_message (the question asked)
result = await db.execute(
select(
AISessionStep.context_message,
func.count(AISessionStep.id).label("total"),
func.sum(case((AISessionStep.was_free_text.is_(True), 1), else_=0)).label("free_text_count"),
)
.join(AISession, AISessionStep.session_id == AISession.id)
.where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISessionStep.step_type == "question",
AISessionStep.context_message.isnot(None),
AISessionStep.responded_at.isnot(None),
)
.group_by(AISessionStep.context_message)
.having(func.count(AISessionStep.id) >= 3) # Minimum sample size
.order_by(func.sum(case((AISessionStep.was_free_text.is_(True), 1), else_=0)).desc())
.limit(5)
)
gaps = []
for row in result.all():
context_msg, total_raw, free_text_raw = row
total = int(total_raw or 0)
free_text_count = int(free_text_raw or 0)
if total == 0 or not free_text_count:
continue
rate = free_text_count / total
if rate < 0.3:
continue
severity = "high" if rate > 0.6 else "medium"
gaps.append(KnowledgeGap(
gap_type="weak_options",
severity=severity,
title=f"Weak options: {(context_msg or '')[:80]}",
description=(
f"Engineers used free-text input {free_text_count}/{total} times "
f"({rate:.0%}) when asked this question. The predefined options "
f"may not cover common scenarios."
),
evidence={
"context_message": context_msg,
"total_responses": total,
"free_text_count": free_text_count,
"free_text_rate": round(rate, 3),
},
suggested_action="Review the free-text responses and add common answers as options.",
))
return gaps
async def _detect_high_escalation(
account_id: UUID,
period_start: datetime,
db: AsyncSession,
) -> list[KnowledgeGap]:
"""Signal 2: Find domains with >40% escalation rate."""
result = await db.execute(
select(
AISession.problem_domain,
func.count(AISession.id).label("total"),
func.sum(case(
(AISession.status == "resolved", 1), else_=0
)).label("resolved"),
func.sum(case(
(AISession.status.in_(["escalated", "requesting_escalation"]), 1), else_=0
)).label("escalated"),
)
.where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.problem_domain.isnot(None),
AISession.status.in_(["resolved", "escalated", "requesting_escalation"]),
)
.group_by(AISession.problem_domain)
.having(func.count(AISession.id) >= 3) # Minimum sample
)
gaps = []
for row in result.all():
domain, total_raw, resolved_raw, escalated_raw = row
total = int(total_raw or 0)
resolved = int(resolved_raw or 0)
escalated = int(escalated_raw or 0)
if total == 0 or not escalated:
continue
escalation_rate = escalated / total
if escalation_rate < 0.4:
continue
severity = "high" if escalation_rate > 0.6 else "medium"
gaps.append(KnowledgeGap(
gap_type="high_escalation",
domain=domain,
severity=severity,
title=f"High escalation rate in {domain}",
description=(
f"{escalated}/{total} sessions ({escalation_rate:.0%}) in {domain} "
f"were escalated. Only {resolved} resolved independently."
),
evidence={
"domain": domain,
"total": total,
"resolved": resolved,
"escalated": escalated,
"escalation_rate": round(escalation_rate, 3),
},
suggested_action=f"Create or improve troubleshooting flows for {domain} issues.",
))
return gaps
async def _detect_uncharted_territory(
account_id: UUID,
period_start: datetime,
db: AsyncSession,
) -> list[KnowledgeGap]:
"""Signal 3: Find discovery-mode resolutions (novel problems solved without flows)."""
result = await db.execute(
select(
AISession.problem_domain,
func.count(AISession.id).label("count"),
)
.where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.status == "resolved",
AISession.confidence_tier == "discovery",
)
.group_by(AISession.problem_domain)
.having(func.count(AISession.id) >= 2)
.order_by(func.count(AISession.id).desc())
.limit(5)
)
gaps = []
for row in result.all():
domain, count = row
severity = "high" if count >= 5 else "medium" if count >= 3 else "low"
domain_label = domain or "unknown domain"
gaps.append(KnowledgeGap(
gap_type="uncharted_territory",
domain=domain,
severity=severity,
title=f"Novel resolutions in {domain_label}",
description=(
f"{count} sessions in {domain_label} were resolved in discovery mode "
f"(no matching flow, low confidence). These represent knowledge capture "
f"opportunities — check the Review Queue for auto-generated proposals."
),
evidence={
"domain": domain,
"discovery_resolution_count": count,
},
suggested_action="Review pending flow proposals or create flows from these session patterns.",
))
return gaps
async def _detect_repeated_patterns(
account_id: UUID,
period_start: datetime,
db: AsyncSession,
) -> list[KnowledgeGap]:
"""Signal 4: Find repeated unmatched intake patterns (keyword-frequency based).
Phase 3 uses keyword frequency on problem_summary. Phase 4 will use
embedding clustering for deeper semantic analysis.
"""
# Get problem summaries from unmatched sessions
result = await db.execute(
select(AISession.problem_summary, AISession.problem_domain)
.where(
AISession.account_id == account_id,
AISession.created_at >= period_start,
AISession.problem_summary.isnot(None),
AISession.matched_flow_id.is_(None),
)
.limit(200)
)
rows = result.all()
if len(rows) < 3:
return []
# Extract keywords from summaries and count frequency
word_counts: Counter[str] = Counter()
domain_for_word: dict[str, str | None] = {}
for summary, domain in rows:
if not summary:
continue
words = set(summary.lower().split())
# Filter out common stop words and short words
stop_words = {"the", "a", "an", "is", "are", "was", "were", "in", "on", "at",
"to", "for", "of", "and", "or", "not", "can", "can't", "with",
"from", "by", "this", "that", "it", "its", "has", "have", "had",
"user", "users", "issue", "error", "problem"}
keywords = {w for w in words if len(w) > 3 and w not in stop_words}
for kw in keywords:
word_counts[kw] += 1
if kw not in domain_for_word:
domain_for_word[kw] = domain
gaps = []
# Find keywords that appear in many unmatched sessions
for keyword, count in word_counts.most_common(3):
if count < 3:
continue
severity = "medium" if count >= 5 else "low"
domain = domain_for_word.get(keyword)
gaps.append(KnowledgeGap(
gap_type="repeated_pattern",
domain=domain,
severity=severity,
title=f"Recurring unmatched pattern: '{keyword}'",
description=(
f"The keyword '{keyword}' appeared in {count} sessions that had no "
f"matching flow. This may indicate a systematic knowledge gap."
),
evidence={
"keyword": keyword,
"unmatched_session_count": count,
"domain": domain,
},
suggested_action=f"Search for '{keyword}' in recent sessions and consider creating a flow.",
))
return gaps