"""API + service tests for the FlowPilot Phase 2 "What we know" facts surface. Covers: - /api/v1/ai-sessions/{id}/facts CRUD - Editability rule (403 on PATCH for question/diagnostic_check facts) - /facts/promote with `proposed_text` (no LLM call) and via synthesis (mocked) - state_version increments on every fact write - Stable-UUID assignment for pending_task_lane items - [PROMOTE] marker parser shape """ from __future__ import annotations import uuid from unittest.mock import AsyncMock, patch import pytest from httpx import AsyncClient from sqlalchemy import select from app.models.ai_session import AISession from app.models.session_fact import SessionFact from app.services.fact_synthesis_service import FactSynthesisService from app.services.unified_chat_service import ( _assign_stable_task_lane_ids, _parse_promote_marker, ) # ── Fixtures ──────────────────────────────────────────────────────────────── async def _make_session(test_db, user, *, pending_task_lane=None) -> AISession: session = AISession( user_id=user["user_data"]["id"], account_id=user["user_data"]["account_id"], session_type="chat", intake_type="free_text", intake_content={"text": "test"}, status="active", confidence_tier="discovery", conversation_messages=[], pending_task_lane=pending_task_lane, ) test_db.add(session) await test_db.commit() await test_db.refresh(session) return session # ── [PROMOTE] marker parser ───────────────────────────────────────────────── class TestPromoteMarkerParser: def test_no_marker_returns_unchanged(self): text = "Just an analysis sentence." cleaned, items = _parse_promote_marker(text) assert cleaned == text assert items is None def test_single_block(self): ref = uuid.uuid4() text = ( "Some analysis.\n\n" f'[PROMOTE]\n{{"source_type":"question","source_ref":"{ref}",' '"text":"OWA login confirmed working","summary":"rules out tenant"}\n' "[/PROMOTE]" ) cleaned, items = _parse_promote_marker(text) assert cleaned == "Some analysis." assert items is not None and len(items) == 1 assert items[0]["source_type"] == "question" assert items[0]["source_ref"] == ref assert items[0]["text"] == "OWA login confirmed working" assert items[0]["summary"] == "rules out tenant" def test_multiple_blocks(self): text = ( '[PROMOTE]\n{"source_type":"question","source_ref":null,' '"text":"a","summary":"x"}\n[/PROMOTE]\n' '[PROMOTE]\n{"source_type":"diagnostic_check","source_ref":null,' '"text":"b","summary":"y"}\n[/PROMOTE]' ) cleaned, items = _parse_promote_marker(text) assert items is not None and len(items) == 2 assert items[0]["text"] == "a" assert items[1]["text"] == "b" assert "[PROMOTE]" not in cleaned def test_ai_synthesis_strips_source_ref(self): # The model should not provide source_ref for synthesis facts — # the parser drops it defensively even if the model does. ref = uuid.uuid4() text = ( f'[PROMOTE]\n{{"source_type":"ai_synthesis","source_ref":"{ref}",' '"text":"Combined finding","summary":"synth"}\n[/PROMOTE]' ) _, items = _parse_promote_marker(text) assert items is not None and items[0]["source_ref"] is None def test_invalid_source_type_dropped(self): text = ( '[PROMOTE]\n{"source_type":"bogus","text":"x"}\n[/PROMOTE]\n' '[PROMOTE]\n{"source_type":"question","source_ref":null,"text":"good"}\n[/PROMOTE]' ) _, items = _parse_promote_marker(text) assert items is not None and len(items) == 1 assert items[0]["text"] == "good" def test_missing_text_dropped(self): text = '[PROMOTE]\n{"source_type":"question","source_ref":null,"text":""}\n[/PROMOTE]' _, items = _parse_promote_marker(text) assert items is None # empty list collapses to None def test_invalid_uuid_drops_ref_keeps_item(self): text = '[PROMOTE]\n{"source_type":"question","source_ref":"not-a-uuid","text":"keep"}\n[/PROMOTE]' _, items = _parse_promote_marker(text) assert items is not None and items[0]["source_ref"] is None assert items[0]["text"] == "keep" def test_malformed_json_dropped(self): text = "[PROMOTE]\nnot json at all\n[/PROMOTE]" cleaned, items = _parse_promote_marker(text) assert items is None # Block is still stripped from display so the engineer doesn't see it. assert "[PROMOTE]" not in cleaned # ── Stable-UUID assignment ────────────────────────────────────────────────── class TestAssignStableTaskLaneIds: def test_empty_prev_assigns_fresh_uuids(self): qs, acts = _assign_stable_task_lane_ids( None, [{"text": "Q1", "context": "c1"}], [{"label": "A1", "command": "cmd"}], ) assert len(qs) == 1 and uuid.UUID(qs[0]["id"]) assert len(acts) == 1 and uuid.UUID(acts[0]["id"]) def test_prev_uuid_preserved_on_text_match(self): qid = str(uuid.uuid4()) prev = { "questions": [{"id": qid, "text": "Same text"}], "actions": [], } qs, _ = _assign_stable_task_lane_ids(prev, [{"text": "Same text"}], []) assert qs[0]["id"] == qid def test_prev_uuid_replaced_when_text_changes(self): qid = str(uuid.uuid4()) prev = {"questions": [{"id": qid, "text": "Original"}], "actions": []} qs, _ = _assign_stable_task_lane_ids(prev, [{"text": "Different"}], []) assert qs[0]["id"] != qid def test_action_label_match_preserves_uuid(self): aid = str(uuid.uuid4()) prev = {"questions": [], "actions": [{"id": aid, "label": "Run X"}]} _, acts = _assign_stable_task_lane_ids(prev, [], [{"label": "Run X"}]) assert acts[0]["id"] == aid # ── FactSynthesisService.create_fact validation ───────────────────────────── @pytest.mark.asyncio async def test_create_fact_rejects_source_ref_for_user_note(test_db, test_user): session = await _make_session(test_db, test_user) svc = FactSynthesisService(test_db) with pytest.raises(ValueError, match="source_ref must be None"): await svc.create_fact( session_id=session.id, account_id=session.account_id, user_id=session.user_id, source_type="user_note", text="x", source_ref=uuid.uuid4(), ) @pytest.mark.asyncio async def test_create_fact_rejects_invalid_source_type(test_db, test_user): session = await _make_session(test_db, test_user) svc = FactSynthesisService(test_db) with pytest.raises(ValueError, match="Invalid source_type"): await svc.create_fact( session_id=session.id, account_id=session.account_id, user_id=session.user_id, source_type="not_a_type", text="x", ) @pytest.mark.asyncio async def test_create_fact_bumps_state_version(test_db, test_user): session = await _make_session(test_db, test_user) initial_version = session.state_version svc = FactSynthesisService(test_db) await svc.create_fact( session_id=session.id, account_id=session.account_id, user_id=session.user_id, source_type="user_note", text="A confirmed observation", ) await test_db.commit() await test_db.refresh(session) assert session.state_version == initial_version + 1 # ── Endpoint tests ────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_list_facts_empty(client: AsyncClient, test_user, auth_headers, test_db): session = await _make_session(test_db, test_user) resp = await client.get( f"/api/v1/ai-sessions/{session.id}/facts", headers=auth_headers, ) assert resp.status_code == 200 assert resp.json()["facts"] == [] @pytest.mark.asyncio async def test_create_user_note_fact(client: AsyncClient, test_user, auth_headers, test_db): session = await _make_session(test_db, test_user) resp = await client.post( f"/api/v1/ai-sessions/{session.id}/facts", headers=auth_headers, json={"text": "Customer is on a laptop", "summary": "endpoint type"}, ) assert resp.status_code == 201 body = resp.json() assert body["source_type"] == "user_note" assert body["editable"] is True assert body["source_ref"] is None assert body["text"] == "Customer is on a laptop" @pytest.mark.asyncio async def test_patch_user_note_succeeds(client: AsyncClient, test_user, auth_headers, test_db): session = await _make_session(test_db, test_user) create = await client.post( f"/api/v1/ai-sessions/{session.id}/facts", headers=auth_headers, json={"text": "original"}, ) fact_id = create.json()["id"] patch_resp = await client.patch( f"/api/v1/ai-sessions/{session.id}/facts/{fact_id}", headers=auth_headers, json={"text": "edited", "summary": "new label"}, ) assert patch_resp.status_code == 200 assert patch_resp.json()["text"] == "edited" assert patch_resp.json()["source_summary"] == "new label" @pytest.mark.asyncio async def test_patch_question_fact_returns_403(client: AsyncClient, test_user, auth_headers, test_db): """Question/check-sourced facts must be edited at the source item, not the card.""" session = await _make_session(test_db, test_user) # Insert a question-sourced fact directly so the editability rule applies. fact = SessionFact( session_id=session.id, account_id=session.account_id, text="Pre-existing question fact", source_type="question", source_ref=uuid.uuid4(), created_by=session.user_id, ) test_db.add(fact) await test_db.commit() await test_db.refresh(fact) resp = await client.patch( f"/api/v1/ai-sessions/{session.id}/facts/{fact.id}", headers=auth_headers, json={"text": "trying to edit"}, ) assert resp.status_code == 403 @pytest.mark.asyncio async def test_delete_fact_soft_deletes(client: AsyncClient, test_user, auth_headers, test_db): session = await _make_session(test_db, test_user) create = await client.post( f"/api/v1/ai-sessions/{session.id}/facts", headers=auth_headers, json={"text": "to be removed"}, ) fact_id = create.json()["id"] del_resp = await client.delete( f"/api/v1/ai-sessions/{session.id}/facts/{fact_id}", headers=auth_headers, ) assert del_resp.status_code == 204 # Listed facts should not include the soft-deleted one. list_resp = await client.get( f"/api/v1/ai-sessions/{session.id}/facts", headers=auth_headers, ) assert list_resp.status_code == 200 assert all(f["id"] != fact_id for f in list_resp.json()["facts"]) # Row still exists in DB (deleted_at set), proving it was soft-deleted. row = ( await test_db.execute( select(SessionFact).where(SessionFact.id == uuid.UUID(fact_id)) ) ).scalar_one() assert row.deleted_at is not None @pytest.mark.asyncio async def test_promote_with_proposed_text(client: AsyncClient, test_user, auth_headers, test_db): qid = uuid.uuid4() session = await _make_session( test_db, test_user, pending_task_lane={ "questions": [{"id": str(qid), "text": "Is OWA working?"}], "actions": [], }, ) resp = await client.post( f"/api/v1/ai-sessions/{session.id}/facts/promote", headers=auth_headers, json={ "source_type": "question", "source_ref": str(qid), "proposed_text": "OWA confirmed working for jsmith", "proposed_summary": "rules out tenant/license", }, ) assert resp.status_code == 201 body = resp.json() assert body["source_type"] == "question" assert body["source_ref"] == str(qid) assert body["editable"] is False # question-sourced facts are read-only at the card @pytest.mark.asyncio async def test_promote_via_synthesis(client: AsyncClient, test_user, auth_headers, test_db): qid = uuid.uuid4() session = await _make_session( test_db, test_user, pending_task_lane={ "questions": [{"id": str(qid), "text": "Is the user on a laptop?"}], "actions": [], }, ) # Mock the LLM call to avoid hitting the network in tests. fake_provider = AsyncMock() fake_provider.generate_json = AsyncMock(return_value=( '{"text": "User confirmed on a laptop", "summary": "endpoint type"}', 50, 20, )) with patch( "app.services.fact_synthesis_service.get_ai_provider", return_value=fake_provider, ): resp = await client.post( f"/api/v1/ai-sessions/{session.id}/facts/promote", headers=auth_headers, json={ "source_type": "question", "source_ref": str(qid), "raw_input": "Yes, it's a Lenovo X1 Carbon", }, ) assert resp.status_code == 201 assert resp.json()["text"] == "User confirmed on a laptop" assert resp.json()["source_summary"] == "endpoint type" @pytest.mark.asyncio async def test_promote_synthesis_returning_null_returns_422( client: AsyncClient, test_user, auth_headers, test_db ): """When the synthesizer judges the input has no fact, the endpoint surfaces 422.""" qid = uuid.uuid4() session = await _make_session( test_db, test_user, pending_task_lane={ "questions": [{"id": str(qid), "text": "Is OWA working?"}], "actions": [], }, ) fake_provider = AsyncMock() fake_provider.generate_json = AsyncMock(return_value=( '{"text": null, "summary": null}', 30, 10, )) with patch( "app.services.fact_synthesis_service.get_ai_provider", return_value=fake_provider, ): resp = await client.post( f"/api/v1/ai-sessions/{session.id}/facts/promote", headers=auth_headers, json={ "source_type": "question", "source_ref": str(qid), "raw_input": "unknown", }, ) assert resp.status_code == 422 @pytest.mark.asyncio async def test_promote_rejects_both_or_neither_inputs( client: AsyncClient, test_user, auth_headers, test_db ): session = await _make_session(test_db, test_user) # Neither resp = await client.post( f"/api/v1/ai-sessions/{session.id}/facts/promote", headers=auth_headers, json={"source_type": "question"}, ) assert resp.status_code == 400 # Both resp2 = await client.post( f"/api/v1/ai-sessions/{session.id}/facts/promote", headers=auth_headers, json={ "source_type": "question", "proposed_text": "x", "raw_input": "y", }, ) assert resp2.status_code == 400 @pytest.mark.asyncio async def test_state_version_bumps_on_create_via_endpoint( client: AsyncClient, test_user, auth_headers, test_db ): session = await _make_session(test_db, test_user) initial = session.state_version await client.post( f"/api/v1/ai-sessions/{session.id}/facts", headers=auth_headers, json={"text": "a"}, ) # Reload — refresh fetches the latest persisted row. await test_db.refresh(session) assert session.state_version == initial + 1