feat: overhaul session documentation, PSA notes, and client communications

- Reformat PSA resolution/escalation notes: clean single-line header,
  steps with engineer responses inline, remove duplicate timing blocks,
  remove AI confidence section, add follow-up recommendations
- Standardize time display to decimal hours (e.g. 0.25 hrs) across all
  note formatters and status update context
- Add follow_up_recommendations to SessionDocumentation schema and
  surface in SessionDocView; extracted from resolution suggestion steps
- Add _build_what_we_know() helper: uses session.evidence_items when
  cockpit branch merges, falls back to deriving findings from steps
- Fix option label lookup in generate_status_update (was passing raw
  machine values to AI instead of human-readable labels)
- Add 'What We Know' section to status update ticket notes prompt
- Improve _build_session_context in resolution_output_generator to
  include intake text and full step details instead of truncated chat
- Add request_info audience type: client-facing information request
  that skips the length step and generates a numbered question list
- Improve client_update and email_draft prompts with per-context
  guidance (status/resolution/escalation) and fix escalation subject
  line from 'Specialist Review' to 'Specialist Assistance'

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-04-05 15:18:31 +00:00
parent 22baad7992
commit f4143e52a1
7 changed files with 352 additions and 172 deletions

View File

@@ -57,181 +57,199 @@ def _format_datetime(dt: datetime | None) -> str:
return dt.strftime("%Y-%m-%d %I:%M %p UTC")
def _get_engineer_response(step) -> str | None:
"""Extract the engineer's response label from a step."""
if step.was_skipped:
return "Skipped"
if step.selected_option and step.options_presented:
for opt in step.options_presented:
if opt.get("value") == step.selected_option:
return opt.get("label", step.selected_option)
return step.selected_option
if step.selected_option:
return step.selected_option
if step.free_text_input:
return step.free_text_input
return None
def format_resolution_note(session: AISession, include_steps: bool = True) -> str:
"""Format a resolved session as a plain-text note for CW."""
lines = [
"═══ FlowPilot Session Documentation ═══",
f"Session: {session.id}",
]
# Engineer name from relationship if loaded, otherwise user_id
engineer_name = getattr(session, 'user', None)
if engineer_name and hasattr(engineer_name, 'name'):
lines.append(f"Engineer: {engineer_name.name}")
engineer_display = engineer_name.name if engineer_name and hasattr(engineer_name, 'name') else "Unknown"
lines.extend([
f"Date: {_format_datetime(session.resolved_at)}",
f"Started: {_format_datetime(session.created_at)}",
f"Ended: {_format_datetime(session.resolved_at)}",
])
# Duration
duration_str = ""
if session.resolved_at and session.created_at:
delta = session.resolved_at - session.created_at
minutes = int(delta.total_seconds() / 60)
if minutes < 60:
lines.append(f"Duration: {minutes}m")
else:
lines.append(f"Duration: {minutes // 60}h {minutes % 60}m")
total_hrs = round(delta.total_seconds() / 3600, 2)
duration_str = f"{total_hrs} hrs"
lines.append("")
lines.append("── Problem ──")
lines.append(session.problem_summary or "No summary available")
if session.problem_domain:
lines.append(f"Domain: {session.problem_domain}")
lines = [
f"FlowPilot Session — {engineer_display}{duration_str}",
f"Problem: {session.problem_summary or 'No summary available'}",
]
# Diagnostic steps
if include_steps and session.steps:
lines.append("")
lines.append("── Diagnosis Path ──")
lines.append("Steps:")
for step in session.steps:
content = step.content or {}
step_type = content.get("type", step.step_type).capitalize()
description = content.get("text", "")
response_text = ""
if step.was_skipped:
response_text = "Skipped"
elif step.selected_option:
# Try to find the label
if step.options_presented:
for opt in step.options_presented:
if opt.get("value") == step.selected_option:
response_text = opt.get("label", step.selected_option)
break
else:
response_text = step.selected_option
else:
response_text = step.selected_option
elif step.free_text_input:
response_text = step.free_text_input
lines.append(f"{step.step_order + 1}. [{step_type}] {description}")
if response_text:
lines.append(f" → Response: {response_text}")
if step.action_result:
result = step.action_result
outcome = "Succeeded" if result.get("success") else "Did not resolve"
if details := result.get("details"):
outcome += f"{details}"
lines.append(f" → Result: {outcome}")
step_type = content.get("type", "")
if step_type == "resolution_suggestion":
continue # Not a diagnostic step
description = content.get("text", "").strip()
if not description:
continue
response = _get_engineer_response(step)
line = f"{step.step_order + 1}. {description}"
if response and response != "Skipped":
line += f"{response}"
elif response == "Skipped":
line += " (skipped)"
lines.append(line)
# Resolution
lines.append("")
lines.append("── Resolution ──")
lines.append(session.resolution_summary or "No resolution summary")
lines.append(f"Resolution: {session.resolution_summary or 'No resolution summary'}")
if session.resolution_action:
lines.append(session.resolution_action)
# Confidence
lines.append("")
lines.append("── AI Confidence ──")
lines.append(f"Final confidence: {session.confidence_tier} ({session.confidence_score:.0%})")
# Follow-up recommendations from resolution suggestion step
follow_ups: list[str] = []
for step in session.steps:
content = step.content or {}
if content.get("type") == "resolution_suggestion":
recs = content.get("follow_up_recommendations", [])
if isinstance(recs, list):
follow_ups.extend(recs)
if follow_ups:
lines.append("")
lines.append("Follow-up:")
for rec in follow_ups:
lines.append(f"- {rec}")
# Timing section (always present)
# Timing
lines.append("")
lines.append("── Session Timing ──")
lines.append(f"Start: {_format_datetime(session.created_at)}")
lines.append(f"End: {_format_datetime(session.resolved_at)}")
if session.resolved_at and session.created_at:
delta = session.resolved_at - session.created_at
minutes = int(delta.total_seconds() / 60)
lines.append(f"Total: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Total: {minutes}m")
total_hrs = round(delta.total_seconds() / 3600, 2)
lines.append(f"Total: {total_hrs} hrs")
lines.append("")
lines.append("Generated by ResolutionFlow FlowPilot")
lines.append("Generated by ResolutionFlow")
return "\n".join(lines)
def _derive_what_we_know(session: AISession) -> tuple[list[str], list[str], list[str]]:
"""Return (confirmed, ruled_out, pending) findings.
Uses session.evidence_items when the cockpit branch is merged; falls back
to deriving from completed diagnostic steps.
"""
evidence_items = getattr(session, 'evidence_items', None)
if evidence_items:
confirmed = [e['text'] for e in evidence_items if e.get('status') == 'confirmed']
ruled_out = [e['text'] for e in evidence_items if e.get('status') == 'ruled_out']
pending = [e['text'] for e in evidence_items if e.get('status') == 'pending']
return confirmed, ruled_out, pending
# Derive from completed steps — all answered steps become findings
findings = []
for step in sorted(session.steps or [], key=lambda s: s.step_order):
content = step.content or {}
if content.get("type") in ("resolution_suggestion", "briefing", "status_update"):
continue
description = content.get("text", "").strip()
if not description or step.was_skipped:
continue
response = _get_engineer_response(step)
if response:
findings.append(f"{description}{response}")
return findings, [], []
def format_escalation_note(session: AISession, include_steps: bool = True) -> str:
"""Format an escalated session as a plain-text note for CW."""
engineer_obj = getattr(session, 'user', None)
engineer_display = engineer_obj.name if engineer_obj and hasattr(engineer_obj, 'name') else "Unknown"
escalated_to_obj = getattr(session, 'escalated_to', None)
escalated_to_display = escalated_to_obj.name if escalated_to_obj and hasattr(escalated_to_obj, 'name') else None
escalated_at = session.resolved_at or datetime.now(timezone.utc)
duration_str = ""
if session.created_at:
delta = escalated_at - session.created_at
total_hrs = round(delta.total_seconds() / 3600, 2)
duration_str = f"{total_hrs} hrs"
header = f"FlowPilot Escalation — {engineer_display}{duration_str}"
if escalated_to_display:
header += f"{escalated_to_display}"
lines = [
"═══ FlowPilot Escalation Documentation ═══",
f"Session: {session.id}",
header,
f"Problem: {session.problem_summary or 'No summary available'}",
]
engineer_name = getattr(session, 'user', None)
if engineer_name and hasattr(engineer_name, 'name'):
lines.append(f"Escalated by: {engineer_name.name}")
escalated_to = getattr(session, 'escalated_to', None)
if escalated_to and hasattr(escalated_to, 'name'):
lines.append(f"Escalated to: {escalated_to.name}")
else:
lines.append("Escalated to: Unassigned")
lines.extend([
f"Date: {_format_datetime(session.resolved_at or datetime.now(timezone.utc))}",
f"Started: {_format_datetime(session.created_at)}",
])
if session.resolved_at and session.created_at:
delta = session.resolved_at - session.created_at
minutes = int(delta.total_seconds() / 60)
lines.append(f"Duration: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Duration: {minutes}m")
lines.append("")
lines.append("── Problem ──")
lines.append(session.problem_summary or "No summary available")
# Work completed
# Work completed with responses
if include_steps and session.steps:
lines.append("")
lines.append("── Work Completed ──")
for step in session.steps:
lines.append("Work completed:")
for step in sorted(session.steps, key=lambda s: s.step_order):
content = step.content or {}
description = content.get("text", "")
lines.append(f"{step.step_order + 1}. {description}")
if content.get("type") in ("resolution_suggestion", "briefing", "status_update"):
continue
description = content.get("text", "").strip()
if not description:
continue
response = _get_engineer_response(step)
line = f"{step.step_order + 1}. {description}"
if response and response != "Skipped":
line += f"{response}"
elif response == "Skipped":
line += " (skipped)"
lines.append(line)
# What We Know
confirmed, ruled_out, pending = _derive_what_we_know(session)
if confirmed or ruled_out or pending:
lines.append("")
lines.append("What we know:")
for f in confirmed:
lines.append(f"{f}")
for f in ruled_out:
lines.append(f"{f}")
for f in pending:
lines.append(f" ? {f}")
# Escalation reason
lines.append("")
lines.append("── Escalation Reason ──")
lines.append(session.escalation_reason or "No reason provided")
lines.append(f"Escalation reason: {session.escalation_reason or 'No reason provided'}")
# Escalation package details
# Suggested next steps from escalation package
pkg = session.escalation_package or {}
if hypotheses := pkg.get("remaining_hypotheses"):
lines.append("")
lines.append("── Remaining Hypotheses ──")
if isinstance(hypotheses, list):
for h in hypotheses:
lines.append(f"- {h}")
else:
lines.append(str(hypotheses))
if suggestions := pkg.get("suggested_next_steps"):
lines.append("")
lines.append("── Suggested Next Steps ──")
if isinstance(suggestions, list):
for s in suggestions:
lines.append(f"- {s}")
else:
lines.append(str(suggestions))
lines.append("Suggested next steps:")
items = suggestions if isinstance(suggestions, list) else [str(suggestions)]
for s in items:
lines.append(f"- {s}")
# Timing
lines.append("")
lines.append("── Session Timing ──")
lines.append(f"Start: {_format_datetime(session.created_at)}")
escalated_at = session.resolved_at or datetime.now(timezone.utc)
lines.append(f"Escalated: {_format_datetime(escalated_at)}")
if session.created_at:
delta = escalated_at - session.created_at
minutes = int(delta.total_seconds() / 60)
lines.append(f"Total: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Total: {minutes}m")
total_hrs = round(delta.total_seconds() / 3600, 2)
lines.append(f"Total: {total_hrs} hrs")
lines.append("")
lines.append("Generated by ResolutionFlow FlowPilot")
lines.append("Generated by ResolutionFlow")
return "\n".join(lines)