fix(l1): write audit_logs rows at resolve/escalate with acting_as

Per spec §5.6.1, audit rows are written at session terminal events
(resolve, escalate, escalate_without_walk). log_audit gains an optional
acting_as parameter that propagates the session's acting_as tag
('l1_coverage' for engineer coverers, null for native L1 users).
Final code review flagged this as Important — column existed but was
never populated. Four new integration tests cover all three paths.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-28 16:19:26 -04:00
parent 10b5d4e9b0
commit 7882b4723b
3 changed files with 198 additions and 3 deletions

View File

@@ -13,13 +13,20 @@ async def log_audit(
resource_id: Optional[UUID] = None,
details: Optional[dict] = None,
account_id: Optional[UUID] = None,
acting_as: Optional[str] = None,
) -> None:
"""Record an audit log entry. Does not commit — piggybacks on the caller's commit."""
"""Record an audit log entry. Does not commit — caller's commit picks it up.
acting_as: optional tag from the session (e.g. 'l1_coverage' for engineers
on the L1 surface, None for native l1_tech users).
"""
if account_id is None:
# Derive from the acting user's account as a fallback (one extra query).
from sqlalchemy import select
from app.models.user import User
result = await db.execute(select(User.account_id).where(User.id == user_id))
result = await db.execute(
select(User.account_id).where(User.id == user_id)
)
account_id = result.scalar_one()
entry = AuditLog(
@@ -29,5 +36,6 @@ async def log_audit(
resource_type=resource_type,
resource_id=resource_id,
details=details,
acting_as=acting_as,
)
db.add(entry)

View File

@@ -9,6 +9,7 @@ from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.audit import log_audit
from app.models.flow_proposal import FlowProposal
from app.models.l1_walk_session import L1WalkSession
from app.models.user import User
@@ -194,6 +195,21 @@ async def resolve(
)
# PSA close deferred to Phase 2 — no-op for now
await log_audit(
db,
user_id=session.created_by_user_id,
action="l1.session.resolve",
resource_type="l1_walk_session",
resource_id=session.id,
details={
"session_kind": session.session_kind,
"helpful": helpful,
"ticket_id": session.ticket_id,
"ticket_kind": session.ticket_kind,
},
account_id=session.account_id,
acting_as=session.acting_as,
)
await db.flush()
return session
@@ -231,6 +247,21 @@ async def escalate(
)
# PSA reassign deferred to Phase 2
await log_audit(
db,
user_id=session.created_by_user_id,
action="l1.session.escalate",
resource_type="l1_walk_session",
resource_id=session.id,
details={
"session_kind": session.session_kind,
"escalation_reason_category": reason_category,
"ticket_id": session.ticket_id,
"ticket_kind": session.ticket_kind,
},
account_id=session.account_id,
acting_as=session.acting_as,
)
await db.flush()
return session
@@ -272,5 +303,19 @@ async def escalate_without_walk(
ticket_id=UUID(ticket_id),
status="escalated",
)
await db.flush()
await db.flush() # flush first so session.id is populated
await log_audit(
db,
user_id=session.created_by_user_id,
action="l1.session.escalate_no_walk",
resource_type="l1_walk_session",
resource_id=session.id,
details={
"escalation_reason_category": reason_category,
"ticket_id": ticket_id,
"ticket_kind": ticket_kind,
},
account_id=session.account_id,
acting_as=session.acting_as,
)
return session

View File

@@ -3,7 +3,10 @@ import uuid
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.account import Account
from app.models.audit_log import AuditLog
from app.models.user import User
from app.models.tree import Tree
from app.models.ai_session import AISession
@@ -773,3 +776,142 @@ async def test_escalate_without_walk_reason_is_optional(test_db: AsyncSession):
)
assert session.escalation_reason is None
assert session.escalation_reason_category == "no_kb_content"
# ---------------------------------------------------------------------------
# T14 audit log tests (spec §5.6.1)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_resolve_writes_audit_log_with_acting_as(test_db: AsyncSession):
"""resolve() writes an audit_logs row with acting_as='l1_coverage' for engineers."""
account = await _make_account(test_db)
eng = await _make_user(
test_db,
account_id=account.id,
account_role="engineer",
can_cover_l1=True,
)
ticket = await _make_internal_ticket(
test_db, account_id=account.id, user_id=eng.id
)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=eng,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
await resolve(
test_db,
session_id=session.id,
helpful=True,
resolution_notes="Coverage engineer resolved",
)
result = await test_db.execute(
select(AuditLog).where(
AuditLog.action == "l1.session.resolve",
AuditLog.resource_id == session.id,
)
)
row = result.scalar_one()
assert row.acting_as == "l1_coverage"
assert row.user_id == eng.id
assert row.account_id == account.id
assert row.details["helpful"] is True
@pytest.mark.asyncio
async def test_resolve_writes_audit_log_native_l1_acting_as_null(
test_db: AsyncSession,
):
"""resolve() writes an audit_logs row with acting_as=None for native l1_tech."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id, account_role="l1_tech")
ticket = await _make_internal_ticket(
test_db, account_id=account.id, user_id=l1.id
)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
await resolve(
test_db,
session_id=session.id,
helpful=False,
resolution_notes="Native L1 resolved",
)
result = await test_db.execute(
select(AuditLog).where(
AuditLog.action == "l1.session.resolve",
AuditLog.resource_id == session.id,
)
)
row = result.scalar_one()
assert row.acting_as is None
@pytest.mark.asyncio
async def test_escalate_writes_audit_log(test_db: AsyncSession):
"""escalate() writes an audit_logs row with action='l1.session.escalate'."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(
test_db, account_id=account.id, user_id=l1.id
)
session = await start_adhoc_session(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
)
await escalate(
test_db,
session_id=session.id,
reason="Beyond scope",
reason_category="out_of_scope",
)
result = await test_db.execute(
select(AuditLog).where(
AuditLog.action == "l1.session.escalate",
AuditLog.resource_id == session.id,
)
)
row = result.scalar_one()
assert row.details["escalation_reason_category"] == "out_of_scope"
assert row.account_id == account.id
@pytest.mark.asyncio
async def test_escalate_without_walk_writes_audit_log(test_db: AsyncSession):
"""escalate_without_walk() writes an audit_logs row."""
account = await _make_account(test_db)
l1 = await _make_user(test_db, account_id=account.id)
ticket = await _make_internal_ticket(
test_db, account_id=account.id, user_id=l1.id
)
session = await escalate_without_walk(
test_db,
account_id=account.id,
user=l1,
ticket_id=str(ticket.id),
ticket_kind="internal",
reason_category="no_kb_content",
)
result = await test_db.execute(
select(AuditLog).where(
AuditLog.action == "l1.session.escalate_no_walk",
AuditLog.resource_id == session.id,
)
)
row = result.scalar_one()
assert row.account_id == account.id
assert row.details["escalation_reason_category"] == "no_kb_content"