Files
resolutionflow/backend/app/services/psa_documentation_service.py
chihlasm bbe590bfec feat(ai-session): add Phase 2 PSA integration, escalation handoff, and session management
Phase 2 of the FlowPilot-First Pivot connecting AI sessions to ConnectWise PSA:

Slice 1 — PSA Ticket Intake:
- FlowPilotEngine accepts psa_ticket intake with graceful CW API fallback
- Ticket picker on intake screen (refactored TicketPickerModal for dual-mode)
- Ticket context card in session sidebar

Slice 2 — Auto Documentation Push:
- PSA documentation service with resolution/escalation note formatting
- Time entry creation via new ConnectWise provider method
- Automatic retry scheduler (APScheduler, 5min interval, 3 retries)
- PSA push status indicators in frontend with manual retry button
- Member mapping warning when CW member not mapped

Slice 3 — Session Pause/Resume & Escalation Handoff:
- Pause/resume endpoints for same-engineer session bookmarking
- Escalation flow: requesting_escalation status, self-escalation blocked
- Enhanced escalation package with LLM-generated hypotheses/suggestions
- Pickup endpoint with continue/fresh resume modes and briefing step
- Escalation queue (sidebar nav + dedicated page)
- SessionBriefing component with continue/fresh choice UI
- EscalateModal with PSA-aware button text

Slice 4 — Mid-Session Ticket Linking:
- Link ticket retroactively with context injection into system prompt
- Link Ticket button in session sidebar

Slice 5 — FlowPilot PSA Settings:
- Settings tab on IntegrationsPage with 7 configurable options
- Stored as flowpilot_settings JSONB on PsaConnection

Database: 2 migrations (flowpilot_settings, psa_post_log changes, status constraint)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 01:30:05 +00:00

403 lines
15 KiB
Python

