diff --git a/backend/app/core/ai_provider.py b/backend/app/core/ai_provider.py index 71d376ad..1e0112b2 100644 --- a/backend/app/core/ai_provider.py +++ b/backend/app/core/ai_provider.py @@ -80,6 +80,73 @@ def _flatten_system_for_gemini( return "\n\n".join(b.get("text", "") for b in system_prompt) +def build_anthropic_chat_messages( + history: list[dict[str, Any]], + new_message: str, + images: list[dict[str, Any]] | None = None, + format_reminder: str | None = None, +) -> list[dict[str, Any]]: + """Construct the Anthropic `messages` payload for a cached multi-turn chat. + + Responsibilities: + - Copy the valid history messages in order. + - Apply `cache_control: ephemeral` to the LAST history message so the entire + conversation prefix is cached across turns. The new user message stays + uncached (it changes each turn). + - Append `format_reminder` to the new user message if provided. The reminder + is invisible to storage (caller's concern) but helps enforce structured + output compliance at generation time. + - If `images` are provided, render the new user message as a multimodal + content block list (images first, then text). Otherwise, render it as + a plain string. + + This helper is Anthropic-specific: the cache-breakpoint pattern, ephemeral + cache_control, and multimodal block shape are all Anthropic conventions. + Do not call it from Gemini code paths. + """ + messages: list[dict[str, Any]] = [] + for msg in history: + messages.append({"role": msg["role"], "content": msg["content"]}) + + # Cache breakpoint on the last existing history message so the entire + # conversation prefix is cached across turns. Safe only when there IS a + # history message; otherwise the new message is the only message. + if messages: + last = messages[-1] + messages[-1] = { + "role": last["role"], + "content": [ + { + "type": "text", + "text": last["content"], + "cache_control": {"type": "ephemeral"}, + } + ], + } + + effective_text = new_message + (format_reminder or "") + + if images: + content_blocks: list[dict[str, Any]] = [] + for img in images: + content_blocks.append( + { + "type": "image", + "source": { + "type": "base64", + "media_type": img["media_type"], + "data": img["data"], + }, + } + ) + content_blocks.append({"type": "text", "text": effective_text}) + messages.append({"role": "user", "content": content_blocks}) + else: + messages.append({"role": "user", "content": effective_text}) + + return messages + + def _log_anthropic_cache_usage(usage: Any, model: str) -> None: """Emit a structured log line capturing cache_read / cache_creation tokens.""" cache_read = getattr(usage, "cache_read_input_tokens", 0) or 0 diff --git a/backend/app/services/assistant_chat_service.py b/backend/app/services/assistant_chat_service.py index 84190e55..23c9bed1 100644 --- a/backend/app/services/assistant_chat_service.py +++ b/backend/app/services/assistant_chat_service.py @@ -10,10 +10,32 @@ Uses Anthropic prompt caching to reduce cost on multi-turn conversations: Optionally connects to Microsoft Learn via Anthropic's MCP connector for real-time documentation lookups (controlled by ENABLE_MCP_MICROSOFT_LEARN). + +## Architectural note — this module is the one MCP/beta chat caller + +`chat_call_cached` below is the ONLY caller in the codebase that uses +Anthropic's `client.beta.messages.create` endpoint, MCP servers, multimodal +user messages, and the retry-without-MCP fallback. It is deliberately NOT +routed through `AnthropicProvider` — MCP/beta/images are features of exactly +one optional Anthropic beta endpoint and do not belong in a provider-agnostic +abstraction that also serves Gemini. + +If a new caller needs the same (MCP, beta, images, history caching) bundle, +call `chat_call_cached` directly rather than pushing those concerns into +`AnthropicProvider`. Cached-system-block plumbing is shared with the provider +via `_normalize_system_for_anthropic` / `build_anthropic_chat_messages` / +`_log_anthropic_cache_usage` in `app.core.ai_provider` — cache primitives are +reusable, but the MCP/beta orchestration stays here. """ import logging from typing import Any +from app.core.ai_provider import ( + _get_anthropic_client, + _log_anthropic_cache_usage, + _normalize_system_for_anthropic, + build_anthropic_chat_messages, +) from app.core.config import settings logger = logging.getLogger(__name__) @@ -184,7 +206,7 @@ async def _call_ai( to include alongside the new_message as vision content. """ if settings.AI_PROVIDER == "anthropic" and settings.ANTHROPIC_API_KEY: - return await _call_anthropic_cached( + return await chat_call_cached( system_base, rag_context, history, new_message, max_tokens, images=images, ) @@ -202,7 +224,18 @@ async def _call_ai( ) -async def _call_anthropic_cached( +# Appended to every chat turn's user message immediately before generation. +# Invisible to storage (unified_chat_service strips markers before persisting), +# but critical for structured output compliance — the model emits invalid +# responses often enough without it that removing this reminder regresses UX. +_CHAT_FORMAT_REMINDER = ( + "\n\n[SYSTEM: Remember — your response MUST end with [QUESTIONS] " + "and/or [ACTIONS] markers containing valid JSON arrays. " + "Responses without markers break the UI.]" +) + + +async def chat_call_cached( system_base: str, rag_context: str, history: list[dict[str, Any]], @@ -210,79 +243,56 @@ async def _call_anthropic_cached( max_tokens: int, images: list[dict[str, Any]] | None = None, ) -> tuple[str, int, int]: - """Call Anthropic with prompt caching on system prompt and history. + """Call Anthropic's chat surface with caching, MCP, images, and retry-without-MCP. - Uses structured system blocks so the static base prompt is cached - independently from the per-query RAG context. Optionally connects - to Microsoft Learn via MCP for real-time documentation lookups. + This is the ONE MCP/beta/multimodal chat caller. It is deliberately NOT + routed through `AnthropicProvider`. See module docstring for rationale. + + Responsibilities unique to this function (not in the provider): + - Anthropic beta endpoint (`client.beta.messages.create`) + - Microsoft Learn MCP connector wiring (optional via ENABLE_MCP_MICROSOFT_LEARN) + - Retry-without-MCP fallback when the MCP server misbehaves + - Multimodal image blocks in the user message + - Format-reminder append for structured-output compliance + - Telemetry (`mcp.turn`, `mcp.fallback`) for Phase 0.5 MCP usage signal + + Cache plumbing is shared with the provider via helpers in `ai_provider`: + `_normalize_system_for_anthropic` (policy α — ephemeral on first block if + none specified), `build_anthropic_chat_messages` (history cache breakpoint + + multimodal user message + format reminder), `_log_anthropic_cache_usage`. """ import anthropic - client = anthropic.AsyncAnthropic( - api_key=settings.ANTHROPIC_API_KEY, + client = _get_anthropic_client( + settings.ANTHROPIC_API_KEY, timeout=settings.AI_REQUEST_TIMEOUT_SECONDS, ) - # System prompt as structured blocks: - # Block 1: static base prompt (cached) - # Block 2: RAG context (changes per query, not cached) + # System prompt as structured blocks. The static base is cacheable; the + # RAG context changes per query and must NOT be cached — so we mark the + # base explicitly and leave the RAG block unmarked. `_normalize_system` + # honors caller-authored cache_control verbatim (policy α). system_blocks: list[dict[str, Any]] = [ { "type": "text", "text": system_base, "cache_control": {"type": "ephemeral"}, + # cacheable: static system prompt, stable across all turns of all sessions }, ] if rag_context: - system_blocks.append({"type": "text", "text": rag_context}) + system_blocks.append( + {"type": "text", "text": rag_context} + # uncached: RAG retrieval varies per query + ) + normalized_system = _normalize_system_for_anthropic(system_blocks) - # Build messages with cache breakpoint on conversation history - messages: list[dict[str, Any]] = [] - for msg in history: - messages.append({"role": msg["role"], "content": msg["content"]}) - - # Place cache breakpoint on the last history message so the entire - # conversation prefix is cached across turns - if messages: - last = messages[-1] - messages[-1] = { - "role": last["role"], - "content": [ - { - "type": "text", - "text": last["content"], - "cache_control": {"type": "ephemeral"}, - } - ], - } - - # Add the new user message (uncached — it's new each turn) - # Append a format reminder to the user message so the model sees it - # immediately before generating. This is invisible to the user (stripped - # before storage) but critical for structured output compliance. - format_reminder = ( - "\n\n[SYSTEM: Remember — your response MUST end with [QUESTIONS] " - "and/or [ACTIONS] markers containing valid JSON arrays. " - "Responses without markers break the UI.]" + messages = build_anthropic_chat_messages( + history=history, + new_message=new_message, + images=images, + format_reminder=_CHAT_FORMAT_REMINDER, ) - reminded_message = new_message + format_reminder - - # If images are attached, build multimodal content blocks - if images: - content_blocks: list[dict[str, Any]] = [] - for img in images: - content_blocks.append({ - "type": "image", - "source": { - "type": "base64", - "media_type": img["media_type"], - "data": img["data"], - }, - }) - content_blocks.append({"type": "text", "text": reminded_message}) - messages.append({"role": "user", "content": content_blocks}) - else: - messages.append({"role": "user", "content": reminded_message}) # MCP server config (optional — controlled by settings) mcp_servers = anthropic.NOT_GIVEN @@ -310,7 +320,7 @@ async def _call_anthropic_cached( response = await client.beta.messages.create( model=settings.AI_MODEL_ANTHROPIC, max_tokens=max_tokens, - system=system_blocks, + system=normalized_system, messages=messages, mcp_servers=mcp_servers, tools=tools, @@ -344,7 +354,7 @@ async def _call_anthropic_cached( response = await client.messages.create( model=settings.AI_MODEL_ANTHROPIC, max_tokens=max_tokens, - system=system_blocks, + system=normalized_system, messages=messages, ) else: @@ -386,14 +396,7 @@ async def _call_anthropic_cached( if mcp_tools_used: logger.info("MCP tools used: %s", ", ".join(mcp_tools_used)) - # Log cache performance - cache_read = getattr(usage, "cache_read_input_tokens", 0) or 0 - cache_creation = getattr(usage, "cache_creation_input_tokens", 0) or 0 - if cache_read or cache_creation: - logger.info( - "Anthropic cache: read=%d creation=%d input=%d output=%d", - cache_read, cache_creation, input_tokens, output_tokens, - ) + _log_anthropic_cache_usage(usage, settings.AI_MODEL_ANTHROPIC) return text, input_tokens, output_tokens