From 067574ad6acf0b83ac63c2bd06f8750793f3d3f8 Mon Sep 17 00:00:00 2001 From: Michael Chihlas Date: Thu, 28 May 2026 14:54:30 -0400 Subject: [PATCH] feat(ai): robust response extraction + structured-output foundation Harden the Anthropic provider and lay the groundwork for schema-constrained JSON, optimizing the existing claude-sonnet-4-6 / claude-haiku-4-5 usage (no model changes). ai_provider.py: - _extract_text_from_response replaces fragile response.content[0].text: skips non-text leading blocks (e.g. thinking), returns the first text block, logs an anthropic.stop_reason warning on max_tokens/refusal (truncation now observable), and raises ValueError on a no-text response. - generate_json gains an optional `schema` param. Anthropic wires it to output_config.format (structured outputs); schema=None preserves the exact prior call for every existing caller. Gemini accepts-and-ignores it. kb_conversion_service.py: - TROUBLESHOOTING_SCHEMA / PROCEDURAL_SCHEMA + _schema_for_target_type(), modelled as a strict superset of every field the prompts emit. - convert_document passes the schema only when the new AI_KB_CONVERT_STRUCTURED_OUTPUT setting is True (default False). The _try_repair_json fallback stays as belt-and-suspenders. Tests: 14 provider + 7 schema, TDD (red-green). Live constrained-decoding smoke-test still required before enabling the flag in production. Co-Authored-By: Claude Opus 4.7 --- backend/app/core/ai_provider.py | 72 ++++++- backend/app/core/config.py | 6 + backend/app/core/kb_conversion_service.py | 120 ++++++++++++ backend/tests/test_ai_provider.py | 209 ++++++++++++++++++++- backend/tests/test_kb_conversion_schema.py | 104 ++++++++++ 5 files changed, 503 insertions(+), 8 deletions(-) create mode 100644 backend/tests/test_kb_conversion_schema.py diff --git a/backend/app/core/ai_provider.py b/backend/app/core/ai_provider.py index 1e0112b2..fba3b505 100644 --- a/backend/app/core/ai_provider.py +++ b/backend/app/core/ai_provider.py @@ -147,6 +147,40 @@ def build_anthropic_chat_messages( return messages +def _extract_text_from_response(response: Any, model: str) -> str: + """Return the first text block's text from an Anthropic message response. + + Robustness over the naive ``response.content[0].text``: + - Skips non-text leading blocks (e.g. ``thinking``) and returns the first + block whose ``type == "text"``. Indexing ``content[0]`` blindly throws or + returns garbage the moment a non-text block leads the response. + - Surfaces truncation/refusal: when ``stop_reason`` is ``max_tokens`` or + ``refusal``, emits a structured warning so silent output corruption + (truncated JSON, empty refusals) is observable rather than handed + downstream to be guessed at. + - Raises ``ValueError`` when no text block is present (e.g. a bare refusal) + instead of returning a non-text block's attributes. + """ + stop_reason = getattr(response, "stop_reason", None) + if stop_reason in ("max_tokens", "refusal"): + logger.warning( + "anthropic.stop_reason", + extra={ + "event": "anthropic.stop_reason", + "model": model, + "stop_reason": stop_reason, + }, + ) + + for block in response.content: + if getattr(block, "type", None) == "text": + return block.text + + raise ValueError( + f"Anthropic response contained no text block (stop_reason={stop_reason!r})" + ) + + def _log_anthropic_cache_usage(usage: Any, model: str) -> None: """Emit a structured log line capturing cache_read / cache_creation tokens.""" cache_read = getattr(usage, "cache_read_input_tokens", 0) or 0 @@ -176,6 +210,7 @@ class AIProvider(ABC): system_prompt: str | list[SystemBlock], messages: list[dict[str, Any]], max_tokens: int = 4096, + schema: dict[str, Any] | None = None, ) -> tuple[str, int, int]: """Generate a JSON response from the AI model. @@ -185,6 +220,15 @@ class AIProvider(ABC): Anthropic prompt caching per module-docstring policy. messages: List of message dicts with "role" and "content" keys. max_tokens: Maximum output tokens. + schema: Optional JSON Schema constraining the response shape. + When provided, the Anthropic backend uses structured outputs + (`output_config.format`) to guarantee valid, parseable JSON — + no markdown fences, no truncated-brace repair. Must satisfy the + structured-output schema limits (every object needs + `additionalProperties: false`; no recursion; numeric/string + constraints are stripped). `None` preserves the legacy + prompt-only behavior. The Gemini backend currently ignores this + argument (it already requests `application/json`). Returns: Tuple of (response_text, input_tokens, output_tokens). @@ -231,7 +275,11 @@ class GeminiProvider(AIProvider): system_prompt: str | list[SystemBlock], messages: list[dict[str, Any]], max_tokens: int = 4096, + schema: dict[str, Any] | None = None, ) -> tuple[str, int, int]: + # `schema` is accepted for interface parity but ignored: Gemini already + # constrains output via response_mime_type="application/json" below. + # Mapping JSON Schema -> Gemini response_schema is deferred. from google import genai from google.genai import types as genai_types @@ -362,18 +410,28 @@ class AnthropicProvider(AIProvider): system_prompt: str | list[SystemBlock], messages: list[dict[str, Any]], max_tokens: int = 4096, + schema: dict[str, Any] | None = None, ) -> tuple[str, int, int]: client = _get_anthropic_client(self._api_key, self._timeout) normalized_system = _normalize_system_for_anthropic(system_prompt) - response = await client.messages.create( - model=self._model, - max_tokens=max_tokens, - system=normalized_system, - messages=messages, - ) + create_kwargs: dict[str, Any] = { + "model": self._model, + "max_tokens": max_tokens, + "system": normalized_system, + "messages": messages, + } + if schema is not None: + # Structured outputs: constrain the response to valid JSON matching + # the schema (Sonnet 4.6 / Haiku 4.5). Removes the need for + # markdown-fence stripping and truncated-JSON repair downstream. + create_kwargs["output_config"] = { + "format": {"type": "json_schema", "schema": schema} + } - text = response.content[0].text + response = await client.messages.create(**create_kwargs) + + text = _extract_text_from_response(response, self._model) input_tokens = response.usage.input_tokens output_tokens = response.usage.output_tokens diff --git a/backend/app/core/config.py b/backend/app/core/config.py index d1582265..5f215cda 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -155,6 +155,12 @@ class Settings(BaseSettings): AI_CONVERSATION_TTL_HOURS: int = 24 AI_MAX_CALLS_PER_FLOW: int = 10 AI_REQUEST_TIMEOUT_SECONDS: int = 120 + # When True, KB conversion constrains the Anthropic response with a JSON + # schema (structured outputs) instead of relying on prompt-only JSON + + # downstream fence-stripping / brace-repair. Default OFF: enable in staging + # and smoke-test constrained decoding against the live model before turning + # it on in production. Only affects the Anthropic backend. + AI_KB_CONVERT_STRUCTURED_OUTPUT: bool = False # AI Provider selection AI_PROVIDER: str = "anthropic" # "gemini" or "anthropic" GOOGLE_AI_API_KEY: Optional[str] = None diff --git a/backend/app/core/kb_conversion_service.py b/backend/app/core/kb_conversion_service.py index b5e7f636..1145a4f7 100644 --- a/backend/app/core/kb_conversion_service.py +++ b/backend/app/core/kb_conversion_service.py @@ -202,6 +202,115 @@ the engineer attached, NOT from this schema): 9. Return ONLY valid JSON — no markdown fences, no explanation text.""" +# ── Structured-output schemas ── +# +# These constrain the model's JSON via Anthropic structured outputs +# (output_config.format) so the response is guaranteed valid and parseable — +# no markdown fences, no truncated-brace repair. They must be a SUPERSET of +# every field the corresponding system prompt instructs the model to emit: +# additionalProperties is False everywhere, so any field the prompt asks for +# but the schema omits would be impossible to produce. +# +# `type`/`field_type` are intentionally left as plain strings (no enum): the +# downstream parser already normalizes/tolerates the type values, and an enum +# risks constraining the model away from a value the prompt would yield. + +_TROUBLESHOOTING_OPTION_SCHEMA: dict[str, Any] = { + "type": "object", + "properties": { + "label": {"type": "string"}, + "next_node_id": {"type": "string"}, + }, + "required": ["label", "next_node_id"], + "additionalProperties": False, +} + +_TROUBLESHOOTING_NODE_SCHEMA: dict[str, Any] = { + "type": "object", + "properties": { + "id": {"type": "string"}, + "type": {"type": "string"}, + "question": {"type": "string"}, + "options": {"type": "array", "items": _TROUBLESHOOTING_OPTION_SCHEMA}, + "next_node_id": {"type": "string"}, + "confidence": {"type": "number"}, + "source_excerpt": {"type": "string"}, + }, + # Only the universal fields are required. `question`/`options`/`next_node_id` + # vary by node type and stay optional so a resolution node need not carry + # options and an action node need not carry a question. + "required": ["id", "type", "confidence", "source_excerpt"], + "additionalProperties": False, +} + +TROUBLESHOOTING_SCHEMA: dict[str, Any] = { + "type": "object", + "properties": { + "title": {"type": "string"}, + "description": {"type": "string"}, + "nodes": {"type": "array", "items": _TROUBLESHOOTING_NODE_SCHEMA}, + }, + "required": ["title", "description", "nodes"], + "additionalProperties": False, +} + +_PROCEDURAL_STEP_SCHEMA: dict[str, Any] = { + "type": "object", + "properties": { + "id": {"type": "string"}, + "type": {"type": "string"}, + "content": {"type": "string"}, + "confidence": {"type": "number"}, + "source_excerpt": {"type": "string"}, + }, + "required": ["id", "type", "content", "confidence", "source_excerpt"], + "additionalProperties": False, +} + +_PROCEDURAL_INTAKE_SCHEMA: dict[str, Any] = { + "type": "object", + "properties": { + "variable_name": {"type": "string"}, + "label": {"type": "string"}, + "field_type": {"type": "string"}, + "required": {"type": "boolean"}, + "display_order": {"type": "integer"}, + }, + "required": [ + "variable_name", + "label", + "field_type", + "required", + "display_order", + ], + "additionalProperties": False, +} + +PROCEDURAL_SCHEMA: dict[str, Any] = { + "type": "object", + "properties": { + "title": {"type": "string"}, + "description": {"type": "string"}, + "steps": {"type": "array", "items": _PROCEDURAL_STEP_SCHEMA}, + "intake_form": {"type": "array", "items": _PROCEDURAL_INTAKE_SCHEMA}, + }, + "required": ["title", "description", "steps", "intake_form"], + "additionalProperties": False, +} + + +def _schema_for_target_type(target_type: str) -> dict[str, Any]: + """Return the structured-output schema for a KB conversion target type. + + Mirrors the prompt selection in ``convert_document``: only + ``"troubleshooting"`` uses the decision-tree schema; everything else is + treated as a procedural flow. + """ + if target_type == "troubleshooting": + return TROUBLESHOOTING_SCHEMA + return PROCEDURAL_SCHEMA + + def _build_user_message( source_text: str, source_metadata: dict[str, Any] | None, @@ -404,6 +513,16 @@ async def convert_document( model = settings.get_model_for_action("kb_convert") provider = get_ai_provider(model=model) + # Structured outputs (flagged): constrain the response to a JSON schema so + # the model can't emit fences or truncated JSON. Falls back to prompt-only + # JSON (schema=None) when disabled; the parse path below stays intact either + # way as a belt-and-suspenders fallback. + schema = ( + _schema_for_target_type(kb_import.target_type) + if settings.AI_KB_CONVERT_STRUCTURED_OUTPUT + else None + ) + try: raw_text, input_tokens, output_tokens = await provider.generate_json( system_prompt=[ @@ -414,6 +533,7 @@ async def convert_document( ], messages=[{"role": "user", "content": user_message}], max_tokens=16384, + schema=schema, ) except Exception as e: logger.error("AI conversion failed for kb_import=%s: %s", kb_import.id, e) diff --git a/backend/tests/test_ai_provider.py b/backend/tests/test_ai_provider.py index 611c8e7b..2e9d4368 100644 --- a/backend/tests/test_ai_provider.py +++ b/backend/tests/test_ai_provider.py @@ -96,7 +96,8 @@ class TestAnthropicProvider: ) mock_response = MagicMock() - mock_response.content = [MagicMock(text='{"result": "ok"}')] + mock_response.content = [MagicMock(type="text", text='{"result": "ok"}')] + mock_response.stop_reason = "end_turn" mock_response.usage = MagicMock(input_tokens=100, output_tokens=50) mock_client = AsyncMock() @@ -120,6 +121,170 @@ class TestAnthropicProvider: messages=[{"role": "user", "content": "Hello"}], ) + @pytest.mark.asyncio + async def test_generate_json_skips_non_text_blocks(self): + """A leading non-text block (e.g. thinking) is skipped; the first + text block's text is returned instead of content[0].text.""" + from app.core import ai_provider + + ai_provider._anthropic_clients.clear() + + provider = AnthropicProvider( + api_key="skip-key", model="claude-sonnet-4-6", timeout=31 + ) + + thinking_block = MagicMock(type="thinking", thinking="hmm...") + text_block = MagicMock(type="text", text='{"ok": 1}') + mock_response = MagicMock() + mock_response.content = [thinking_block, text_block] + mock_response.stop_reason = "end_turn" + mock_response.usage = MagicMock(input_tokens=10, output_tokens=5) + + mock_client = AsyncMock() + mock_client.messages.create = AsyncMock(return_value=mock_response) + + with patch("anthropic.AsyncAnthropic", return_value=mock_client): + text, _, _ = await provider.generate_json( + system_prompt="You are a helper.", + messages=[{"role": "user", "content": "Hi"}], + ) + + assert text == '{"ok": 1}' + + @pytest.mark.asyncio + async def test_generate_json_raises_when_no_text_block(self): + """A response with no text block (e.g. a bare refusal) raises a clear + error instead of returning a non-text block's attributes.""" + from app.core import ai_provider + + ai_provider._anthropic_clients.clear() + + provider = AnthropicProvider( + api_key="empty-key", model="claude-sonnet-4-6", timeout=32 + ) + + mock_response = MagicMock() + mock_response.content = [MagicMock(type="thinking", thinking="...")] + mock_response.stop_reason = "refusal" + mock_response.usage = MagicMock(input_tokens=10, output_tokens=0) + + mock_client = AsyncMock() + mock_client.messages.create = AsyncMock(return_value=mock_response) + + with patch("anthropic.AsyncAnthropic", return_value=mock_client): + with pytest.raises(ValueError, match="no text block"): + await provider.generate_json( + system_prompt="You are a helper.", + messages=[{"role": "user", "content": "Hi"}], + ) + + @pytest.mark.asyncio + async def test_generate_json_logs_warning_on_truncation(self, caplog): + """When stop_reason is max_tokens, a warning is logged (truncation + signal) and the partial text is still returned.""" + import logging + + from app.core import ai_provider + + ai_provider._anthropic_clients.clear() + + provider = AnthropicProvider( + api_key="trunc-key", model="claude-sonnet-4-6", timeout=33 + ) + + text_block = MagicMock(type="text", text='{"partial": tr') + mock_response = MagicMock() + mock_response.content = [text_block] + mock_response.stop_reason = "max_tokens" + mock_response.usage = MagicMock(input_tokens=10, output_tokens=4096) + + mock_client = AsyncMock() + mock_client.messages.create = AsyncMock(return_value=mock_response) + + with patch("anthropic.AsyncAnthropic", return_value=mock_client): + with caplog.at_level(logging.WARNING, logger="app.core.ai_provider"): + text, _, _ = await provider.generate_json( + system_prompt="You are a helper.", + messages=[{"role": "user", "content": "Hi"}], + ) + + assert text == '{"partial": tr' + truncation_records = [ + r for r in caplog.records if getattr(r, "stop_reason", None) == "max_tokens" + ] + assert truncation_records, "expected a warning record for max_tokens truncation" + + @pytest.mark.asyncio + async def test_generate_json_passes_output_config_when_schema_given(self): + """When a JSON schema is supplied, it is forwarded as + output_config.format so the API constrains the response shape.""" + from app.core import ai_provider + + ai_provider._anthropic_clients.clear() + + provider = AnthropicProvider( + api_key="schema-key", model="claude-sonnet-4-6", timeout=34 + ) + + mock_response = MagicMock() + mock_response.content = [MagicMock(type="text", text='{"title": "x"}')] + mock_response.stop_reason = "end_turn" + mock_response.usage = MagicMock(input_tokens=10, output_tokens=5) + + mock_client = AsyncMock() + mock_client.messages.create = AsyncMock(return_value=mock_response) + + schema = { + "type": "object", + "properties": {"title": {"type": "string"}}, + "required": ["title"], + "additionalProperties": False, + } + + with patch("anthropic.AsyncAnthropic", return_value=mock_client): + await provider.generate_json( + system_prompt="You are a helper.", + messages=[{"role": "user", "content": "Hi"}], + max_tokens=512, + schema=schema, + ) + + mock_client.messages.create.assert_called_once_with( + model="claude-sonnet-4-6", + max_tokens=512, + system="You are a helper.", + messages=[{"role": "user", "content": "Hi"}], + output_config={"format": {"type": "json_schema", "schema": schema}}, + ) + + @pytest.mark.asyncio + async def test_generate_json_no_output_config_when_schema_none(self): + """With no schema, output_config is not sent (backward compatible).""" + from app.core import ai_provider + + ai_provider._anthropic_clients.clear() + + provider = AnthropicProvider( + api_key="noschema-key", model="claude-sonnet-4-6", timeout=35 + ) + + mock_response = MagicMock() + mock_response.content = [MagicMock(type="text", text="{}")] + mock_response.stop_reason = "end_turn" + mock_response.usage = MagicMock(input_tokens=1, output_tokens=1) + + mock_client = AsyncMock() + mock_client.messages.create = AsyncMock(return_value=mock_response) + + with patch("anthropic.AsyncAnthropic", return_value=mock_client): + await provider.generate_json( + system_prompt="You are a helper.", + messages=[{"role": "user", "content": "Hi"}], + ) + + _, call_kwargs = mock_client.messages.create.call_args + assert "output_config" not in call_kwargs + class TestGeminiProvider: """Tests for GeminiProvider.generate_json.""" @@ -174,6 +339,48 @@ class TestGeminiProvider: mock_client.aio.models.generate_content.assert_called_once() + @pytest.mark.asyncio + async def test_generate_json_accepts_and_ignores_schema(self): + """Gemini accepts the schema kwarg (interface parity) and still + returns JSON; it does not error on the param.""" + provider = GeminiProvider(api_key="test-key", model="gemini-2.5-flash") + + mock_usage = MagicMock() + mock_usage.prompt_token_count = 5 + mock_usage.candidates_token_count = 3 + + mock_response = MagicMock() + mock_response.text = '{"answer": 1}' + mock_response.usage_metadata = mock_usage + + mock_client = MagicMock() + mock_client.aio.models.generate_content = AsyncMock(return_value=mock_response) + + mock_genai_module = MagicMock() + mock_genai_module.Client.return_value = mock_client + + mock_types = MagicMock() + mock_types.Content.side_effect = lambda **kw: kw + mock_types.Part.side_effect = lambda **kw: kw + mock_types.GenerateContentConfig.side_effect = lambda **kw: kw + + mock_google = MagicMock() + mock_google.genai = mock_genai_module + mock_genai_module.types = mock_types + + with patch.dict(sys.modules, { + "google": mock_google, + "google.genai": mock_genai_module, + "google.genai.types": mock_types, + }): + text, _, _ = await provider.generate_json( + system_prompt="Generate JSON.", + messages=[{"role": "user", "content": "data"}], + schema={"type": "object"}, + ) + + assert text == '{"answer": 1}' + @pytest.mark.asyncio async def test_generate_json_handles_none_usage(self): """Token counts default to 0 when usage_metadata attributes are None.""" diff --git a/backend/tests/test_kb_conversion_schema.py b/backend/tests/test_kb_conversion_schema.py new file mode 100644 index 00000000..fae4da3f --- /dev/null +++ b/backend/tests/test_kb_conversion_schema.py @@ -0,0 +1,104 @@ +"""Tests for the structured-output JSON schemas used by KB conversion. + +These validate that the schemas are well-formed against the Anthropic +structured-output limits (every object carries additionalProperties: false, +`required` is a subset of declared properties, no numeric/length constraints) +and that the target_type -> schema selector returns the right shape. They do +NOT exercise the live API — constrained decoding must be smoke-tested against +a real model before AI_KB_CONVERT_STRUCTURED_OUTPUT is enabled in production. +""" + +from app.core.kb_conversion_service import ( + PROCEDURAL_SCHEMA, + TROUBLESHOOTING_SCHEMA, + _schema_for_target_type, +) + +# Constraints disallowed by Anthropic structured outputs (must be absent so the +# API does not reject the schema or silently strip them). +_DISALLOWED_KEYS = { + "minimum", + "maximum", + "multipleOf", + "minLength", + "maxLength", + "minItems", + "maxItems", +} + + +def _assert_well_formed(schema: dict) -> None: + """Recursively assert a JSON schema obeys the structured-output limits.""" + if schema.get("type") == "object": + assert schema.get("additionalProperties") is False, ( + f"object schema missing additionalProperties: false: {schema}" + ) + props = schema.get("properties", {}) + required = set(schema.get("required", [])) + assert required <= set(props), ( + f"required keys not all declared as properties: {required - set(props)}" + ) + for sub in props.values(): + _assert_well_formed(sub) + elif schema.get("type") == "array": + _assert_well_formed(schema["items"]) + + assert not (_DISALLOWED_KEYS & set(schema)), ( + f"schema uses unsupported constraint(s): {_DISALLOWED_KEYS & set(schema)}" + ) + + +class TestStructuredOutputSchemas: + def test_troubleshooting_schema_is_well_formed(self): + _assert_well_formed(TROUBLESHOOTING_SCHEMA) + + def test_procedural_schema_is_well_formed(self): + _assert_well_formed(PROCEDURAL_SCHEMA) + + def test_troubleshooting_schema_top_level_shape(self): + props = TROUBLESHOOTING_SCHEMA["properties"] + assert set(props) >= {"title", "description", "nodes"} + node = props["nodes"]["items"] + # Every field the troubleshooting prompt may emit must be modelled, + # else additionalProperties: false makes them impossible to produce. + assert set(node["properties"]) >= { + "id", + "type", + "question", + "options", + "next_node_id", + "confidence", + "source_excerpt", + } + + def test_procedural_schema_top_level_shape(self): + props = PROCEDURAL_SCHEMA["properties"] + assert set(props) >= {"title", "description", "steps", "intake_form"} + step = props["steps"]["items"] + assert set(step["properties"]) >= { + "id", + "type", + "content", + "confidence", + "source_excerpt", + } + intake = props["intake_form"]["items"] + assert set(intake["properties"]) >= { + "variable_name", + "label", + "field_type", + "required", + "display_order", + } + + +class TestSchemaSelector: + def test_returns_troubleshooting_schema(self): + assert _schema_for_target_type("troubleshooting") is TROUBLESHOOTING_SCHEMA + + def test_returns_procedural_schema_for_procedural(self): + assert _schema_for_target_type("procedural") is PROCEDURAL_SCHEMA + + def test_defaults_to_procedural_for_unknown(self): + # convert_document treats any non-"troubleshooting" target as procedural. + assert _schema_for_target_type("something-else") is PROCEDURAL_SCHEMA