"""Knowledge Flywheel — post-session analysis engine. Analyzes resolved AI sessions and generates flow proposals: - new_flow: Novel resolution path → propose a new troubleshooting flow - enhancement: Diverged from a matched flow → propose additions - auto_reinforced: Followed a flow exactly → update flow stats Called by the knowledge_flywheel_scheduler (APScheduler) after sessions resolve. """ import json import logging import uuid from datetime import datetime, timezone from typing import Any, Optional from uuid import UUID from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.core.ai_provider import get_ai_provider from app.core.config import settings from app.services.notification_service import notify from app.models.ai_session import AISession from app.models.ai_session_step import AISessionStep from app.models.flow_proposal import FlowProposal from app.models.tree import Tree logger = logging.getLogger(__name__) # Daily budget cap for proposal generation LLM calls per account MAX_PROPOSALS_PER_DAY = 50 FLOW_GENERATION_PROMPT = """\ You are a knowledge engineer converting a troubleshooting session into a reusable flow definition. Given the session transcript below, generate a JSON flow definition that captures the diagnostic logic so other engineers can follow the same path. ## OUTPUT FORMAT Respond with ONLY valid JSON: { "title": "Short descriptive title (5-10 words)", "description": "When to use this flow (1-2 sentences)", "match_keywords": ["keyword1", "keyword2", ...], "problem_domain": "active_directory | networking | m365 | hardware | endpoint | virtualization | security | backup | email | printing | cloud | other", "tree_structure": { "id": "root", "type": "decision", "question": "First diagnostic question", "help_text": "Context for the engineer", "options": [ {"id": "opt1", "label": "Option text", "next_node_id": "node_id"} ], "children": [ { "id": "node_id", "type": "decision | action | solution", "title": "Node title", "question": "For decision nodes", "description": "For action/solution nodes", "options": [], "next_node_id": "next_id or null for terminal nodes" } ] } } ## RULES - tree_structure uses a flat children array with id-based references via next_node_id - The root node has type "decision" with a question and options - Decision nodes have options with next_node_id pointing to child nodes - Action nodes describe what the engineer should do with a description field - Solution nodes describe the resolution (terminal — no next_node_id) - Every decision node must have 2-5 options - Include the key diagnostic questions that narrowed down the problem - Skip redundant or dead-end paths from the session - match_keywords should be symptoms, error messages, and technology names (5-10 keywords) - Do NOT wrap JSON in markdown code fences\ """ ENHANCEMENT_PROMPT = """\ You are a knowledge engineer analyzing how a troubleshooting session diverged from an existing flow. Given the session transcript and the existing flow structure, identify what should be added or changed. ## OUTPUT FORMAT Respond with ONLY valid JSON: { "title": "Enhancement: ", "description": "Why this enhancement is needed", "diff_description": "Human-readable summary of changes", "new_nodes": [ { "id": "new_node_id", "type": "decision | action | solution", "title": "Node title", "question": "For decision nodes", "description": "For action/solution nodes", "options": [], "attach_after_node_id": "existing node ID where this branches off", "new_option_label": "Label for the new option on the parent node" } ], "modified_options": [ { "node_id": "existing node ID", "add_option": {"id": "new_opt", "label": "New option text", "next_node_id": "new_node_id"} } ] } ## RULES - Only propose changes supported by the session evidence - Minimize changes — add branches, don't restructure - new_nodes should follow the same format as the existing flow - Do NOT wrap JSON in markdown code fences\ """ def _build_session_context(session: AISession) -> str: """Build a text summary of a session for the LLM prompt.""" parts = [ f"Problem: {session.problem_summary or 'Unknown'}", f"Domain: {session.problem_domain or 'Unknown'}", f"Confidence at resolution: {session.confidence_tier} ({session.confidence_score:.0%})", f"Resolution: {session.resolution_summary or 'No summary'}", ] if session.escalation_reason: parts.append(f"Escalation reason: {session.escalation_reason}") # Build step-by-step diagnostic trail steps = sorted(session.steps, key=lambda s: s.step_order) if steps: parts.append("\n--- DIAGNOSTIC TRAIL ---") for step in steps: content = step.content or {} step_desc = content.get("text", "") step_type = content.get("type", step.step_type) line = f"Step {step.step_order + 1} [{step_type}]: {step_desc}" # Engineer response if step.was_skipped: line += "\n → Skipped" elif step.selected_option: # Find label from options label = step.selected_option if step.options_presented: for opt in step.options_presented: if opt.get("value") == step.selected_option: label = opt.get("label", step.selected_option) break line += f"\n → Selected: {label}" elif step.free_text_input: line += f"\n → Free text: {step.free_text_input}" if step.action_result: result = step.action_result outcome = "Succeeded" if result.get("success") else "Did not resolve" if details := result.get("details"): outcome += f" — {details}" line += f"\n → Result: {outcome}" parts.append(line) return "\n".join(parts) def _has_free_text_escapes(session: AISession) -> bool: """Check if the session used free-text escapes (diverged from options).""" return any(step.was_free_text for step in session.steps) async def _check_daily_budget(account_id: UUID, db: AsyncSession) -> bool: """Check if the account has exceeded the daily proposal generation budget.""" today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) result = await db.execute( select(func.count(FlowProposal.id)) .where( FlowProposal.account_id == account_id, FlowProposal.created_at >= today_start, FlowProposal.status != "auto_reinforced", # Don't count no-LLM proposals ) ) count = result.scalar() or 0 return count < MAX_PROPOSALS_PER_DAY async def _find_similar_pending_proposal( title: str, problem_domain: Optional[str], account_id: UUID, db: AsyncSession, ) -> Optional[FlowProposal]: """Find an existing pending proposal with similar title and domain. Uses simple keyword overlap for now. Phase 4 will add embedding similarity. """ # Build domain filter — match NULL domain proposals if domain is NULL domain_filter = ( FlowProposal.problem_domain == problem_domain if problem_domain else FlowProposal.problem_domain.is_(None) ) result = await db.execute( select(FlowProposal) .where( FlowProposal.account_id == account_id, FlowProposal.status == "pending", domain_filter, ) .limit(20) ) candidates = result.scalars().all() if not candidates: return None # Simple keyword overlap check title_words = set(title.lower().split()) for candidate in candidates: candidate_words = set(candidate.title.lower().split()) if len(title_words) > 0 and len(candidate_words) > 0: overlap = len(title_words & candidate_words) / max(len(title_words), len(candidate_words)) if overlap > 0.6: return candidate return None async def analyze_session(session: AISession, db: AsyncSession) -> None: """Analyze a resolved session and create appropriate flow proposal. Dispatches to one of three outcomes: 1. new_flow — novel resolution, no matching flow 2. enhancement — matched flow but diverged 3. auto_reinforced — followed existing flow closely """ # Re-fetch with eager-loaded steps to avoid async lazy-load errors result = await db.execute( select(AISession) .where(AISession.id == session.id) .options(selectinload(AISession.steps)) ) session = result.scalar_one() # Determine which analysis path to take has_match = session.matched_flow_id is not None match_score = session.match_score or 0.0 has_divergence = _has_free_text_escapes(session) if has_match and match_score > 0.8 and not has_divergence: # Path 3: Auto-reinforcement await _auto_reinforce(session, db) elif has_match and match_score > 0.5 and has_divergence: # Path 2: Enhancement proposal await _propose_enhancement(session, db) elif not has_match or match_score < 0.5: # Path 1: New flow proposal await _propose_new_flow(session, db) else: # Edge case: matched but moderate score, no divergence — reinforce await _auto_reinforce(session, db) async def _auto_reinforce(session: AISession, db: AsyncSession) -> None: """Update the matched flow's stats and create a tracking record.""" if session.matched_flow_id: result = await db.execute( select(Tree).where(Tree.id == session.matched_flow_id) ) flow = result.scalar_one_or_none() if flow: # Update flow stats current_rate = flow.success_rate or 0.0 # Simple moving average flow.success_rate = round(current_rate * 0.9 + 1.0 * 0.1, 4) flow.last_matched_at = datetime.now(timezone.utc) # Create tracking record (no review needed) proposal = FlowProposal( id=uuid.uuid4(), account_id=session.account_id, team_id=session.team_id, source_session_id=session.id, proposal_type="auto_reinforced", title=f"Reinforcement: {session.problem_summary or 'Session'}", description="Session followed existing flow closely. No changes needed.", proposed_flow_data={}, confidence_score=session.confidence_score, supporting_session_ids=[str(session.id)], problem_domain=session.problem_domain, status="auto_reinforced", target_flow_id=session.matched_flow_id, ) db.add(proposal) # auto_reinforced proposals don't need review — no notification logger.info("Auto-reinforced flow %s from session %s", session.matched_flow_id, session.id) async def _propose_new_flow(session: AISession, db: AsyncSession) -> None: """Generate a new flow proposal from a novel session.""" if not await _check_daily_budget(session.account_id, db): logger.warning("Daily proposal budget exceeded for account %s", session.account_id) return session_context = _build_session_context(session) try: provider = get_ai_provider(settings.get_model_for_action("open_chat")) raw_response, _, _ = await provider.generate_json( system_prompt=FLOW_GENERATION_PROMPT, messages=[{"role": "user", "content": session_context}], max_tokens=4096, ) parsed = _parse_llm_json(raw_response) except Exception as e: logger.warning("Knowledge Flywheel LLM call failed for session %s: %s", session.id, e) return title = parsed.get("title", session.problem_summary or "Untitled Flow") domain = parsed.get("problem_domain", session.problem_domain) # Check for similar pending proposals existing = await _find_similar_pending_proposal(title, domain, session.account_id, db) if existing: # Merge into existing proposal existing.supporting_session_count += 1 sids = existing.supporting_session_ids or [] sids.append(str(session.id)) existing.supporting_session_ids = sids existing.confidence_score = min(1.0, existing.confidence_score + 0.1) logger.info( "Merged session %s into existing proposal %s (now %d supporting)", session.id, existing.id, existing.supporting_session_count, ) return proposal = FlowProposal( id=uuid.uuid4(), account_id=session.account_id, team_id=session.team_id, source_session_id=session.id, proposal_type="new_flow", title=title, description=parsed.get("description"), proposed_flow_data={ "tree_structure": parsed.get("tree_structure", {}), "match_keywords": parsed.get("match_keywords", []), }, confidence_score=session.confidence_score, supporting_session_ids=[str(session.id)], problem_domain=domain, status="pending", ) db.add(proposal) await notify("proposal.pending", proposal.account_id, { "title": proposal.title, "proposal_type": proposal.proposal_type, "problem_domain": proposal.problem_domain or "General", "link": "/review-queue", }, db) logger.info("Created new_flow proposal for session %s: %s", session.id, title) async def _propose_enhancement(session: AISession, db: AsyncSession) -> None: """Generate an enhancement proposal for an existing flow.""" if not session.matched_flow_id: # Fallback to new flow if no match await _propose_new_flow(session, db) return if not await _check_daily_budget(session.account_id, db): logger.warning("Daily proposal budget exceeded for account %s", session.account_id) return # Load the matched flow result = await db.execute( select(Tree).where(Tree.id == session.matched_flow_id) ) matched_flow = result.scalar_one_or_none() if not matched_flow: await _propose_new_flow(session, db) return session_context = _build_session_context(session) flow_json = json.dumps(matched_flow.tree_structure, indent=None) if len(flow_json) > 4000: flow_json = flow_json[:4000] + "... [truncated]" prompt_content = ( f"## EXISTING FLOW\n" f"Name: {matched_flow.name}\n" f"Structure:\n{flow_json}\n\n" f"## SESSION THAT DIVERGED\n" f"{session_context}" ) try: provider = get_ai_provider(settings.get_model_for_action("open_chat")) raw_response, _, _ = await provider.generate_json( system_prompt=ENHANCEMENT_PROMPT, messages=[{"role": "user", "content": prompt_content}], max_tokens=4096, ) parsed = _parse_llm_json(raw_response) except Exception as e: logger.warning("Knowledge Flywheel enhancement LLM call failed for session %s: %s", session.id, e) return title = parsed.get("title", f"Enhancement: {session.problem_summary or 'Flow update'}") diff_description = parsed.get("diff_description", "Session diverged from existing flow") proposal = FlowProposal( id=uuid.uuid4(), account_id=session.account_id, team_id=session.team_id, source_session_id=session.id, proposal_type="enhancement", target_flow_id=session.matched_flow_id, title=title, description=diff_description, proposed_flow_data={ "new_nodes": parsed.get("new_nodes", []), "modified_options": parsed.get("modified_options", []), }, proposed_diff={ "diff_description": diff_description, "new_nodes": parsed.get("new_nodes", []), "modified_options": parsed.get("modified_options", []), }, confidence_score=session.confidence_score, supporting_session_ids=[str(session.id)], problem_domain=session.problem_domain, status="pending", ) db.add(proposal) await notify("proposal.pending", proposal.account_id, { "title": proposal.title, "proposal_type": proposal.proposal_type, "problem_domain": proposal.problem_domain or "General", "link": "/review-queue", }, db) logger.info( "Created enhancement proposal for flow %s from session %s: %s", session.matched_flow_id, session.id, title, ) def _parse_llm_json(raw_text: str) -> dict[str, Any]: """Parse JSON from LLM response, handling common quirks.""" text = raw_text.strip() # Strip markdown code fences if present if text.startswith("```"): lines = text.split("\n") lines = [line for line in lines if not line.strip().startswith("```")] text = "\n".join(lines).strip() try: return json.loads(text) except json.JSONDecodeError as e: logger.warning("Knowledge Flywheel JSON parse failed: %s — raw: %.300s", e, text) raise ValueError(f"Invalid JSON from LLM: {e}") from e