936 lines
30 KiB
Python
936 lines
30 KiB
Python
"""Tests for l1_session_service start_* functions (T12), record_step/update_notes (T13), resolve/escalate (T14)."""
|
|
import uuid
|
|
import pytest
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from sqlalchemy import select
|
|
|
|
from app.models.account import Account
|
|
from app.models.audit_log import AuditLog
|
|
from app.models.user import User
|
|
from app.models.tree import Tree
|
|
from app.models.ai_session import AISession
|
|
from app.models.flow_proposal import FlowProposal
|
|
from app.services.l1_session_service import (
|
|
start_flow_session,
|
|
start_proposal_session,
|
|
start_adhoc_session,
|
|
_resolve_acting_as,
|
|
record_step,
|
|
update_notes,
|
|
resolve,
|
|
escalate,
|
|
escalate_without_walk,
|
|
)
|
|
from app.services import internal_ticket_service
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _make_account(db: AsyncSession) -> Account:
|
|
s = str(uuid.uuid4())[:8]
|
|
account = Account(
|
|
id=uuid.uuid4(),
|
|
name=f"Test Account {s}",
|
|
display_code=s[:8].upper(),
|
|
)
|
|
db.add(account)
|
|
await db.flush()
|
|
return account
|
|
|
|
|
|
async def _make_user(
|
|
db: AsyncSession,
|
|
*,
|
|
account_id: uuid.UUID,
|
|
account_role: str = "l1_tech",
|
|
can_cover_l1: bool = False,
|
|
) -> User:
|
|
user = User(
|
|
id=uuid.uuid4(),
|
|
email=f"user-{uuid.uuid4()}@example.com",
|
|
name="Test User",
|
|
account_id=account_id,
|
|
account_role=account_role,
|
|
role="engineer",
|
|
is_active=True,
|
|
can_cover_l1=can_cover_l1,
|
|
)
|
|
db.add(user)
|
|
await db.flush()
|
|
return user
|
|
|
|
|
|
async def _make_tree(db: AsyncSession, *, account_id: uuid.UUID, author_id: uuid.UUID) -> Tree:
|
|
tree = Tree(
|
|
id=uuid.uuid4(),
|
|
name="Test Flow",
|
|
account_id=account_id,
|
|
author_id=author_id,
|
|
tree_type="troubleshooting",
|
|
tree_structure={"nodes": [], "edges": []},
|
|
visibility="team",
|
|
status="published",
|
|
)
|
|
db.add(tree)
|
|
await db.flush()
|
|
return tree
|
|
|
|
|
|
async def _make_ai_session(db: AsyncSession, *, user_id: uuid.UUID, account_id: uuid.UUID) -> AISession:
|
|
ai_session = AISession(
|
|
id=uuid.uuid4(),
|
|
user_id=user_id,
|
|
account_id=account_id,
|
|
session_type="chat",
|
|
intake_type="free_text",
|
|
intake_content={"text": "test"},
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
conversation_messages=[],
|
|
)
|
|
db.add(ai_session)
|
|
await db.flush()
|
|
return ai_session
|
|
|
|
|
|
async def _make_proposal(
|
|
db: AsyncSession,
|
|
*,
|
|
account_id: uuid.UUID,
|
|
source_session_id: uuid.UUID,
|
|
) -> FlowProposal:
|
|
proposal = FlowProposal(
|
|
id=uuid.uuid4(),
|
|
account_id=account_id,
|
|
source_session_id=source_session_id,
|
|
proposal_type="new_flow",
|
|
title="Test Proposal",
|
|
proposed_flow_data={"nodes": [], "edges": []},
|
|
source="manual_draft",
|
|
status="pending",
|
|
)
|
|
db.add(proposal)
|
|
await db.flush()
|
|
return proposal
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Unit tests for _resolve_acting_as (no DB needed)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_acting_as_for_engineer_returns_coverage_tag():
|
|
user = User(account_role="engineer")
|
|
assert _resolve_acting_as(user) == "l1_coverage"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_acting_as_for_l1_tech_returns_none():
|
|
user = User(account_role="l1_tech")
|
|
assert _resolve_acting_as(user) is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_acting_as_for_owner_returns_none():
|
|
user = User(account_role="owner")
|
|
assert _resolve_acting_as(user) is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Integration tests (real DB)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_flow_session_creates_active_flow_session(test_db: AsyncSession):
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
|
|
|
|
session = await start_flow_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
flow_id=tree.id,
|
|
ticket_id="ticket-abc",
|
|
ticket_kind="internal",
|
|
)
|
|
assert session.session_kind == "flow"
|
|
assert session.flow_id == tree.id
|
|
assert session.flow_proposal_id is None
|
|
assert session.status == "active"
|
|
assert session.walked_path == []
|
|
assert session.walk_notes == []
|
|
assert session.acting_as is None # l1_tech native
|
|
assert session.ticket_id == "ticket-abc"
|
|
assert session.ticket_kind == "internal"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_proposal_session_creates_active_proposal_session(test_db: AsyncSession):
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
ai_session = await _make_ai_session(test_db, user_id=l1.id, account_id=account.id)
|
|
proposal = await _make_proposal(
|
|
test_db,
|
|
account_id=account.id,
|
|
source_session_id=ai_session.id,
|
|
)
|
|
|
|
session = await start_proposal_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
flow_proposal_id=proposal.id,
|
|
ticket_id="ticket-xyz",
|
|
ticket_kind="psa",
|
|
)
|
|
assert session.session_kind == "proposal"
|
|
assert session.flow_proposal_id == proposal.id
|
|
assert session.flow_id is None
|
|
assert session.status == "active"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_adhoc_session_no_flow_no_proposal(test_db: AsyncSession):
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
session = await start_adhoc_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id="ticket-adhoc",
|
|
ticket_kind="internal",
|
|
)
|
|
assert session.session_kind == "adhoc"
|
|
assert session.flow_id is None
|
|
assert session.flow_proposal_id is None
|
|
assert session.walked_path == []
|
|
assert session.walk_notes == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_engineer_with_coverage_gets_acting_as_tag(test_db: AsyncSession):
|
|
account = await _make_account(test_db)
|
|
eng = await _make_user(
|
|
test_db,
|
|
account_id=account.id,
|
|
account_role="engineer",
|
|
can_cover_l1=True,
|
|
)
|
|
session = await start_adhoc_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=eng,
|
|
ticket_id="t",
|
|
ticket_kind="internal",
|
|
)
|
|
assert session.acting_as == "l1_coverage"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# T13: record_step and update_notes tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_record_step_appends_to_walked_path(test_db: AsyncSession):
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
|
|
session = await start_flow_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
flow_id=tree.id,
|
|
ticket_id="t1",
|
|
ticket_kind="internal",
|
|
)
|
|
updated = await record_step(
|
|
test_db,
|
|
session_id=session.id,
|
|
node_id="n1",
|
|
question="Is the device powered on?",
|
|
answer="yes",
|
|
)
|
|
assert len(updated.walked_path) == 1
|
|
assert updated.walked_path[0] == {
|
|
"node_id": "n1",
|
|
"question": "Is the device powered on?",
|
|
"answer": "yes",
|
|
"l1_note": None,
|
|
}
|
|
assert updated.current_node_id == "n1"
|
|
assert updated.last_step_at is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_record_step_two_sequential_steps_accumulate(test_db: AsyncSession):
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
|
|
session = await start_flow_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
flow_id=tree.id,
|
|
ticket_id="t2",
|
|
ticket_kind="internal",
|
|
)
|
|
await record_step(
|
|
test_db,
|
|
session_id=session.id,
|
|
node_id="n1",
|
|
question="Step 1?",
|
|
answer="yes",
|
|
)
|
|
updated = await record_step(
|
|
test_db,
|
|
session_id=session.id,
|
|
node_id="n2",
|
|
question="Step 2?",
|
|
answer="no",
|
|
)
|
|
assert len(updated.walked_path) == 2
|
|
assert updated.walked_path[0]["node_id"] == "n1"
|
|
assert updated.walked_path[1]["node_id"] == "n2"
|
|
assert updated.current_node_id == "n2"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_record_step_blocks_adhoc(test_db: AsyncSession):
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
session = await start_adhoc_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id="t3",
|
|
ticket_kind="internal",
|
|
)
|
|
with pytest.raises(ValueError, match="adhoc"):
|
|
await record_step(
|
|
test_db,
|
|
session_id=session.id,
|
|
node_id="n1",
|
|
question="Q?",
|
|
answer="yes",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_record_step_blocks_inactive_session(test_db: AsyncSession):
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
|
|
session = await start_flow_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
flow_id=tree.id,
|
|
ticket_id="t4",
|
|
ticket_kind="internal",
|
|
)
|
|
# Manually mark resolved to simulate inactive state
|
|
session.status = "resolved"
|
|
await test_db.flush()
|
|
with pytest.raises(ValueError, match="not active"):
|
|
await record_step(
|
|
test_db,
|
|
session_id=session.id,
|
|
node_id="n1",
|
|
question="Q?",
|
|
answer="yes",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_record_step_includes_note_when_provided(test_db: AsyncSession):
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
|
|
session = await start_flow_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
flow_id=tree.id,
|
|
ticket_id="t5",
|
|
ticket_kind="internal",
|
|
)
|
|
updated = await record_step(
|
|
test_db,
|
|
session_id=session.id,
|
|
node_id="n1",
|
|
question="Q?",
|
|
answer="yes",
|
|
note="Customer mentioned it started yesterday",
|
|
)
|
|
assert updated.walked_path[0]["l1_note"] == "Customer mentioned it started yesterday"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_notes_replaces_walk_notes(test_db: AsyncSession):
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
session = await start_adhoc_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id="t6",
|
|
ticket_kind="internal",
|
|
)
|
|
new_notes = [{"timestamp": "2026-05-28T10:00:00Z", "content": "Customer rebooted"}]
|
|
updated = await update_notes(test_db, session_id=session.id, notes=new_notes)
|
|
assert updated.walk_notes == new_notes
|
|
assert updated.last_step_at is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_notes_raises_if_session_not_found(test_db: AsyncSession):
|
|
missing_id = uuid.uuid4()
|
|
with pytest.raises(ValueError, match="not found"):
|
|
await update_notes(test_db, session_id=missing_id, notes=[])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_notes_raises_if_session_not_active(test_db: AsyncSession):
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
session = await start_adhoc_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id="t7",
|
|
ticket_kind="internal",
|
|
)
|
|
session.status = "escalated"
|
|
await test_db.flush()
|
|
with pytest.raises(ValueError, match="not active"):
|
|
await update_notes(test_db, session_id=session.id, notes=[])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_notes_size_cap(test_db: AsyncSession):
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
session = await start_adhoc_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id="t8",
|
|
ticket_kind="internal",
|
|
)
|
|
# Create a notes payload larger than 256KB
|
|
big_content = "x" * (256 * 1024 + 100)
|
|
notes = [{"timestamp": "2026-05-28T10:00:00Z", "content": big_content}]
|
|
with pytest.raises(ValueError, match="256KB"):
|
|
await update_notes(test_db, session_id=session.id, notes=notes)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# T14: resolve, escalate, escalate_without_walk tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _make_internal_ticket(db: AsyncSession, *, account_id: uuid.UUID, user_id: uuid.UUID) -> object:
|
|
"""Create an internal ticket and return it."""
|
|
return await internal_ticket_service.create_ticket(
|
|
db,
|
|
account_id=account_id,
|
|
created_by_user_id=user_id,
|
|
problem_statement="Customer cannot log in",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_proposal_helpful_flips_validated_by_outcome(test_db: AsyncSession):
|
|
"""resolve(helpful=True) on a proposal session sets validated_by_outcome=True."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
|
|
ai_session = await _make_ai_session(test_db, user_id=l1.id, account_id=account.id)
|
|
proposal = await _make_proposal(test_db, account_id=account.id, source_session_id=ai_session.id)
|
|
|
|
session = await start_proposal_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
flow_proposal_id=proposal.id,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
)
|
|
resolved = await resolve(
|
|
test_db,
|
|
session_id=session.id,
|
|
helpful=True,
|
|
resolution_notes="Issue fixed by proposal walk",
|
|
)
|
|
assert resolved.status == "resolved"
|
|
assert resolved.helpful is True
|
|
assert resolved.resolution_notes == "Issue fixed by proposal walk"
|
|
assert resolved.resolved_at is not None
|
|
|
|
# Proposal should now be validated
|
|
await test_db.refresh(proposal)
|
|
assert proposal.validated_by_outcome is True
|
|
|
|
# Internal ticket should be closed
|
|
await test_db.refresh(ticket)
|
|
assert ticket.status == "resolved"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_proposal_not_helpful_leaves_validated_by_outcome_false(test_db: AsyncSession):
|
|
"""resolve(helpful=False) on a proposal session does NOT flip validated_by_outcome."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
|
|
ai_session = await _make_ai_session(test_db, user_id=l1.id, account_id=account.id)
|
|
proposal = await _make_proposal(test_db, account_id=account.id, source_session_id=ai_session.id)
|
|
|
|
session = await start_proposal_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
flow_proposal_id=proposal.id,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
)
|
|
resolved = await resolve(
|
|
test_db,
|
|
session_id=session.id,
|
|
helpful=False,
|
|
resolution_notes="Proposal did not help",
|
|
)
|
|
assert resolved.helpful is False
|
|
await test_db.refresh(proposal)
|
|
assert proposal.validated_by_outcome is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_flow_session_closes_ticket_no_proposal_update(test_db: AsyncSession):
|
|
"""resolve on a flow session closes the ticket and does not touch proposals."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
|
|
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
|
|
|
|
session = await start_flow_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
flow_id=tree.id,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
)
|
|
resolved = await resolve(
|
|
test_db,
|
|
session_id=session.id,
|
|
helpful=True,
|
|
resolution_notes="Flow resolved the issue",
|
|
)
|
|
assert resolved.status == "resolved"
|
|
assert resolved.session_kind == "flow"
|
|
assert resolved.flow_proposal_id is None
|
|
|
|
await test_db.refresh(ticket)
|
|
assert ticket.status == "resolved"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_adhoc_session_closes_ticket(test_db: AsyncSession):
|
|
"""resolve on an adhoc session closes the ticket with no proposal interaction."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
|
|
|
|
session = await start_adhoc_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
)
|
|
resolved = await resolve(
|
|
test_db,
|
|
session_id=session.id,
|
|
helpful=True,
|
|
resolution_notes="Adhoc resolved",
|
|
)
|
|
assert resolved.status == "resolved"
|
|
await test_db.refresh(ticket)
|
|
assert ticket.status == "resolved"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_psa_session_no_ticket_update(test_db: AsyncSession):
|
|
"""resolve on a PSA-backed session does not attempt to update an internal ticket."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
|
|
|
|
session = await start_flow_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
flow_id=tree.id,
|
|
ticket_id="psa-external-id-123",
|
|
ticket_kind="psa",
|
|
)
|
|
resolved = await resolve(
|
|
test_db,
|
|
session_id=session.id,
|
|
helpful=True,
|
|
resolution_notes="PSA ticket resolved externally",
|
|
)
|
|
assert resolved.status == "resolved"
|
|
assert resolved.ticket_kind == "psa"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_raises_on_missing_session(test_db: AsyncSession):
|
|
"""resolve raises ValueError when session does not exist."""
|
|
with pytest.raises(ValueError, match="not found"):
|
|
await resolve(
|
|
test_db,
|
|
session_id=uuid.uuid4(),
|
|
helpful=True,
|
|
resolution_notes="N/A",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_raises_on_inactive_session(test_db: AsyncSession):
|
|
"""resolve raises ValueError when session is not active."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
|
|
|
|
session = await start_adhoc_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
)
|
|
session.status = "escalated"
|
|
await test_db.flush()
|
|
|
|
with pytest.raises(ValueError, match="not active"):
|
|
await resolve(
|
|
test_db,
|
|
session_id=session.id,
|
|
helpful=False,
|
|
resolution_notes="N/A",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalate_marks_session_and_ticket_as_escalated(test_db: AsyncSession):
|
|
"""escalate sets session status=escalated and closes the internal ticket as escalated."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
|
|
tree = await _make_tree(test_db, account_id=account.id, author_id=l1.id)
|
|
|
|
session = await start_flow_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
flow_id=tree.id,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
)
|
|
escalated = await escalate(
|
|
test_db,
|
|
session_id=session.id,
|
|
reason="Customer reported intermittent failure not covered by flow",
|
|
reason_category="out_of_scope",
|
|
)
|
|
assert escalated.status == "escalated"
|
|
assert escalated.escalation_reason == "Customer reported intermittent failure not covered by flow"
|
|
assert escalated.escalation_reason_category == "out_of_scope"
|
|
assert escalated.resolved_at is not None
|
|
assert escalated.last_step_at is not None
|
|
|
|
await test_db.refresh(ticket)
|
|
assert ticket.status == "escalated"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalate_raises_on_missing_session(test_db: AsyncSession):
|
|
"""escalate raises ValueError when session does not exist."""
|
|
with pytest.raises(ValueError, match="not found"):
|
|
await escalate(
|
|
test_db,
|
|
session_id=uuid.uuid4(),
|
|
reason="Some reason",
|
|
reason_category="unknown",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalate_raises_on_inactive_session(test_db: AsyncSession):
|
|
"""escalate raises ValueError when session is already inactive."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
|
|
|
|
session = await start_adhoc_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
)
|
|
session.status = "resolved"
|
|
await test_db.flush()
|
|
|
|
with pytest.raises(ValueError, match="not active"):
|
|
await escalate(
|
|
test_db,
|
|
session_id=session.id,
|
|
reason="Too late",
|
|
reason_category="other",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalate_without_walk_creates_escalated_adhoc_session(test_db: AsyncSession):
|
|
"""escalate_without_walk creates an immediately-escalated session with empty walked_path."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
|
|
|
|
session = await escalate_without_walk(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
reason_category="no_kb_content",
|
|
reason="No KB article matched this issue",
|
|
)
|
|
assert session.status == "escalated"
|
|
assert session.session_kind == "adhoc"
|
|
assert session.walked_path == []
|
|
assert session.escalation_reason == "No KB article matched this issue"
|
|
assert session.escalation_reason_category == "no_kb_content"
|
|
assert session.resolved_at is not None
|
|
assert session.last_step_at is not None
|
|
assert session.account_id == account.id
|
|
assert session.created_by_user_id == l1.id
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalate_without_walk_escalates_internal_ticket(test_db: AsyncSession):
|
|
"""escalate_without_walk marks the internal ticket as escalated."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
|
|
|
|
await escalate_without_walk(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
reason_category="no_kb_content",
|
|
)
|
|
await test_db.refresh(ticket)
|
|
assert ticket.status == "escalated"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalate_without_walk_psa_does_not_touch_internal_ticket(test_db: AsyncSession):
|
|
"""escalate_without_walk with ticket_kind='psa' does not update internal tickets."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
|
|
session = await escalate_without_walk(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id="psa-ticket-999",
|
|
ticket_kind="psa",
|
|
reason_category="no_kb_content",
|
|
)
|
|
assert session.status == "escalated"
|
|
assert session.ticket_kind == "psa"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalate_without_walk_reason_is_optional(test_db: AsyncSession):
|
|
"""escalate_without_walk works without a reason string."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
ticket = await _make_internal_ticket(test_db, account_id=account.id, user_id=l1.id)
|
|
|
|
session = await escalate_without_walk(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
reason_category="no_kb_content",
|
|
)
|
|
assert session.escalation_reason is None
|
|
assert session.escalation_reason_category == "no_kb_content"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# T7: start_ai_build_session
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_ai_build_session(test_db: AsyncSession):
|
|
from app.services import l1_session_service as svc
|
|
account = await _make_account(test_db)
|
|
l1_user = await _make_user(test_db, account_id=account.id)
|
|
s = await svc.start_ai_build_session(
|
|
test_db, account_id=account.id, user=l1_user,
|
|
ticket_id="t-ai", ticket_kind="internal",
|
|
)
|
|
assert s.session_kind == "ai_build"
|
|
assert s.flow_id is None and s.flow_proposal_id is None
|
|
assert s.status == "active"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# T14 audit log tests (spec §5.6.1)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_writes_audit_log_with_acting_as(test_db: AsyncSession):
|
|
"""resolve() writes an audit_logs row with acting_as='l1_coverage' for engineers."""
|
|
account = await _make_account(test_db)
|
|
eng = await _make_user(
|
|
test_db,
|
|
account_id=account.id,
|
|
account_role="engineer",
|
|
can_cover_l1=True,
|
|
)
|
|
ticket = await _make_internal_ticket(
|
|
test_db, account_id=account.id, user_id=eng.id
|
|
)
|
|
session = await start_adhoc_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=eng,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
)
|
|
await resolve(
|
|
test_db,
|
|
session_id=session.id,
|
|
helpful=True,
|
|
resolution_notes="Coverage engineer resolved",
|
|
)
|
|
|
|
result = await test_db.execute(
|
|
select(AuditLog).where(
|
|
AuditLog.action == "l1.session.resolve",
|
|
AuditLog.resource_id == session.id,
|
|
)
|
|
)
|
|
row = result.scalar_one()
|
|
assert row.acting_as == "l1_coverage"
|
|
assert row.user_id == eng.id
|
|
assert row.account_id == account.id
|
|
assert row.details["helpful"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolve_writes_audit_log_native_l1_acting_as_null(
|
|
test_db: AsyncSession,
|
|
):
|
|
"""resolve() writes an audit_logs row with acting_as=None for native l1_tech."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id, account_role="l1_tech")
|
|
ticket = await _make_internal_ticket(
|
|
test_db, account_id=account.id, user_id=l1.id
|
|
)
|
|
session = await start_adhoc_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
)
|
|
await resolve(
|
|
test_db,
|
|
session_id=session.id,
|
|
helpful=False,
|
|
resolution_notes="Native L1 resolved",
|
|
)
|
|
|
|
result = await test_db.execute(
|
|
select(AuditLog).where(
|
|
AuditLog.action == "l1.session.resolve",
|
|
AuditLog.resource_id == session.id,
|
|
)
|
|
)
|
|
row = result.scalar_one()
|
|
assert row.acting_as is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalate_writes_audit_log(test_db: AsyncSession):
|
|
"""escalate() writes an audit_logs row with action='l1.session.escalate'."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
ticket = await _make_internal_ticket(
|
|
test_db, account_id=account.id, user_id=l1.id
|
|
)
|
|
session = await start_adhoc_session(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
)
|
|
await escalate(
|
|
test_db,
|
|
session_id=session.id,
|
|
reason="Beyond scope",
|
|
reason_category="out_of_scope",
|
|
)
|
|
|
|
result = await test_db.execute(
|
|
select(AuditLog).where(
|
|
AuditLog.action == "l1.session.escalate",
|
|
AuditLog.resource_id == session.id,
|
|
)
|
|
)
|
|
row = result.scalar_one()
|
|
assert row.details["escalation_reason_category"] == "out_of_scope"
|
|
assert row.account_id == account.id
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_escalate_without_walk_writes_audit_log(test_db: AsyncSession):
|
|
"""escalate_without_walk() writes an audit_logs row."""
|
|
account = await _make_account(test_db)
|
|
l1 = await _make_user(test_db, account_id=account.id)
|
|
ticket = await _make_internal_ticket(
|
|
test_db, account_id=account.id, user_id=l1.id
|
|
)
|
|
session = await escalate_without_walk(
|
|
test_db,
|
|
account_id=account.id,
|
|
user=l1,
|
|
ticket_id=str(ticket.id),
|
|
ticket_kind="internal",
|
|
reason_category="no_kb_content",
|
|
)
|
|
|
|
result = await test_db.execute(
|
|
select(AuditLog).where(
|
|
AuditLog.action == "l1.session.escalate_no_walk",
|
|
AuditLog.resource_id == session.id,
|
|
)
|
|
)
|
|
row = result.scalar_one()
|
|
assert row.account_id == account.id
|
|
assert row.details["escalation_reason_category"] == "no_kb_content"
|