"""L1 session lifecycle: start (flow/proposal/adhoc), step, notes, resolve, escalate. start_* functions live in T12; step/notes are T13; resolve/escalate are T14. """ import json from datetime import datetime, timezone from typing import Optional from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession from app.models.flow_proposal import FlowProposal from app.models.l1_walk_session import L1WalkSession from app.models.user import User from app.services import internal_ticket_service def _resolve_acting_as(user: User) -> Optional[str]: """An engineer (whether covering or not) gets tagged for audit when using L1 surface. Returns 'l1_coverage' for engineers (only engineers WITH the coverage flag should reach this code path — the require_l1_or_coverage dep gates that). For native l1_tech users, returns None (no special tag — they ARE l1). """ if user.account_role == "engineer": return "l1_coverage" return None async def start_flow_session( db: AsyncSession, *, account_id: UUID, user: User, flow_id: UUID, ticket_id: str, ticket_kind: str, # 'psa' | 'internal' ) -> L1WalkSession: """Start a session walking an authored flow.""" session = L1WalkSession( account_id=account_id, created_by_user_id=user.id, acting_as=_resolve_acting_as(user), ticket_id=ticket_id, ticket_kind=ticket_kind, session_kind="flow", flow_id=flow_id, ) db.add(session) await db.flush() return session async def start_proposal_session( db: AsyncSession, *, account_id: UUID, user: User, flow_proposal_id: UUID, ticket_id: str, ticket_kind: str, ) -> L1WalkSession: """Start a session walking an AI-built FlowProposal.""" session = L1WalkSession( account_id=account_id, created_by_user_id=user.id, acting_as=_resolve_acting_as(user), ticket_id=ticket_id, ticket_kind=ticket_kind, session_kind="proposal", flow_proposal_id=flow_proposal_id, ) db.add(session) await db.flush() return session async def start_adhoc_session( db: AsyncSession, *, account_id: UUID, user: User, ticket_id: str, ticket_kind: str, ) -> L1WalkSession: """Start an ad-hoc session with no tree (free-form note-taking only).""" session = L1WalkSession( account_id=account_id, created_by_user_id=user.id, acting_as=_resolve_acting_as(user), ticket_id=ticket_id, ticket_kind=ticket_kind, session_kind="adhoc", ) db.add(session) await db.flush() return session async def record_step( db: AsyncSession, *, session_id: UUID, node_id: str, question: str, answer: str, note: Optional[str] = None, ) -> L1WalkSession: """Record an answered step in a tree walk. Appends to walked_path JSONB and advances current_node_id. Raises ValueError on adhoc sessions or inactive sessions. Updates last_step_at.""" session = await db.get(L1WalkSession, session_id) if not session: raise ValueError(f"L1WalkSession {session_id} not found") if session.session_kind == "adhoc": raise ValueError("Cannot record step on adhoc session — use update_notes") if session.status != "active": raise ValueError(f"Session {session_id} is not active (status={session.status})") entry = { "node_id": node_id, "question": question, "answer": answer, "l1_note": note, } # JSONB requires assigning a new list — in-place mutation isn't tracked session.walked_path = [*session.walked_path, entry] session.current_node_id = node_id session.last_step_at = datetime.now(timezone.utc) await db.flush() return session async def update_notes( db: AsyncSession, *, session_id: UUID, notes: list[dict], ) -> L1WalkSession: """Replace walk_notes on an active session. Used by adhoc walks for debounced autosave. Raises ValueError if missing or inactive. Caps notes payload at 256KB to prevent unbounded growth.""" session = await db.get(L1WalkSession, session_id) if not session: raise ValueError(f"L1WalkSession {session_id} not found") if session.status != "active": raise ValueError(f"Session {session_id} is not active (status={session.status})") encoded_size = len(json.dumps(notes).encode("utf-8")) if encoded_size > 256 * 1024: raise ValueError("walk_notes exceeds 256KB cap — consider escalating") session.walk_notes = notes session.last_step_at = datetime.now(timezone.utc) await db.flush() return session async def resolve( db: AsyncSession, *, session_id: UUID, helpful: bool, resolution_notes: str, ) -> L1WalkSession: """Close a session as resolved. - Sets status='resolved', helpful, resolution_notes, resolved_at. - On helpful=True AND session_kind='proposal': flips flow_proposal.validated_by_outcome=True (one-bit aggregate signal). - Closes the linked internal ticket (PSA close stubbed for Phase 2). - Raises ValueError on missing or non-active session. """ session = await db.get(L1WalkSession, session_id) if not session: raise ValueError(f"L1WalkSession {session_id} not found") if session.status != "active": raise ValueError(f"Session not active (status={session.status})") now = datetime.now(timezone.utc) session.status = "resolved" session.helpful = helpful session.resolution_notes = resolution_notes session.resolved_at = now session.last_step_at = now if helpful and session.session_kind == "proposal" and session.flow_proposal_id: proposal = await db.get(FlowProposal, session.flow_proposal_id) if proposal: proposal.validated_by_outcome = True if session.ticket_kind == "internal": await internal_ticket_service.update_status( db, ticket_id=UUID(session.ticket_id), status="resolved", resolution_notes=resolution_notes, ) # PSA close deferred to Phase 2 — no-op for now await db.flush() return session async def escalate( db: AsyncSession, *, session_id: UUID, reason: str, reason_category: str, ) -> L1WalkSession: """Escalate an active session to engineering. - Sets status='escalated', escalation_reason, escalation_reason_category, resolved_at. - Marks the linked internal ticket as escalated (PSA reassign deferred to Phase 2). - Raises ValueError on missing or non-active session. """ session = await db.get(L1WalkSession, session_id) if not session: raise ValueError(f"L1WalkSession {session_id} not found") if session.status != "active": raise ValueError(f"Session not active (status={session.status})") now = datetime.now(timezone.utc) session.status = "escalated" session.escalation_reason = reason session.escalation_reason_category = reason_category session.resolved_at = now session.last_step_at = now if session.ticket_kind == "internal": await internal_ticket_service.update_status( db, ticket_id=UUID(session.ticket_id), status="escalated", ) # PSA reassign deferred to Phase 2 await db.flush() return session async def escalate_without_walk( db: AsyncSession, *, account_id: UUID, user: User, ticket_id: str, ticket_kind: str, reason_category: str, reason: Optional[str] = None, ) -> L1WalkSession: """Create an immediately-escalated session with no walked_path. Used from the BuildAbortedNoKB screen (no KB content available to walk a tree). Captures the call as an audit record + escalates the ticket without requiring a walker session in between. """ now = datetime.now(timezone.utc) session = L1WalkSession( account_id=account_id, created_by_user_id=user.id, acting_as=_resolve_acting_as(user), ticket_id=ticket_id, ticket_kind=ticket_kind, session_kind="adhoc", status="escalated", escalation_reason=reason, escalation_reason_category=reason_category, resolved_at=now, last_step_at=now, ) db.add(session) if ticket_kind == "internal": await internal_ticket_service.update_status( db, ticket_id=UUID(ticket_id), status="escalated", ) await db.flush() return session