Merge feat/ai-structured-outputs into integration branch
This commit is contained in:
@@ -13,6 +13,32 @@
|
||||
|
||||
---
|
||||
|
||||
## 2026-05-28 — Scope Anthropic structured outputs to flat-array JSON only
|
||||
|
||||
**Context:** Optimizing the existing Claude API usage (no model change). The Anthropic path in `generate_json` (`ai_provider.py`) had no equivalent to the Gemini path's `response_mime_type="application/json"` — it prompted for JSON and relied on downstream defenses: `_strip_markdown_fences` (ai_fix), `parse_llm_json` (knowledge_flywheel), and `_try_repair_json` (kb_conversion, which balances unclosed braces on truncated output). Anthropic structured outputs (`output_config.format` with a JSON schema) guarantee valid, parseable JSON and would eliminate those band-aids. The question was which of the four `generate_json` call sites can adopt it.
|
||||
|
||||
Structured outputs has hard schema limits: **no recursive schemas**, and **every object must set `additionalProperties: false`** (so the schema must enumerate exactly the fields the model emits — a superset is impossible, an omission makes a field unproducible). Tracing the call sites against those limits:
|
||||
|
||||
- **kb_conversion** → output is `{title, description, nodes: [...]}` / `{...steps[], intake_form[]}` — **flat arrays**, references by `next_node_id`/id, no nesting. Expressible.
|
||||
- **ai_fix** → returns a fixed *node that is itself a subtree*; `_find_node_by_id` recurses `node["children"]` and the prompt requires decision nodes to have ≥2 children. **Recursive, arbitrary depth.**
|
||||
- **knowledge_flywheel flow-gen** → emits `tree_structure`, a decision-tree root with nested `children`/`options`, persisted as an opaque blob.
|
||||
- **knowledge_flywheel enhancement** → flat `new_nodes[] + modified_options[]`; expressible but low-frequency and only fence-stripped.
|
||||
|
||||
**Decision:** Apply structured outputs to **flat-array outputs only** — i.e. `kb_conversion`. Wired via an optional `schema=` param on `AIProvider.generate_json` (`None` = legacy prompt-only behavior; Anthropic maps it to `output_config.format`, Gemini ignores it), with the two KB schemas + `_schema_for_target_type()` in `kb_conversion_service.py`, gated behind `settings.AI_KB_CONVERT_STRUCTURED_OUTPUT` (default **False**) pending a live constrained-decoding smoke-test in staging. The robustness fixes that motivated the work — `_extract_text_from_response` (skip non-text blocks, log `max_tokens`/`refusal`, raise on no-text) — live in the shared provider, so **all four** callers already benefit regardless of schema adoption.
|
||||
|
||||
**Rejected:**
|
||||
- **Forcing schemas on ai_fix / flow-gen.** Their outputs are recursive/nested decision trees; a bounded-depth schema would reject valid deeper trees and break generation. Wrong architecture for marginal/zero benefit (flow-gen's tree is stored as a blob, never schema-validated downstream).
|
||||
- **Wiring the flywheel enhancement site.** Flat and technically expressible, but low call frequency and only fence-stripping today — marginal benefit against the risk of a blind (un-live-tested) `additionalProperties: false` schema.
|
||||
- **Deleting the fence-strip / repair helpers now.** `_strip_markdown_fences` / `parse_llm_json` must stay — they protect the recursive paths that can't use schemas. Only `_try_repair_json` (kb-only) becomes removable, and only *after* the flag is validated in staging.
|
||||
|
||||
**Consequences:**
|
||||
- Structured outputs is the tool for flat JSON; recursive decision-tree outputs are excluded by design. New flat-JSON `generate_json` callers can opt in via `schema=`; recursive ones should not.
|
||||
- `AI_KB_CONVERT_STRUCTURED_OUTPUT` must be smoke-tested against the live model (both target types) before production enablement. Open risk: whether Anthropic accepts optional (non-`required`) fields — if not, the schemas need every field in `required` with nullable types. The flag makes this fully reversible.
|
||||
- Deferred cleanup: once the flag is validated, remove only `_try_repair_json` from the kb_conversion Anthropic path; leave the fence-strippers.
|
||||
- Work lives on branch `feat/ai-structured-outputs` (commits `84a02a5`, `1388357`), based on `design/l1-workspace`.
|
||||
|
||||
---
|
||||
|
||||
## 2026-05-13 — Session expiration policy: 3d idle / 14d absolute defaults + per-account override
|
||||
|
||||
**Context:** User report: "I login to ResolutionFlow and never have to log back in." Investigation found refresh tokens at `REFRESH_TOKEN_EXPIRE_DAYS=7` with JTI rotation (`security.py:36`) — every `/auth/refresh` minted a fresh 7-day window. Net effect: a sliding 7-day session with no absolute cap. Visit once a week, logged in forever. Acceptable for pilot but not for MSP buyers whose SOC2 / cyber-insurance auditors require enforced session timeouts. Required for the same Phase O launch readiness as the other gates already in flight.
|
||||
|
||||
4
.gitignore
vendored
4
.gitignore
vendored
@@ -237,6 +237,10 @@ package.json
|
||||
package-lock.json
|
||||
.worktrees/
|
||||
.gstack/
|
||||
|
||||
# Core dumps from crashed processes (e.g. core.12345)
|
||||
core.[0-9]*
|
||||
**/core.[0-9]*
|
||||
.gitnexus
|
||||
|
||||
# graphify knowledge graph outputs
|
||||
|
||||
@@ -147,6 +147,40 @@ def build_anthropic_chat_messages(
|
||||
return messages
|
||||
|
||||
|
||||
def _extract_text_from_response(response: Any, model: str) -> str:
|
||||
"""Return the first text block's text from an Anthropic message response.
|
||||
|
||||
Robustness over the naive ``response.content[0].text``:
|
||||
- Skips non-text leading blocks (e.g. ``thinking``) and returns the first
|
||||
block whose ``type == "text"``. Indexing ``content[0]`` blindly throws or
|
||||
returns garbage the moment a non-text block leads the response.
|
||||
- Surfaces truncation/refusal: when ``stop_reason`` is ``max_tokens`` or
|
||||
``refusal``, emits a structured warning so silent output corruption
|
||||
(truncated JSON, empty refusals) is observable rather than handed
|
||||
downstream to be guessed at.
|
||||
- Raises ``ValueError`` when no text block is present (e.g. a bare refusal)
|
||||
instead of returning a non-text block's attributes.
|
||||
"""
|
||||
stop_reason = getattr(response, "stop_reason", None)
|
||||
if stop_reason in ("max_tokens", "refusal"):
|
||||
logger.warning(
|
||||
"anthropic.stop_reason",
|
||||
extra={
|
||||
"event": "anthropic.stop_reason",
|
||||
"model": model,
|
||||
"stop_reason": stop_reason,
|
||||
},
|
||||
)
|
||||
|
||||
for block in response.content:
|
||||
if getattr(block, "type", None) == "text":
|
||||
return block.text
|
||||
|
||||
raise ValueError(
|
||||
f"Anthropic response contained no text block (stop_reason={stop_reason!r})"
|
||||
)
|
||||
|
||||
|
||||
def _log_anthropic_cache_usage(usage: Any, model: str) -> None:
|
||||
"""Emit a structured log line capturing cache_read / cache_creation tokens."""
|
||||
cache_read = getattr(usage, "cache_read_input_tokens", 0) or 0
|
||||
@@ -176,6 +210,7 @@ class AIProvider(ABC):
|
||||
system_prompt: str | list[SystemBlock],
|
||||
messages: list[dict[str, Any]],
|
||||
max_tokens: int = 4096,
|
||||
schema: dict[str, Any] | None = None,
|
||||
) -> tuple[str, int, int]:
|
||||
"""Generate a JSON response from the AI model.
|
||||
|
||||
@@ -185,6 +220,15 @@ class AIProvider(ABC):
|
||||
Anthropic prompt caching per module-docstring policy.
|
||||
messages: List of message dicts with "role" and "content" keys.
|
||||
max_tokens: Maximum output tokens.
|
||||
schema: Optional JSON Schema constraining the response shape.
|
||||
When provided, the Anthropic backend uses structured outputs
|
||||
(`output_config.format`) to guarantee valid, parseable JSON —
|
||||
no markdown fences, no truncated-brace repair. Must satisfy the
|
||||
structured-output schema limits (every object needs
|
||||
`additionalProperties: false`; no recursion; numeric/string
|
||||
constraints are stripped). `None` preserves the legacy
|
||||
prompt-only behavior. The Gemini backend currently ignores this
|
||||
argument (it already requests `application/json`).
|
||||
|
||||
Returns:
|
||||
Tuple of (response_text, input_tokens, output_tokens).
|
||||
@@ -231,7 +275,11 @@ class GeminiProvider(AIProvider):
|
||||
system_prompt: str | list[SystemBlock],
|
||||
messages: list[dict[str, Any]],
|
||||
max_tokens: int = 4096,
|
||||
schema: dict[str, Any] | None = None,
|
||||
) -> tuple[str, int, int]:
|
||||
# `schema` is accepted for interface parity but ignored: Gemini already
|
||||
# constrains output via response_mime_type="application/json" below.
|
||||
# Mapping JSON Schema -> Gemini response_schema is deferred.
|
||||
from google import genai
|
||||
from google.genai import types as genai_types
|
||||
|
||||
@@ -362,18 +410,28 @@ class AnthropicProvider(AIProvider):
|
||||
system_prompt: str | list[SystemBlock],
|
||||
messages: list[dict[str, Any]],
|
||||
max_tokens: int = 4096,
|
||||
schema: dict[str, Any] | None = None,
|
||||
) -> tuple[str, int, int]:
|
||||
client = _get_anthropic_client(self._api_key, self._timeout)
|
||||
normalized_system = _normalize_system_for_anthropic(system_prompt)
|
||||
|
||||
response = await client.messages.create(
|
||||
model=self._model,
|
||||
max_tokens=max_tokens,
|
||||
system=normalized_system,
|
||||
messages=messages,
|
||||
)
|
||||
create_kwargs: dict[str, Any] = {
|
||||
"model": self._model,
|
||||
"max_tokens": max_tokens,
|
||||
"system": normalized_system,
|
||||
"messages": messages,
|
||||
}
|
||||
if schema is not None:
|
||||
# Structured outputs: constrain the response to valid JSON matching
|
||||
# the schema (Sonnet 4.6 / Haiku 4.5). Removes the need for
|
||||
# markdown-fence stripping and truncated-JSON repair downstream.
|
||||
create_kwargs["output_config"] = {
|
||||
"format": {"type": "json_schema", "schema": schema}
|
||||
}
|
||||
|
||||
text = response.content[0].text
|
||||
response = await client.messages.create(**create_kwargs)
|
||||
|
||||
text = _extract_text_from_response(response, self._model)
|
||||
input_tokens = response.usage.input_tokens
|
||||
output_tokens = response.usage.output_tokens
|
||||
|
||||
|
||||
@@ -155,6 +155,12 @@ class Settings(BaseSettings):
|
||||
AI_CONVERSATION_TTL_HOURS: int = 24
|
||||
AI_MAX_CALLS_PER_FLOW: int = 10
|
||||
AI_REQUEST_TIMEOUT_SECONDS: int = 120
|
||||
# When True, KB conversion constrains the Anthropic response with a JSON
|
||||
# schema (structured outputs) instead of relying on prompt-only JSON +
|
||||
# downstream fence-stripping / brace-repair. Default OFF: enable in staging
|
||||
# and smoke-test constrained decoding against the live model before turning
|
||||
# it on in production. Only affects the Anthropic backend.
|
||||
AI_KB_CONVERT_STRUCTURED_OUTPUT: bool = False
|
||||
# AI Provider selection
|
||||
AI_PROVIDER: str = "anthropic" # "gemini" or "anthropic"
|
||||
GOOGLE_AI_API_KEY: Optional[str] = None
|
||||
|
||||
@@ -202,6 +202,115 @@ the engineer attached, NOT from this schema):
|
||||
9. Return ONLY valid JSON — no markdown fences, no explanation text."""
|
||||
|
||||
|
||||
# ── Structured-output schemas ──
|
||||
#
|
||||
# These constrain the model's JSON via Anthropic structured outputs
|
||||
# (output_config.format) so the response is guaranteed valid and parseable —
|
||||
# no markdown fences, no truncated-brace repair. They must be a SUPERSET of
|
||||
# every field the corresponding system prompt instructs the model to emit:
|
||||
# additionalProperties is False everywhere, so any field the prompt asks for
|
||||
# but the schema omits would be impossible to produce.
|
||||
#
|
||||
# `type`/`field_type` are intentionally left as plain strings (no enum): the
|
||||
# downstream parser already normalizes/tolerates the type values, and an enum
|
||||
# risks constraining the model away from a value the prompt would yield.
|
||||
|
||||
_TROUBLESHOOTING_OPTION_SCHEMA: dict[str, Any] = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"label": {"type": "string"},
|
||||
"next_node_id": {"type": "string"},
|
||||
},
|
||||
"required": ["label", "next_node_id"],
|
||||
"additionalProperties": False,
|
||||
}
|
||||
|
||||
_TROUBLESHOOTING_NODE_SCHEMA: dict[str, Any] = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {"type": "string"},
|
||||
"type": {"type": "string"},
|
||||
"question": {"type": "string"},
|
||||
"options": {"type": "array", "items": _TROUBLESHOOTING_OPTION_SCHEMA},
|
||||
"next_node_id": {"type": "string"},
|
||||
"confidence": {"type": "number"},
|
||||
"source_excerpt": {"type": "string"},
|
||||
},
|
||||
# Only the universal fields are required. `question`/`options`/`next_node_id`
|
||||
# vary by node type and stay optional so a resolution node need not carry
|
||||
# options and an action node need not carry a question.
|
||||
"required": ["id", "type", "confidence", "source_excerpt"],
|
||||
"additionalProperties": False,
|
||||
}
|
||||
|
||||
TROUBLESHOOTING_SCHEMA: dict[str, Any] = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"title": {"type": "string"},
|
||||
"description": {"type": "string"},
|
||||
"nodes": {"type": "array", "items": _TROUBLESHOOTING_NODE_SCHEMA},
|
||||
},
|
||||
"required": ["title", "description", "nodes"],
|
||||
"additionalProperties": False,
|
||||
}
|
||||
|
||||
_PROCEDURAL_STEP_SCHEMA: dict[str, Any] = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {"type": "string"},
|
||||
"type": {"type": "string"},
|
||||
"content": {"type": "string"},
|
||||
"confidence": {"type": "number"},
|
||||
"source_excerpt": {"type": "string"},
|
||||
},
|
||||
"required": ["id", "type", "content", "confidence", "source_excerpt"],
|
||||
"additionalProperties": False,
|
||||
}
|
||||
|
||||
_PROCEDURAL_INTAKE_SCHEMA: dict[str, Any] = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"variable_name": {"type": "string"},
|
||||
"label": {"type": "string"},
|
||||
"field_type": {"type": "string"},
|
||||
"required": {"type": "boolean"},
|
||||
"display_order": {"type": "integer"},
|
||||
},
|
||||
"required": [
|
||||
"variable_name",
|
||||
"label",
|
||||
"field_type",
|
||||
"required",
|
||||
"display_order",
|
||||
],
|
||||
"additionalProperties": False,
|
||||
}
|
||||
|
||||
PROCEDURAL_SCHEMA: dict[str, Any] = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"title": {"type": "string"},
|
||||
"description": {"type": "string"},
|
||||
"steps": {"type": "array", "items": _PROCEDURAL_STEP_SCHEMA},
|
||||
"intake_form": {"type": "array", "items": _PROCEDURAL_INTAKE_SCHEMA},
|
||||
},
|
||||
"required": ["title", "description", "steps", "intake_form"],
|
||||
"additionalProperties": False,
|
||||
}
|
||||
|
||||
|
||||
def _schema_for_target_type(target_type: str) -> dict[str, Any]:
|
||||
"""Return the structured-output schema for a KB conversion target type.
|
||||
|
||||
Mirrors the prompt selection in ``convert_document``: only
|
||||
``"troubleshooting"`` uses the decision-tree schema; everything else is
|
||||
treated as a procedural flow.
|
||||
"""
|
||||
if target_type == "troubleshooting":
|
||||
return TROUBLESHOOTING_SCHEMA
|
||||
return PROCEDURAL_SCHEMA
|
||||
|
||||
|
||||
def _build_user_message(
|
||||
source_text: str,
|
||||
source_metadata: dict[str, Any] | None,
|
||||
@@ -404,6 +513,16 @@ async def convert_document(
|
||||
model = settings.get_model_for_action("kb_convert")
|
||||
provider = get_ai_provider(model=model)
|
||||
|
||||
# Structured outputs (flagged): constrain the response to a JSON schema so
|
||||
# the model can't emit fences or truncated JSON. Falls back to prompt-only
|
||||
# JSON (schema=None) when disabled; the parse path below stays intact either
|
||||
# way as a belt-and-suspenders fallback.
|
||||
schema = (
|
||||
_schema_for_target_type(kb_import.target_type)
|
||||
if settings.AI_KB_CONVERT_STRUCTURED_OUTPUT
|
||||
else None
|
||||
)
|
||||
|
||||
try:
|
||||
raw_text, input_tokens, output_tokens = await provider.generate_json(
|
||||
system_prompt=[
|
||||
@@ -414,6 +533,7 @@ async def convert_document(
|
||||
],
|
||||
messages=[{"role": "user", "content": user_message}],
|
||||
max_tokens=16384,
|
||||
schema=schema,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("AI conversion failed for kb_import=%s: %s", kb_import.id, e)
|
||||
|
||||
@@ -96,7 +96,8 @@ class TestAnthropicProvider:
|
||||
)
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = [MagicMock(text='{"result": "ok"}')]
|
||||
mock_response.content = [MagicMock(type="text", text='{"result": "ok"}')]
|
||||
mock_response.stop_reason = "end_turn"
|
||||
mock_response.usage = MagicMock(input_tokens=100, output_tokens=50)
|
||||
|
||||
mock_client = AsyncMock()
|
||||
@@ -120,6 +121,170 @@ class TestAnthropicProvider:
|
||||
messages=[{"role": "user", "content": "Hello"}],
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_json_skips_non_text_blocks(self):
|
||||
"""A leading non-text block (e.g. thinking) is skipped; the first
|
||||
text block's text is returned instead of content[0].text."""
|
||||
from app.core import ai_provider
|
||||
|
||||
ai_provider._anthropic_clients.clear()
|
||||
|
||||
provider = AnthropicProvider(
|
||||
api_key="skip-key", model="claude-sonnet-4-6", timeout=31
|
||||
)
|
||||
|
||||
thinking_block = MagicMock(type="thinking", thinking="hmm...")
|
||||
text_block = MagicMock(type="text", text='{"ok": 1}')
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = [thinking_block, text_block]
|
||||
mock_response.stop_reason = "end_turn"
|
||||
mock_response.usage = MagicMock(input_tokens=10, output_tokens=5)
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.messages.create = AsyncMock(return_value=mock_response)
|
||||
|
||||
with patch("anthropic.AsyncAnthropic", return_value=mock_client):
|
||||
text, _, _ = await provider.generate_json(
|
||||
system_prompt="You are a helper.",
|
||||
messages=[{"role": "user", "content": "Hi"}],
|
||||
)
|
||||
|
||||
assert text == '{"ok": 1}'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_json_raises_when_no_text_block(self):
|
||||
"""A response with no text block (e.g. a bare refusal) raises a clear
|
||||
error instead of returning a non-text block's attributes."""
|
||||
from app.core import ai_provider
|
||||
|
||||
ai_provider._anthropic_clients.clear()
|
||||
|
||||
provider = AnthropicProvider(
|
||||
api_key="empty-key", model="claude-sonnet-4-6", timeout=32
|
||||
)
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = [MagicMock(type="thinking", thinking="...")]
|
||||
mock_response.stop_reason = "refusal"
|
||||
mock_response.usage = MagicMock(input_tokens=10, output_tokens=0)
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.messages.create = AsyncMock(return_value=mock_response)
|
||||
|
||||
with patch("anthropic.AsyncAnthropic", return_value=mock_client):
|
||||
with pytest.raises(ValueError, match="no text block"):
|
||||
await provider.generate_json(
|
||||
system_prompt="You are a helper.",
|
||||
messages=[{"role": "user", "content": "Hi"}],
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_json_logs_warning_on_truncation(self, caplog):
|
||||
"""When stop_reason is max_tokens, a warning is logged (truncation
|
||||
signal) and the partial text is still returned."""
|
||||
import logging
|
||||
|
||||
from app.core import ai_provider
|
||||
|
||||
ai_provider._anthropic_clients.clear()
|
||||
|
||||
provider = AnthropicProvider(
|
||||
api_key="trunc-key", model="claude-sonnet-4-6", timeout=33
|
||||
)
|
||||
|
||||
text_block = MagicMock(type="text", text='{"partial": tr')
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = [text_block]
|
||||
mock_response.stop_reason = "max_tokens"
|
||||
mock_response.usage = MagicMock(input_tokens=10, output_tokens=4096)
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.messages.create = AsyncMock(return_value=mock_response)
|
||||
|
||||
with patch("anthropic.AsyncAnthropic", return_value=mock_client):
|
||||
with caplog.at_level(logging.WARNING, logger="app.core.ai_provider"):
|
||||
text, _, _ = await provider.generate_json(
|
||||
system_prompt="You are a helper.",
|
||||
messages=[{"role": "user", "content": "Hi"}],
|
||||
)
|
||||
|
||||
assert text == '{"partial": tr'
|
||||
truncation_records = [
|
||||
r for r in caplog.records if getattr(r, "stop_reason", None) == "max_tokens"
|
||||
]
|
||||
assert truncation_records, "expected a warning record for max_tokens truncation"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_json_passes_output_config_when_schema_given(self):
|
||||
"""When a JSON schema is supplied, it is forwarded as
|
||||
output_config.format so the API constrains the response shape."""
|
||||
from app.core import ai_provider
|
||||
|
||||
ai_provider._anthropic_clients.clear()
|
||||
|
||||
provider = AnthropicProvider(
|
||||
api_key="schema-key", model="claude-sonnet-4-6", timeout=34
|
||||
)
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = [MagicMock(type="text", text='{"title": "x"}')]
|
||||
mock_response.stop_reason = "end_turn"
|
||||
mock_response.usage = MagicMock(input_tokens=10, output_tokens=5)
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.messages.create = AsyncMock(return_value=mock_response)
|
||||
|
||||
schema = {
|
||||
"type": "object",
|
||||
"properties": {"title": {"type": "string"}},
|
||||
"required": ["title"],
|
||||
"additionalProperties": False,
|
||||
}
|
||||
|
||||
with patch("anthropic.AsyncAnthropic", return_value=mock_client):
|
||||
await provider.generate_json(
|
||||
system_prompt="You are a helper.",
|
||||
messages=[{"role": "user", "content": "Hi"}],
|
||||
max_tokens=512,
|
||||
schema=schema,
|
||||
)
|
||||
|
||||
mock_client.messages.create.assert_called_once_with(
|
||||
model="claude-sonnet-4-6",
|
||||
max_tokens=512,
|
||||
system="You are a helper.",
|
||||
messages=[{"role": "user", "content": "Hi"}],
|
||||
output_config={"format": {"type": "json_schema", "schema": schema}},
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_json_no_output_config_when_schema_none(self):
|
||||
"""With no schema, output_config is not sent (backward compatible)."""
|
||||
from app.core import ai_provider
|
||||
|
||||
ai_provider._anthropic_clients.clear()
|
||||
|
||||
provider = AnthropicProvider(
|
||||
api_key="noschema-key", model="claude-sonnet-4-6", timeout=35
|
||||
)
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = [MagicMock(type="text", text="{}")]
|
||||
mock_response.stop_reason = "end_turn"
|
||||
mock_response.usage = MagicMock(input_tokens=1, output_tokens=1)
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.messages.create = AsyncMock(return_value=mock_response)
|
||||
|
||||
with patch("anthropic.AsyncAnthropic", return_value=mock_client):
|
||||
await provider.generate_json(
|
||||
system_prompt="You are a helper.",
|
||||
messages=[{"role": "user", "content": "Hi"}],
|
||||
)
|
||||
|
||||
_, call_kwargs = mock_client.messages.create.call_args
|
||||
assert "output_config" not in call_kwargs
|
||||
|
||||
|
||||
class TestGeminiProvider:
|
||||
"""Tests for GeminiProvider.generate_json."""
|
||||
@@ -174,6 +339,48 @@ class TestGeminiProvider:
|
||||
|
||||
mock_client.aio.models.generate_content.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_json_accepts_and_ignores_schema(self):
|
||||
"""Gemini accepts the schema kwarg (interface parity) and still
|
||||
returns JSON; it does not error on the param."""
|
||||
provider = GeminiProvider(api_key="test-key", model="gemini-2.5-flash")
|
||||
|
||||
mock_usage = MagicMock()
|
||||
mock_usage.prompt_token_count = 5
|
||||
mock_usage.candidates_token_count = 3
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.text = '{"answer": 1}'
|
||||
mock_response.usage_metadata = mock_usage
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.aio.models.generate_content = AsyncMock(return_value=mock_response)
|
||||
|
||||
mock_genai_module = MagicMock()
|
||||
mock_genai_module.Client.return_value = mock_client
|
||||
|
||||
mock_types = MagicMock()
|
||||
mock_types.Content.side_effect = lambda **kw: kw
|
||||
mock_types.Part.side_effect = lambda **kw: kw
|
||||
mock_types.GenerateContentConfig.side_effect = lambda **kw: kw
|
||||
|
||||
mock_google = MagicMock()
|
||||
mock_google.genai = mock_genai_module
|
||||
mock_genai_module.types = mock_types
|
||||
|
||||
with patch.dict(sys.modules, {
|
||||
"google": mock_google,
|
||||
"google.genai": mock_genai_module,
|
||||
"google.genai.types": mock_types,
|
||||
}):
|
||||
text, _, _ = await provider.generate_json(
|
||||
system_prompt="Generate JSON.",
|
||||
messages=[{"role": "user", "content": "data"}],
|
||||
schema={"type": "object"},
|
||||
)
|
||||
|
||||
assert text == '{"answer": 1}'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_json_handles_none_usage(self):
|
||||
"""Token counts default to 0 when usage_metadata attributes are None."""
|
||||
|
||||
104
backend/tests/test_kb_conversion_schema.py
Normal file
104
backend/tests/test_kb_conversion_schema.py
Normal file
@@ -0,0 +1,104 @@
|
||||
"""Tests for the structured-output JSON schemas used by KB conversion.
|
||||
|
||||
These validate that the schemas are well-formed against the Anthropic
|
||||
structured-output limits (every object carries additionalProperties: false,
|
||||
`required` is a subset of declared properties, no numeric/length constraints)
|
||||
and that the target_type -> schema selector returns the right shape. They do
|
||||
NOT exercise the live API — constrained decoding must be smoke-tested against
|
||||
a real model before AI_KB_CONVERT_STRUCTURED_OUTPUT is enabled in production.
|
||||
"""
|
||||
|
||||
from app.core.kb_conversion_service import (
|
||||
PROCEDURAL_SCHEMA,
|
||||
TROUBLESHOOTING_SCHEMA,
|
||||
_schema_for_target_type,
|
||||
)
|
||||
|
||||
# Constraints disallowed by Anthropic structured outputs (must be absent so the
|
||||
# API does not reject the schema or silently strip them).
|
||||
_DISALLOWED_KEYS = {
|
||||
"minimum",
|
||||
"maximum",
|
||||
"multipleOf",
|
||||
"minLength",
|
||||
"maxLength",
|
||||
"minItems",
|
||||
"maxItems",
|
||||
}
|
||||
|
||||
|
||||
def _assert_well_formed(schema: dict) -> None:
|
||||
"""Recursively assert a JSON schema obeys the structured-output limits."""
|
||||
if schema.get("type") == "object":
|
||||
assert schema.get("additionalProperties") is False, (
|
||||
f"object schema missing additionalProperties: false: {schema}"
|
||||
)
|
||||
props = schema.get("properties", {})
|
||||
required = set(schema.get("required", []))
|
||||
assert required <= set(props), (
|
||||
f"required keys not all declared as properties: {required - set(props)}"
|
||||
)
|
||||
for sub in props.values():
|
||||
_assert_well_formed(sub)
|
||||
elif schema.get("type") == "array":
|
||||
_assert_well_formed(schema["items"])
|
||||
|
||||
assert not (_DISALLOWED_KEYS & set(schema)), (
|
||||
f"schema uses unsupported constraint(s): {_DISALLOWED_KEYS & set(schema)}"
|
||||
)
|
||||
|
||||
|
||||
class TestStructuredOutputSchemas:
|
||||
def test_troubleshooting_schema_is_well_formed(self):
|
||||
_assert_well_formed(TROUBLESHOOTING_SCHEMA)
|
||||
|
||||
def test_procedural_schema_is_well_formed(self):
|
||||
_assert_well_formed(PROCEDURAL_SCHEMA)
|
||||
|
||||
def test_troubleshooting_schema_top_level_shape(self):
|
||||
props = TROUBLESHOOTING_SCHEMA["properties"]
|
||||
assert set(props) >= {"title", "description", "nodes"}
|
||||
node = props["nodes"]["items"]
|
||||
# Every field the troubleshooting prompt may emit must be modelled,
|
||||
# else additionalProperties: false makes them impossible to produce.
|
||||
assert set(node["properties"]) >= {
|
||||
"id",
|
||||
"type",
|
||||
"question",
|
||||
"options",
|
||||
"next_node_id",
|
||||
"confidence",
|
||||
"source_excerpt",
|
||||
}
|
||||
|
||||
def test_procedural_schema_top_level_shape(self):
|
||||
props = PROCEDURAL_SCHEMA["properties"]
|
||||
assert set(props) >= {"title", "description", "steps", "intake_form"}
|
||||
step = props["steps"]["items"]
|
||||
assert set(step["properties"]) >= {
|
||||
"id",
|
||||
"type",
|
||||
"content",
|
||||
"confidence",
|
||||
"source_excerpt",
|
||||
}
|
||||
intake = props["intake_form"]["items"]
|
||||
assert set(intake["properties"]) >= {
|
||||
"variable_name",
|
||||
"label",
|
||||
"field_type",
|
||||
"required",
|
||||
"display_order",
|
||||
}
|
||||
|
||||
|
||||
class TestSchemaSelector:
|
||||
def test_returns_troubleshooting_schema(self):
|
||||
assert _schema_for_target_type("troubleshooting") is TROUBLESHOOTING_SCHEMA
|
||||
|
||||
def test_returns_procedural_schema_for_procedural(self):
|
||||
assert _schema_for_target_type("procedural") is PROCEDURAL_SCHEMA
|
||||
|
||||
def test_defaults_to_procedural_for_unknown(self):
|
||||
# convert_document treats any non-"troubleshooting" target as procedural.
|
||||
assert _schema_for_target_type("something-else") is PROCEDURAL_SCHEMA
|
||||
Reference in New Issue
Block a user