feat(ai): robust response extraction + structured-output foundation

Harden the Anthropic provider and lay the groundwork for schema-constrained
JSON, optimizing the existing claude-sonnet-4-6 / claude-haiku-4-5 usage
(no model changes).

ai_provider.py:
- _extract_text_from_response replaces fragile response.content[0].text:
  skips non-text leading blocks (e.g. thinking), returns the first text
  block, logs an anthropic.stop_reason warning on max_tokens/refusal
  (truncation now observable), and raises ValueError on a no-text response.
- generate_json gains an optional `schema` param. Anthropic wires it to
  output_config.format (structured outputs); schema=None preserves the exact
  prior call for every existing caller. Gemini accepts-and-ignores it.

kb_conversion_service.py:
- TROUBLESHOOTING_SCHEMA / PROCEDURAL_SCHEMA + _schema_for_target_type(),
  modelled as a strict superset of every field the prompts emit.
- convert_document passes the schema only when the new
  AI_KB_CONVERT_STRUCTURED_OUTPUT setting is True (default False). The
  _try_repair_json fallback stays as belt-and-suspenders.

Tests: 14 provider + 7 schema, TDD (red-green). Live constrained-decoding
smoke-test still required before enabling the flag in production.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-28 14:54:30 -04:00
parent b1ee46656e
commit 067574ad6a
5 changed files with 503 additions and 8 deletions

View File

