Files
resolutionflow/backend/app/services/copilot_service.py
Michael Chihlas 9495b09e72 fix: copilot node context — use title/question/description over legacy content field
_build_flow_context() was reading node.get('content') which was only
present on old KB-imported steps. Now falls back through title →
question → description → content → label so all node types (decision,
action, solution, procedural step) show correctly in the copilot context.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-11 17:55:26 -04:00

211 lines
7.4 KiB
Python

"""Copilot service — in-session AI assistant with RAG context.
Builds system prompts with current flow context and RAG results,
manages conversation state, and returns AI responses with flow suggestions.
"""
import logging
from datetime import datetime, timezone, timedelta
from typing import Optional, Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.ai_provider import get_ai_provider
from app.models.tree import Tree
from app.models.copilot_conversation import CopilotConversation
from app.services.rag_service import search as rag_search, build_rag_context, extract_suggested_flows
logger = logging.getLogger(__name__)
COPILOT_SYSTEM_PROMPT = """You are a Senior Systems and Network Engineer with 15+ years of experience working in Managed Service Provider (MSP) environments. You specialize in:
- Windows Server, Active Directory, Group Policy, and Hybrid Identity (Entra ID)
- Networking (TCP/IP, DNS, DHCP, VPN, firewall troubleshooting, Cisco/Fortinet)
- Virtualization (VMware, Hyper-V) and cloud platforms (Azure, AWS, M365)
- Endpoint management, RMM tools, and PSA platforms (ConnectWise, Datto, Kaseya)
- PowerShell scripting and automation
You are acting as an in-session copilot while the user navigates a troubleshooting or procedural flow. You can see the flow context and their current position.
When answering:
- Be direct and actionable — MSP engineers need fast, practical answers
- Include specific commands, paths, and config values when relevant
- Mention potential risks or gotchas before suggesting changes
- If a relevant troubleshooting flow exists in the team's library, reference it
- Keep responses concise but thorough — prefer bullet points and code blocks
"""
def _build_flow_context(tree: Tree, current_node_id: Optional[str]) -> str:
"""Build flow context string for the system prompt."""
parts = [
f"\n--- CURRENT FLOW CONTEXT ---",
f"Flow: {tree.name}",
f"Type: {tree.tree_type}",
]
if tree.description:
parts.append(f"Description: {tree.description}")
if current_node_id and tree.tree_structure:
node = _find_node(tree.tree_structure, current_node_id)
if node:
parts.append(f"Current node type: {node.get('type', 'unknown')}")
node_label = (
node.get('title')
or node.get('question')
or node.get('description')
or node.get('content')
or node.get('label')
or 'Unknown'
)
parts.append(f"Current node: {node_label}")
# Add options if it's a question/decision node
children = node.get("children", [])
if children and isinstance(children, list):
option_labels = [
c.get("label", c.get("content", ""))
for c in children if isinstance(c, dict)
]
if option_labels:
parts.append(f"Available options: {', '.join(option_labels)}")
return "\n".join(parts)
def _find_node(structure: dict, node_id: str) -> Optional[dict]:
"""Recursively find a node by ID in tree structure."""
if structure.get("id") == node_id:
return structure
for child in structure.get("children", []):
if isinstance(child, dict):
found = _find_node(child, node_id)
if found:
return found
# Check steps array for procedural flows
for step in structure.get("steps", []):
if isinstance(step, dict):
found = _find_node(step, node_id)
if found:
return found
return None
async def start_conversation(
user_id: UUID,
account_id: UUID,
tree_id: UUID,
session_id: Optional[UUID],
current_node_id: Optional[str],
db: AsyncSession,
) -> tuple[CopilotConversation, str]:
"""Start a new copilot conversation.
Returns (conversation, greeting_message).
"""
# Load tree
result = await db.execute(
select(Tree).options(selectinload(Tree.tags)).where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise ValueError(f"Tree {tree_id} not found")
conversation = CopilotConversation(
user_id=user_id,
account_id=account_id,
tree_id=tree_id,
session_id=session_id,
current_node_id=current_node_id,
messages=[],
expires_at=datetime.now(timezone.utc) + timedelta(hours=24),
)
db.add(conversation)
await db.flush()
greeting = f"I'm your copilot for this **{tree.tree_type}** flow: **{tree.name}**. Ask me anything about the current step, alternative approaches, or related troubleshooting tips."
conversation.messages = [{"role": "assistant", "content": greeting}]
conversation.message_count = 1
return conversation, greeting
async def send_message(
conversation_id: UUID,
user_id: UUID,
message: str,
current_node_id: Optional[str],
db: AsyncSession,
) -> tuple[str, list[dict[str, Any]], CopilotConversation]:
"""Send a user message and get AI response.
Returns (ai_content, suggested_flows, conversation).
"""
result = await db.execute(
select(CopilotConversation).where(
CopilotConversation.id == conversation_id,
CopilotConversation.user_id == user_id,
)
)
conversation = result.scalar_one_or_none()
if not conversation:
raise ValueError("Conversation not found")
if conversation.expires_at < datetime.now(timezone.utc):
raise ValueError("Conversation has expired")
# Load tree for context
tree_result = await db.execute(
select(Tree).options(selectinload(Tree.tags)).where(Tree.id == conversation.tree_id)
)
tree = tree_result.scalar_one_or_none()
if not tree:
raise ValueError("Associated flow not found")
# Update current node
if current_node_id:
conversation.current_node_id = current_node_id
# RAG search
rag_results = await rag_search(
query=message,
account_id=conversation.account_id,
db=db,
limit=8,
)
# Build system prompt
system_prompt = COPILOT_SYSTEM_PROMPT
system_prompt += _build_flow_context(tree, conversation.current_node_id)
system_prompt += build_rag_context(rag_results)
# Build messages for AI
ai_messages = []
for msg in conversation.messages:
if msg["role"] in ("user", "assistant"):
ai_messages.append({"role": msg["role"], "content": msg["content"]})
ai_messages.append({"role": "user", "content": message})
# Call AI
provider = get_ai_provider()
ai_content, input_tokens, output_tokens = await provider.generate_text(
system_prompt=system_prompt,
messages=ai_messages,
max_tokens=2048,
)
# Update conversation
msgs = list(conversation.messages)
msgs.append({"role": "user", "content": message})
msgs.append({"role": "assistant", "content": ai_content})
conversation.messages = msgs
conversation.message_count += 2
conversation.total_input_tokens += input_tokens
conversation.total_output_tokens += output_tokens
# Extract suggested flows
suggested_flows = extract_suggested_flows(rag_results, exclude_tree_id=tree.id)
return ai_content, suggested_flows, conversation