User model has 'name', not 'display_name'. Fixed in flowpilot_engine (escalate notify + pickup briefing) and psa_documentation_service (engineer name in exported docs). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
433 lines
16 KiB
Python
433 lines
16 KiB
Python
"""PSA Documentation Push Service.
|
|
|
|
Generates structured documentation from FlowPilot AI sessions and pushes
|
|
it back to ConnectWise as internal notes + optional time entries.
|
|
"""
|
|
import logging
|
|
import math
|
|
import uuid
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Optional, Any
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.ai_session import AISession
|
|
from app.models.psa_activity_log import PsaActivityLog
|
|
from app.models.psa_connection import PsaConnection
|
|
from app.models.psa_member_mapping import PsaMemberMapping
|
|
from app.models.psa_post_log import PsaPostLog
|
|
from app.services.psa.registry import get_provider_for_connection
|
|
from app.services.psa.types import NoteType
|
|
from app.services.redaction_service import apply_redaction_to_text
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default flowpilot_settings values
|
|
DEFAULT_SETTINGS = {
|
|
"auto_push": True,
|
|
"auto_time_entry": True,
|
|
"time_rounding": "15min", # "15min", "30min", "exact", "none"
|
|
"note_visibility": "internal", # "internal", "both"
|
|
"include_diagnostic_steps": True,
|
|
}
|
|
|
|
|
|
def _get_setting(connection: PsaConnection, key: str) -> Any:
|
|
"""Get a flowpilot setting with default fallback."""
|
|
settings = connection.flowpilot_settings or {}
|
|
return settings.get(key, DEFAULT_SETTINGS.get(key))
|
|
|
|
|
|
def _round_hours(hours: float, rounding: str) -> float:
|
|
"""Round hours according to the rounding setting."""
|
|
if rounding == "exact":
|
|
return round(hours, 2)
|
|
elif rounding == "30min":
|
|
return math.ceil(hours * 2) / 2
|
|
else: # default 15min
|
|
return math.ceil(hours * 4) / 4
|
|
|
|
|
|
def _format_datetime(dt: datetime | None) -> str:
|
|
"""Format a datetime for display in notes."""
|
|
if not dt:
|
|
return "N/A"
|
|
return dt.strftime("%Y-%m-%d %I:%M %p UTC")
|
|
|
|
|
|
def format_resolution_note(session: AISession, include_steps: bool = True) -> str:
|
|
"""Format a resolved session as a plain-text note for CW."""
|
|
lines = [
|
|
"═══ FlowPilot Session Documentation ═══",
|
|
f"Session: {session.id}",
|
|
]
|
|
|
|
# Engineer name from relationship if loaded, otherwise user_id
|
|
engineer_name = getattr(session, 'user', None)
|
|
if engineer_name and hasattr(engineer_name, 'name'):
|
|
lines.append(f"Engineer: {engineer_name.name}")
|
|
|
|
lines.extend([
|
|
f"Date: {_format_datetime(session.resolved_at)}",
|
|
f"Started: {_format_datetime(session.created_at)}",
|
|
f"Ended: {_format_datetime(session.resolved_at)}",
|
|
])
|
|
|
|
# Duration
|
|
if session.resolved_at and session.created_at:
|
|
delta = session.resolved_at - session.created_at
|
|
minutes = int(delta.total_seconds() / 60)
|
|
if minutes < 60:
|
|
lines.append(f"Duration: {minutes}m")
|
|
else:
|
|
lines.append(f"Duration: {minutes // 60}h {minutes % 60}m")
|
|
|
|
lines.append("")
|
|
lines.append("── Problem ──")
|
|
lines.append(session.problem_summary or "No summary available")
|
|
if session.problem_domain:
|
|
lines.append(f"Domain: {session.problem_domain}")
|
|
|
|
# Diagnostic steps
|
|
if include_steps and session.steps:
|
|
lines.append("")
|
|
lines.append("── Diagnosis Path ──")
|
|
for step in session.steps:
|
|
content = step.content or {}
|
|
step_type = content.get("type", step.step_type).capitalize()
|
|
description = content.get("text", "")
|
|
|
|
response_text = ""
|
|
if step.was_skipped:
|
|
response_text = "Skipped"
|
|
elif step.selected_option:
|
|
# Try to find the label
|
|
if step.options_presented:
|
|
for opt in step.options_presented:
|
|
if opt.get("value") == step.selected_option:
|
|
response_text = opt.get("label", step.selected_option)
|
|
break
|
|
else:
|
|
response_text = step.selected_option
|
|
else:
|
|
response_text = step.selected_option
|
|
elif step.free_text_input:
|
|
response_text = step.free_text_input
|
|
|
|
lines.append(f"{step.step_order + 1}. [{step_type}] {description}")
|
|
if response_text:
|
|
lines.append(f" → Response: {response_text}")
|
|
if step.action_result:
|
|
result = step.action_result
|
|
outcome = "Succeeded" if result.get("success") else "Did not resolve"
|
|
if details := result.get("details"):
|
|
outcome += f" — {details}"
|
|
lines.append(f" → Result: {outcome}")
|
|
|
|
# Resolution
|
|
lines.append("")
|
|
lines.append("── Resolution ──")
|
|
lines.append(session.resolution_summary or "No resolution summary")
|
|
if session.resolution_action:
|
|
lines.append(session.resolution_action)
|
|
|
|
# Confidence
|
|
lines.append("")
|
|
lines.append("── AI Confidence ──")
|
|
lines.append(f"Final confidence: {session.confidence_tier} ({session.confidence_score:.0%})")
|
|
|
|
# Timing section (always present)
|
|
lines.append("")
|
|
lines.append("── Session Timing ──")
|
|
lines.append(f"Start: {_format_datetime(session.created_at)}")
|
|
lines.append(f"End: {_format_datetime(session.resolved_at)}")
|
|
if session.resolved_at and session.created_at:
|
|
delta = session.resolved_at - session.created_at
|
|
minutes = int(delta.total_seconds() / 60)
|
|
lines.append(f"Total: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Total: {minutes}m")
|
|
|
|
lines.append("")
|
|
lines.append("Generated by ResolutionFlow FlowPilot")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_escalation_note(session: AISession, include_steps: bool = True) -> str:
|
|
"""Format an escalated session as a plain-text note for CW."""
|
|
lines = [
|
|
"═══ FlowPilot Escalation Documentation ═══",
|
|
f"Session: {session.id}",
|
|
]
|
|
|
|
engineer_name = getattr(session, 'user', None)
|
|
if engineer_name and hasattr(engineer_name, 'name'):
|
|
lines.append(f"Escalated by: {engineer_name.name}")
|
|
|
|
escalated_to = getattr(session, 'escalated_to', None)
|
|
if escalated_to and hasattr(escalated_to, 'name'):
|
|
lines.append(f"Escalated to: {escalated_to.name}")
|
|
else:
|
|
lines.append("Escalated to: Unassigned")
|
|
|
|
lines.extend([
|
|
f"Date: {_format_datetime(session.resolved_at or datetime.now(timezone.utc))}",
|
|
f"Started: {_format_datetime(session.created_at)}",
|
|
])
|
|
|
|
if session.resolved_at and session.created_at:
|
|
delta = session.resolved_at - session.created_at
|
|
minutes = int(delta.total_seconds() / 60)
|
|
lines.append(f"Duration: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Duration: {minutes}m")
|
|
|
|
lines.append("")
|
|
lines.append("── Problem ──")
|
|
lines.append(session.problem_summary or "No summary available")
|
|
|
|
# Work completed
|
|
if include_steps and session.steps:
|
|
lines.append("")
|
|
lines.append("── Work Completed ──")
|
|
for step in session.steps:
|
|
content = step.content or {}
|
|
description = content.get("text", "")
|
|
lines.append(f"{step.step_order + 1}. {description}")
|
|
|
|
# Escalation reason
|
|
lines.append("")
|
|
lines.append("── Escalation Reason ──")
|
|
lines.append(session.escalation_reason or "No reason provided")
|
|
|
|
# Escalation package details
|
|
pkg = session.escalation_package or {}
|
|
if hypotheses := pkg.get("remaining_hypotheses"):
|
|
lines.append("")
|
|
lines.append("── Remaining Hypotheses ──")
|
|
if isinstance(hypotheses, list):
|
|
for h in hypotheses:
|
|
lines.append(f"- {h}")
|
|
else:
|
|
lines.append(str(hypotheses))
|
|
|
|
if suggestions := pkg.get("suggested_next_steps"):
|
|
lines.append("")
|
|
lines.append("── Suggested Next Steps ──")
|
|
if isinstance(suggestions, list):
|
|
for s in suggestions:
|
|
lines.append(f"- {s}")
|
|
else:
|
|
lines.append(str(suggestions))
|
|
|
|
# Timing
|
|
lines.append("")
|
|
lines.append("── Session Timing ──")
|
|
lines.append(f"Start: {_format_datetime(session.created_at)}")
|
|
escalated_at = session.resolved_at or datetime.now(timezone.utc)
|
|
lines.append(f"Escalated: {_format_datetime(escalated_at)}")
|
|
if session.created_at:
|
|
delta = escalated_at - session.created_at
|
|
minutes = int(delta.total_seconds() / 60)
|
|
lines.append(f"Total: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Total: {minutes}m")
|
|
|
|
lines.append("")
|
|
lines.append("Generated by ResolutionFlow FlowPilot")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
async def push_documentation(
|
|
session: AISession,
|
|
user_id: UUID,
|
|
db: AsyncSession,
|
|
) -> dict[str, Any]:
|
|
"""Push session documentation to PSA ticket.
|
|
|
|
Returns:
|
|
{
|
|
"psa_push_status": "sent" | "pending_retry" | "failed" | "no_psa",
|
|
"psa_push_error": str | None,
|
|
"member_mapping_warning": str | None,
|
|
}
|
|
"""
|
|
if not session.psa_ticket_id or not session.psa_connection_id:
|
|
return {"psa_push_status": "no_psa", "psa_push_error": None, "member_mapping_warning": None}
|
|
|
|
# Load connection and check settings
|
|
result = await db.execute(
|
|
select(PsaConnection).where(PsaConnection.id == session.psa_connection_id)
|
|
)
|
|
connection = result.scalar_one_or_none()
|
|
if not connection:
|
|
return {"psa_push_status": "failed", "psa_push_error": "PSA connection not found", "member_mapping_warning": None}
|
|
|
|
if not _get_setting(connection, "auto_push"):
|
|
return {"psa_push_status": "no_psa", "psa_push_error": None, "member_mapping_warning": None}
|
|
|
|
# Format the note
|
|
include_steps = _get_setting(connection, "include_diagnostic_steps")
|
|
if session.status == "resolved":
|
|
note_text = format_resolution_note(session, include_steps=include_steps)
|
|
else:
|
|
note_text = format_escalation_note(session, include_steps=include_steps)
|
|
|
|
# Redact sensitive data
|
|
note_text, _ = apply_redaction_to_text(note_text)
|
|
|
|
# Determine note type
|
|
visibility = _get_setting(connection, "note_visibility")
|
|
note_type = NoteType.INTERNAL_ANALYSIS if visibility == "internal" else NoteType.DESCRIPTION
|
|
|
|
# Check member mapping for time entry
|
|
member_mapping_warning = None
|
|
member_mapping = None
|
|
if _get_setting(connection, "auto_time_entry") and _get_setting(connection, "time_rounding") != "none":
|
|
mapping_result = await db.execute(
|
|
select(PsaMemberMapping).where(
|
|
PsaMemberMapping.psa_connection_id == session.psa_connection_id,
|
|
PsaMemberMapping.user_id == user_id,
|
|
)
|
|
)
|
|
member_mapping = mapping_result.scalar_one_or_none()
|
|
if not member_mapping:
|
|
member_mapping_warning = "Map your CW account in Settings → Integrations to enable auto-logged time entries."
|
|
|
|
# Push to PSA
|
|
try:
|
|
provider = await get_provider_for_connection(session.psa_connection_id, db)
|
|
|
|
# Post the note
|
|
posted_note = await provider.post_note(
|
|
ticket_id=session.psa_ticket_id,
|
|
text=note_text,
|
|
note_type=note_type,
|
|
)
|
|
|
|
# Create time entry if member mapping exists
|
|
time_entry_hours: Optional[float] = None
|
|
if member_mapping and session.resolved_at and session.created_at:
|
|
try:
|
|
delta = session.resolved_at - session.created_at
|
|
hours = delta.total_seconds() / 3600
|
|
rounding = _get_setting(connection, "time_rounding")
|
|
rounded_hours = _round_hours(hours, rounding)
|
|
if rounded_hours > 0:
|
|
await provider.create_time_entry(
|
|
ticket_id=session.psa_ticket_id,
|
|
member_id=member_mapping.external_member_id,
|
|
hours=rounded_hours,
|
|
notes=f"FlowPilot session: {session.problem_summary or 'Troubleshooting'}",
|
|
)
|
|
time_entry_hours = rounded_hours
|
|
except Exception as e:
|
|
logger.warning("Failed to create time entry for session %s: %s", session.id, e)
|
|
# Don't fail the note push just because time entry failed
|
|
|
|
# Log PSA activity — note posted
|
|
try:
|
|
note_activity = PsaActivityLog(
|
|
account_id=session.account_id,
|
|
session_id=session.id,
|
|
activity_type="note_posted",
|
|
hours_logged=None,
|
|
psa_ticket_id=session.psa_ticket_id,
|
|
)
|
|
db.add(note_activity)
|
|
except Exception as e:
|
|
logger.warning("Failed to log PSA note activity for session %s: %s", session.id, e)
|
|
|
|
# Log time entry activity if one was created
|
|
if time_entry_hours is not None:
|
|
try:
|
|
time_activity = PsaActivityLog(
|
|
account_id=session.account_id,
|
|
session_id=session.id,
|
|
activity_type="time_entry_posted",
|
|
hours_logged=time_entry_hours,
|
|
psa_ticket_id=session.psa_ticket_id,
|
|
)
|
|
db.add(time_activity)
|
|
except Exception as e:
|
|
logger.warning("Failed to log PSA time entry activity for session %s: %s", session.id, e)
|
|
|
|
# Log success
|
|
log_entry = PsaPostLog(
|
|
id=uuid.uuid4(),
|
|
ai_session_id=session.id,
|
|
psa_connection_id=session.psa_connection_id,
|
|
ticket_id=session.psa_ticket_id,
|
|
note_type=note_type,
|
|
content_posted=note_text[:10000], # Truncate for storage
|
|
external_note_id=posted_note.id,
|
|
status="success",
|
|
posted_by=user_id,
|
|
)
|
|
db.add(log_entry)
|
|
|
|
return {
|
|
"psa_push_status": "sent",
|
|
"psa_push_error": None,
|
|
"member_mapping_warning": member_mapping_warning,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning("PSA push failed for session %s: %s", session.id, e)
|
|
|
|
# Log failure with retry scheduling
|
|
log_entry = PsaPostLog(
|
|
id=uuid.uuid4(),
|
|
ai_session_id=session.id,
|
|
psa_connection_id=session.psa_connection_id,
|
|
ticket_id=session.psa_ticket_id,
|
|
note_type=note_type,
|
|
content_posted=note_text[:10000],
|
|
status="pending_retry",
|
|
error_message=str(e)[:500],
|
|
retry_count=0,
|
|
next_retry_at=datetime.now(timezone.utc) + timedelta(minutes=5),
|
|
posted_by=user_id,
|
|
)
|
|
db.add(log_entry)
|
|
|
|
return {
|
|
"psa_push_status": "pending_retry",
|
|
"psa_push_error": str(e)[:200],
|
|
"member_mapping_warning": member_mapping_warning,
|
|
}
|
|
|
|
|
|
async def retry_failed_push(
|
|
log_entry: PsaPostLog,
|
|
db: AsyncSession,
|
|
) -> bool:
|
|
"""Retry a failed PSA push. Returns True on success."""
|
|
try:
|
|
provider = await get_provider_for_connection(log_entry.psa_connection_id, db)
|
|
posted_note = await provider.post_note(
|
|
ticket_id=log_entry.ticket_id,
|
|
text=log_entry.content_posted,
|
|
note_type=log_entry.note_type,
|
|
)
|
|
log_entry.status = "success"
|
|
log_entry.external_note_id = posted_note.id
|
|
log_entry.error_message = None
|
|
log_entry.next_retry_at = None
|
|
return True
|
|
except Exception as e:
|
|
log_entry.retry_count += 1
|
|
log_entry.error_message = str(e)[:500]
|
|
|
|
if log_entry.retry_count >= 3:
|
|
log_entry.status = "failed"
|
|
log_entry.next_retry_at = None
|
|
else:
|
|
# Exponential backoff: 5min, 15min, 45min
|
|
backoff_minutes = 5 * (3 ** log_entry.retry_count)
|
|
log_entry.next_retry_at = datetime.now(timezone.utc) + timedelta(minutes=backoff_minutes)
|
|
|
|
logger.warning(
|
|
"PSA retry %d failed for log %s: %s",
|
|
log_entry.retry_count, log_entry.id, e,
|
|
)
|
|
return False
|