Files
resolutionflow/backend/app/services/handoff_manager.py
chihlasm 758cd61621 fix: propagate account_id through all write paths missing NOT NULL coverage
Service layer (production code):
- branch_manager: set account_id on SessionBranch (root + fork) and ForkPoint
  from session.account_id; load session in create_fork for this purpose
- handoff_manager: set account_id on SessionHandoff from session.account_id
- ai_suggestions endpoint: set account_id on AISuggestion from current_user
- steps endpoint (/feedback): set account_id on StepRating from current_user
- ratings endpoint: set account_id on StepRating from current_user

Test infrastructure:
- conftest.py: seed PLATFORM_ACCOUNT_ID (00000000-...-0001) account after
  Base.metadata.create_all so global categories and gallery items have a valid FK
- test_rls_isolation: add _ensure_rls_schema fixture that runs
  'alembic upgrade head' before module tests — previous function-scoped
  test_db fixtures drop the schema, leaving the RLS tests with no tables
- test_branding: create Account before User in helper functions
- test_admin_gallery: set account_id=PLATFORM_ACCOUNT_ID on Tree/ScriptTemplate
- test_public_templates: set account_id=PLATFORM_ACCOUNT_ID on Tree,
  ScriptTemplate, TreeCategory
- test_resolution_outputs: set account_id=session.account_id on
  SessionResolutionOutput
- test_analytics_phase5: set account_id on PsaPostLog
- test_draft_trees: replace account_id=None with PLATFORM_ACCOUNT_ID in
  migration default test (NOT NULL now enforced)
- test_maintenance_schedules: set account_id on other_tree
- test_save_session_as_tree: set account_id on all 5 Session() constructors

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 04:24:36 +00:00

291 lines
10 KiB
Python

