Files
resolutionflow/backend/tests/test_l1_session_service.py
Michael Chihlas 9c34d1e82d fix(l1): answer buttons must match the question — yes_label/no_label end-to-end
Live walk defect: the builder generated alternatives questions ("Is Jane's
account a Microsoft account or a local account?") while the UI could only
offer Yes/No. Root cause: SYSTEM_PROMPT mandated a label-less
'<yes/no question>' shape with no way to express the two answers.

- SYSTEM_PROMPT: question nodes must carry yes_label/no_label — the literal
  button texts; alternatives questions must use the alternatives as labels.
- validate_node: labels hard-floor-scanned, must be distinct non-empty strings.
- _ensure_labels: server defaults missing labels to Yes/No.
- advance_ai_build: records answer_label (and both labels) in walked_path,
  derived from the server-held pending_node — never client-supplied.
- _build_context: LLM context shows the chosen label, not a bare yes/no
  (a raw "-> yes" on an alternatives question degrades the next generation).
- normalize_walked_path: captured flywheel trees keep question labels.
- Frontend: buttons render yes_label/no_label; walk transcript and
  L1EscalationsSection render answer_label.

Phase 2A backend set: 137 passed / 0 failed / 8 deselected. tsc, eslint,
vite build clean.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 15:03:15 -04:00

1247 lines
44 KiB
Python

"""Tests for l1_session_service start_* functions (T12), record_step/update_notes (T13), resolve/escalate (T14)."""
import uuid
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.account import Account
from app.models.audit_log import AuditLog
from app.models.user import User
from app.models.tree import Tree
from app.models.ai_session import AISession
from app.models.flow_proposal import FlowProposal
from app.models.l1_walk_session import L1WalkSession
from app.services.l1_session_service import (
start_flow_session,
start_proposal_session,
start_adhoc_session,
_resolve_acting_as,
record_step,
update_notes,
resolve,
escalate,
escalate_without_walk,
)
from app.services import internal_ticket_service
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _make_account(db: AsyncSession) -> Account:
s = str(uuid.uuid4())[:8]
account = Account(
id=uuid.uuid4(),
name=f"Test Account {s}",
display_code=s[:8].upper(),
)
db.add(account)
await db.flush()
return account
async def _make_user(
db: AsyncSession,
*,
account_id: uuid.UUID,
account_role: str = "l1_tech",
can_cover_l1: bool = False,
) -> User:
user = User(
id=uuid.uuid4(),
email=f"user-{uuid.uuid4()}@example.com",
name="Test User",
account_id=account_id,
account_role=account_role,
role="engineer",
is_active=True,
can_cover_l1=can_cover_l1,
)
db.add(user)
await db.flush()
return user
async def _make_tree(db: AsyncSession, *, account_id: uuid.UUID, author_id: uuid.UUID) -> Tree:
tree = Tree(
id=uuid.uuid4(),
name="Test Flow",
account_id=account_id,
author_id=author_id,
tree_type="troubleshooting",
tree_structure={"nodes": [], "edges": []},
visibility="team",
status="published",
)
db.add(tree)
await db.flush()
return tree
async def _make_ai_session(db: AsyncSession, *, user_id: uuid.UUID, account_id: uuid.UUID) -> AISession:
ai_session = AISession(
id=uuid.uuid4(),
user_id=user_id,
account_id=account_id,
session_type="chat",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
db.add(ai_session)
await db.flush()
return ai_session
async def _make_proposal(
db: AsyncSession,
*,
account_id: uuid.UUID,
source_session_id: uuid.UUID,
) -> FlowProposal:
proposal = FlowProposal(
id=uuid.uuid4(),
account_id=account_id,
source_session_id=source_session_id,
proposal_type="new_flow",
title="Test Proposal",
proposed_flow_data={"nodes": [], "edges": []},
source="manual_draft",
status="pending",
)
db.add(proposal)
await db.flush()
return proposal
# ---------------------------------------------------------------------------
# Unit tests for _resolve_acting_as (no DB needed)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_resolve_acting_as_for_engineer_returns_coverage_tag():
user = User(account_role="engineer")
assert _resolve_acting_as(user) == "l1_coverage"
@pytest.mark.asyncio
async def test_resolve_acting_as_for_l1_tech_returns_none():
user = User(account_role="l1_tech")
assert _resolve_acting_as(user) is None
@pytest.mark.asyncio
async def test_resolve_acting_as_for_owner_returns_none():
user = User(account_role="owner")
assert _resolve_acting_as(user) is None
# ---------------------------------------------------------------------------
# Integration tests (real DB)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_start_flow_session_creates_active_flow_session(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="ticket-abc",
ticket_kind="internal",
)
assert session.session_kind == "flow"
assert session.flow_id == tree.id
assert session.flow_proposal_id is None
assert session.status == "active"
assert session.walked_path == []
assert session.walk_notes == []
assert session.acting_as is None # l1_tech native
assert session.ticket_id == "ticket-abc"
assert session.ticket_kind == "internal"
@pytest.mark.asyncio
async def test_start_proposal_session_creates_active_proposal_session(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ai_session = await _make_ai_session(test_db, user_id=l1.id, account_id=account.id)
proposal = await _make_proposal(
test_db,
account_id=account.id,
source_session_id=ai_session.id,
)
session = await start_proposal_session(
test_db,
account_id=account.id,
user=l1,
flow_proposal_id=proposal.id,
ticket_id="ticket-xyz",
ticket_kind="psa",
)
assert session.session_kind == "proposal"
assert session.flow_proposal_id == proposal.id
assert session.flow_id is None
assert session.status == "active"
@pytest.mark.asyncio
async def test_start_adhoc_session_no_flow_no_proposal(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="ticket-adhoc",
ticket_kind="internal",
)
assert session.session_kind == "adhoc"
assert session.flow_id is None
assert session.flow_proposal_id is None
assert session.walked_path == []
assert session.walk_notes == []
@pytest.mark.asyncio
async def test_engineer_with_coverage_gets_acting_as_tag(test_db: AsyncSession):
account = await _make_account(test_db)
eng = await _make_user(
test_db,
account_id=account.id,
account_role="engineer",
can_cover_l1=True,
)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=eng,
ticket_id="t",
ticket_kind="internal",
)
assert session.acting_as == "l1_coverage"
# ---------------------------------------------------------------------------
# T13: record_step and update_notes tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_record_step_appends_to_walked_path(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="t1",
ticket_kind="internal",
)
updated = await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Is the device powered on?",
answer="yes",
)
assert len(updated.walked_path) == 1
assert updated.walked_path[0] == {
"node_id": "n1",
"question": "Is the device powered on?",
"answer": "yes",
"l1_note": None,
}
assert updated.current_node_id == "n1"
assert updated.last_step_at is not None
@pytest.mark.asyncio
async def test_record_step_two_sequential_steps_accumulate(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="t2",
ticket_kind="internal",
)
await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Step 1?",
answer="yes",
)
updated = await record_step(
test_db,
session_id=session.id,
node_id="n2",
question="Step 2?",
answer="no",
)
assert len(updated.walked_path) == 2
assert updated.walked_path[0]["node_id"] == "n1"
assert updated.walked_path[1]["node_id"] == "n2"
assert updated.current_node_id == "n2"
@pytest.mark.asyncio
async def test_record_step_blocks_adhoc(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="t3",
ticket_kind="internal",
)
with pytest.raises(ValueError, match="adhoc"):
await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Q?",
answer="yes",
)
@pytest.mark.asyncio
async def test_record_step_blocks_inactive_session(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="t4",
ticket_kind="internal",
)
# Manually mark resolved to simulate inactive state
session.status = "resolved"
await test_db.flush()
with pytest.raises(ValueError, match="not active"):
await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Q?",
answer="yes",
)
@pytest.mark.asyncio
async def test_record_step_includes_note_when_provided(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="t5",
ticket_kind="internal",
)
updated = await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Q?",
answer="yes",
note="Customer mentioned it started yesterday",
)
assert updated.walked_path[0]["l1_note"] == "Customer mentioned it started yesterday"
@pytest.mark.asyncio
async def test_update_notes_replaces_walk_notes(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="t6",
ticket_kind="internal",
)
new_notes = [{"timestamp": "2026-05-28T10:00:00Z", "content": "Customer rebooted"}]
updated = await update_notes(test_db, session_id=session.id, notes=new_notes)
assert updated.walk_notes == new_notes
assert updated.last_step_at is not None
@pytest.mark.asyncio
async def test_update_notes_raises_if_session_not_found(test_db: AsyncSession):
missing_id = uuid.uuid4()
with pytest.raises(ValueError, match="not found"):
await update_notes(test_db, session_id=missing_id, notes=[])
@pytest.mark.asyncio
async def test_update_notes_raises_if_session_not_active(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="t7",
ticket_kind="internal",
)
session.status = "escalated"
await test_db.flush()
with pytest.raises(ValueError, match="not active"):
await update_notes(test_db, session_id=session.id, notes=[])
@pytest.mark.asyncio
async def test_update_notes_size_cap(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="t8",
ticket_kind="internal",
)
# Create a notes payload larger than 256KB
big_content = "x" * (256 * 1024 + 100)
notes = [{"timestamp": "2026-05-28T10:00:00Z", "content": big_content}]
with pytest.raises(ValueError, match="256KB"):
await update_notes(test_db, session_id=session.id, notes=notes)
# ---------------------------------------------------------------------------
# T14: resolve, escalate, escalate_without_walk tests
# ---------------------------------------------------------------------------
async def _make_internal_ticket(db: AsyncSession, *, account_id: uuid.UUID, user_id: uuid.UUID) -> object:
"""Create an internal ticket and return it."""
return await internal_ticket_service.create_ticket(
db,
account_id=account_id,
created_by_user_id=user_id,
problem_statement="Customer cannot log in",
)
@pytest.mark.asyncio
async def test_resolve_proposal_helpful_flips_validated_by_outcome(test_db: AsyncSession):
"""resolve(helpful=True) on a proposal session sets validated_by_outcome=True."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
ai_session = await _make_ai_session(test_db, user_id=l1.id, account_id=account.id)
proposal = await _make_proposal(test_db, account_id=account.id, source_session_id=ai_session.id)
session = await start_proposal_session(
test_db,
account_id=account.id,
user=l1,
flow_proposal_id=proposal.id,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
resolved = await resolve(
test_db,
session_id=session.id,
helpful=True,
resolution_notes="Issue fixed by proposal walk",
)
assert resolved.status == "resolved"
assert resolved.helpful is True
assert resolved.resolution_notes == "Issue fixed by proposal walk"
assert resolved.resolved_at is not None
# Proposal should now be validated
await test_db.refresh(proposal)
assert proposal.validated_by_outcome is True
# Internal ticket should be closed
await test_db.refresh(ticket)
assert ticket.status == "resolved"
@pytest.mark.asyncio
async def test_resolve_proposal_not_helpful_leaves_validated_by_outcome_false(test_db: AsyncSession):
"""resolve(helpful=False) on a proposal session does NOT flip validated_by_outcome."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
ai_session = await _make_ai_session(test_db, user_id=l1.id, account_id=account.id)
proposal = await _make_proposal(test_db, account_id=account.id, source_session_id=ai_session.id)
session = await start_proposal_session(
test_db,
account_id=account.id,
user=l1,
flow_proposal_id=proposal.id,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
resolved = await resolve(
test_db,
session_id=session.id,
helpful=False,
resolution_notes="Proposal did not help",
)
assert resolved.helpful is False
await test_db.refresh(proposal)
assert proposal.validated_by_outcome is False
@pytest.mark.asyncio
async def test_resolve_flow_session_closes_ticket_no_proposal_update(test_db: AsyncSession):
"""resolve on a flow session closes the ticket and does not touch proposals."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
resolved = await resolve(
test_db,
session_id=session.id,
helpful=True,
resolution_notes="Flow resolved the issue",
)
assert resolved.status == "resolved"
assert resolved.session_kind == "flow"
assert resolved.flow_proposal_id is None
await test_db.refresh(ticket)
assert ticket.status == "resolved"
@pytest.mark.asyncio
async def test_resolve_adhoc_session_closes_ticket(test_db: AsyncSession):
"""resolve on an adhoc session closes the ticket with no proposal interaction."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
resolved = await resolve(
test_db,
session_id=session.id,
helpful=True,
resolution_notes="Adhoc resolved",
)
assert resolved.status == "resolved"
await test_db.refresh(ticket)
assert ticket.status == "resolved"
@pytest.mark.asyncio
async def test_resolve_psa_session_no_ticket_update(test_db: AsyncSession):
"""resolve on a PSA-backed session does not attempt to update an internal ticket."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="psa-external-id-123",
ticket_kind="psa",
)
resolved = await resolve(
test_db,
session_id=session.id,
helpful=True,
resolution_notes="PSA ticket resolved externally",
)
assert resolved.status == "resolved"
assert resolved.ticket_kind == "psa"
@pytest.mark.asyncio
async def test_resolve_raises_on_missing_session(test_db: AsyncSession):
"""resolve raises ValueError when session does not exist."""
with pytest.raises(ValueError, match="not found"):
await resolve(
test_db,
session_id=uuid.uuid4(),
helpful=True,
resolution_notes="N/A",
)
@pytest.mark.asyncio
async def test_resolve_raises_on_inactive_session(test_db: AsyncSession):
"""resolve raises ValueError when session is not active."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
session.status = "escalated"
await test_db.flush()
with pytest.raises(ValueError, match="not active"):
await resolve(
test_db,
session_id=session.id,
helpful=False,
resolution_notes="N/A",
)
@pytest.mark.asyncio
async def test_escalate_marks_session_and_ticket_as_escalated(test_db: AsyncSession):
"""escalate sets session status=escalated and closes the internal ticket as escalated."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
escalated = await escalate(
test_db,
session_id=session.id,
reason="Customer reported intermittent failure not covered by flow",
reason_category="out_of_scope",
)
assert escalated.status == "escalated"
assert escalated.escalation_reason == "Customer reported intermittent failure not covered by flow"
assert escalated.escalation_reason_category == "out_of_scope"
assert escalated.resolved_at is not None
assert escalated.last_step_at is not None
await test_db.refresh(ticket)
assert ticket.status == "escalated"
@pytest.mark.asyncio
async def test_escalate_raises_on_missing_session(test_db: AsyncSession):
"""escalate raises ValueError when session does not exist."""
with pytest.raises(ValueError, match="not found"):
await escalate(
test_db,
session_id=uuid.uuid4(),
reason="Some reason",
reason_category="unknown",
)
@pytest.mark.asyncio
async def test_escalate_raises_on_inactive_session(test_db: AsyncSession):
"""escalate raises ValueError when session is already inactive."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
session.status = "resolved"
await test_db.flush()
with pytest.raises(ValueError, match="not active"):
await escalate(
test_db,
session_id=session.id,
reason="Too late",
reason_category="other",
)
@pytest.mark.asyncio
async def test_escalate_without_walk_creates_escalated_adhoc_session(test_db: AsyncSession):
"""escalate_without_walk creates an immediately-escalated session with empty walked_path."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
session = await escalate_without_walk(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
reason_category="no_kb_content",
reason="No KB article matched this issue",
)
assert session.status == "escalated"
assert session.session_kind == "adhoc"
assert session.walked_path == []
assert session.escalation_reason == "No KB article matched this issue"
assert session.escalation_reason_category == "no_kb_content"
assert session.resolved_at is not None
assert session.last_step_at is not None
assert session.account_id == account.id
assert session.created_by_user_id == l1.id
@pytest.mark.asyncio
async def test_escalate_without_walk_escalates_internal_ticket(test_db: AsyncSession):
"""escalate_without_walk marks the internal ticket as escalated."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
await escalate_without_walk(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
reason_category="no_kb_content",
)
await test_db.refresh(ticket)
assert ticket.status == "escalated"
@pytest.mark.asyncio
async def test_escalate_without_walk_psa_does_not_touch_internal_ticket(test_db: AsyncSession):
"""escalate_without_walk with ticket_kind='psa' does not update internal tickets."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await escalate_without_walk(
test_db,
account_id=account.id,
user=l1,
ticket_id="psa-ticket-999",
ticket_kind="psa",
reason_category="no_kb_content",
)
assert session.status == "escalated"
assert session.ticket_kind == "psa"
@pytest.mark.asyncio
async def test_escalate_without_walk_reason_is_optional(test_db: AsyncSession):
"""escalate_without_walk works without a reason string."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
session = await escalate_without_walk(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
reason_category="no_kb_content",
)
assert session.escalation_reason is None
assert session.escalation_reason_category == "no_kb_content"
# ---------------------------------------------------------------------------
# T7: start_ai_build_session
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_start_ai_build_session(test_db: AsyncSession):
from app.services import l1_session_service as svc
account = await _make_account(test_db)
l1_user = await _make_user(test_db, account_id=account.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id="t-ai", ticket_kind="internal",
)
assert s.session_kind == "ai_build"
assert s.flow_id is None and s.flow_proposal_id is None
assert s.status == "active"
# ---------------------------------------------------------------------------
# T8: advance_ai_build
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_advance_ai_build_appends_and_returns_next(test_db: AsyncSession, monkeypatch):
from app.services import l1_session_service as svc
from app.services import ai_tree_builder
account = await _make_account(test_db)
l1_user = await _make_user(test_db, account_id=account.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id="t-ai", ticket_kind="internal")
async def fake_next(problem, category, walked):
return {"node_type": "resolved", "id": "done", "text": "Fixed."}
monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next)
next_node = await svc.advance_ai_build(
test_db, session_id=s.id, problem_text="printer", category="printer",
node_id="n1", node_text="Powered on?", answer="no", note=None)
assert next_node["node_type"] == "resolved"
refreshed = await test_db.get(type(s), s.id)
assert len(refreshed.walked_path) == 1
assert refreshed.walked_path[0]["answer"] == "no"
assert refreshed.walked_path[0]["text"] == "Powered on?"
@pytest.mark.asyncio
async def test_advance_ai_build_first_call_does_not_append(test_db: AsyncSession, monkeypatch):
from app.services import l1_session_service as svc
from app.services import ai_tree_builder
account = await _make_account(test_db)
l1_user = await _make_user(test_db, account_id=account.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id="t-ai-first", ticket_kind="internal")
async def fake_next(problem, category, walked):
return {"node_type": "question", "id": "q1", "text": "Is it plugged in?"}
monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next)
# First call: node_id=None — nothing should be appended
next_node = await svc.advance_ai_build(
test_db, session_id=s.id, problem_text="printer", category="printer",
node_id=None)
assert next_node["node_type"] == "question"
assert next_node["id"] == "q1"
refreshed = await test_db.get(type(s), s.id)
assert len(refreshed.walked_path) == 0
assert refreshed.current_node_id == "q1"
@pytest.mark.asyncio
async def test_advance_ai_build_wrong_session_kind_raises(test_db: AsyncSession, monkeypatch):
from app.services import l1_session_service as svc
from app.services import ai_tree_builder
account = await _make_account(test_db)
l1_user = await _make_user(test_db, account_id=account.id)
# start an adhoc session (not ai_build)
s = await svc.start_adhoc_session(
test_db, account_id=account.id, user=l1_user,
ticket_id="t-adhoc-guard", ticket_kind="internal")
async def fake_next(problem, category, walked): # pragma: no cover
return {"node_type": "question", "id": "q1", "text": "?"}
monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next)
with pytest.raises(ValueError, match="ai_build"):
await svc.advance_ai_build(
test_db, session_id=s.id, problem_text="printer", category="printer")
# ---------------------------------------------------------------------------
# T9: flywheel capture on resolve + engineer notification on escalate
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_resolve_ai_build_creates_outcome_validated_proposal(test_db: AsyncSession, monkeypatch):
"""resolve(helpful=True) on an ai_build session creates a FlowProposal with validated_by_outcome=True."""
from app.services import l1_session_service as svc
account = await _make_account(test_db)
l1_user = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1_user.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id=str(ticket.id), ticket_kind="internal",
)
# Populate walked_path with at least one node (needed for normalize_walked_path)
s.walked_path = [
{"node_type": "question", "id": "n1", "text": "On?", "answer": "no"},
{"node_type": "resolved", "id": "n2", "text": "Fixed."},
]
await test_db.flush()
await svc.resolve(test_db, session_id=s.id, helpful=True, resolution_notes="ok")
props = (await test_db.execute(
select(FlowProposal).where(FlowProposal.l1_session_id == s.id)
)).scalars().all()
assert len(props) == 1
assert props[0].source == "ai_realtime_l1"
assert props[0].validated_by_outcome is True
assert props[0].source_session_id is None
assert props[0].proposed_flow_data["tree_structure"]["id"] == "n1"
assert props[0].proposal_type == "new_flow"
assert props[0].proposed_flow_data["match_keywords"] == []
@pytest.mark.asyncio
async def test_escalate_notifies_engineers(test_db: AsyncSession, monkeypatch):
"""escalate() calls notify with event='l1.session.escalated' and explicit engineer recipients."""
from app.services import l1_session_service as svc
calls = {}
async def fake_notify(event, account_id, payload, db, target_user_ids=None):
calls["event"] = event
calls["target_user_ids"] = target_user_ids
monkeypatch.setattr(svc, "notify", fake_notify)
account = await _make_account(test_db)
# l1_user is the session owner (account_role="l1_tech" by default — NOT in the recipient query)
l1_user = await _make_user(test_db, account_id=account.id)
# Seed an eligible recipient: account_role="engineer" matches the production query
# (owner/admin/engineer). Without this user, target_ids would be [] and the
# eng.id assertion below would fail, proving the assertion is non-vacuous.
eng = await _make_user(test_db, account_id=account.id, account_role="engineer")
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1_user.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id=str(ticket.id), ticket_kind="internal",
)
await svc.escalate(test_db, session_id=s.id, reason="stuck", reason_category="exhausted_safe_steps")
assert calls["event"] == "l1.session.escalated"
assert isinstance(calls["target_user_ids"], list) and len(calls["target_user_ids"]) >= 1
assert eng.id in calls["target_user_ids"] # the eligible engineer is a recipient
# ---------------------------------------------------------------------------
# T14 audit log tests (spec §5.6.1)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_resolve_writes_audit_log_with_acting_as(test_db: AsyncSession):
"""resolve() writes an audit_logs row with acting_as='l1_coverage' for engineers."""
account = await _make_account(test_db)
eng = await _make_user(
test_db,
account_id=account.id,
account_role="engineer",
can_cover_l1=True,
)
ticket = await _make_internal_ticket(
test_db, account_id=account.id, user_id=eng.id
)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=eng,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
await resolve(
test_db,
session_id=session.id,
helpful=True,
resolution_notes="Coverage engineer resolved",
)
result = await test_db.execute(
select(AuditLog).where(
AuditLog.action == "l1.session.resolve",
AuditLog.resource_id == session.id,
)
)
row = result.scalar_one()
assert row.acting_as == "l1_coverage"
assert row.user_id == eng.id
assert row.account_id == account.id
assert row.details["helpful"] is True
@pytest.mark.asyncio
async def test_resolve_writes_audit_log_native_l1_acting_as_null(
test_db: AsyncSession,
):
"""resolve() writes an audit_logs row with acting_as=None for native l1_tech."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id, account_role="l1_tech")
ticket = await _make_internal_ticket(
test_db, account_id=account.id, user_id=l1.id
)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
await resolve(
test_db,
session_id=session.id,
helpful=False,
resolution_notes="Native L1 resolved",
)
result = await test_db.execute(
select(AuditLog).where(
AuditLog.action == "l1.session.resolve",
AuditLog.resource_id == session.id,
)
)
row = result.scalar_one()
assert row.acting_as is None
@pytest.mark.asyncio
async def test_escalate_writes_audit_log(test_db: AsyncSession):
"""escalate() writes an audit_logs row with action='l1.session.escalate'."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(
test_db, account_id=account.id, user_id=l1.id
)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
await escalate(
test_db,
session_id=session.id,
reason="Beyond scope",
reason_category="out_of_scope",
)
result = await test_db.execute(
select(AuditLog).where(
AuditLog.action == "l1.session.escalate",
AuditLog.resource_id == session.id,
)
)
row = result.scalar_one()
assert row.details["escalation_reason_category"] == "out_of_scope"
assert row.account_id == account.id
@pytest.mark.asyncio
async def test_escalate_without_walk_writes_audit_log(test_db: AsyncSession):
"""escalate_without_walk() writes an audit_logs row."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(
test_db, account_id=account.id, user_id=l1.id
)
session = await escalate_without_walk(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
reason_category="no_kb_content",
)
result = await test_db.execute(
select(AuditLog).where(
AuditLog.action == "l1.session.escalate_no_walk",
AuditLog.resource_id == session.id,
)
)
row = result.scalar_one()
assert row.account_id == account.id
# Audit coverage: the reason category must be recorded (restored — a prior
# edit dropped this assertion, weakening the audit guarantee).
assert row.details["escalation_reason_category"] == "no_kb_content"
# ---------------------------------------------------------------------------
# Finding 1 (server-assigned node ids) + Finding 8 (pending-node replay)
# ---------------------------------------------------------------------------
class _FakeProvider:
def __init__(self, raw):
self._raw = raw
async def generate_json(self, *, system_prompt, messages, max_tokens):
return self._raw, None, None
@pytest.mark.asyncio
async def test_ai_build_first_node_carries_id_and_advance_grows_walk(
test_db: AsyncSession, monkeypatch,
):
"""Finding 1 contract: the SYSTEM_PROMPT never asks for an id, yet the first
generated node must carry one — and advancing with that id must grow walked_path
(the original showstopper: node_id was always None, so the walk never advanced)."""
from app.services import l1_session_service as svc
from app.services import ai_tree_builder
account = await _make_account(test_db)
l1_user = await _make_user(test_db, account_id=account.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id="t-contract", ticket_kind="internal",
category="printer", problem_text="printer offline")
# Real generator + a provider that omits id (the shape the model produces).
monkeypatch.setattr(
ai_tree_builder, "get_ai_provider",
lambda *a, **k: _FakeProvider('{"node_type":"question","text":"Plugged in?"}'))
first = await svc.advance_ai_build(
test_db, session_id=s.id, problem_text="printer offline",
category="printer", node_id=None)
assert first.get("id"), "first node must carry a server-assigned id"
# Answer it with the id we were handed; walked_path must grow by one.
await svc.advance_ai_build(
test_db, session_id=s.id, problem_text="printer offline", category="printer",
node_id=first["id"], node_text=first["text"], answer="no")
refreshed = await test_db.get(L1WalkSession, s.id)
assert len(refreshed.walked_path) == 1
assert refreshed.walked_path[0]["id"] == first["id"]
@pytest.mark.asyncio
async def test_advance_ai_build_replays_pending_node_without_regenerating(
test_db: AsyncSession, monkeypatch,
):
"""Finding 8: a re-mount (node_id=None) replays the served-but-unanswered node
instead of firing a fresh paid LLM call (which could also swap the question)."""
from app.services import l1_session_service as svc
from app.services import ai_tree_builder
account = await _make_account(test_db)
l1_user = await _make_user(test_db, account_id=account.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id="t-replay", ticket_kind="internal",
category="printer", problem_text="printer offline")
calls = {"n": 0}
async def fake_next(problem, category, walked):
calls["n"] += 1
return {"node_type": "question", "id": f"q{calls['n']}", "text": "?"}
monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next)
first = await svc.advance_ai_build(
test_db, session_id=s.id, problem_text="p", category="printer", node_id=None)
# Re-mount without answering — must NOT regenerate.
replay = await svc.advance_ai_build(
test_db, session_id=s.id, problem_text="p", category="printer", node_id=None)
assert calls["n"] == 1
assert replay["id"] == first["id"]
@pytest.mark.asyncio
async def test_advance_ai_build_records_answer_label_from_pending_node(
test_db: AsyncSession, monkeypatch,
):
"""When the served question carried yes_label/no_label, answering it must
record the chosen label (answer_label) in walked_path — derived server-side
from pending_node, never trusted from the client. 'Microsoft account or
local account? -> yes' is meaningless in the transcript and the LLM context."""
from app.services import l1_session_service as svc
from app.services import ai_tree_builder
account = await _make_account(test_db)
l1_user = await _make_user(test_db, account_id=account.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id="t-label", ticket_kind="internal",
category="account_login", problem_text="login issue")
async def fake_next(problem, category, walked):
return {"node_type": "question", "id": "q-acct",
"text": "Is the account a Microsoft account or a local account?",
"yes_label": "Microsoft account", "no_label": "Local account"}
monkeypatch.setattr(ai_tree_builder, "generate_next_node", fake_next)
first = await svc.advance_ai_build(
test_db, session_id=s.id, problem_text="login issue",
category="account_login", node_id=None)
await svc.advance_ai_build(
test_db, session_id=s.id, problem_text="login issue",
category="account_login",
node_id=first["id"], node_text=first["text"], answer="yes")
refreshed = await test_db.get(L1WalkSession, s.id)
assert refreshed.walked_path[0]["answer"] == "yes"
assert refreshed.walked_path[0]["answer_label"] == "Microsoft account"
# ---------------------------------------------------------------------------
# Finding 10: escalation recipient resolution
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_escalate_skips_soft_deleted_engineer(test_db: AsyncSession, monkeypatch):
"""A soft-deleted engineer must not be paged (is_active alone misses them)."""
from datetime import datetime, timezone
from app.services import l1_session_service as svc
calls = {}
async def fake_notify(event, account_id, payload, db, target_user_ids=None):
calls["target_user_ids"] = target_user_ids
monkeypatch.setattr(svc, "notify", fake_notify)
account = await _make_account(test_db)
l1_user = await _make_user(test_db, account_id=account.id)
live_eng = await _make_user(test_db, account_id=account.id, account_role="engineer")
dead_eng = await _make_user(test_db, account_id=account.id, account_role="engineer")
dead_eng.deleted_at = datetime.now(timezone.utc)
await test_db.flush()
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1_user.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id=str(ticket.id), ticket_kind="internal")
await svc.escalate(test_db, session_id=s.id, reason="x", reason_category="exhausted_safe_steps")
assert live_eng.id in calls["target_user_ids"]
assert dead_eng.id not in calls["target_user_ids"]
@pytest.mark.asyncio
async def test_escalate_with_no_engineers_falls_back_to_default_recipients(
test_db: AsyncSession, monkeypatch,
):
"""Finding 10: when no eligible engineer exists, pass None (not []) so notify()
falls back to the default owner/admin set instead of silently dropping it."""
from app.services import l1_session_service as svc
calls = {}
async def fake_notify(event, account_id, payload, db, target_user_ids=None):
calls["target_user_ids"] = target_user_ids
monkeypatch.setattr(svc, "notify", fake_notify)
account = await _make_account(test_db)
# Only an l1_tech exists — not in the owner/admin/engineer recipient query.
l1_user = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1_user.id)
s = await svc.start_ai_build_session(
test_db, account_id=account.id, user=l1_user,
ticket_id=str(ticket.id), ticket_kind="internal")
await svc.escalate(test_db, session_id=s.id, reason="x", reason_category="exhausted_safe_steps")
assert calls["target_user_ids"] is None