"""Tests for l1_session_service start_* functions (T12), record_step/update_notes (T13), resolve/escalate (T14).""" import uuid import pytest from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.models.account import Account from app.models.audit_log import AuditLog from app.models.user import User from app.models.tree import Tree from app.models.ai_session import AISession from app.models.flow_proposal import FlowProposal from app.models.l1_walk_session import L1WalkSession from app.services.l1_session_service import ( start_flow_session, start_proposal_session, start_adhoc_session, _resolve_acting_as, record_step, update_notes, resolve, escalate, escalate_without_walk, ) from app.services import internal_ticket_service # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- async def _make_account(db: AsyncSession) -> Account: s = str(uuid.uuid4())[:8] account = Account( id=uuid.uuid4(), name=f"Test Account {s}", display_code=s[:8].upper(), ) db.add(account) await db.flush() return account async def _make_user( db: AsyncSession, *, account_id: uuid.UUID, account_role: str = "l1_tech", can_cover_l1: bool = False, ) -> User: user = User( id=uuid.uuid4(), email=f"user-{uuid.uuid4()}@example.com", name="Test User", account_id=account_id, account_role=account_role, role="engineer", is_active=True, can_cover_l1=can_cover_l1, ) db.add(user) await db.flush() return user async def _make_tree(db: AsyncSession, *, account_id: uuid.UUID, author_id: uuid.UUID) -> Tree: tree = Tree( id=uuid.uuid4(), name="Test Flow", account_id=account_id, author_id=author_id, tree_type="troubleshooting", tree_structure={"nodes": [], "edges": []}, visibility="team", status="published", ) db.add(tree) await db.flush() return tree async def _make_ai_session(db: AsyncSession, *, user_id: uuid.UUID, account_id: uuid.UUID) -> AISession: ai_session = AISession( id=uuid.uuid4(), user_id=user_id, account_id=account_id, session_type="chat", intake_type="free_text", intake_content={"text": "test"}, status="active", confidence_tier="discovery", conversation_messages=[], ) db.add(ai_session) await db.flush() return ai_session async def _make_proposal( db: AsyncSession, *, account_id: uuid.UUID, source_session_id: uuid.UUID, ) -> FlowProposal: proposal = FlowProposal( id=uuid.uuid4(), account_id=account_id, source_session_id=source_session_id, proposal_type="new_flow", title="Test Proposal", proposed_flow_data={"nodes": [], "edges": []}, source="manual_draft", status="pending", ) db.add(proposal) await db.flush() return proposal # --------------------------------------------------------------------------- # Unit tests for _resolve_acting_as (no DB needed) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_resolve_acting_as_for_engineer_returns_coverage_tag(): user = User(account_role="engineer") assert _resolve_acting_as(user) == "l1_coverage" @pytest.mark.asyncio async def test_resolve_acting_as_for_l1_tech_returns_none(): user = User(account_role="l1_tech") assert _resolve_acting_as(user) is None @pytest.mark.asyncio async def test_resolve_acting_as_for_owner_returns_none(): user = User(account_role="owner") assert _resolve_acting_as(user) is None # --------------------------------------------------------------------------- # Integration tests (real DB) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_start_flow_session_creates_active_flow_session(test_db: AsyncSession): account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id) session = await start_flow_session( test_db, account_id=account.id, user=l1, flow_id=tree.id, ticket_id="ticket-abc", ticket_kind="internal", ) assert session.session_kind == "flow" assert session.flow_id == tree.id assert session.flow_proposal_id is None assert session.status == "active" assert session.walked_path == [] assert session.walk_notes == [] assert session.acting_as is None # l1_tech native assert session.ticket_id == "ticket-abc" assert session.ticket_kind == "internal" @pytest.mark.asyncio async def test_start_proposal_session_creates_active_proposal_session(test_db: AsyncSession): account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) ai_session = await _make_ai_session(test_db, user_id=l1.id, account_id=account.id) proposal = await _make_proposal( test_db, account_id=account.id, source_session_id=ai_session.id, ) session = await start_proposal_session( test_db, account_id=account.id, user=l1, flow_proposal_id=proposal.id, ticket_id="ticket-xyz", ticket_kind="psa", ) assert session.session_kind == "proposal" assert session.flow_proposal_id == proposal.id assert session.flow_id is None assert session.status == "active" @pytest.mark.asyncio async def test_start_adhoc_session_no_flow_no_proposal(test_db: AsyncSession): account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) session = await start_adhoc_session( test_db, account_id=account.id, user=l1, ticket_id="ticket-adhoc", ticket_kind="internal", ) assert session.session_kind == "adhoc" assert session.flow_id is None assert session.flow_proposal_id is None assert session.walked_path == [] assert session.walk_notes == [] @pytest.mark.asyncio async def test_engineer_with_coverage_gets_acting_as_tag(test_db: AsyncSession): account = await _make_account(test_db) eng = await _make_user( test_db, account_id=account.id, account_role="engineer", can_cover_l1=True, ) session = await start_adhoc_session( test_db, account_id=account.id, user=eng, ticket_id="t", ticket_kind="internal", ) assert session.acting_as == "l1_coverage" # --------------------------------------------------------------------------- # T13: record_step and update_notes tests # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_record_step_appends_to_walked_path(test_db: AsyncSession): account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id) session = await start_flow_session( test_db, account_id=account.id, user=l1, flow_id=tree.id, ticket_id="t1", ticket_kind="internal", ) updated = await record_step( test_db, session_id=session.id, node_id="n1", question="Is the device powered on?", answer="yes", ) assert len(updated.walked_path) == 1 assert updated.walked_path[0] == { "node_id": "n1", "question": "Is the device powered on?", "answer": "yes", "l1_note": None, } assert updated.current_node_id == "n1" assert updated.last_step_at is not None @pytest.mark.asyncio async def test_record_step_two_sequential_steps_accumulate(test_db: AsyncSession): account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id) session = await start_flow_session( test_db, account_id=account.id, user=l1, flow_id=tree.id, ticket_id="t2", ticket_kind="internal", ) await record_step( test_db, session_id=session.id, node_id="n1", question="Step 1?", answer="yes", ) updated = await record_step( test_db, session_id=session.id, node_id="n2", question="Step 2?", answer="no", ) assert len(updated.walked_path) == 2 assert updated.walked_path[0]["node_id"] == "n1" assert updated.walked_path[1]["node_id"] == "n2" assert updated.current_node_id == "n2" @pytest.mark.asyncio async def test_record_step_blocks_adhoc(test_db: AsyncSession): account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) session = await start_adhoc_session( test_db, account_id=account.id, user=l1, ticket_id="t3", ticket_kind="internal", ) with pytest.raises(ValueError, match="adhoc"): await record_step( test_db, session_id=session.id, node_id="n1", question="Q?", answer="yes", ) @pytest.mark.asyncio async def test_record_step_blocks_inactive_session(test_db: AsyncSession): account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id) session = await start_flow_session( test_db, account_id=account.id, user=l1, flow_id=tree.id, ticket_id="t4", ticket_kind="internal", ) # Manually mark resolved to simulate inactive state session.status = "resolved" await test_db.flush() with pytest.raises(ValueError, match="not active"): await record_step( test_db, session_id=session.id, node_id="n1", question="Q?", answer="yes", ) @pytest.mark.asyncio async def test_record_step_includes_note_when_provided(test_db: AsyncSession): account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id) session = await start_flow_session( test_db, account_id=account.id, user=l1, flow_id=tree.id, ticket_id="t5", ticket_kind="internal", ) updated = await record_step( test_db, session_id=session.id, node_id="n1", question="Q?", answer="yes", note="Customer mentioned it started yesterday", ) assert updated.walked_path[0]["l1_note"] == "Customer mentioned it started yesterday" @pytest.mark.asyncio async def test_update_notes_replaces_walk_notes(test_db: AsyncSession): account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) session = await start_adhoc_session( test_db, account_id=account.id, user=l1, ticket_id="t6", ticket_kind="internal", ) new_notes = [{"timestamp": "2026-05-28T10:00:00Z", "content": "Customer rebooted"}] updated = await update_notes(test_db, session_id=session.id, notes=new_notes) assert updated.walk_notes == new_notes assert updated.last_step_at is not None @pytest.mark.asyncio async def test_update_notes_raises_if_session_not_found(test_db: AsyncSession): missing_id = uuid.uuid4() with pytest.raises(ValueError, match="not found"): await update_notes(test_db, session_id=missing_id, notes=[]) @pytest.mark.asyncio async def test_update_notes_raises_if_session_not_active(test_db: AsyncSession): account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) session = await start_adhoc_session( test_db, account_id=account.id, user=l1, ticket_id="t7", ticket_kind="internal", ) session.status = "escalated" await test_db.flush() with pytest.raises(ValueError, match="not active"): await update_notes(test_db, session_id=session.id, notes=[]) @pytest.mark.asyncio async def test_update_notes_size_cap(test_db: AsyncSession): account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) session = await start_adhoc_session( test_db, account_id=account.id, user=l1, ticket_id="t8", ticket_kind="internal", ) # Create a notes payload larger than 256KB big_content = "x" * (256 * 1024 + 100) notes = [{"timestamp": "2026-05-28T10:00:00Z", "content": big_content}] with pytest.raises(ValueError, match="256KB"): await update_notes(test_db, session_id=session.id, notes=notes) # --------------------------------------------------------------------------- # T14: resolve, escalate, escalate_without_walk tests # --------------------------------------------------------------------------- async def _make_internal_ticket(db: AsyncSession, *, account_id: uuid.UUID, user_id: uuid.UUID) -> object: """Create an internal ticket and return it.""" return await internal_ticket_service.create_ticket( db, account_id=account_id, created_by_user_id=user_id, problem_statement="Customer cannot log in", ) @pytest.mark.asyncio async def test_resolve_proposal_helpful_flips_validated_by_outcome(test_db: AsyncSession): """resolve(helpful=True) on a proposal session sets validated_by_outcome=True.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id) ai_session = await _make_ai_session(test_db, user_id=l1.id, account_id=account.id) proposal = await _make_proposal(test_db, account_id=account.id, source_session_id=ai_session.id) session = await start_proposal_session( test_db, account_id=account.id, user=l1, flow_proposal_id=proposal.id, ticket_id=str(ticket.id), ticket_kind="internal", ) resolved = await resolve( test_db, session_id=session.id, helpful=True, resolution_notes="Issue fixed by proposal walk", ) assert resolved.status == "resolved" assert resolved.helpful is True assert resolved.resolution_notes == "Issue fixed by proposal walk" assert resolved.resolved_at is not None # Proposal should now be validated await test_db.refresh(proposal) assert proposal.validated_by_outcome is True # Internal ticket should be closed await test_db.refresh(ticket) assert ticket.status == "resolved" @pytest.mark.asyncio async def test_resolve_proposal_not_helpful_leaves_validated_by_outcome_false(test_db: AsyncSession): """resolve(helpful=False) on a proposal session does NOT flip validated_by_outcome.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id) ai_session = await _make_ai_session(test_db, user_id=l1.id, account_id=account.id) proposal = await _make_proposal(test_db, account_id=account.id, source_session_id=ai_session.id) session = await start_proposal_session( test_db, account_id=account.id, user=l1, flow_proposal_id=proposal.id, ticket_id=str(ticket.id), ticket_kind="internal", ) resolved = await resolve( test_db, session_id=session.id, helpful=False, resolution_notes="Proposal did not help", ) assert resolved.helpful is False await test_db.refresh(proposal) assert proposal.validated_by_outcome is False @pytest.mark.asyncio async def test_resolve_flow_session_closes_ticket_no_proposal_update(test_db: AsyncSession): """resolve on a flow session closes the ticket and does not touch proposals.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id) tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id) session = await start_flow_session( test_db, account_id=account.id, user=l1, flow_id=tree.id, ticket_id=str(ticket.id), ticket_kind="internal", ) resolved = await resolve( test_db, session_id=session.id, helpful=True, resolution_notes="Flow resolved the issue", ) assert resolved.status == "resolved" assert resolved.session_kind == "flow" assert resolved.flow_proposal_id is None await test_db.refresh(ticket) assert ticket.status == "resolved" @pytest.mark.asyncio async def test_resolve_adhoc_session_closes_ticket(test_db: AsyncSession): """resolve on an adhoc session closes the ticket with no proposal interaction.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id) session = await start_adhoc_session( test_db, account_id=account.id, user=l1, ticket_id=str(ticket.id), ticket_kind="internal", ) resolved = await resolve( test_db, session_id=session.id, helpful=True, resolution_notes="Adhoc resolved", ) assert resolved.status == "resolved" await test_db.refresh(ticket) assert ticket.status == "resolved" @pytest.mark.asyncio async def test_resolve_psa_session_no_ticket_update(test_db: AsyncSession): """resolve on a PSA-backed session does not attempt to update an internal ticket.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id) session = await start_flow_session( test_db, account_id=account.id, user=l1, flow_id=tree.id, ticket_id="psa-external-id-123", ticket_kind="psa", ) resolved = await resolve( test_db, session_id=session.id, helpful=True, resolution_notes="PSA ticket resolved externally", ) assert resolved.status == "resolved" assert resolved.ticket_kind == "psa" @pytest.mark.asyncio async def test_resolve_raises_on_missing_session(test_db: AsyncSession): """resolve raises ValueError when session does not exist.""" with pytest.raises(ValueError, match="not found"): await resolve( test_db, session_id=uuid.uuid4(), helpful=True, resolution_notes="N/A", ) @pytest.mark.asyncio async def test_resolve_raises_on_inactive_session(test_db: AsyncSession): """resolve raises ValueError when session is not active.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id) session = await start_adhoc_session( test_db, account_id=account.id, user=l1, ticket_id=str(ticket.id), ticket_kind="internal", ) session.status = "escalated" await test_db.flush() with pytest.raises(ValueError, match="not active"): await resolve( test_db, session_id=session.id, helpful=False, resolution_notes="N/A", ) @pytest.mark.asyncio async def test_escalate_marks_session_and_ticket_as_escalated(test_db: AsyncSession): """escalate sets session status=escalated and closes the internal ticket as escalated.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id) tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id) session = await start_flow_session( test_db, account_id=account.id, user=l1, flow_id=tree.id, ticket_id=str(ticket.id), ticket_kind="internal", ) escalated = await escalate( test_db, session_id=session.id, reason="Customer reported intermittent failure not covered by flow", reason_category="out_of_scope", ) assert escalated.status == "escalated" assert escalated.escalation_reason == "Customer reported intermittent failure not covered by flow" assert escalated.escalation_reason_category == "out_of_scope" assert escalated.resolved_at is not None assert escalated.last_step_at is not None await test_db.refresh(ticket) assert ticket.status == "escalated" @pytest.mark.asyncio async def test_escalate_raises_on_missing_session(test_db: AsyncSession): """escalate raises ValueError when session does not exist.""" with pytest.raises(ValueError, match="not found"): await escalate( test_db, session_id=uuid.uuid4(), reason="Some reason", reason_category="unknown", ) @pytest.mark.asyncio async def test_escalate_raises_on_inactive_session(test_db: AsyncSession): """escalate raises ValueError when session is already inactive.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id) session = await start_adhoc_session( test_db, account_id=account.id, user=l1, ticket_id=str(ticket.id), ticket_kind="internal", ) session.status = "resolved" await test_db.flush() with pytest.raises(ValueError, match="not active"): await escalate( test_db, session_id=session.id, reason="Too late", reason_category="other", ) @pytest.mark.asyncio async def test_escalate_without_walk_creates_escalated_adhoc_session(test_db: AsyncSession): """escalate_without_walk creates an immediately-escalated session with empty walked_path.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id) session = await escalate_without_walk( test_db, account_id=account.id, user=l1, ticket_id=str(ticket.id), ticket_kind="internal", reason_category="no_kb_content", reason="No KB article matched this issue", ) assert session.status == "escalated" assert session.session_kind == "adhoc" assert session.walked_path == [] assert session.escalation_reason == "No KB article matched this issue" assert session.escalation_reason_category == "no_kb_content" assert session.resolved_at is not None assert session.last_step_at is not None assert session.account_id == account.id assert session.created_by_user_id == l1.id @pytest.mark.asyncio async def test_escalate_without_walk_escalates_internal_ticket(test_db: AsyncSession): """escalate_without_walk marks the internal ticket as escalated.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id) await escalate_without_walk( test_db, account_id=account.id, user=l1, ticket_id=str(ticket.id), ticket_kind="internal", reason_category="no_kb_content", ) await test_db.refresh(ticket) assert ticket.status == "escalated" @pytest.mark.asyncio async def test_escalate_without_walk_psa_does_not_touch_internal_ticket(test_db: AsyncSession): """escalate_without_walk with ticket_kind='psa' does not update internal tickets.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) session = await escalate_without_walk( test_db, account_id=account.id, user=l1, ticket_id="psa-ticket-999", ticket_kind="psa", reason_category="no_kb_content", ) assert session.status == "escalated" assert session.ticket_kind == "psa" @pytest.mark.asyncio async def test_escalate_without_walk_reason_is_optional(test_db: AsyncSession): """escalate_without_walk works without a reason string.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id) session = await escalate_without_walk( test_db, account_id=account.id, user=l1, ticket_id=str(ticket.id), ticket_kind="internal", reason_category="no_kb_content", ) assert session.escalation_reason is None assert session.escalation_reason_category == "no_kb_content" # --------------------------------------------------------------------------- # T7: start_ai_build_session # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_start_ai_build_session(test_db: AsyncSession): from app.services import l1_session_service as svc account = await _make_account(test_db) l1_user = await _make_user(test_db, account_id=account.id) s = await svc.start_ai_build_session( test_db, account_id=account.id, user=l1_user, ticket_id="t-ai", ticket_kind="internal", ) assert s.session_kind == "ai_build" assert s.flow_id is None and s.flow_proposal_id is None assert s.status == "active" # --------------------------------------------------------------------------- # T8: advance_ai_build # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_advance_ai_build_appends_and_returns_next(test_db: AsyncSession, monkeypatch): from app.services import l1_session_service as svc from app.services import ai_tree_builder account = await _make_account(test_db) l1_user = await _make_user(test_db, account_id=account.id) s = await svc.start_ai_build_session( test_db, account_id=account.id, user=l1_user, ticket_id="t-ai", ticket_kind="internal") async def fake_next(problem, category, walked): return {"node_type": "resolved", "id": "done", "text": "Fixed."} monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next) next_node = await svc.advance_ai_build( test_db, session_id=s.id, problem_text="printer", category="printer", node_id="n1", node_text="Powered on?", answer="no", note=None) assert next_node["node_type"] == "resolved" refreshed = await test_db.get(type(s), s.id) assert len(refreshed.walked_path) == 1 assert refreshed.walked_path[0]["answer"] == "no" assert refreshed.walked_path[0]["text"] == "Powered on?" @pytest.mark.asyncio async def test_advance_ai_build_first_call_does_not_append(test_db: AsyncSession, monkeypatch): from app.services import l1_session_service as svc from app.services import ai_tree_builder account = await _make_account(test_db) l1_user = await _make_user(test_db, account_id=account.id) s = await svc.start_ai_build_session( test_db, account_id=account.id, user=l1_user, ticket_id="t-ai-first", ticket_kind="internal") async def fake_next(problem, category, walked): return {"node_type": "question", "id": "q1", "text": "Is it plugged in?"} monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next) # First call: node_id=None — nothing should be appended next_node = await svc.advance_ai_build( test_db, session_id=s.id, problem_text="printer", category="printer", node_id=None) assert next_node["node_type"] == "question" assert next_node["id"] == "q1" refreshed = await test_db.get(type(s), s.id) assert len(refreshed.walked_path) == 0 assert refreshed.current_node_id == "q1" @pytest.mark.asyncio async def test_advance_ai_build_wrong_session_kind_raises(test_db: AsyncSession, monkeypatch): from app.services import l1_session_service as svc from app.services import ai_tree_builder account = await _make_account(test_db) l1_user = await _make_user(test_db, account_id=account.id) # start an adhoc session (not ai_build) s = await svc.start_adhoc_session( test_db, account_id=account.id, user=l1_user, ticket_id="t-adhoc-guard", ticket_kind="internal") async def fake_next(problem, category, walked): # pragma: no cover return {"node_type": "question", "id": "q1", "text": "?"} monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next) with pytest.raises(ValueError, match="ai_build"): await svc.advance_ai_build( test_db, session_id=s.id, problem_text="printer", category="printer") # --------------------------------------------------------------------------- # T9: flywheel capture on resolve + engineer notification on escalate # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_resolve_ai_build_creates_outcome_validated_proposal(test_db: AsyncSession, monkeypatch): """resolve(helpful=True) on an ai_build session creates a FlowProposal with validated_by_outcome=True.""" from app.services import l1_session_service as svc account = await _make_account(test_db) l1_user = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1_user.id) s = await svc.start_ai_build_session( test_db, account_id=account.id, user=l1_user, ticket_id=str(ticket.id), ticket_kind="internal", ) # Populate walked_path with at least one node (needed for normalize_walked_path) s.walked_path = [ {"node_type": "question", "id": "n1", "text": "On?", "answer": "no"}, {"node_type": "resolved", "id": "n2", "text": "Fixed."}, ] await test_db.flush() await svc.resolve(test_db, session_id=s.id, helpful=True, resolution_notes="ok") props = (await test_db.execute( select(FlowProposal).where(FlowProposal.l1_session_id == s.id) )).scalars().all() assert len(props) == 1 assert props[0].source == "ai_realtime_l1" assert props[0].validated_by_outcome is True assert props[0].source_session_id is None assert props[0].proposed_flow_data["tree_structure"]["id"] == "n1" assert props[0].proposal_type == "new_flow" assert props[0].proposed_flow_data["match_keywords"] == [] @pytest.mark.asyncio async def test_escalate_notifies_engineers(test_db: AsyncSession, monkeypatch): """escalate() calls notify with event='l1.session.escalated' and explicit engineer recipients.""" from app.services import l1_session_service as svc calls = {} async def fake_notify(event, account_id, payload, db, target_user_ids=None): calls["event"] = event calls["target_user_ids"] = target_user_ids monkeypatch.setattr(svc, "notify", fake_notify) account = await _make_account(test_db) # l1_user is the session owner (account_role="l1_tech" by default — NOT in the recipient query) l1_user = await _make_user(test_db, account_id=account.id) # Seed an eligible recipient: account_role="engineer" matches the production query # (owner/admin/engineer). Without this user, target_ids would be [] and the # eng.id assertion below would fail, proving the assertion is non-vacuous. eng = await _make_user(test_db, account_id=account.id, account_role="engineer") ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1_user.id) s = await svc.start_ai_build_session( test_db, account_id=account.id, user=l1_user, ticket_id=str(ticket.id), ticket_kind="internal", ) await svc.escalate(test_db, session_id=s.id, reason="stuck", reason_category="exhausted_safe_steps") assert calls["event"] == "l1.session.escalated" assert isinstance(calls["target_user_ids"], list) and len(calls["target_user_ids"]) >= 1 assert eng.id in calls["target_user_ids"] # the eligible engineer is a recipient # --------------------------------------------------------------------------- # T14 audit log tests (spec §5.6.1) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_resolve_writes_audit_log_with_acting_as(test_db: AsyncSession): """resolve() writes an audit_logs row with acting_as='l1_coverage' for engineers.""" account = await _make_account(test_db) eng = await _make_user( test_db, account_id=account.id, account_role="engineer", can_cover_l1=True, ) ticket = await _make_internal_ticket( test_db, account_id=account.id, user_id=eng.id ) session = await start_adhoc_session( test_db, account_id=account.id, user=eng, ticket_id=str(ticket.id), ticket_kind="internal", ) await resolve( test_db, session_id=session.id, helpful=True, resolution_notes="Coverage engineer resolved", ) result = await test_db.execute( select(AuditLog).where( AuditLog.action == "l1.session.resolve", AuditLog.resource_id == session.id, ) ) row = result.scalar_one() assert row.acting_as == "l1_coverage" assert row.user_id == eng.id assert row.account_id == account.id assert row.details["helpful"] is True @pytest.mark.asyncio async def test_resolve_writes_audit_log_native_l1_acting_as_null( test_db: AsyncSession, ): """resolve() writes an audit_logs row with acting_as=None for native l1_tech.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id, account_role="l1_tech") ticket = await _make_internal_ticket( test_db, account_id=account.id, user_id=l1.id ) session = await start_adhoc_session( test_db, account_id=account.id, user=l1, ticket_id=str(ticket.id), ticket_kind="internal", ) await resolve( test_db, session_id=session.id, helpful=False, resolution_notes="Native L1 resolved", ) result = await test_db.execute( select(AuditLog).where( AuditLog.action == "l1.session.resolve", AuditLog.resource_id == session.id, ) ) row = result.scalar_one() assert row.acting_as is None @pytest.mark.asyncio async def test_escalate_writes_audit_log(test_db: AsyncSession): """escalate() writes an audit_logs row with action='l1.session.escalate'.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket( test_db, account_id=account.id, user_id=l1.id ) session = await start_adhoc_session( test_db, account_id=account.id, user=l1, ticket_id=str(ticket.id), ticket_kind="internal", ) await escalate( test_db, session_id=session.id, reason="Beyond scope", reason_category="out_of_scope", ) result = await test_db.execute( select(AuditLog).where( AuditLog.action == "l1.session.escalate", AuditLog.resource_id == session.id, ) ) row = result.scalar_one() assert row.details["escalation_reason_category"] == "out_of_scope" assert row.account_id == account.id @pytest.mark.asyncio async def test_escalate_without_walk_writes_audit_log(test_db: AsyncSession): """escalate_without_walk() writes an audit_logs row.""" account = await _make_account(test_db) l1 = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket( test_db, account_id=account.id, user_id=l1.id ) session = await escalate_without_walk( test_db, account_id=account.id, user=l1, ticket_id=str(ticket.id), ticket_kind="internal", reason_category="no_kb_content", ) result = await test_db.execute( select(AuditLog).where( AuditLog.action == "l1.session.escalate_no_walk", AuditLog.resource_id == session.id, ) ) row = result.scalar_one() assert row.account_id == account.id # Audit coverage: the reason category must be recorded (restored — a prior # edit dropped this assertion, weakening the audit guarantee). assert row.details["escalation_reason_category"] == "no_kb_content" # --------------------------------------------------------------------------- # Finding 1 (server-assigned node ids) + Finding 8 (pending-node replay) # --------------------------------------------------------------------------- class _FakeProvider: def __init__(self, raw): self._raw = raw async def generate_json(self, *, system_prompt, messages, max_tokens): return self._raw, None, None @pytest.mark.asyncio async def test_ai_build_first_node_carries_id_and_advance_grows_walk( test_db: AsyncSession, monkeypatch, ): """Finding 1 contract: the SYSTEM_PROMPT never asks for an id, yet the first generated node must carry one — and advancing with that id must grow walked_path (the original showstopper: node_id was always None, so the walk never advanced).""" from app.services import l1_session_service as svc from app.services import ai_tree_builder account = await _make_account(test_db) l1_user = await _make_user(test_db, account_id=account.id) s = await svc.start_ai_build_session( test_db, account_id=account.id, user=l1_user, ticket_id="t-contract", ticket_kind="internal", category="printer", problem_text="printer offline") # Real generator + a provider that omits id (the shape the model produces). monkeypatch.setattr( ai_tree_builder, "get_ai_provider", lambda *a, **k: _FakeProvider('{"node_type":"question","text":"Plugged in?"}')) first = await svc.advance_ai_build( test_db, session_id=s.id, problem_text="printer offline", category="printer", node_id=None) assert first.get("id"), "first node must carry a server-assigned id" # Answer it with the id we were handed; walked_path must grow by one. await svc.advance_ai_build( test_db, session_id=s.id, problem_text="printer offline", category="printer", node_id=first["id"], node_text=first["text"], answer="no") refreshed = await test_db.get(L1WalkSession, s.id) assert len(refreshed.walked_path) == 1 assert refreshed.walked_path[0]["id"] == first["id"] @pytest.mark.asyncio async def test_advance_ai_build_replays_pending_node_without_regenerating( test_db: AsyncSession, monkeypatch, ): """Finding 8: a re-mount (node_id=None) replays the served-but-unanswered node instead of firing a fresh paid LLM call (which could also swap the question).""" from app.services import l1_session_service as svc from app.services import ai_tree_builder account = await _make_account(test_db) l1_user = await _make_user(test_db, account_id=account.id) s = await svc.start_ai_build_session( test_db, account_id=account.id, user=l1_user, ticket_id="t-replay", ticket_kind="internal", category="printer", problem_text="printer offline") calls = {"n": 0} async def fake_next(problem, category, walked): calls["n"] += 1 return {"node_type": "question", "id": f"q{calls['n']}", "text": "?"} monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next) first = await svc.advance_ai_build( test_db, session_id=s.id, problem_text="p", category="printer", node_id=None) # Re-mount without answering — must NOT regenerate. replay = await svc.advance_ai_build( test_db, session_id=s.id, problem_text="p", category="printer", node_id=None) assert calls["n"] == 1 assert replay["id"] == first["id"] @pytest.mark.asyncio async def test_advance_ai_build_records_answer_label_from_pending_node( test_db: AsyncSession, monkeypatch, ): """When the served question carried yes_label/no_label, answering it must record the chosen label (answer_label) in walked_path — derived server-side from pending_node, never trusted from the client. 'Microsoft account or local account? -> yes' is meaningless in the transcript and the LLM context.""" from app.services import l1_session_service as svc from app.services import ai_tree_builder account = await _make_account(test_db) l1_user = await _make_user(test_db, account_id=account.id) s = await svc.start_ai_build_session( test_db, account_id=account.id, user=l1_user, ticket_id="t-label", ticket_kind="internal", category="account_login", problem_text="login issue") async def fake_next(problem, category, walked): return {"node_type": "question", "id": "q-acct", "text": "Is the account a Microsoft account or a local account?", "yes_label": "Microsoft account", "no_label": "Local account"} monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next) first = await svc.advance_ai_build( test_db, session_id=s.id, problem_text="login issue", category="account_login", node_id=None) await svc.advance_ai_build( test_db, session_id=s.id, problem_text="login issue", category="account_login", node_id=first["id"], node_text=first["text"], answer="yes") refreshed = await test_db.get(L1WalkSession, s.id) assert refreshed.walked_path[0]["answer"] == "yes" assert refreshed.walked_path[0]["answer_label"] == "Microsoft account" # --------------------------------------------------------------------------- # Finding 10: escalation recipient resolution # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_escalate_skips_soft_deleted_engineer(test_db: AsyncSession, monkeypatch): """A soft-deleted engineer must not be paged (is_active alone misses them).""" from datetime import datetime, timezone from app.services import l1_session_service as svc calls = {} async def fake_notify(event, account_id, payload, db, target_user_ids=None): calls["target_user_ids"] = target_user_ids monkeypatch.setattr(svc, "notify", fake_notify) account = await _make_account(test_db) l1_user = await _make_user(test_db, account_id=account.id) live_eng = await _make_user(test_db, account_id=account.id, account_role="engineer") dead_eng = await _make_user(test_db, account_id=account.id, account_role="engineer") dead_eng.deleted_at = datetime.now(timezone.utc) await test_db.flush() ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1_user.id) s = await svc.start_ai_build_session( test_db, account_id=account.id, user=l1_user, ticket_id=str(ticket.id), ticket_kind="internal") await svc.escalate(test_db, session_id=s.id, reason="x", reason_category="exhausted_safe_steps") assert live_eng.id in calls["target_user_ids"] assert dead_eng.id not in calls["target_user_ids"] @pytest.mark.asyncio async def test_escalate_with_no_engineers_falls_back_to_default_recipients( test_db: AsyncSession, monkeypatch, ): """Finding 10: when no eligible engineer exists, pass None (not []) so notify() falls back to the default owner/admin set instead of silently dropping it.""" from app.services import l1_session_service as svc calls = {} async def fake_notify(event, account_id, payload, db, target_user_ids=None): calls["target_user_ids"] = target_user_ids monkeypatch.setattr(svc, "notify", fake_notify) account = await _make_account(test_db) # Only an l1_tech exists — not in the owner/admin/engineer recipient query. l1_user = await _make_user(test_db, account_id=account.id) ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1_user.id) s = await svc.start_ai_build_session( test_db, account_id=account.id, user=l1_user, ticket_id=str(ticket.id), ticket_kind="internal") await svc.escalate(test_db, session_id=s.id, reason="x", reason_category="exhausted_safe_steps") assert calls["target_user_ids"] is None