"""Handoff management — unified park/escalate with dual-write backward compat.
Creates handoff snapshots, AI assessments (for escalations), claim workflow,
and queue queries. Dual-writes to ai_sessions.escalation_package for
backward compatibility with the existing escalation queue.
"""
import logging
from datetime import datetime, timezone
from typing import Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.ai_session import AISession
from app.models.session_branch import SessionBranch
from app.models.session_handoff import SessionHandoff
logger = logging.getLogger(__name__)
class HandoffManager:
"""Unified park/escalate handoff management."""
def __init__(self, db: AsyncSession):
self.db = db
async def create_handoff(
self,
session_id: UUID,
intent: str,
engineer_notes: str | None,
user_id: UUID,
priority: str = "normal",
) -> SessionHandoff:
"""Create a handoff (park or escalate).
Generates snapshot, updates session status, dual-writes to
escalation_package for backward compat.
"""
result = await self.db.execute(
select(AISession).where(AISession.id == session_id)
)
session = result.scalar_one_or_none()
if not session:
raise ValueError(f"Session {session_id} not found")
# Generate snapshot
snapshot = await self._generate_snapshot(session)
# Generate AI assessment for escalations
ai_assessment = None
ai_assessment_data = None
if intent == "escalate":
ai_assessment, ai_assessment_data = await self._generate_ai_assessment(session)
handoff = SessionHandoff(
session_id=session_id,
account_id=session.account_id,
handed_off_by=user_id,
intent=intent,
source_branch_id=session.active_branch_id,
snapshot=snapshot,
ai_assessment=ai_assessment,
ai_assessment_data=ai_assessment_data,
engineer_notes=engineer_notes,
priority=priority,
)
self.db.add(handoff)
# Update session status
if intent == "park":
session.status = "paused"
elif intent == "escalate":
session.status = "escalated"
session.handoff_count = (session.handoff_count or 0) + 1
# Dual-write for backward compat
session.escalation_package = {
"snapshot": snapshot,
"intent": intent,
"engineer_notes": engineer_notes,
"handoff_id": str(handoff.id),
}
await self.db.flush()
return handoff
async def _generate_snapshot(self, session: AISession) -> dict[str, Any]:
"""Generate a snapshot of the session state at handoff time."""
snapshot: dict[str, Any] = {
"problem_summary": session.problem_summary,
"problem_domain": session.problem_domain,
"status": session.status,
"step_count": session.step_count,
"confidence_tier": session.confidence_tier,
}
# Add branch map if branching is active
if session.is_branching:
branches_result = await self.db.execute(
select(SessionBranch)
.where(SessionBranch.session_id == session.id)
.order_by(SessionBranch.branch_order)
)
branches = list(branches_result.scalars().all())
branch_map = []
for b in branches:
branch_map.append({
"id": str(b.id),
"label": b.label,
"status": b.status,
"status_reason": b.status_reason,
"parent_branch_id": str(b.parent_branch_id) if b.parent_branch_id else None,
})
snapshot["branch_map"] = branch_map
snapshot["active_branch_id"] = str(session.active_branch_id) if session.active_branch_id else None
return snapshot
async def claim_session(
self,
handoff_id: UUID,
claiming_user_id: UUID,
) -> SessionHandoff:
"""Claim a handed-off session."""
result = await self.db.execute(
select(SessionHandoff).where(SessionHandoff.id == handoff_id)
)
handoff = result.scalar_one_or_none()
if not handoff:
raise ValueError(f"Handoff {handoff_id} not found")
handoff.claimed_by = claiming_user_id
handoff.claimed_at = datetime.now(timezone.utc)
# Reactivate session
session_result = await self.db.execute(
select(AISession).where(AISession.id == handoff.session_id)
)
session = session_result.scalar_one()
session.status = "active"
# Dual-write
session.escalated_to_id = claiming_user_id
await self.db.flush()
return handoff
async def _generate_ai_assessment(
self, session: AISession
) -> tuple[str | None, dict[str, Any] | None]:
"""Generate AI diagnostic assessment for escalation handoffs."""
try:
from app.services.assistant_chat_service import _call_ai
context = f"Problem: {session.problem_summary or 'Unknown'}\nDomain: {session.problem_domain or 'Unknown'}"
msgs = session.conversation_messages or []
# Include last 10 messages for context
recent = "\n".join(
f"[{m.get('role', '?')}]: {m.get('content', '')[:200]}"
for m in msgs[-10:]
)
assessment_text, _, _ = await _call_ai(
system_base="You are a diagnostic assessment generator for MSP escalations.",
rag_context="",
history=[],
new_message=(
f"Generate a brief diagnostic assessment for this escalation.\n"
f"{context}\n\nRecent conversation:\n{recent}\n\n"
f"Return: 1) Most likely cause, 2) Suggested next steps, 3) Confidence (low/medium/high)"
),
max_tokens=500,
)
assessment_data = {
"likely_cause": "See assessment text",
"suggested_steps": [],
"confidence": "medium",
}
return assessment_text, assessment_data
except Exception:
logger.exception("Failed to generate AI assessment")
return None, None
async def generate_briefing(
self, handoff_id: UUID, claiming_user_id: UUID
) -> str:
"""Generate a natural-language briefing for the engineer claiming the session."""
result = await self.db.execute(
select(SessionHandoff).where(SessionHandoff.id == handoff_id)
)
handoff = result.scalar_one_or_none()
if not handoff:
raise ValueError(f"Handoff {handoff_id} not found")
session_result = await self.db.execute(
select(AISession).where(AISession.id == handoff.session_id)
)
session = session_result.scalar_one()
from app.services.assistant_chat_service import _call_ai
snapshot_text = str(handoff.snapshot)[:2000]
briefing, _, _ = await _call_ai(
system_base="You are a handoff briefing generator for MSP teams.",
rag_context="",
history=[],
new_message=(
f"Generate a concise briefing for an engineer picking up this session.\n"
f"Problem: {session.problem_summary}\n"
f"Intent: {handoff.intent}\n"
f"Engineer notes: {handoff.engineer_notes or 'None'}\n"
f"Snapshot: {snapshot_text}\n"
f"AI Assessment: {handoff.ai_assessment or 'None'}"
),
max_tokens=500,
)
return briefing
async def push_to_psa(self, handoff_id: UUID) -> SessionHandoff:
"""Push handoff notes to PSA via existing psa_documentation_service."""
result = await self.db.execute(
select(SessionHandoff).where(SessionHandoff.id == handoff_id)
)
handoff = result.scalar_one_or_none()
if not handoff:
raise ValueError(f"Handoff {handoff_id} not found")
try:
from app.services.psa_documentation_service import push_session_notes
session_result = await self.db.execute(
select(AISession).where(AISession.id == handoff.session_id)
)
session = session_result.scalar_one()
if session.psa_ticket_id and session.psa_connection_id:
note_id = await push_session_notes(
session=session,
notes_content=handoff.ai_assessment or str(handoff.snapshot),
db=self.db,
)
handoff.psa_note_pushed = True
handoff.psa_note_id = note_id
except Exception:
logger.exception(f"Failed to push handoff {handoff_id} to PSA")
await self.db.flush()
return handoff
async def get_queue(
self,
team_id: UUID | None = None,
account_id: UUID | None = None,
) -> list[dict[str, Any]]:
"""Get team queue of parked + escalated sessions."""
query = (
select(SessionHandoff, AISession)
.join(AISession, SessionHandoff.session_id == AISession.id)
.where(SessionHandoff.claimed_by.is_(None))
.order_by(SessionHandoff.created_at.desc())
)
if team_id:
query = query.where(AISession.team_id == team_id)
elif account_id:
query = query.where(AISession.account_id == account_id)
result = await self.db.execute(query)
rows = result.all()
queue_items = []
for handoff, session in rows:
queue_items.append({
"handoff_id": handoff.id,
"session_id": session.id,
"intent": handoff.intent,
"problem_summary": session.problem_summary,
"problem_domain": session.problem_domain,
"priority": handoff.priority,
"engineer_notes": handoff.engineer_notes,
"created_at": handoff.created_at,
"claimed_by": handoff.claimed_by,
"claimed_at": handoff.claimed_at,
})
return queue_items