Files
resolutionflow/backend/app/services/assistant_chat_service.py
Michael Chihlas 2007dcb990 feat: add Anthropic prompt caching to assistant chat
Cache the static system prompt and conversation history prefix across
turns, reducing input token costs by ~80% on multi-turn conversations.
RAG context is intentionally uncached since it changes per query.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-05 18:17:55 -05:00

238 lines
7.5 KiB
Python

"""Standalone AI assistant chat service with RAG context.
Provides persistent conversation history for general IT questions
with semantic search over the team's flow library.
Uses Anthropic prompt caching to reduce cost on multi-turn conversations:
- The static system prompt is cached (ephemeral, 5-min TTL)
- The conversation history prefix is cached via a breakpoint on the
last existing message before the new user input
"""
import logging
from typing import Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.models.assistant_chat import AssistantChat
from app.services.rag_service import search as rag_search, build_rag_context, extract_suggested_flows
logger = logging.getLogger(__name__)
ASSISTANT_SYSTEM_PROMPT = """You are a Senior Systems and Network Engineer with 15+ years of experience working in Managed Service Provider (MSP) environments. You specialize in:
- Windows Server, Active Directory, Group Policy, and Hybrid Identity (Entra ID)
- Networking (TCP/IP, DNS, DHCP, VPN, firewall troubleshooting, Cisco/Fortinet)
- Virtualization (VMware, Hyper-V) and cloud platforms (Azure, AWS, M365)
- Endpoint management, RMM tools, and PSA platforms (ConnectWise, Datto, Kaseya)
- PowerShell scripting and automation
When answering:
- Be direct and actionable — MSP engineers need fast, practical answers
- Include specific commands, paths, and config values when relevant
- Mention potential risks or gotchas before suggesting changes
- If a relevant troubleshooting flow exists in the team's library, reference it
- Keep responses concise but thorough — prefer bullet points and code blocks
- Format code with proper markdown code blocks
"""
async def _call_ai(
system_base: str,
rag_context: str,
history: list[dict[str, Any]],
new_message: str,
max_tokens: int = 4096,
) -> tuple[str, int, int]:
"""Call the AI with prompt caching when using Anthropic.
Caching strategy:
- System prompt base: cached (stable across all turns)
- RAG context: NOT cached (changes per query)
- Conversation history prefix: cached via breakpoint on last
existing message (stable — only new user message is uncached)
"""
if settings.AI_PROVIDER == "anthropic" and settings.ANTHROPIC_API_KEY:
return await _call_anthropic_cached(
system_base, rag_context, history, new_message, max_tokens
)
# Fallback: generic provider (Gemini, etc.)
from app.core.ai_provider import get_ai_provider
system_prompt = system_base + rag_context
messages = history + [{"role": "user", "content": new_message}]
provider = get_ai_provider()
return await provider.generate_text(
system_prompt=system_prompt,
messages=messages,
max_tokens=max_tokens,
)
async def _call_anthropic_cached(
system_base: str,
rag_context: str,
history: list[dict[str, Any]],
new_message: str,
max_tokens: int,
) -> tuple[str, int, int]:
"""Call Anthropic with prompt caching on system prompt and history.
Uses structured system blocks so the static base prompt is cached
independently from the per-query RAG context.
"""
import anthropic
client = anthropic.AsyncAnthropic(
api_key=settings.ANTHROPIC_API_KEY,
timeout=settings.AI_REQUEST_TIMEOUT_SECONDS,
)
# System prompt as structured blocks:
# Block 1: static base prompt (cached)
# Block 2: RAG context (changes per query, not cached)
system_blocks: list[dict[str, Any]] = [
{
"type": "text",
"text": system_base,
"cache_control": {"type": "ephemeral"},
},
]
if rag_context:
system_blocks.append({"type": "text", "text": rag_context})
# Build messages with cache breakpoint on conversation history
messages: list[dict[str, Any]] = []
for msg in history:
messages.append({"role": msg["role"], "content": msg["content"]})
# Place cache breakpoint on the last history message so the entire
# conversation prefix is cached across turns
if messages:
last = messages[-1]
messages[-1] = {
"role": last["role"],
"content": [
{
"type": "text",
"text": last["content"],
"cache_control": {"type": "ephemeral"},
}
],
}
# Add the new user message (uncached — it's new each turn)
messages.append({"role": "user", "content": new_message})
response = await client.messages.create(
model=settings.AI_MODEL_ANTHROPIC,
max_tokens=max_tokens,
system=system_blocks,
messages=messages,
)
text = response.content[0].text
usage = response.usage
input_tokens = usage.input_tokens
output_tokens = usage.output_tokens
# Log cache performance
cache_read = getattr(usage, "cache_read_input_tokens", 0) or 0
cache_creation = getattr(usage, "cache_creation_input_tokens", 0) or 0
if cache_read or cache_creation:
logger.info(
"Anthropic cache: read=%d creation=%d input=%d output=%d",
cache_read, cache_creation, input_tokens, output_tokens,
)
return text, input_tokens, output_tokens
def _auto_title(message: str) -> str:
"""Generate a short title from the first user message."""
title = message.strip()[:100]
if len(message) > 100:
title = title.rsplit(" ", 1)[0] + "..."
return title
async def create_chat(
user_id: UUID,
account_id: UUID,
db: AsyncSession,
) -> AssistantChat:
"""Create a new empty chat."""
chat = AssistantChat(
user_id=user_id,
account_id=account_id,
messages=[],
)
db.add(chat)
await db.flush()
return chat
async def send_message(
chat_id: UUID,
user_id: UUID,
account_id: UUID,
message: str,
db: AsyncSession,
) -> tuple[str, list[dict[str, Any]], AssistantChat]:
"""Send a user message and get AI response.
Returns (ai_content, suggested_flows, chat).
"""
result = await db.execute(
select(AssistantChat).where(
AssistantChat.id == chat_id,
AssistantChat.user_id == user_id,
)
)
chat = result.scalar_one_or_none()
if not chat:
raise ValueError("Chat not found")
# Auto-title from first message
if chat.message_count == 0:
chat.title = _auto_title(message)
# RAG search
rag_results = await rag_search(
query=message,
account_id=account_id,
db=db,
limit=8,
)
rag_context = build_rag_context(rag_results)
# Build messages for AI
ai_messages: list[dict[str, Any]] = []
for msg in chat.messages:
if msg["role"] in ("user", "assistant"):
ai_messages.append({"role": msg["role"], "content": msg["content"]})
# Call AI with prompt caching (Anthropic) or generic provider
ai_content, input_tokens, output_tokens = await _call_ai(
system_base=ASSISTANT_SYSTEM_PROMPT,
rag_context=rag_context,
history=ai_messages,
new_message=message,
)
# Update chat
msgs = list(chat.messages)
msgs.append({"role": "user", "content": message})
msgs.append({"role": "assistant", "content": ai_content})
chat.messages = msgs
chat.message_count += 2
chat.total_input_tokens += input_tokens
chat.total_output_tokens += output_tokens
suggested_flows = extract_suggested_flows(rag_results)
return ai_content, suggested_flows, chat