"""API + service tests for the FlowPilot Phase 3 suggested-fix + preview surface. Covers: - /api/v1/ai-sessions/{id}/suggested-fixes/active (200 + 404) - /api/v1/ai-sessions/{id}/suggested-fixes/{fix_id}/decision (one_off, draft_template, build_template, dismissed; 409 on dismissing a superseded fix; state_version bump) - /api/v1/ai-sessions/{id}/resolution-note/preview (LLM mocked; cache hit on same state_version, miss after a fact write) - [SUGGEST_FIX] marker parser shape - _persist_suggested_fix supersession + state_version bump """ from __future__ import annotations import uuid from datetime import datetime, timezone from unittest.mock import AsyncMock, patch import pytest from httpx import AsyncClient from sqlalchemy import select from app.api.endpoints.session_suggested_fixes import _clear_preview_cache_for_tests from app.models.ai_session import AISession from app.models.session_suggested_fix import SessionSuggestedFix from app.services.unified_chat_service import ( _parse_suggest_fix_marker, _persist_suggested_fix, ) @pytest.fixture(autouse=True) def _isolate_preview_cache(): _clear_preview_cache_for_tests() yield _clear_preview_cache_for_tests() async def _make_session(test_db, user) -> AISession: session = AISession( user_id=user["user_data"]["id"], account_id=user["user_data"]["account_id"], session_type="chat", intake_type="free_text", intake_content={"text": "phase 3 test"}, status="active", confidence_tier="discovery", conversation_messages=[], ) test_db.add(session) await test_db.commit() await test_db.refresh(session) return session # ── [SUGGEST_FIX] parser ──────────────────────────────────────────────────── class TestSuggestFixParser: def test_no_marker(self): cleaned, fix = _parse_suggest_fix_marker("just analysis") assert cleaned == "just analysis" assert fix is None def test_well_formed_block(self): text = ( "Analysis sentence.\n\n" '[SUGGEST_FIX]\n' '{"title": "Reset password", "description": "Stale credential.", ' '"confidence": 87, "script_template_slug": "reset-cw"}\n' '[/SUGGEST_FIX]' ) cleaned, fix = _parse_suggest_fix_marker(text) assert cleaned == "Analysis sentence." assert fix is not None assert fix["title"] == "Reset password" assert fix["confidence_pct"] == 87 assert fix["script_template_slug"] == "reset-cw" assert fix["ai_drafted_script"] is None def test_confidence_clamped_and_rounded(self): text = ( '[SUGGEST_FIX]\n{"title":"x","description":"y","confidence":120.7}\n[/SUGGEST_FIX]' ) _, fix = _parse_suggest_fix_marker(text) assert fix is not None and fix["confidence_pct"] == 100 text2 = ( '[SUGGEST_FIX]\n{"title":"x","description":"y","confidence":-3}\n[/SUGGEST_FIX]' ) _, fix2 = _parse_suggest_fix_marker(text2) assert fix2 is not None and fix2["confidence_pct"] == 0 def test_only_last_block_wins(self): # Stale early block plus a final intent — the parser keeps the LAST one. text = ( '[SUGGEST_FIX]\n{"title":"old","description":"o","confidence":50}\n[/SUGGEST_FIX]\n' '[SUGGEST_FIX]\n{"title":"new","description":"n","confidence":80}\n[/SUGGEST_FIX]' ) cleaned, fix = _parse_suggest_fix_marker(text) assert fix is not None and fix["title"] == "new" assert "[SUGGEST_FIX]" not in cleaned def test_missing_required_field_dropped(self): text = '[SUGGEST_FIX]\n{"title":"only title"}\n[/SUGGEST_FIX]' cleaned, fix = _parse_suggest_fix_marker(text) assert fix is None # Marker still stripped from display. assert "[SUGGEST_FIX]" not in cleaned def test_malformed_json_dropped(self): text = "[SUGGEST_FIX]\nnot json\n[/SUGGEST_FIX]" cleaned, fix = _parse_suggest_fix_marker(text) assert fix is None assert "[SUGGEST_FIX]" not in cleaned # ── _persist_suggested_fix ────────────────────────────────────────────────── @pytest.mark.asyncio async def test_persist_supersedes_prior_active_and_bumps_state_version(test_db, test_user): session = await _make_session(test_db, test_user) initial_version = session.state_version # Insert an existing active fix so we can verify supersession. existing = SessionSuggestedFix( session_id=session.id, account_id=session.account_id, title="Old fix", description="prior", confidence_pct=60, ) test_db.add(existing) await test_db.commit() await _persist_suggested_fix( db=test_db, session=session, fix={ "title": "New fix", "description": "current best", "confidence_pct": 88, "script_template_slug": None, "ai_drafted_script": None, "ai_drafted_parameters": None, }, ) await test_db.commit() await test_db.refresh(existing) await test_db.refresh(session) assert existing.superseded_at is not None assert session.state_version == initial_version + 1 # Exactly one active row remains — and it's the new one. result = await test_db.execute( select(SessionSuggestedFix).where( SessionSuggestedFix.session_id == session.id, SessionSuggestedFix.superseded_at.is_(None), ) ) actives = list(result.scalars().all()) assert len(actives) == 1 assert actives[0].title == "New fix" # ── /suggested-fixes/active endpoint ──────────────────────────────────────── @pytest.mark.asyncio async def test_get_active_returns_404_when_none(client: AsyncClient, test_user, auth_headers, test_db): session = await _make_session(test_db, test_user) r = await client.get( f"/api/v1/ai-sessions/{session.id}/suggested-fixes/active", headers=auth_headers, ) assert r.status_code == 404 @pytest.mark.asyncio async def test_get_active_returns_active_fix(client: AsyncClient, test_user, auth_headers, test_db): session = await _make_session(test_db, test_user) fix = SessionSuggestedFix( session_id=session.id, account_id=session.account_id, title="Active fix", description="d", confidence_pct=72, ) test_db.add(fix) await test_db.commit() r = await client.get( f"/api/v1/ai-sessions/{session.id}/suggested-fixes/active", headers=auth_headers, ) assert r.status_code == 200 body = r.json() assert body["title"] == "Active fix" assert body["confidence_pct"] == 72 assert body["superseded_at"] is None # ── /decision endpoint ───────────────────────────────────────────────────── @pytest.mark.asyncio async def test_record_decision_persists_and_bumps_state_version( client: AsyncClient, test_user, auth_headers, test_db ): session = await _make_session(test_db, test_user) initial_version = session.state_version fix = SessionSuggestedFix( session_id=session.id, account_id=session.account_id, title="x", description="y", confidence_pct=50, ai_drafted_script="Write-Output 'ok'", ) test_db.add(fix) await test_db.commit() # The draft_template path calls TemplateExtractionService, which needs an # AI provider configured. CI doesn't set ANTHROPIC_API_KEY/GOOGLE_AI_API_KEY, # and this test isn't exercising the AI integration — patch the extractor # with a minimal valid response so the rest of the decision flow runs. extractor_stub = AsyncMock(return_value={ "templated_body": "Write-Output 'ok'", "parameters": [], }) with patch( "app.api.endpoints.session_suggested_fixes._extract_template_parameters", extractor_stub, ): r = await client.post( f"/api/v1/ai-sessions/{session.id}/suggested-fixes/{fix.id}/decision", headers=auth_headers, json={"decision": "draft_template"}, ) assert r.status_code == 200 assert r.json()["user_decision"] == "draft_template" await test_db.refresh(session) assert session.state_version == initial_version + 1 @pytest.mark.asyncio async def test_dismissed_supersedes_the_fix( client: AsyncClient, test_user, auth_headers, test_db ): session = await _make_session(test_db, test_user) fix = SessionSuggestedFix( session_id=session.id, account_id=session.account_id, title="x", description="y", confidence_pct=50, ) test_db.add(fix) await test_db.commit() r = await client.post( f"/api/v1/ai-sessions/{session.id}/suggested-fixes/{fix.id}/decision", headers=auth_headers, json={"decision": "dismissed"}, ) assert r.status_code == 200 await test_db.refresh(fix) assert fix.superseded_at is not None @pytest.mark.asyncio async def test_dismiss_already_superseded_returns_409( client: AsyncClient, test_user, auth_headers, test_db ): session = await _make_session(test_db, test_user) fix = SessionSuggestedFix( session_id=session.id, account_id=session.account_id, title="x", description="y", confidence_pct=50, superseded_at=datetime.now(timezone.utc), ) test_db.add(fix) await test_db.commit() r = await client.post( f"/api/v1/ai-sessions/{session.id}/suggested-fixes/{fix.id}/decision", headers=auth_headers, json={"decision": "dismissed"}, ) assert r.status_code == 409 # ── /resolution-note/preview endpoint ────────────────────────────────────── @pytest.mark.asyncio async def test_preview_uses_state_version_cache( client: AsyncClient, test_user, auth_headers, test_db ): session = await _make_session(test_db, test_user) fake_provider = AsyncMock() fake_provider.generate_text = AsyncMock(return_value=( "## Problem\nx\n\n## What we confirmed\n(none)\n\n## Root cause\ny\n\n## Resolution\nz", 100, 50, )) with patch( "app.services.resolution_note_generator.get_ai_provider", return_value=fake_provider, ): # First call — cache miss, generates fresh. r1 = await client.post( f"/api/v1/ai-sessions/{session.id}/resolution-note/preview", headers=auth_headers, ) assert r1.status_code == 200 assert r1.json()["from_cache"] is False assert fake_provider.generate_text.await_count == 1 # Second call, no state change — must hit the cache (no extra LLM call). r2 = await client.post( f"/api/v1/ai-sessions/{session.id}/resolution-note/preview", headers=auth_headers, ) assert r2.status_code == 200 assert r2.json()["from_cache"] is True assert r2.json()["markdown"] == r1.json()["markdown"] assert fake_provider.generate_text.await_count == 1 @pytest.mark.asyncio async def test_preview_invalidates_after_fact_write( client: AsyncClient, test_user, auth_headers, test_db ): """A new fact bumps state_version → next preview is a fresh generation, not cached.""" session = await _make_session(test_db, test_user) fake_provider = AsyncMock() fake_provider.generate_text = AsyncMock(return_value=( "## Problem\nx\n\n## What we confirmed\n(none)\n\n## Root cause\ny\n\n## Resolution\nz", 100, 50, )) with patch( "app.services.resolution_note_generator.get_ai_provider", return_value=fake_provider, ): await client.post( f"/api/v1/ai-sessions/{session.id}/resolution-note/preview", headers=auth_headers, ) assert fake_provider.generate_text.await_count == 1 # Add a fact — bumps state_version on the session. await client.post( f"/api/v1/ai-sessions/{session.id}/facts", headers=auth_headers, json={"text": "a confirmed observation"}, ) # Next preview must regenerate (cache key includes state_version). r = await client.post( f"/api/v1/ai-sessions/{session.id}/resolution-note/preview", headers=auth_headers, ) assert r.status_code == 200 assert r.json()["from_cache"] is False assert fake_provider.generate_text.await_count == 2