_build_flow_context() was reading node.get('content') which was only
present on old KB-imported steps. Now falls back through title →
question → description → content → label so all node types (decision,
action, solution, procedural step) show correctly in the copilot context.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
211 lines
7.4 KiB
Python
211 lines
7.4 KiB
Python
"""Copilot service — in-session AI assistant with RAG context.
|
|
|
|
Builds system prompts with current flow context and RAG results,
|
|
manages conversation state, and returns AI responses with flow suggestions.
|
|
"""
|
|
import logging
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Optional, Any
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.core.ai_provider import get_ai_provider
|
|
from app.models.tree import Tree
|
|
from app.models.copilot_conversation import CopilotConversation
|
|
from app.services.rag_service import search as rag_search, build_rag_context, extract_suggested_flows
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
COPILOT_SYSTEM_PROMPT = """You are a Senior Systems and Network Engineer with 15+ years of experience working in Managed Service Provider (MSP) environments. You specialize in:
|
|
- Windows Server, Active Directory, Group Policy, and Hybrid Identity (Entra ID)
|
|
- Networking (TCP/IP, DNS, DHCP, VPN, firewall troubleshooting, Cisco/Fortinet)
|
|
- Virtualization (VMware, Hyper-V) and cloud platforms (Azure, AWS, M365)
|
|
- Endpoint management, RMM tools, and PSA platforms (ConnectWise, Datto, Kaseya)
|
|
- PowerShell scripting and automation
|
|
|
|
You are acting as an in-session copilot while the user navigates a troubleshooting or procedural flow. You can see the flow context and their current position.
|
|
|
|
When answering:
|
|
- Be direct and actionable — MSP engineers need fast, practical answers
|
|
- Include specific commands, paths, and config values when relevant
|
|
- Mention potential risks or gotchas before suggesting changes
|
|
- If a relevant troubleshooting flow exists in the team's library, reference it
|
|
- Keep responses concise but thorough — prefer bullet points and code blocks
|
|
"""
|
|
|
|
|
|
def _build_flow_context(tree: Tree, current_node_id: Optional[str]) -> str:
|
|
"""Build flow context string for the system prompt."""
|
|
parts = [
|
|
f"\n--- CURRENT FLOW CONTEXT ---",
|
|
f"Flow: {tree.name}",
|
|
f"Type: {tree.tree_type}",
|
|
]
|
|
if tree.description:
|
|
parts.append(f"Description: {tree.description}")
|
|
|
|
if current_node_id and tree.tree_structure:
|
|
node = _find_node(tree.tree_structure, current_node_id)
|
|
if node:
|
|
parts.append(f"Current node type: {node.get('type', 'unknown')}")
|
|
node_label = (
|
|
node.get('title')
|
|
or node.get('question')
|
|
or node.get('description')
|
|
or node.get('content')
|
|
or node.get('label')
|
|
or 'Unknown'
|
|
)
|
|
parts.append(f"Current node: {node_label}")
|
|
# Add options if it's a question/decision node
|
|
children = node.get("children", [])
|
|
if children and isinstance(children, list):
|
|
option_labels = [
|
|
c.get("label", c.get("content", ""))
|
|
for c in children if isinstance(c, dict)
|
|
]
|
|
if option_labels:
|
|
parts.append(f"Available options: {', '.join(option_labels)}")
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _find_node(structure: dict, node_id: str) -> Optional[dict]:
|
|
"""Recursively find a node by ID in tree structure."""
|
|
if structure.get("id") == node_id:
|
|
return structure
|
|
for child in structure.get("children", []):
|
|
if isinstance(child, dict):
|
|
found = _find_node(child, node_id)
|
|
if found:
|
|
return found
|
|
# Check steps array for procedural flows
|
|
for step in structure.get("steps", []):
|
|
if isinstance(step, dict):
|
|
found = _find_node(step, node_id)
|
|
if found:
|
|
return found
|
|
return None
|
|
|
|
|
|
async def start_conversation(
|
|
user_id: UUID,
|
|
account_id: UUID,
|
|
tree_id: UUID,
|
|
session_id: Optional[UUID],
|
|
current_node_id: Optional[str],
|
|
db: AsyncSession,
|
|
) -> tuple[CopilotConversation, str]:
|
|
"""Start a new copilot conversation.
|
|
|
|
Returns (conversation, greeting_message).
|
|
"""
|
|
# Load tree
|
|
result = await db.execute(
|
|
select(Tree).options(selectinload(Tree.tags)).where(Tree.id == tree_id)
|
|
)
|
|
tree = result.scalar_one_or_none()
|
|
if not tree:
|
|
raise ValueError(f"Tree {tree_id} not found")
|
|
|
|
conversation = CopilotConversation(
|
|
user_id=user_id,
|
|
account_id=account_id,
|
|
tree_id=tree_id,
|
|
session_id=session_id,
|
|
current_node_id=current_node_id,
|
|
messages=[],
|
|
expires_at=datetime.now(timezone.utc) + timedelta(hours=24),
|
|
)
|
|
db.add(conversation)
|
|
await db.flush()
|
|
|
|
greeting = f"I'm your copilot for this **{tree.tree_type}** flow: **{tree.name}**. Ask me anything about the current step, alternative approaches, or related troubleshooting tips."
|
|
|
|
conversation.messages = [{"role": "assistant", "content": greeting}]
|
|
conversation.message_count = 1
|
|
|
|
return conversation, greeting
|
|
|
|
|
|
async def send_message(
|
|
conversation_id: UUID,
|
|
user_id: UUID,
|
|
message: str,
|
|
current_node_id: Optional[str],
|
|
db: AsyncSession,
|
|
) -> tuple[str, list[dict[str, Any]], CopilotConversation]:
|
|
"""Send a user message and get AI response.
|
|
|
|
Returns (ai_content, suggested_flows, conversation).
|
|
"""
|
|
result = await db.execute(
|
|
select(CopilotConversation).where(
|
|
CopilotConversation.id == conversation_id,
|
|
CopilotConversation.user_id == user_id,
|
|
)
|
|
)
|
|
conversation = result.scalar_one_or_none()
|
|
if not conversation:
|
|
raise ValueError("Conversation not found")
|
|
|
|
if conversation.expires_at < datetime.now(timezone.utc):
|
|
raise ValueError("Conversation has expired")
|
|
|
|
# Load tree for context
|
|
tree_result = await db.execute(
|
|
select(Tree).options(selectinload(Tree.tags)).where(Tree.id == conversation.tree_id)
|
|
)
|
|
tree = tree_result.scalar_one_or_none()
|
|
if not tree:
|
|
raise ValueError("Associated flow not found")
|
|
|
|
# Update current node
|
|
if current_node_id:
|
|
conversation.current_node_id = current_node_id
|
|
|
|
# RAG search
|
|
rag_results = await rag_search(
|
|
query=message,
|
|
account_id=conversation.account_id,
|
|
db=db,
|
|
limit=8,
|
|
)
|
|
|
|
# Build system prompt
|
|
system_prompt = COPILOT_SYSTEM_PROMPT
|
|
system_prompt += _build_flow_context(tree, conversation.current_node_id)
|
|
system_prompt += build_rag_context(rag_results)
|
|
|
|
# Build messages for AI
|
|
ai_messages = []
|
|
for msg in conversation.messages:
|
|
if msg["role"] in ("user", "assistant"):
|
|
ai_messages.append({"role": msg["role"], "content": msg["content"]})
|
|
ai_messages.append({"role": "user", "content": message})
|
|
|
|
# Call AI
|
|
provider = get_ai_provider()
|
|
ai_content, input_tokens, output_tokens = await provider.generate_text(
|
|
system_prompt=system_prompt,
|
|
messages=ai_messages,
|
|
max_tokens=2048,
|
|
)
|
|
|
|
# Update conversation
|
|
msgs = list(conversation.messages)
|
|
msgs.append({"role": "user", "content": message})
|
|
msgs.append({"role": "assistant", "content": ai_content})
|
|
conversation.messages = msgs
|
|
conversation.message_count += 2
|
|
conversation.total_input_tokens += input_tokens
|
|
conversation.total_output_tokens += output_tokens
|
|
|
|
# Extract suggested flows
|
|
suggested_flows = extract_suggested_flows(rag_results, exclude_tree_id=tree.id)
|
|
|
|
return ai_content, suggested_flows, conversation
|