Files
resolutionflow/backend/app/services/assistant_chat_service.py
Michael Chihlas e4c5948fbd feat: add Microsoft Learn MCP integration + refine assistant system prompt
- Integrate Microsoft Learn MCP server via Anthropic's MCP connector
  for real-time documentation lookups (docs search, fetch, code samples)
- Refine system prompt: clear persona, structured answer guidelines,
  when to use RAG flows vs Microsoft Learn, guardrails against fabrication
- Add ENABLE_MCP_MICROSOFT_LEARN config toggle (default: True)
- Fix bugs from prior edit: wrong MCP URL, broken indentation, undefined
  usage/token variables, NOT_GIVEN for disabled MCP params
- Log MCP tool usage and cache performance

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-05 19:13:34 -05:00

308 lines
10 KiB
Python

"""Standalone AI assistant chat service with RAG context.
Provides persistent conversation history for general IT questions
with semantic search over the team's flow library.
Uses Anthropic prompt caching to reduce cost on multi-turn conversations:
- The static system prompt is cached (ephemeral, 5-min TTL)
- The conversation history prefix is cached via a breakpoint on the
last existing message before the new user input
Optionally connects to Microsoft Learn via Anthropic's MCP connector
for real-time documentation lookups (controlled by ENABLE_MCP_MICROSOFT_LEARN).
"""
import logging
from typing import Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.models.assistant_chat import AssistantChat
from app.services.rag_service import search as rag_search, build_rag_context, extract_suggested_flows
logger = logging.getLogger(__name__)
ASSISTANT_SYSTEM_PROMPT = """\
You are ResolutionFlow Assistant — an expert IT systems engineer embedded in a \
troubleshooting platform built for Managed Service Provider (MSP) teams.
## Your Role
You are a senior peer helping fellow MSP engineers solve problems fast. You have \
deep expertise across the MSP technology stack:
- Windows Server, Active Directory, Group Policy, Hybrid Identity (Entra ID / Azure AD)
- Networking: TCP/IP, DNS, DHCP, VPN, firewalls (Cisco, Fortinet, Meraki, SonicWall)
- Virtualization: VMware vSphere, Hyper-V, Proxmox
- Cloud platforms: Microsoft 365, Azure, AWS
- Endpoint management, RMM tools, and PSA platforms (ConnectWise, Datto, Kaseya, NinjaRMM)
- PowerShell scripting and automation
- Security: MFA, Conditional Access, EDR, backup/DR
## How to Answer
- **Be direct and actionable.** Engineers are mid-ticket — give them the answer, \
not a lecture. Lead with the fix, then explain why.
- **Include specifics.** Exact commands, registry paths, config values, port numbers. \
Vague advice wastes time.
- **Warn before you wreck.** If a step could cause downtime, data loss, or a lockout, \
say so upfront — before the command.
- **Use structured formatting.** Bullet points for steps, code blocks for commands, \
bold for key terms. Engineers scan, they don't read essays.
- **Say when you're unsure.** If you don't know the exact answer, say so. Suggest \
where to verify (vendor docs, a specific KB article) rather than guessing.
## Using the Team's Flow Library
Your team has built troubleshooting flows in ResolutionFlow. When relevant flows \
appear in the context below, reference them by name so the engineer can launch them \
directly. Prefer the team's proven flows over ad-hoc instructions when they exist.
## Using Microsoft Learn Documentation
You have access to Microsoft's official documentation via Microsoft Learn. Use it when:
- The question involves exact cmdlet syntax, API parameters, or configuration steps
- You need to verify current Microsoft/Azure behavior or requirements
- No team flow covers the topic and vendor-specific detail would help
Do NOT use Microsoft Learn for every question — only when official docs add real value.
## Boundaries
- Stay focused on IT infrastructure, systems administration, and MSP operations.
- If a question is clearly outside your domain, say so briefly and redirect.
- Never fabricate error codes, KB article numbers, or CLI flags. If unsure, say so.
"""
async def _call_ai(
system_base: str,
rag_context: str,
history: list[dict[str, Any]],
new_message: str,
max_tokens: int = 4096,
) -> tuple[str, int, int]:
"""Call the AI with prompt caching when using Anthropic.
Caching strategy:
- System prompt base: cached (stable across all turns)
- RAG context: NOT cached (changes per query)
- Conversation history prefix: cached via breakpoint on last
existing message (stable — only new user message is uncached)
"""
if settings.AI_PROVIDER == "anthropic" and settings.ANTHROPIC_API_KEY:
return await _call_anthropic_cached(
system_base, rag_context, history, new_message, max_tokens
)
# Fallback: generic provider (Gemini, etc.)
from app.core.ai_provider import get_ai_provider
system_prompt = system_base + rag_context
messages = history + [{"role": "user", "content": new_message}]
provider = get_ai_provider()
return await provider.generate_text(
system_prompt=system_prompt,
messages=messages,
max_tokens=max_tokens,
)
async def _call_anthropic_cached(
system_base: str,
rag_context: str,
history: list[dict[str, Any]],
new_message: str,
max_tokens: int,
) -> tuple[str, int, int]:
"""Call Anthropic with prompt caching on system prompt and history.
Uses structured system blocks so the static base prompt is cached
independently from the per-query RAG context. Optionally connects
to Microsoft Learn via MCP for real-time documentation lookups.
"""
import anthropic
client = anthropic.AsyncAnthropic(
api_key=settings.ANTHROPIC_API_KEY,
timeout=settings.AI_REQUEST_TIMEOUT_SECONDS,
)
# System prompt as structured blocks:
# Block 1: static base prompt (cached)
# Block 2: RAG context (changes per query, not cached)
system_blocks: list[dict[str, Any]] = [
{
"type": "text",
"text": system_base,
"cache_control": {"type": "ephemeral"},
},
]
if rag_context:
system_blocks.append({"type": "text", "text": rag_context})
# Build messages with cache breakpoint on conversation history
messages: list[dict[str, Any]] = []
for msg in history:
messages.append({"role": msg["role"], "content": msg["content"]})
# Place cache breakpoint on the last history message so the entire
# conversation prefix is cached across turns
if messages:
last = messages[-1]
messages[-1] = {
"role": last["role"],
"content": [
{
"type": "text",
"text": last["content"],
"cache_control": {"type": "ephemeral"},
}
],
}
# Add the new user message (uncached — it's new each turn)
messages.append({"role": "user", "content": new_message})
# MCP server config (optional — controlled by settings)
mcp_servers = anthropic.NOT_GIVEN
tools = anthropic.NOT_GIVEN
if settings.ENABLE_MCP_MICROSOFT_LEARN:
mcp_servers = [
{
"type": "url",
"url": "https://learn.microsoft.com/api/mcp",
"name": "microsoft-learn",
}
]
tools = [
{
"type": "mcp_toolset",
"mcp_server_name": "microsoft-learn",
}
]
response = await client.beta.messages.create(
model=settings.AI_MODEL_ANTHROPIC,
max_tokens=max_tokens,
system=system_blocks,
messages=messages,
mcp_servers=mcp_servers,
tools=tools,
betas=["mcp-client-2025-11-20"],
)
# Extract text from response — MCP responses can have multiple block
# types (text, mcp_tool_use, mcp_tool_result). We join all text blocks.
text_parts = []
mcp_tools_used = []
for block in response.content:
if hasattr(block, "text"):
text_parts.append(block.text)
if getattr(block, "type", None) == "mcp_tool_use":
mcp_tools_used.append(getattr(block, "name", "unknown"))
text = "\n".join(text_parts) if text_parts else ""
usage = response.usage
input_tokens = usage.input_tokens
output_tokens = usage.output_tokens
# Log MCP tool usage
if mcp_tools_used:
logger.info("MCP tools used: %s", ", ".join(mcp_tools_used))
# Log cache performance
cache_read = getattr(usage, "cache_read_input_tokens", 0) or 0
cache_creation = getattr(usage, "cache_creation_input_tokens", 0) or 0
if cache_read or cache_creation:
logger.info(
"Anthropic cache: read=%d creation=%d input=%d output=%d",
cache_read, cache_creation, input_tokens, output_tokens,
)
return text, input_tokens, output_tokens
def _auto_title(message: str) -> str:
"""Generate a short title from the first user message."""
title = message.strip()[:100]
if len(message) > 100:
title = title.rsplit(" ", 1)[0] + "..."
return title
async def create_chat(
user_id: UUID,
account_id: UUID,
db: AsyncSession,
) -> AssistantChat:
"""Create a new empty chat."""
chat = AssistantChat(
user_id=user_id,
account_id=account_id,
messages=[],
)
db.add(chat)
await db.flush()
return chat
async def send_message(
chat_id: UUID,
user_id: UUID,
account_id: UUID,
message: str,
db: AsyncSession,
) -> tuple[str, list[dict[str, Any]], AssistantChat]:
"""Send a user message and get AI response.
Returns (ai_content, suggested_flows, chat).
"""
result = await db.execute(
select(AssistantChat).where(
AssistantChat.id == chat_id,
AssistantChat.user_id == user_id,
)
)
chat = result.scalar_one_or_none()
if not chat:
raise ValueError("Chat not found")
# Auto-title from first message
if chat.message_count == 0:
chat.title = _auto_title(message)
# RAG search
rag_results = await rag_search(
query=message,
account_id=account_id,
db=db,
limit=8,
)
rag_context = build_rag_context(rag_results)
# Build messages for AI
ai_messages: list[dict[str, Any]] = []
for msg in chat.messages:
if msg["role"] in ("user", "assistant"):
ai_messages.append({"role": msg["role"], "content": msg["content"]})
# Call AI with prompt caching (Anthropic) or generic provider
ai_content, input_tokens, output_tokens = await _call_ai(
system_base=ASSISTANT_SYSTEM_PROMPT,
rag_context=rag_context,
history=ai_messages,
new_message=message,
)
# Update chat
msgs = list(chat.messages)
msgs.append({"role": "user", "content": message})
msgs.append({"role": "assistant", "content": ai_content})
chat.messages = msgs
chat.message_count += 2
chat.total_input_tokens += input_tokens
chat.total_output_tokens += output_tokens
suggested_flows = extract_suggested_flows(rag_results)
return ai_content, suggested_flows, chat