Files
resolutionflow/backend/app/services/assistant_chat_service.py
chihlasm df46046c86 fix: resolve MissingGreenlet crash and add MCP fallback in AI Assistant
Capture user_id/account_id before try block so error handler survives
db.rollback() without triggering lazy loads in async context. Add
retry-without-MCP fallback when Anthropic MCP server returns rate limit
or connection errors.

Fixes PYTHON-FASTAPI-3, PYTHON-FASTAPI-4

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 15:43:22 -04:00

444 lines
15 KiB
Python

"""Standalone AI assistant chat service with RAG context.
Provides persistent conversation history for general IT questions
with semantic search over the team's flow library.
Uses Anthropic prompt caching to reduce cost on multi-turn conversations:
- The static system prompt is cached (ephemeral, 5-min TTL)
- The conversation history prefix is cached via a breakpoint on the
last existing message before the new user input
Optionally connects to Microsoft Learn via Anthropic's MCP connector
for real-time documentation lookups (controlled by ENABLE_MCP_MICROSOFT_LEARN).
"""
import logging
from typing import Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.models.assistant_chat import AssistantChat
from app.services.rag_service import search as rag_search, build_rag_context, extract_suggested_flows
logger = logging.getLogger(__name__)
ASSISTANT_SYSTEM_PROMPT = """\
You are ResolutionFlow Assistant — an expert IT systems engineer embedded in a \
troubleshooting platform built for Managed Service Provider (MSP) teams.
## Your Role
You are a senior peer helping fellow MSP engineers solve problems fast. You have \
deep expertise across the MSP technology stack:
- Windows Server, Active Directory, Group Policy, Hybrid Identity (Entra ID / Azure AD)
- Networking: TCP/IP, DNS, DHCP, VPN, firewalls (Cisco, Fortinet, Meraki, SonicWall)
- Virtualization: VMware vSphere, Hyper-V, Proxmox
- Cloud platforms: Microsoft 365, Azure, AWS
- Endpoint management, RMM tools, and PSA platforms (ConnectWise, Datto, Kaseya, NinjaRMM)
- PowerShell scripting and automation
- Security: MFA, Conditional Access, EDR, backup/DR
## How to Answer
- **Be direct and actionable.** Engineers are mid-ticket — lead with the fix or next \
diagnostic step, then explain why in one sentence if helpful. Skip background unless asked.
- **Include specifics.** Exact commands, registry paths, config values, port numbers. \
Vague advice wastes time.
- **Warn before you wreck.** If a step could cause downtime, data loss, or a lockout, \
say so upfront — before the command.
- **Use structured formatting.** Bullet points for steps, code blocks for commands, \
bold for key terms. Engineers scan, they don't read essays.
- **Say when you're unsure.** If you don't know the exact answer, say so. Suggest \
where to verify (vendor docs, a specific KB article) rather than guessing.
## How to Ask Questions
- **Default to a single focused question.** Ask what you need to know right now to make progress.
- **Use contextual bullets sparingly.** If the question could be ambiguous (e.g., "what error?" \
when there are multiple common patterns), add 2-3 sub-bullets to help the engineer recognize \
what you're asking for — but keep it short.
- **Multiple questions only when blocking.** If you genuinely cannot proceed without knowing \
two things (e.g., both the error message AND which users are affected), preface it clearly: \
"Before continuing troubleshooting, I need to know: 1) [question], 2) [question]." Use this rarely.
- **Avoid interrogation mode.** Don't fire off 5 questions in a row. Get one answer, make \
progress, then ask the next question if needed.
## Using the Team's Flow Library
Your team has built troubleshooting flows in ResolutionFlow. When relevant flows \
appear in the context below, reference them by name so the engineer can launch them \
directly. Prefer the team's proven flows over ad-hoc instructions when they exist.
## Using Microsoft Learn Documentation
You have access to Microsoft's official documentation via Microsoft Learn. Use it when:
- The question involves exact cmdlet syntax, API parameters, or configuration steps
- You need to verify current Microsoft/Azure behavior or requirements
- No team flow covers the topic and vendor-specific detail would help
Do NOT use Microsoft Learn for every question — only when official docs add real value.
## Boundaries
- Stay focused on IT infrastructure, systems administration, and MSP operations.
- If a question is clearly outside your domain, say so briefly and redirect.
- Never fabricate error codes, KB article numbers, or CLI flags. If unsure, say so.
"""
async def _call_ai(
system_base: str,
rag_context: str,
history: list[dict[str, Any]],
new_message: str,
max_tokens: int = 4096,
) -> tuple[str, int, int]:
"""Call the AI with prompt caching when using Anthropic.
Caching strategy:
- System prompt base: cached (stable across all turns)
- RAG context: NOT cached (changes per query)
- Conversation history prefix: cached via breakpoint on last
existing message (stable — only new user message is uncached)
"""
if settings.AI_PROVIDER == "anthropic" and settings.ANTHROPIC_API_KEY:
return await _call_anthropic_cached(
system_base, rag_context, history, new_message, max_tokens
)
# Fallback: generic provider (Gemini, etc.)
from app.core.ai_provider import get_ai_provider
system_prompt = system_base + rag_context
messages = history + [{"role": "user", "content": new_message}]
provider = get_ai_provider()
return await provider.generate_text(
system_prompt=system_prompt,
messages=messages,
max_tokens=max_tokens,
)
async def _call_anthropic_cached(
system_base: str,
rag_context: str,
history: list[dict[str, Any]],
new_message: str,
max_tokens: int,
) -> tuple[str, int, int]:
"""Call Anthropic with prompt caching on system prompt and history.
Uses structured system blocks so the static base prompt is cached
independently from the per-query RAG context. Optionally connects
to Microsoft Learn via MCP for real-time documentation lookups.
"""
import anthropic
client = anthropic.AsyncAnthropic(
api_key=settings.ANTHROPIC_API_KEY,
timeout=settings.AI_REQUEST_TIMEOUT_SECONDS,
)
# System prompt as structured blocks:
# Block 1: static base prompt (cached)
# Block 2: RAG context (changes per query, not cached)
system_blocks: list[dict[str, Any]] = [
{
"type": "text",
"text": system_base,
"cache_control": {"type": "ephemeral"},
},
]
if rag_context:
system_blocks.append({"type": "text", "text": rag_context})
# Build messages with cache breakpoint on conversation history
messages: list[dict[str, Any]] = []
for msg in history:
messages.append({"role": msg["role"], "content": msg["content"]})
# Place cache breakpoint on the last history message so the entire
# conversation prefix is cached across turns
if messages:
last = messages[-1]
messages[-1] = {
"role": last["role"],
"content": [
{
"type": "text",
"text": last["content"],
"cache_control": {"type": "ephemeral"},
}
],
}
# Add the new user message (uncached — it's new each turn)
messages.append({"role": "user", "content": new_message})
# MCP server config (optional — controlled by settings)
mcp_servers = anthropic.NOT_GIVEN
tools = anthropic.NOT_GIVEN
if settings.ENABLE_MCP_MICROSOFT_LEARN:
mcp_servers = [
{
"type": "url",
"url": "https://learn.microsoft.com/api/mcp",
"name": "microsoft-learn",
}
]
tools = [
{
"type": "mcp_toolset",
"mcp_server_name": "microsoft-learn",
}
]
try:
response = await client.beta.messages.create(
model=settings.AI_MODEL_ANTHROPIC,
max_tokens=max_tokens,
system=system_blocks,
messages=messages,
mcp_servers=mcp_servers,
tools=tools,
betas=["mcp-client-2025-11-20"],
)
except anthropic.BadRequestError as e:
# MCP server failures (rate limits, connection errors) should not
# block the assistant entirely — retry without MCP tools.
if "MCP server" in str(e) and mcp_servers is not anthropic.NOT_GIVEN:
logger.warning("MCP server error, retrying without MCP: %s", e)
response = await client.beta.messages.create(
model=settings.AI_MODEL_ANTHROPIC,
max_tokens=max_tokens,
system=system_blocks,
messages=messages,
)
else:
raise
# Extract text from response — MCP responses can have multiple block
# types (text, mcp_tool_use, mcp_tool_result). We join all text blocks.
text_parts = []
mcp_tools_used = []
for block in response.content:
if hasattr(block, "text"):
text_parts.append(block.text)
if getattr(block, "type", None) == "mcp_tool_use":
mcp_tools_used.append(getattr(block, "name", "unknown"))
text = "\n".join(text_parts) if text_parts else ""
usage = response.usage
input_tokens = usage.input_tokens
output_tokens = usage.output_tokens
# Log MCP tool usage
if mcp_tools_used:
logger.info("MCP tools used: %s", ", ".join(mcp_tools_used))
# Log cache performance
cache_read = getattr(usage, "cache_read_input_tokens", 0) or 0
cache_creation = getattr(usage, "cache_creation_input_tokens", 0) or 0
if cache_read or cache_creation:
logger.info(
"Anthropic cache: read=%d creation=%d input=%d output=%d",
cache_read, cache_creation, input_tokens, output_tokens,
)
return text, input_tokens, output_tokens
def _auto_title(message: str) -> str:
"""Generate a short title from the first user message."""
title = message.strip()[:100]
if len(message) > 100:
title = title.rsplit(" ", 1)[0] + "..."
return title
CONCLUSION_SYSTEM_PROMPT = """\
You are a ticket documentation specialist for MSP (Managed Service Provider) teams. \
Your job is to transform an AI troubleshooting conversation into clean, professional \
ticket notes that can be pasted directly into a PSA/ticketing system (ConnectWise, \
Autotask, HaloPSA, etc.).
## Output Format
Generate a structured summary using this exact format:
**Subject:** [One-line summary of the issue]
**Outcome:** {outcome_label}
**Problem Description:**
[2-3 sentence summary of the original problem]
**Steps Taken:**
1. [Step] — [Result/finding]
2. [Step] — [Result/finding]
(list all troubleshooting steps from the conversation)
**Current Status:**
[Where things stand now — what was resolved, what remains]
{notes_section}
**Key Findings:**
- [Important discovery or configuration detail]
- [Any relevant error codes, settings, or values identified]
{resume_section}
## Rules
- Be concise but thorough — these notes will be read by another engineer
- Include specific technical details (commands run, error messages, config values)
- Use plain text formatting (no HTML) — bold with ** is fine
- Do NOT include conversational filler, greetings, or meta-commentary
- Extract ALL actionable steps from the conversation, in chronological order
- If the conversation identified root cause, state it clearly
"""
async def generate_conclusion_summary(
chat: "AssistantChat",
outcome: str,
notes: str | None = None,
) -> str:
"""Generate a ticket-ready summary from a concluded chat conversation."""
outcome_labels = {
"resolved": "Resolved",
"escalated": "Escalated",
"paused": "Paused — To Be Continued",
}
outcome_label = outcome_labels.get(outcome, outcome)
notes_section = ""
if notes:
notes_section = f"\n**Engineer Notes:**\n{notes}\n"
resume_section = ""
if outcome == "paused":
resume_section = (
"\n**Next Steps (for resumption):**\n"
"- [What needs to happen next]\n"
"- [Any pending actions or follow-ups]\n"
)
elif outcome == "escalated":
resume_section = (
"\n**Escalation Details:**\n"
"- [Reason for escalation]\n"
"- [Recommended next steps for receiving team/tier]\n"
)
# Build the conversation transcript for the AI
transcript_lines = []
for msg in chat.messages:
role_label = "ENGINEER" if msg["role"] == "user" else "AI ASSISTANT"
transcript_lines.append(f"[{role_label}]: {msg['content']}")
transcript = "\n\n".join(transcript_lines)
prompt = (
f"Outcome: {outcome_label}\n\n"
f"{'Engineer Notes: ' + notes if notes else '(No additional notes)'}\n\n"
f"--- CONVERSATION TRANSCRIPT ---\n\n{transcript}\n\n"
f"--- END TRANSCRIPT ---\n\n"
f"Generate the ticket notes now. Replace all placeholder brackets with actual content from the conversation. "
f"The notes_section placeholder should be: {notes_section or '(omit this section)'}\n"
f"The resume_section placeholder should be filled based on the conversation context."
)
system_with_vars = CONCLUSION_SYSTEM_PROMPT.replace(
"{outcome_label}", outcome_label
).replace(
"{notes_section}", notes_section or ""
).replace(
"{resume_section}", resume_section
)
content, _, _ = await _call_ai(
system_base=system_with_vars,
rag_context="",
history=[],
new_message=prompt,
max_tokens=2048,
)
return content
async def create_chat(
user_id: UUID,
account_id: UUID,
db: AsyncSession,
) -> AssistantChat:
"""Create a new empty chat."""
chat = AssistantChat(
user_id=user_id,
account_id=account_id,
messages=[],
)
db.add(chat)
await db.flush()
return chat
async def send_message(
chat_id: UUID,
user_id: UUID,
account_id: UUID,
message: str,
db: AsyncSession,
) -> tuple[str, list[dict[str, Any]], AssistantChat]:
"""Send a user message and get AI response.
Returns (ai_content, suggested_flows, chat).
"""
result = await db.execute(
select(AssistantChat).where(
AssistantChat.id == chat_id,
AssistantChat.user_id == user_id,
)
)
chat = result.scalar_one_or_none()
if not chat:
raise ValueError("Chat not found")
# Auto-title from first message
if chat.message_count == 0:
chat.title = _auto_title(message)
# RAG search
rag_results = await rag_search(
query=message,
account_id=account_id,
db=db,
limit=8,
)
rag_context = build_rag_context(rag_results)
# Build messages for AI
ai_messages: list[dict[str, Any]] = []
for msg in chat.messages:
if msg["role"] in ("user", "assistant"):
ai_messages.append({"role": msg["role"], "content": msg["content"]})
# Call AI with prompt caching (Anthropic) or generic provider
ai_content, input_tokens, output_tokens = await _call_ai(
system_base=ASSISTANT_SYSTEM_PROMPT,
rag_context=rag_context,
history=ai_messages,
new_message=message,
)
# Update chat
msgs = list(chat.messages)
msgs.append({"role": "user", "content": message})
msgs.append({"role": "assistant", "content": ai_content})
chat.messages = msgs
chat.message_count += 2
chat.total_input_tokens += input_tokens
chat.total_output_tokens += output_tokens
suggested_flows = extract_suggested_flows(rag_results)
return ai_content, suggested_flows, chat