Files
resolutionflow/backend/tests/test_l1_session_service.py
Michael Chihlas e803a78ded feat(l1): l1_session_service record_step + update_notes
record_step appends to walked_path JSONB and advances current_node_id
on flow/proposal walks; refuses adhoc sessions. update_notes replaces
walk_notes (used by adhoc walks for debounced autosave); 256KB size cap
to prevent unbounded JSONB growth. Both reject non-active sessions.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-28 13:20:20 -04:00

422 lines
13 KiB
Python

"""Tests for l1_session_service start_* functions (T12) and record_step/update_notes (T13)."""
import uuid
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.account import Account
from app.models.user import User
from app.models.tree import Tree
from app.models.ai_session import AISession
from app.models.flow_proposal import FlowProposal
from app.services.l1_session_service import (
start_flow_session,
start_proposal_session,
start_adhoc_session,
_resolve_acting_as,
record_step,
update_notes,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _make_account(db: AsyncSession) -> Account:
s = str(uuid.uuid4())[:8]
account = Account(
id=uuid.uuid4(),
name=f"Test Account {s}",
display_code=s[:8].upper(),
)
db.add(account)
await db.flush()
return account
async def _make_user(
db: AsyncSession,
*,
account_id: uuid.UUID,
account_role: str = "l1_tech",
can_cover_l1: bool = False,
) -> User:
user = User(
id=uuid.uuid4(),
email=f"user-{uuid.uuid4()}@example.com",
name="Test User",
account_id=account_id,
account_role=account_role,
role="engineer",
is_active=True,
can_cover_l1=can_cover_l1,
)
db.add(user)
await db.flush()
return user
async def _make_tree(db: AsyncSession, *, account_id: uuid.UUID, author_id: uuid.UUID) -> Tree:
tree = Tree(
id=uuid.uuid4(),
name="Test Flow",
account_id=account_id,
author_id=author_id,
tree_type="troubleshooting",
tree_structure={"nodes": [], "edges": []},
visibility="team",
status="published",
)
db.add(tree)
await db.flush()
return tree
async def _make_ai_session(db: AsyncSession, *, user_id: uuid.UUID, account_id: uuid.UUID) -> AISession:
ai_session = AISession(
id=uuid.uuid4(),
user_id=user_id,
account_id=account_id,
session_type="chat",
intake_type="free_text",
intake_content={"text": "test"},
status="active",
confidence_tier="discovery",
conversation_messages=[],
)
db.add(ai_session)
await db.flush()
return ai_session
async def _make_proposal(
db: AsyncSession,
*,
account_id: uuid.UUID,
source_session_id: uuid.UUID,
) -> FlowProposal:
proposal = FlowProposal(
id=uuid.uuid4(),
account_id=account_id,
source_session_id=source_session_id,
proposal_type="new_flow",
title="Test Proposal",
proposed_flow_data={"nodes": [], "edges": []},
source="manual_draft",
status="pending",
)
db.add(proposal)
await db.flush()
return proposal
# ---------------------------------------------------------------------------
# Unit tests for _resolve_acting_as (no DB needed)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_resolve_acting_as_for_engineer_returns_coverage_tag():
user = User(account_role="engineer")
assert _resolve_acting_as(user) == "l1_coverage"
@pytest.mark.asyncio
async def test_resolve_acting_as_for_l1_tech_returns_none():
user = User(account_role="l1_tech")
assert _resolve_acting_as(user) is None
@pytest.mark.asyncio
async def test_resolve_acting_as_for_owner_returns_none():
user = User(account_role="owner")
assert _resolve_acting_as(user) is None
# ---------------------------------------------------------------------------
# Integration tests (real DB)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_start_flow_session_creates_active_flow_session(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="ticket-abc",
ticket_kind="internal",
)
assert session.session_kind == "flow"
assert session.flow_id == tree.id
assert session.flow_proposal_id is None
assert session.status == "active"
assert session.walked_path == []
assert session.walk_notes == []
assert session.acting_as is None # l1_tech native
assert session.ticket_id == "ticket-abc"
assert session.ticket_kind == "internal"
@pytest.mark.asyncio
async def test_start_proposal_session_creates_active_proposal_session(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ai_session = await _make_ai_session(test_db, user_id=l1.id, account_id=account.id)
proposal = await _make_proposal(
test_db,
account_id=account.id,
source_session_id=ai_session.id,
)
session = await start_proposal_session(
test_db,
account_id=account.id,
user=l1,
flow_proposal_id=proposal.id,
ticket_id="ticket-xyz",
ticket_kind="psa",
)
assert session.session_kind == "proposal"
assert session.flow_proposal_id == proposal.id
assert session.flow_id is None
assert session.status == "active"
@pytest.mark.asyncio
async def test_start_adhoc_session_no_flow_no_proposal(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="ticket-adhoc",
ticket_kind="internal",
)
assert session.session_kind == "adhoc"
assert session.flow_id is None
assert session.flow_proposal_id is None
assert session.walked_path == []
assert session.walk_notes == []
@pytest.mark.asyncio
async def test_engineer_with_coverage_gets_acting_as_tag(test_db: AsyncSession):
account = await _make_account(test_db)
eng = await _make_user(
test_db,
account_id=account.id,
account_role="engineer",
can_cover_l1=True,
)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=eng,
ticket_id="t",
ticket_kind="internal",
)
assert session.acting_as == "l1_coverage"
# ---------------------------------------------------------------------------
# T13: record_step and update_notes tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_record_step_appends_to_walked_path(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="t1",
ticket_kind="internal",
)
updated = await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Is the device powered on?",
answer="yes",
)
assert len(updated.walked_path) == 1
assert updated.walked_path[0] == {
"node_id": "n1",
"question": "Is the device powered on?",
"answer": "yes",
"l1_note": None,
}
assert updated.current_node_id == "n1"
assert updated.last_step_at is not None
@pytest.mark.asyncio
async def test_record_step_two_sequential_steps_accumulate(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="t2",
ticket_kind="internal",
)
await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Step 1?",
answer="yes",
)
updated = await record_step(
test_db,
session_id=session.id,
node_id="n2",
question="Step 2?",
answer="no",
)
assert len(updated.walked_path) == 2
assert updated.walked_path[0]["node_id"] == "n1"
assert updated.walked_path[1]["node_id"] == "n2"
assert updated.current_node_id == "n2"
@pytest.mark.asyncio
async def test_record_step_blocks_adhoc(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="t3",
ticket_kind="internal",
)
with pytest.raises(ValueError, match="adhoc"):
await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Q?",
answer="yes",
)
@pytest.mark.asyncio
async def test_record_step_blocks_inactive_session(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="t4",
ticket_kind="internal",
)
# Manually mark resolved to simulate inactive state
session.status = "resolved"
await test_db.flush()
with pytest.raises(ValueError, match="not active"):
await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Q?",
answer="yes",
)
@pytest.mark.asyncio
async def test_record_step_includes_note_when_provided(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
session = await start_flow_session(
test_db,
account_id=account.id,
user=l1,
flow_id=tree.id,
ticket_id="t5",
ticket_kind="internal",
)
updated = await record_step(
test_db,
session_id=session.id,
node_id="n1",
question="Q?",
answer="yes",
note="Customer mentioned it started yesterday",
)
assert updated.walked_path[0]["l1_note"] == "Customer mentioned it started yesterday"
@pytest.mark.asyncio
async def test_update_notes_replaces_walk_notes(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="t6",
ticket_kind="internal",
)
new_notes = [{"timestamp": "2026-05-28T10:00:00Z", "content": "Customer rebooted"}]
updated = await update_notes(test_db, session_id=session.id, notes=new_notes)
assert updated.walk_notes == new_notes
assert updated.last_step_at is not None
@pytest.mark.asyncio
async def test_update_notes_raises_if_session_not_found(test_db: AsyncSession):
missing_id = uuid.uuid4()
with pytest.raises(ValueError, match="not found"):
await update_notes(test_db, session_id=missing_id, notes=[])
@pytest.mark.asyncio
async def test_update_notes_raises_if_session_not_active(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="t7",
ticket_kind="internal",
)
session.status = "escalated"
await test_db.flush()
with pytest.raises(ValueError, match="not active"):
await update_notes(test_db, session_id=session.id, notes=[])
@pytest.mark.asyncio
async def test_update_notes_size_cap(test_db: AsyncSession):
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id="t8",
ticket_kind="internal",
)
# Create a notes payload larger than 256KB
big_content = "x" * (256 * 1024 + 100)
notes = [{"timestamp": "2026-05-28T10:00:00Z", "content": big_content}]
with pytest.raises(ValueError, match="256KB"):
await update_notes(test_db, session_id=session.id, notes=notes)