@@ -147,6 +147,40 @@ def build_anthropic_chat_messages(
return messages
def _extract_text_from_response(response: Any, model: str) -> str:
"""Return the first text block's text from an Anthropic message response.
Robustness over the naive ``response.content[0].text``:
- Skips non-text leading blocks (e.g. ``thinking``) and returns the first
block whose ``type == "text"``. Indexing ``content[0]`` blindly throws or
returns garbage the moment a non-text block leads the response.
- Surfaces truncation/refusal: when ``stop_reason`` is ``max_tokens`` or
``refusal``, emits a structured warning so silent output corruption
(truncated JSON, empty refusals) is observable rather than handed
downstream to be guessed at.
- Raises ``ValueError`` when no text block is present (e.g. a bare refusal)
instead of returning a non-text block's attributes.
"""
stop_reason = getattr(response, "stop_reason", None)
if stop_reason in ("max_tokens", "refusal"):
logger.warning(
"anthropic.stop_reason",
extra={
"event": "anthropic.stop_reason",
"model": model,
"stop_reason": stop_reason,
},
)
for block in response.content:
if getattr(block, "type", None) == "text":
return block.text
raise ValueError(
f"Anthropic response contained no text block (stop_reason={stop_reason!r})"
)
def _log_anthropic_cache_usage(usage: Any, model: str) -> None:
"""Emit a structured log line capturing cache_read / cache_creation tokens."""
cache_read = getattr(usage, "cache_read_input_tokens", 0) or 0
@@ -176,6 +210,7 @@ class AIProvider(ABC):
system_prompt: str | list[SystemBlock],
messages: list[dict[str, Any]],
max_tokens: int = 4096,
schema: dict[str, Any] | None = None,
) -> tuple[str, int, int]:
"""Generate a JSON response from the AI model.
@@ -185,6 +220,15 @@ class AIProvider(ABC):
Anthropic prompt caching per module-docstring policy.
messages: List of message dicts with "role" and "content" keys.
max_tokens: Maximum output tokens.
schema: Optional JSON Schema constraining the response shape.
When provided, the Anthropic backend uses structured outputs
(`output_config.format`) to guarantee valid, parseable JSON —
no markdown fences, no truncated-brace repair. Must satisfy the
structured-output schema limits (every object needs
`additionalProperties: false`; no recursion; numeric/string
constraints are stripped). `None` preserves the legacy
prompt-only behavior. The Gemini backend currently ignores this
argument (it already requests `application/json`).
Returns:
Tuple of (response_text, input_tokens, output_tokens).
@@ -231,7 +275,11 @@ class GeminiProvider(AIProvider):
system_prompt: str | list[SystemBlock],
messages: list[dict[str, Any]],
max_tokens: int = 4096,
schema: dict[str, Any] | None = None,
) -> tuple[str, int, int]:
# `schema` is accepted for interface parity but ignored: Gemini already
# constrains output via response_mime_type="application/json" below.
# Mapping JSON Schema -> Gemini response_schema is deferred.
from google import genai
from google.genai import types as genai_types
@@ -362,18 +410,28 @@ class AnthropicProvider(AIProvider):
system_prompt: str | list[SystemBlock],
messages: list[dict[str, Any]],
max_tokens: int = 4096,
schema: dict[str, Any] | None = None,
) -> tuple[str, int, int]:
client = _get_anthropic_client(self._api_key, self._timeout)
normalized_system = _normalize_system_for_anthropic(system_prompt)
response = await client.messages.create(
model=self._model,
max_tokens=max_tokens,
system=normalized_system,
messages=messages,
)
create_kwargs: dict[str, Any] = {
"model": self._model,
"max_tokens": max_tokens,
"system": normalized_system,
"messages": messages,
}
if schema is not None:
# Structured outputs: constrain the response to valid JSON matching
# the schema (Sonnet 4.6 / Haiku 4.5). Removes the need for
# markdown-fence stripping and truncated-JSON repair downstream.
create_kwargs["output_config"] = {
"format": {"type": "json_schema", "schema": schema}
}
text = response.content[0].text
response = await client.messages.create(**create_kwargs)
text = _extract_text_from_response(response, self._model)
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens

View File

@@ -155,6 +155,12 @@ class Settings(BaseSettings):
AI_CONVERSATION_TTL_HOURS: int = 24
AI_MAX_CALLS_PER_FLOW: int = 10
AI_REQUEST_TIMEOUT_SECONDS: int = 120
# When True, KB conversion constrains the Anthropic response with a JSON
# schema (structured outputs) instead of relying on prompt-only JSON +
# downstream fence-stripping / brace-repair. Default OFF: enable in staging
# and smoke-test constrained decoding against the live model before turning
# it on in production. Only affects the Anthropic backend.
AI_KB_CONVERT_STRUCTURED_OUTPUT: bool = False
# AI Provider selection
AI_PROVIDER: str = "anthropic" # "gemini" or "anthropic"
GOOGLE_AI_API_KEY: Optional[str] = None

View File

@@ -202,6 +202,115 @@ the engineer attached, NOT from this schema):
9. Return ONLY valid JSON — no markdown fences, no explanation text."""
# ── Structured-output schemas ──
#
# These constrain the model's JSON via Anthropic structured outputs
# (output_config.format) so the response is guaranteed valid and parseable —
# no markdown fences, no truncated-brace repair. They must be a SUPERSET of
# every field the corresponding system prompt instructs the model to emit:
# additionalProperties is False everywhere, so any field the prompt asks for
# but the schema omits would be impossible to produce.
#
# `type`/`field_type` are intentionally left as plain strings (no enum): the
# downstream parser already normalizes/tolerates the type values, and an enum
# risks constraining the model away from a value the prompt would yield.
_TROUBLESHOOTING_OPTION_SCHEMA: dict[str, Any] = {
"type": "object",
"properties": {
"label": {"type": "string"},
"next_node_id": {"type": "string"},
},
"required": ["label", "next_node_id"],
"additionalProperties": False,
}
_TROUBLESHOOTING_NODE_SCHEMA: dict[str, Any] = {
"type": "object",
"properties": {
"id": {"type": "string"},
"type": {"type": "string"},
"question": {"type": "string"},
"options": {"type": "array", "items": _TROUBLESHOOTING_OPTION_SCHEMA},
"next_node_id": {"type": "string"},
"confidence": {"type": "number"},
"source_excerpt": {"type": "string"},
},
# Only the universal fields are required. `question`/`options`/`next_node_id`
# vary by node type and stay optional so a resolution node need not carry
# options and an action node need not carry a question.
"required": ["id", "type", "confidence", "source_excerpt"],
"additionalProperties": False,
}
TROUBLESHOOTING_SCHEMA: dict[str, Any] = {
"type": "object",
"properties": {
"title": {"type": "string"},
"description": {"type": "string"},
"nodes": {"type": "array", "items": _TROUBLESHOOTING_NODE_SCHEMA},
},
"required": ["title", "description", "nodes"],
"additionalProperties": False,
}
_PROCEDURAL_STEP_SCHEMA: dict[str, Any] = {
"type": "object",
"properties": {
"id": {"type": "string"},
"type": {"type": "string"},
"content": {"type": "string"},
"confidence": {"type": "number"},
"source_excerpt": {"type": "string"},
},
"required": ["id", "type", "content", "confidence", "source_excerpt"],
"additionalProperties": False,
}
_PROCEDURAL_INTAKE_SCHEMA: dict[str, Any] = {
"type": "object",
"properties": {
"variable_name": {"type": "string"},
"label": {"type": "string"},
"field_type": {"type": "string"},
"required": {"type": "boolean"},
"display_order": {"type": "integer"},
},
"required": [
"variable_name",
"label",
"field_type",
"required",
"display_order",
],
"additionalProperties": False,
}
PROCEDURAL_SCHEMA: dict[str, Any] = {
"type": "object",
"properties": {
"title": {"type": "string"},
"description": {"type": "string"},
"steps": {"type": "array", "items": _PROCEDURAL_STEP_SCHEMA},
"intake_form": {"type": "array", "items": _PROCEDURAL_INTAKE_SCHEMA},
},
"required": ["title", "description", "steps", "intake_form"],
"additionalProperties": False,
}
def _schema_for_target_type(target_type: str) -> dict[str, Any]:
"""Return the structured-output schema for a KB conversion target type.
Mirrors the prompt selection in ``convert_document``: only
``"troubleshooting"`` uses the decision-tree schema; everything else is
treated as a procedural flow.
"""
if target_type == "troubleshooting":
return TROUBLESHOOTING_SCHEMA
return PROCEDURAL_SCHEMA
def _build_user_message(
source_text: str,
source_metadata: dict[str, Any] | None,
@@ -404,6 +513,16 @@ async def convert_document(
model = settings.get_model_for_action("kb_convert")
provider = get_ai_provider(model=model)
# Structured outputs (flagged): constrain the response to a JSON schema so
# the model can't emit fences or truncated JSON. Falls back to prompt-only
# JSON (schema=None) when disabled; the parse path below stays intact either
# way as a belt-and-suspenders fallback.
schema = (
_schema_for_target_type(kb_import.target_type)
if settings.AI_KB_CONVERT_STRUCTURED_OUTPUT
else None
)
try:
raw_text, input_tokens, output_tokens = await provider.generate_json(
system_prompt=[
@@ -414,6 +533,7 @@ async def convert_document(
],
messages=[{"role": "user", "content": user_message}],
max_tokens=16384,
schema=schema,
)
except Exception as e:
logger.error("AI conversion failed for kb_import=%s: %s", kb_import.id, e)

View File

@@ -96,7 +96,8 @@ class TestAnthropicProvider:
)
mock_response = MagicMock()
mock_response.content = [MagicMock(text='{"result": "ok"}')]
mock_response.content = [MagicMock(type="text", text='{"result": "ok"}')]
mock_response.stop_reason = "end_turn"
mock_response.usage = MagicMock(input_tokens=100, output_tokens=50)
mock_client = AsyncMock()
@@ -120,6 +121,170 @@ class TestAnthropicProvider:
messages=[{"role": "user", "content": "Hello"}],
)
@pytest.mark.asyncio
async def test_generate_json_skips_non_text_blocks(self):
"""A leading non-text block (e.g. thinking) is skipped; the first
text block's text is returned instead of content[0].text."""
from app.core import ai_provider
ai_provider._anthropic_clients.clear()
provider = AnthropicProvider(
api_key="skip-key", model="claude-sonnet-4-6", timeout=31
)
thinking_block = MagicMock(type="thinking", thinking="hmm...")
text_block = MagicMock(type="text", text='{"ok": 1}')
mock_response = MagicMock()
mock_response.content = [thinking_block, text_block]
mock_response.stop_reason = "end_turn"
mock_response.usage = MagicMock(input_tokens=10, output_tokens=5)
mock_client = AsyncMock()
mock_client.messages.create = AsyncMock(return_value=mock_response)
with patch("anthropic.AsyncAnthropic", return_value=mock_client):
text, _, _ = await provider.generate_json(
system_prompt="You are a helper.",
messages=[{"role": "user", "content": "Hi"}],
)
assert text == '{"ok": 1}'
@pytest.mark.asyncio
async def test_generate_json_raises_when_no_text_block(self):
"""A response with no text block (e.g. a bare refusal) raises a clear
error instead of returning a non-text block's attributes."""
from app.core import ai_provider
ai_provider._anthropic_clients.clear()
provider = AnthropicProvider(
api_key="empty-key", model="claude-sonnet-4-6", timeout=32
)
mock_response = MagicMock()
mock_response.content = [MagicMock(type="thinking", thinking="...")]
mock_response.stop_reason = "refusal"
mock_response.usage = MagicMock(input_tokens=10, output_tokens=0)
mock_client = AsyncMock()
mock_client.messages.create = AsyncMock(return_value=mock_response)
with patch("anthropic.AsyncAnthropic", return_value=mock_client):
with pytest.raises(ValueError, match="no text block"):
await provider.generate_json(
system_prompt="You are a helper.",
messages=[{"role": "user", "content": "Hi"}],
)
@pytest.mark.asyncio
async def test_generate_json_logs_warning_on_truncation(self, caplog):
"""When stop_reason is max_tokens, a warning is logged (truncation
signal) and the partial text is still returned."""
import logging
from app.core import ai_provider
ai_provider._anthropic_clients.clear()
provider = AnthropicProvider(
api_key="trunc-key", model="claude-sonnet-4-6", timeout=33
)
text_block = MagicMock(type="text", text='{"partial": tr')
mock_response = MagicMock()
mock_response.content = [text_block]
mock_response.stop_reason = "max_tokens"
mock_response.usage = MagicMock(input_tokens=10, output_tokens=4096)
mock_client = AsyncMock()
mock_client.messages.create = AsyncMock(return_value=mock_response)
with patch("anthropic.AsyncAnthropic", return_value=mock_client):
with caplog.at_level(logging.WARNING, logger="app.core.ai_provider"):
text, _, _ = await provider.generate_json(
system_prompt="You are a helper.",
messages=[{"role": "user", "content": "Hi"}],
)
assert text == '{"partial": tr'
truncation_records = [
r for r in caplog.records if getattr(r, "stop_reason", None) == "max_tokens"
]
assert truncation_records, "expected a warning record for max_tokens truncation"
@pytest.mark.asyncio
async def test_generate_json_passes_output_config_when_schema_given(self):
"""When a JSON schema is supplied, it is forwarded as
output_config.format so the API constrains the response shape."""
from app.core import ai_provider
ai_provider._anthropic_clients.clear()
provider = AnthropicProvider(
api_key="schema-key", model="claude-sonnet-4-6", timeout=34
)
mock_response = MagicMock()
mock_response.content = [MagicMock(type="text", text='{"title": "x"}')]
mock_response.stop_reason = "end_turn"
mock_response.usage = MagicMock(input_tokens=10, output_tokens=5)
mock_client = AsyncMock()
mock_client.messages.create = AsyncMock(return_value=mock_response)
schema = {
"type": "object",
"properties": {"title": {"type": "string"}},
"required": ["title"],
"additionalProperties": False,
}
with patch("anthropic.AsyncAnthropic", return_value=mock_client):
await provider.generate_json(
system_prompt="You are a helper.",
messages=[{"role": "user", "content": "Hi"}],
max_tokens=512,
schema=schema,
)
mock_client.messages.create.assert_called_once_with(
model="claude-sonnet-4-6",
max_tokens=512,
system="You are a helper.",
messages=[{"role": "user", "content": "Hi"}],
output_config={"format": {"type": "json_schema", "schema": schema}},
)
@pytest.mark.asyncio
async def test_generate_json_no_output_config_when_schema_none(self):
"""With no schema, output_config is not sent (backward compatible)."""
from app.core import ai_provider
ai_provider._anthropic_clients.clear()
provider = AnthropicProvider(
api_key="noschema-key", model="claude-sonnet-4-6", timeout=35
)
mock_response = MagicMock()
mock_response.content = [MagicMock(type="text", text="{}")]
mock_response.stop_reason = "end_turn"
mock_response.usage = MagicMock(input_tokens=1, output_tokens=1)
mock_client = AsyncMock()
mock_client.messages.create = AsyncMock(return_value=mock_response)
with patch("anthropic.AsyncAnthropic", return_value=mock_client):
await provider.generate_json(
system_prompt="You are a helper.",
messages=[{"role": "user", "content": "Hi"}],
)
_, call_kwargs = mock_client.messages.create.call_args
assert "output_config" not in call_kwargs
class TestGeminiProvider:
"""Tests for GeminiProvider.generate_json."""
@@ -174,6 +339,48 @@ class TestGeminiProvider:
mock_client.aio.models.generate_content.assert_called_once()
@pytest.mark.asyncio
async def test_generate_json_accepts_and_ignores_schema(self):
"""Gemini accepts the schema kwarg (interface parity) and still
returns JSON; it does not error on the param."""
provider = GeminiProvider(api_key="test-key", model="gemini-2.5-flash")
mock_usage = MagicMock()
mock_usage.prompt_token_count = 5
mock_usage.candidates_token_count = 3
mock_response = MagicMock()
mock_response.text = '{"answer": 1}'
mock_response.usage_metadata = mock_usage
mock_client = MagicMock()
mock_client.aio.models.generate_content = AsyncMock(return_value=mock_response)
mock_genai_module = MagicMock()
mock_genai_module.Client.return_value = mock_client
mock_types = MagicMock()
mock_types.Content.side_effect = lambda **kw: kw
mock_types.Part.side_effect = lambda **kw: kw
mock_types.GenerateContentConfig.side_effect = lambda **kw: kw
mock_google = MagicMock()
mock_google.genai = mock_genai_module
mock_genai_module.types = mock_types
with patch.dict(sys.modules, {
"google": mock_google,
"google.genai": mock_genai_module,
"google.genai.types": mock_types,
}):
text, _, _ = await provider.generate_json(
system_prompt="Generate JSON.",
messages=[{"role": "user", "content": "data"}],
schema={"type": "object"},
)
assert text == '{"answer": 1}'
@pytest.mark.asyncio
async def test_generate_json_handles_none_usage(self):
"""Token counts default to 0 when usage_metadata attributes are None."""

View File

@@ -0,0 +1,104 @@
"""Tests for the structured-output JSON schemas used by KB conversion.
These validate that the schemas are well-formed against the Anthropic
structured-output limits (every object carries additionalProperties: false,
`required` is a subset of declared properties, no numeric/length constraints)
and that the target_type -> schema selector returns the right shape. They do
NOT exercise the live API — constrained decoding must be smoke-tested against
a real model before AI_KB_CONVERT_STRUCTURED_OUTPUT is enabled in production.
"""
from app.core.kb_conversion_service import (
PROCEDURAL_SCHEMA,
TROUBLESHOOTING_SCHEMA,
_schema_for_target_type,
)
# Constraints disallowed by Anthropic structured outputs (must be absent so the
# API does not reject the schema or silently strip them).
_DISALLOWED_KEYS = {
"minimum",
"maximum",
"multipleOf",
"minLength",
"maxLength",
"minItems",
"maxItems",
}
def _assert_well_formed(schema: dict) -> None:
"""Recursively assert a JSON schema obeys the structured-output limits."""
if schema.get("type") == "object":
assert schema.get("additionalProperties") is False, (
f"object schema missing additionalProperties: false: {schema}"
)
props = schema.get("properties", {})
required = set(schema.get("required", []))
assert required <= set(props), (
f"required keys not all declared as properties: {required - set(props)}"
)
for sub in props.values():
_assert_well_formed(sub)
elif schema.get("type") == "array":
_assert_well_formed(schema["items"])
assert not (_DISALLOWED_KEYS & set(schema)), (
f"schema uses unsupported constraint(s): {_DISALLOWED_KEYS & set(schema)}"
)
class TestStructuredOutputSchemas:
def test_troubleshooting_schema_is_well_formed(self):
_assert_well_formed(TROUBLESHOOTING_SCHEMA)
def test_procedural_schema_is_well_formed(self):
_assert_well_formed(PROCEDURAL_SCHEMA)
def test_troubleshooting_schema_top_level_shape(self):
props = TROUBLESHOOTING_SCHEMA["properties"]
assert set(props) >= {"title", "description", "nodes"}
node = props["nodes"]["items"]
# Every field the troubleshooting prompt may emit must be modelled,
# else additionalProperties: false makes them impossible to produce.
assert set(node["properties"]) >= {
"id",
"type",
"question",
"options",
"next_node_id",
"confidence",
"source_excerpt",
}
def test_procedural_schema_top_level_shape(self):
props = PROCEDURAL_SCHEMA["properties"]
assert set(props) >= {"title", "description", "steps", "intake_form"}
step = props["steps"]["items"]
assert set(step["properties"]) >= {
"id",
"type",
"content",
"confidence",
"source_excerpt",
}
intake = props["intake_form"]["items"]
assert set(intake["properties"]) >= {
"variable_name",
"label",
"field_type",
"required",
"display_order",
}
class TestSchemaSelector:
def test_returns_troubleshooting_schema(self):
assert _schema_for_target_type("troubleshooting") is TROUBLESHOOTING_SCHEMA
def test_returns_procedural_schema_for_procedural(self):
assert _schema_for_target_type("procedural") is PROCEDURAL_SCHEMA
def test_defaults_to_procedural_for_unknown(self):
# convert_document treats any non-"troubleshooting" target as procedural.
assert _schema_for_target_type("something-else") is PROCEDURAL_SCHEMA