"""PSA Documentation Push Service.
Generates structured documentation from FlowPilot AI sessions and pushes
it back to ConnectWise as internal notes + optional time entries.
"""
import logging
import math
import uuid
from datetime import datetime, timezone, timedelta
from typing import Optional, Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.ai_session import AISession
from app.models.psa_connection import PsaConnection
from app.models.psa_member_mapping import PsaMemberMapping
from app.models.psa_post_log import PsaPostLog
from app.services.psa.registry import get_provider_for_connection
from app.services.psa.types import NoteType
from app.services.redaction_service import apply_redaction_to_text
logger = logging.getLogger(__name__)
# Default flowpilot_settings values
DEFAULT_SETTINGS = {
"auto_push": True,
"auto_time_entry": True,
"time_rounding": "15min", # "15min", "30min", "exact", "none"
"note_visibility": "internal", # "internal", "both"
"include_diagnostic_steps": True,
}
def _get_setting(connection: PsaConnection, key: str) -> Any:
"""Get a flowpilot setting with default fallback."""
settings = connection.flowpilot_settings or {}
return settings.get(key, DEFAULT_SETTINGS.get(key))
def _round_hours(hours: float, rounding: str) -> float:
"""Round hours according to the rounding setting."""
if rounding == "exact":
return round(hours, 2)
elif rounding == "30min":
return math.ceil(hours * 2) / 2
else: # default 15min
return math.ceil(hours * 4) / 4
def _format_datetime(dt: datetime | None) -> str:
"""Format a datetime for display in notes."""
if not dt:
return "N/A"
return dt.strftime("%Y-%m-%d %I:%M %p UTC")
def format_resolution_note(session: AISession, include_steps: bool = True) -> str:
"""Format a resolved session as a plain-text note for CW."""
lines = [
"═══ FlowPilot Session Documentation ═══",
f"Session: {session.id}",
]
# Engineer name from relationship if loaded, otherwise user_id
engineer_name = getattr(session, 'user', None)
if engineer_name and hasattr(engineer_name, 'display_name'):
lines.append(f"Engineer: {engineer_name.display_name}")
lines.extend([
f"Date: {_format_datetime(session.resolved_at)}",
f"Started: {_format_datetime(session.created_at)}",
f"Ended: {_format_datetime(session.resolved_at)}",
])
# Duration
if session.resolved_at and session.created_at:
delta = session.resolved_at - session.created_at
minutes = int(delta.total_seconds() / 60)
if minutes < 60:
lines.append(f"Duration: {minutes}m")
else:
lines.append(f"Duration: {minutes // 60}h {minutes % 60}m")
lines.append("")
lines.append("── Problem ──")
lines.append(session.problem_summary or "No summary available")
if session.problem_domain:
lines.append(f"Domain: {session.problem_domain}")
# Diagnostic steps
if include_steps and session.steps:
lines.append("")
lines.append("── Diagnosis Path ──")
for step in session.steps:
content = step.content or {}
step_type = content.get("type", step.step_type).capitalize()
description = content.get("text", "")
response_text = ""
if step.was_skipped:
response_text = "Skipped"
elif step.selected_option:
# Try to find the label
if step.options_presented:
for opt in step.options_presented:
if opt.get("value") == step.selected_option:
response_text = opt.get("label", step.selected_option)
break
else:
response_text = step.selected_option
else:
response_text = step.selected_option
elif step.free_text_input:
response_text = step.free_text_input
lines.append(f"{step.step_order + 1}. [{step_type}] {description}")
if response_text:
lines.append(f" → Response: {response_text}")
if step.action_result:
result = step.action_result
outcome = "Succeeded" if result.get("success") else "Did not resolve"
if details := result.get("details"):
outcome += f"{details}"
lines.append(f" → Result: {outcome}")
# Resolution
lines.append("")
lines.append("── Resolution ──")
lines.append(session.resolution_summary or "No resolution summary")
if session.resolution_action:
lines.append(session.resolution_action)
# Confidence
lines.append("")
lines.append("── AI Confidence ──")
lines.append(f"Final confidence: {session.confidence_tier} ({session.confidence_score:.0%})")
# Timing section (always present)
lines.append("")
lines.append("── Session Timing ──")
lines.append(f"Start: {_format_datetime(session.created_at)}")
lines.append(f"End: {_format_datetime(session.resolved_at)}")
if session.resolved_at and session.created_at:
delta = session.resolved_at - session.created_at
minutes = int(delta.total_seconds() / 60)
lines.append(f"Total: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Total: {minutes}m")
lines.append("")
lines.append("Generated by ResolutionFlow FlowPilot")
return "\n".join(lines)
def format_escalation_note(session: AISession, include_steps: bool = True) -> str:
"""Format an escalated session as a plain-text note for CW."""
lines = [
"═══ FlowPilot Escalation Documentation ═══",
f"Session: {session.id}",
]
engineer_name = getattr(session, 'user', None)
if engineer_name and hasattr(engineer_name, 'display_name'):
lines.append(f"Escalated by: {engineer_name.display_name}")
escalated_to = getattr(session, 'escalated_to', None)
if escalated_to and hasattr(escalated_to, 'display_name'):
lines.append(f"Escalated to: {escalated_to.display_name}")
else:
lines.append("Escalated to: Unassigned")
lines.extend([
f"Date: {_format_datetime(session.resolved_at or datetime.now(timezone.utc))}",
f"Started: {_format_datetime(session.created_at)}",
])
if session.resolved_at and session.created_at:
delta = session.resolved_at - session.created_at
minutes = int(delta.total_seconds() / 60)
lines.append(f"Duration: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Duration: {minutes}m")
lines.append("")
lines.append("── Problem ──")
lines.append(session.problem_summary or "No summary available")
# Work completed
if include_steps and session.steps:
lines.append("")
lines.append("── Work Completed ──")
for step in session.steps:
content = step.content or {}
description = content.get("text", "")
lines.append(f"{step.step_order + 1}. {description}")
# Escalation reason
lines.append("")
lines.append("── Escalation Reason ──")
lines.append(session.escalation_reason or "No reason provided")
# Escalation package details
pkg = session.escalation_package or {}
if hypotheses := pkg.get("remaining_hypotheses"):
lines.append("")
lines.append("── Remaining Hypotheses ──")
if isinstance(hypotheses, list):
for h in hypotheses:
lines.append(f"- {h}")
else:
lines.append(str(hypotheses))
if suggestions := pkg.get("suggested_next_steps"):
lines.append("")
lines.append("── Suggested Next Steps ──")
if isinstance(suggestions, list):
for s in suggestions:
lines.append(f"- {s}")
else:
lines.append(str(suggestions))
# Timing
lines.append("")
lines.append("── Session Timing ──")
lines.append(f"Start: {_format_datetime(session.created_at)}")
escalated_at = session.resolved_at or datetime.now(timezone.utc)
lines.append(f"Escalated: {_format_datetime(escalated_at)}")
if session.created_at:
delta = escalated_at - session.created_at
minutes = int(delta.total_seconds() / 60)
lines.append(f"Total: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Total: {minutes}m")
lines.append("")
lines.append("Generated by ResolutionFlow FlowPilot")
return "\n".join(lines)
async def push_documentation(
session: AISession,
user_id: UUID,
db: AsyncSession,
) -> dict[str, Any]:
"""Push session documentation to PSA ticket.
Returns:
{
"psa_push_status": "sent" | "pending_retry" | "failed" | "no_psa",
"psa_push_error": str | None,
"member_mapping_warning": str | None,
}
"""
if not session.psa_ticket_id or not session.psa_connection_id:
return {"psa_push_status": "no_psa", "psa_push_error": None, "member_mapping_warning": None}
# Load connection and check settings
result = await db.execute(
select(PsaConnection).where(PsaConnection.id == session.psa_connection_id)
)
connection = result.scalar_one_or_none()
if not connection:
return {"psa_push_status": "failed", "psa_push_error": "PSA connection not found", "member_mapping_warning": None}
if not _get_setting(connection, "auto_push"):
return {"psa_push_status": "no_psa", "psa_push_error": None, "member_mapping_warning": None}
# Format the note
include_steps = _get_setting(connection, "include_diagnostic_steps")
if session.status == "resolved":
note_text = format_resolution_note(session, include_steps=include_steps)
else:
note_text = format_escalation_note(session, include_steps=include_steps)
# Redact sensitive data
note_text, _ = apply_redaction_to_text(note_text)
# Determine note type
visibility = _get_setting(connection, "note_visibility")
note_type = NoteType.INTERNAL_ANALYSIS if visibility == "internal" else NoteType.DESCRIPTION
# Check member mapping for time entry
member_mapping_warning = None
member_mapping = None
if _get_setting(connection, "auto_time_entry") and _get_setting(connection, "time_rounding") != "none":
mapping_result = await db.execute(
select(PsaMemberMapping).where(
PsaMemberMapping.psa_connection_id == session.psa_connection_id,
PsaMemberMapping.user_id == user_id,
)
)
member_mapping = mapping_result.scalar_one_or_none()
if not member_mapping:
member_mapping_warning = "Map your CW account in Settings → Integrations to enable auto-logged time entries."
# Push to PSA
try:
provider = await get_provider_for_connection(session.psa_connection_id, db)
# Post the note
posted_note = await provider.post_note(
ticket_id=session.psa_ticket_id,
text=note_text,
note_type=note_type,
)
# Create time entry if member mapping exists
if member_mapping and session.resolved_at and session.created_at:
try:
delta = session.resolved_at - session.created_at
hours = delta.total_seconds() / 3600
rounding = _get_setting(connection, "time_rounding")
rounded_hours = _round_hours(hours, rounding)
if rounded_hours > 0:
await provider.create_time_entry(
ticket_id=session.psa_ticket_id,
member_id=member_mapping.external_member_id,
hours=rounded_hours,
notes=f"FlowPilot session: {session.problem_summary or 'Troubleshooting'}",
)
except Exception as e:
logger.warning("Failed to create time entry for session %s: %s", session.id, e)
# Don't fail the note push just because time entry failed
# Log success
log_entry = PsaPostLog(
id=uuid.uuid4(),
ai_session_id=session.id,
psa_connection_id=session.psa_connection_id,
ticket_id=session.psa_ticket_id,
note_type=note_type,
content_posted=note_text[:10000], # Truncate for storage
external_note_id=posted_note.id,
status="success",
posted_by=user_id,
)
db.add(log_entry)
return {
"psa_push_status": "sent",
"psa_push_error": None,
"member_mapping_warning": member_mapping_warning,
}
except Exception as e:
logger.warning("PSA push failed for session %s: %s", session.id, e)
# Log failure with retry scheduling
log_entry = PsaPostLog(
id=uuid.uuid4(),
ai_session_id=session.id,
psa_connection_id=session.psa_connection_id,
ticket_id=session.psa_ticket_id,
note_type=note_type,
content_posted=note_text[:10000],
status="pending_retry",
error_message=str(e)[:500],
retry_count=0,
next_retry_at=datetime.now(timezone.utc) + timedelta(minutes=5),
posted_by=user_id,
)
db.add(log_entry)
return {
"psa_push_status": "pending_retry",
"psa_push_error": str(e)[:200],
"member_mapping_warning": member_mapping_warning,
}
async def retry_failed_push(
log_entry: PsaPostLog,
db: AsyncSession,
) -> bool:
"""Retry a failed PSA push. Returns True on success."""
try:
provider = await get_provider_for_connection(log_entry.psa_connection_id, db)
posted_note = await provider.post_note(
ticket_id=log_entry.ticket_id,
text=log_entry.content_posted,
note_type=log_entry.note_type,
)
log_entry.status = "success"
log_entry.external_note_id = posted_note.id
log_entry.error_message = None
log_entry.next_retry_at = None
return True
except Exception as e:
log_entry.retry_count += 1
log_entry.error_message = str(e)[:500]
if log_entry.retry_count >= 3:
log_entry.status = "failed"
log_entry.next_retry_at = None
else:
# Exponential backoff: 5min, 15min, 45min
backoff_minutes = 5 * (3 ** log_entry.retry_count)
log_entry.next_retry_at = datetime.now(timezone.utc) + timedelta(minutes=backoff_minutes)
logger.warning(
"PSA retry %d failed for log %s: %s",
log_entry.retry_count, log_entry.id, e,
)
return False