"""
Session export generators for ResolutionFlow.
Provides markdown, plain text, HTML, and PSA/ticket note export formatters
for troubleshooting sessions.
"""
import html
from datetime import datetime
from typing import Any
from app.models.session import Session
from app.schemas.session import SessionExport
OUTCOME_LABELS = {
"resolved": "Resolved",
"escalated": "Escalated",
"workaround": "Workaround Applied",
"unresolved": "Unresolved",
}
def _format_duration(started_at: datetime, completed_at: datetime | None) -> str:
"""Format duration between two datetimes as human-readable string."""
if not completed_at:
return "In progress"
delta = completed_at - started_at
total_seconds = int(delta.total_seconds())
if total_seconds < 0:
return "0 minutes"
hours, remainder = divmod(total_seconds, 3600)
minutes = remainder // 60
if hours > 0:
return f"{hours}h {minutes}m"
return f"{minutes} minutes"
def _format_step_duration(duration_seconds: int) -> str:
"""Format step duration seconds as compact human-readable text."""
if duration_seconds < 0:
return "0s"
if duration_seconds < 60:
return f"{duration_seconds}s"
hours, remainder = divmod(duration_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if hours > 0:
if seconds > 0:
return f"{hours}h {minutes}m {seconds}s"
return f"{hours}h {minutes}m"
if seconds > 0:
return f"{minutes}m {seconds}s"
return f"{minutes}m"
def _parse_iso_datetime(value: Any) -> datetime | None:
"""Parse ISO datetime strings from JSONB with support for trailing Z."""
if isinstance(value, datetime):
return value
if not isinstance(value, str) or not value:
return None
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
def _get_step_duration_seconds(decision: dict[str, Any]) -> int | None:
"""Get step duration from explicit field or entered/exited timestamps."""
explicit_duration = decision.get("duration_seconds")
if isinstance(explicit_duration, (int, float)):
duration = int(explicit_duration)
if duration >= 0:
return duration
entered_at = _parse_iso_datetime(decision.get("entered_at"))
exited_at = _parse_iso_datetime(decision.get("exited_at"))
if entered_at and exited_at:
total_seconds = int((exited_at - entered_at).total_seconds())
if total_seconds >= 0:
return total_seconds
return None
def _get_command_output(decision: dict[str, Any]) -> str | None:
"""Extract and normalize command_output from a decision dict."""
output = decision.get("command_output")
if not isinstance(output, str):
return None
output = output.strip()
return output if output else None
def _truncate_command_output(output: str, max_lines: int = 5, fmt: str = "text") -> str:
"""Truncate command output to max_lines for standard detail level.
Args:
fmt: One of "markdown", "text", "html", "psa" — controls suffix formatting.
"""
lines = output.splitlines()
if len(lines) <= max_lines:
return output
truncated = "\n".join(lines[:max_lines])
count = len(lines)
if fmt == "markdown":
suffix = f"*(full output omitted — {count} lines)*"
elif fmt == "html":
suffix = f"(full output omitted — {count} lines)"
else: # text, psa
suffix = f"(full output omitted — {count} lines)"
return f"{truncated}\n{suffix}"
def _find_node_commands(tree_snapshot: dict[str, Any], node_id: str) -> list[str]:
"""Find the commands list for a node in the tree snapshot."""
def _search(node: dict[str, Any]) -> list[str] | None:
if node.get("id") == node_id:
return node.get("commands") or []
for child in node.get("children", []):
result = _search(child)
if result is not None:
return result
return None
structure = tree_snapshot.get("tree_structure") or tree_snapshot
return _search(structure) or []
def _get_outcome_label(session: Session) -> str | None:
"""Map stored outcome enum to human-friendly label."""
outcome = getattr(session, "outcome", None)
if not outcome:
return None
return OUTCOME_LABELS.get(outcome, str(outcome).replace("_", " ").title())
def _build_summary_fields(session: Session) -> dict[str, str]:
"""Build auto-populated summary fields from session data.
Empty fields are left blank — users fill them in via the editable preview.
"""
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
tree_desc = session.tree_snapshot.get("description", "")
issue = f"{tree_name}: {tree_desc}" if tree_desc else tree_name
if session.completed_at:
status = "Resolved" if getattr(session, "outcome", None) == "resolved" else \
f"Completed — {_get_outcome_label(session) or 'Unknown'}"
else:
step_count = len(session.decisions) if session.decisions else 0
status = f"In Progress — paused at step {step_count}" if step_count else "In Progress"
_raw_notes = getattr(session, 'outcome_notes', None)
resolution = (_raw_notes if isinstance(_raw_notes, str) else '').strip()
_raw_next = getattr(session, 'next_steps', None)
next_steps = (_raw_next if isinstance(_raw_next, str) else '').strip()
return {
"issue": issue,
"impact": "",
"status": status,
"resolution": resolution,
"next_steps": next_steps,
}
def _escape_markdown_table(value: str) -> str:
"""Escape value for use in a markdown table cell."""
return value.replace("|", "\\|").replace("\n", " ")
def generate_markdown_export(session: Session, options: SessionExport) -> str:
"""Generate markdown export."""
if _is_procedural_session(session):
return _generate_procedural_markdown(session, options)
lines = []
outcome_label = _get_outcome_label(session)
if options.include_tree_info:
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
lines.append(f"# {tree_name}")
lines.append("")
if session.ticket_number:
lines.append(f"**Ticket:** {session.ticket_number}")
if session.client_name:
lines.append(f"**Client:** {session.client_name}")
if options.include_timestamps:
lines.append(f"**Started:** {session.started_at.strftime('%Y-%m-%d %H:%M')}")
if session.completed_at:
lines.append(f"**Completed:** {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"**Duration:** {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"**Outcome:** {outcome_label}")
lines.append("")
lines.append("---")
lines.append("")
if options.include_summary:
summary = _build_summary_fields(session)
esc = _escape_markdown_table
lines.append("## Summary")
lines.append("")
lines.append("| Field | Details |")
lines.append("|-------|---------|")
lines.append(f"| Issue | {esc(summary['issue'])} |")
lines.append(f"| Impact | {esc(summary['impact'])} |")
lines.append(f"| Status | {esc(summary['status'])} |")
lines.append(f"| Resolution | {esc(summary['resolution'])} |")
lines.append(f"| Next Steps | {esc(summary['next_steps'])} |")
lines.append("")
lines.append("---")
lines.append("")
# Scratchpad / Evidence section
scratchpad = getattr(session, 'scratchpad', '') or ''
if scratchpad.strip():
lines.append("## Evidence / Reference")
lines.append("")
lines.append(scratchpad)
lines.append("")
lines.append("---")
lines.append("")
lines.append("## Troubleshooting Steps")
lines.append("")
decisions = session.decisions
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
question = decision.get("question") or decision.get("action_performed", "Step")
answer = decision.get("answer", "")
notes = decision.get("notes", "")
duration_seconds = _get_step_duration_seconds(decision)
is_custom = decision.get("node_id", "").startswith("custom-")
prefix = "[CUSTOM] " if is_custom else ""
lines.append(f"### Step {i}: {prefix}{question}")
if is_custom:
lines.append("*Custom step added by engineer*")
if answer:
lines.append(f"**Answer:** {answer}")
if notes:
lines.append(f"**Notes:** {notes}")
if command_output := _get_command_output(decision):
if options.detail_level == "standard":
command_output = _truncate_command_output(command_output, fmt="markdown")
commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", ""))
if commands:
lines.append(f"**Commands Run:** {', '.join(f'`{c}`' for c in commands)}")
lines.append("**Output:**")
lines.append("```")
lines.append(command_output)
lines.append("```")
if duration_seconds is not None:
lines.append(f"**Duration:** {_format_step_duration(duration_seconds)}")
if options.include_timestamps and decision.get("timestamp"):
lines.append(f"*{decision['timestamp']}*")
lines.append("")
# Resolution / Outcome Notes
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append("---")
lines.append("")
lines.append("## Resolution")
lines.append("")
lines.append(outcome_notes.strip())
lines.append("")
# Next Steps
_raw_next = getattr(session, 'next_steps', None)
next_steps = _raw_next if isinstance(_raw_next, str) else ''
if next_steps.strip() and options.include_next_steps:
lines.append("---")
lines.append("")
lines.append("## Next Steps")
lines.append("")
lines.append(next_steps.strip())
lines.append("")
return "\n".join(lines)
def generate_text_export(session: Session, options: SessionExport) -> str:
"""Generate plain text export."""
if _is_procedural_session(session):
return _generate_procedural_text(session, options)
lines = []
outcome_label = _get_outcome_label(session)
if options.include_tree_info:
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
lines.append(tree_name)
lines.append("=" * len(tree_name))
if session.ticket_number:
lines.append(f"Ticket: {session.ticket_number}")
if session.client_name:
lines.append(f"Client: {session.client_name}")
if options.include_timestamps:
lines.append(f"Started: {session.started_at.strftime('%Y-%m-%d %H:%M')}")
if session.completed_at:
lines.append(f"Completed: {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
if options.include_summary:
summary = _build_summary_fields(session)
lines.append("SUMMARY")
lines.append("-" * 20)
lines.append(f"Issue: {summary['issue']}")
lines.append(f"Impact: {summary['impact']}")
lines.append(f"Status: {summary['status']}")
lines.append(f"Resolution: {summary['resolution']}")
lines.append(f"Next Steps: {summary['next_steps']}")
lines.append("")
# Scratchpad / Evidence section
scratchpad = getattr(session, 'scratchpad', '') or ''
if scratchpad.strip():
lines.append("EVIDENCE / REFERENCE")
lines.append("-" * 20)
lines.append(scratchpad)
lines.append("")
lines.append("TROUBLESHOOTING STEPS")
lines.append("-" * 20)
decisions = session.decisions
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
question = decision.get("question") or decision.get("action_performed", "Step")
answer = decision.get("answer", "")
notes = decision.get("notes", "")
duration_seconds = _get_step_duration_seconds(decision)
is_custom = decision.get("node_id", "").startswith("custom-")
prefix = "[CUSTOM] " if is_custom else ""
lines.append(f"\n{i}. {prefix}{question}")
if answer:
lines.append(f" Answer: {answer}")
if notes:
lines.append(f" Notes: {notes}")
if command_output := _get_command_output(decision):
if options.detail_level == "standard":
command_output = _truncate_command_output(command_output, fmt="text")
commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", ""))
if commands:
lines.append(f" Commands Run: {', '.join(commands)}")
lines.append(" Output:")
for output_line in command_output.splitlines():
lines.append(f" {output_line}")
if duration_seconds is not None:
lines.append(f" Duration: {_format_step_duration(duration_seconds)}")
# Resolution
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append("")
lines.append("RESOLUTION")
lines.append("-" * 20)
lines.append(outcome_notes.strip())
# Next Steps
_raw_next = getattr(session, 'next_steps', None)
next_steps = _raw_next if isinstance(_raw_next, str) else ''
if next_steps.strip() and options.include_next_steps:
lines.append("")
lines.append("NEXT STEPS")
lines.append("-" * 20)
lines.append(next_steps.strip())
return "\n".join(lines)
def generate_html_export(session: Session, options: SessionExport) -> str:
"""Generate HTML export."""
if _is_procedural_session(session):
return _generate_procedural_html(session, options)
tree_name = html.escape(session.tree_snapshot.get("name", "Troubleshooting Session"))
outcome_label = _get_outcome_label(session)
html_parts = ['', '', '
',
'',
f'{tree_name}',
'',
'', '']
if options.include_tree_info:
html_parts.append(f'{tree_name}
')
html_parts.append('')
if options.include_summary:
summary = _build_summary_fields(session)
html_parts.append('Summary
')
html_parts.append('')
for label, value in [("Issue", summary["issue"]), ("Impact", summary["impact"]),
("Status", summary["status"]), ("Resolution", summary["resolution"]),
("Next Steps", summary["next_steps"])]:
html_parts.append(f'| {html.escape(label)} | ')
html_parts.append(f'{html.escape(value)} |
')
html_parts.append('
')
# Scratchpad / Evidence section
scratchpad = getattr(session, 'scratchpad', '') or ''
if scratchpad.strip():
html_parts.append('Evidence / Reference
')
html_parts.append(f'{html.escape(scratchpad)}
')
html_parts.append('Troubleshooting Steps
')
decisions = session.decisions
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
question = html.escape(decision.get("question") or decision.get("action_performed", "Step"))
answer = html.escape(decision.get("answer", ""))
notes = html.escape(decision.get("notes", ""))
duration_seconds = _get_step_duration_seconds(decision)
html_parts.append('')
is_custom = decision.get("node_id", "").startswith("custom-")
custom_badge = '
CUSTOM' if is_custom else ''
html_parts.append(f'
{custom_badge}Step {i}: {question}
')
if answer:
html_parts.append(f'
Answer: {answer}
')
if notes:
html_parts.append(f'
Notes: {notes}
')
if command_output := _get_command_output(decision):
if options.detail_level == "standard":
command_output = _truncate_command_output(command_output, fmt="html")
commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", ""))
if commands:
cmd_html = ", ".join(f"
{html.escape(c)}" for c in commands)
html_parts.append(f'
Commands Run: {cmd_html}
')
html_parts.append(f'
{html.escape(command_output)}
')
if duration_seconds is not None:
html_parts.append(f'
Duration: {_format_step_duration(duration_seconds)}
')
if options.include_timestamps and decision.get("timestamp"):
html_parts.append(f'
{html.escape(str(decision["timestamp"]))}
')
html_parts.append('
')
# Resolution
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
html_parts.append('Resolution
')
html_parts.append(f'{html.escape(outcome_notes.strip())}
')
# Next Steps
_raw_next = getattr(session, 'next_steps', None)
next_steps = _raw_next if isinstance(_raw_next, str) else ''
if next_steps.strip() and options.include_next_steps:
html_parts.append('Next Steps
')
html_parts.append(f'{html.escape(next_steps.strip())}
')
html_parts.extend(['', ''])
return "\n".join(html_parts)
def generate_psa_export(session: Session, options: SessionExport) -> str:
"""Generate PSA/ticket note export optimized for ConnectWise and similar PSA tools."""
if _is_procedural_session(session):
return _generate_procedural_psa(session, options)
lines = []
outcome_label = _get_outcome_label(session)
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
tree_description = session.tree_snapshot.get("description", "")
ticket_number = session.ticket_number or "N/A"
client_name = session.client_name or "N/A"
date_str = session.started_at.strftime("%Y-%m-%d %H:%M")
lines.append("=== TROUBLESHOOTING NOTES ===")
lines.append(f"Ticket: {ticket_number} | Client: {client_name}")
lines.append(f"Tree: {tree_name} | Date: {date_str}")
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
if options.include_summary:
summary = _build_summary_fields(session)
lines.append("--- SUMMARY ---")
lines.append(f"Issue: {summary['issue']}")
lines.append(f"Impact: {summary['impact']}")
lines.append(f"Status: {summary['status']}")
lines.append(f"Resolution: {summary['resolution']}")
lines.append(f"Next Steps: {summary['next_steps']}")
lines.append("")
# Problem section
lines.append("--- PROBLEM ---")
lines.append(tree_description if tree_description else "No description provided.")
lines.append("")
# Steps taken
lines.append("--- STEPS TAKEN ---")
decisions = session.decisions
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
if decisions:
for i, decision in enumerate(decisions, 1):
question = decision.get("question") or decision.get("action_performed", "Step")
answer = decision.get("answer", "")
notes = decision.get("notes", "")
duration_seconds = _get_step_duration_seconds(decision)
is_custom = decision.get("node_id", "").startswith("custom-")
prefix = "[CUSTOM] " if is_custom else ""
line = f"{i}. {prefix}{question}"
if answer:
line += f" -> {answer}"
if duration_seconds is not None:
line += f" ({_format_step_duration(duration_seconds)})"
lines.append(line)
if notes:
lines.append(f" Notes: {notes}")
if command_output := _get_command_output(decision):
if options.detail_level == "standard":
command_output = _truncate_command_output(command_output, fmt="psa")
commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", ""))
if commands:
lines.append(f" Commands: {', '.join(commands)}")
lines.append(" Output:")
for output_line in command_output.splitlines():
lines.append(f" {output_line}")
else:
lines.append("No steps recorded.")
lines.append("")
# Resolution — only for completed sessions
if session.completed_at:
lines.append("--- RESOLUTION ---")
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append(outcome_notes.strip())
elif session.decisions:
last_decision = session.decisions[-1]
resolution = last_decision.get("answer") or last_decision.get("question", "No resolution recorded")
lines.append(resolution)
else:
lines.append("No resolution recorded.")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
# Next Steps
_raw_next = getattr(session, 'next_steps', None)
next_steps = _raw_next if isinstance(_raw_next, str) else ''
if next_steps.strip() and options.include_next_steps:
lines.append("--- NEXT STEPS ---")
lines.append(next_steps.strip())
lines.append("")
# Time spent
lines.append("--- TIME SPENT ---")
duration = _format_duration(session.started_at, session.completed_at)
lines.append(f"Duration: {duration}")
lines.append("")
# Engineer notes (scratchpad)
lines.append("--- ENGINEER NOTES ---")
scratchpad = getattr(session, 'scratchpad', '') or ''
lines.append(scratchpad.strip() if scratchpad.strip() else "None")
return "\n".join(lines)
def _is_procedural_session(session: Session) -> bool:
"""Check if session is for a procedural flow."""
return session.tree_snapshot.get("tree_type") == "procedural"
def _get_session_variables(session: Session) -> dict[str, str]:
"""Get session variables (intake form values) from session."""
variables = getattr(session, "session_variables", None)
if isinstance(variables, dict):
return variables
return {}
def _generate_procedural_markdown(session: Session, options: SessionExport) -> str:
"""Generate markdown export for procedural sessions."""
lines = []
outcome_label = _get_outcome_label(session)
tree_name = session.tree_snapshot.get("name", "Procedure")
if options.include_tree_info:
lines.append(f"# {tree_name}")
lines.append("")
if session.ticket_number:
lines.append(f"**Ticket:** {session.ticket_number}")
if session.client_name:
lines.append(f"**Client:** {session.client_name}")
if options.include_timestamps:
lines.append(f"**Started:** {session.started_at.strftime('%Y-%m-%d %H:%M')}")
if session.completed_at:
lines.append(f"**Completed:** {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"**Duration:** {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"**Outcome:** {outcome_label}")
lines.append("")
lines.append("---")
lines.append("")
# Project Parameters
variables = _get_session_variables(session)
if variables:
lines.append("## Project Parameters")
lines.append("")
lines.append("| Parameter | Value |")
lines.append("|-----------|-------|")
for key, value in variables.items():
lines.append(f"| `{_escape_markdown_table(key)}` | {_escape_markdown_table(value)} |")
lines.append("")
lines.append("---")
lines.append("")
# Steps
lines.append("## Procedure Steps")
lines.append("")
decisions = session.decisions or []
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
title = decision.get("question") or decision.get("action_performed", "Step")
notes = decision.get("notes", "")
verification = _get_command_output(decision)
completed = decision.get("answer") == "completed"
checkbox = "[x]" if completed else "[ ]"
lines.append(f"- {checkbox} **Step {i}: {title}**")
if notes:
lines.append(f" - Notes: {notes}")
if verification:
lines.append(f" - Verification: {verification}")
duration_seconds = _get_step_duration_seconds(decision)
if duration_seconds is not None:
lines.append(f" - Duration: {_format_step_duration(duration_seconds)}")
lines.append("")
# Resolution
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append("---")
lines.append("")
lines.append("## Notes")
lines.append("")
lines.append(outcome_notes.strip())
lines.append("")
return "\n".join(lines)
def _generate_procedural_text(session: Session, options: SessionExport) -> str:
"""Generate plain text export for procedural sessions."""
lines = []
outcome_label = _get_outcome_label(session)
tree_name = session.tree_snapshot.get("name", "Procedure")
if options.include_tree_info:
lines.append(tree_name)
lines.append("=" * len(tree_name))
if session.ticket_number:
lines.append(f"Ticket: {session.ticket_number}")
if session.client_name:
lines.append(f"Client: {session.client_name}")
if options.include_timestamps:
lines.append(f"Started: {session.started_at.strftime('%Y-%m-%d %H:%M')}")
if session.completed_at:
lines.append(f"Completed: {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
# Project Parameters
variables = _get_session_variables(session)
if variables:
lines.append("PROJECT PARAMETERS")
lines.append("-" * 20)
for key, value in variables.items():
lines.append(f" {key}: {value}")
lines.append("")
lines.append("PROCEDURE STEPS")
lines.append("-" * 20)
decisions = session.decisions or []
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
title = decision.get("question") or decision.get("action_performed", "Step")
notes = decision.get("notes", "")
verification = _get_command_output(decision)
completed = decision.get("answer") == "completed"
marker = "[DONE]" if completed else "[ ]"
lines.append(f"\n{marker} {i}. {title}")
if notes:
lines.append(f" Notes: {notes}")
if verification:
lines.append(f" Verification: {verification}")
duration_seconds = _get_step_duration_seconds(decision)
if duration_seconds is not None:
lines.append(f" Duration: {_format_step_duration(duration_seconds)}")
lines.append("")
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append("NOTES")
lines.append("-" * 20)
lines.append(outcome_notes.strip())
return "\n".join(lines)
def _generate_procedural_html(session: Session, options: SessionExport) -> str:
"""Generate HTML export for procedural sessions."""
tree_name = html.escape(session.tree_snapshot.get("name", "Procedure"))
outcome_label = _get_outcome_label(session)
html_parts = ['', '', '',
'',
f'{tree_name}',
'',
'', '']
if options.include_tree_info:
html_parts.append(f'{tree_name}
')
html_parts.append('')
# Project Parameters
variables = _get_session_variables(session)
if variables:
html_parts.append('')
html_parts.append('
Project Parameters
')
html_parts.append('
')
for key, value in variables.items():
html_parts.append(f'| {html.escape(key)} | {html.escape(value)} |
')
html_parts.append('
')
html_parts.append('
')
html_parts.append('Procedure Steps
')
decisions = session.decisions or []
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
title = html.escape(decision.get("question") or decision.get("action_performed", "Step"))
notes = html.escape(decision.get("notes", ""))
verification = _get_command_output(decision)
completed = decision.get("answer") == "completed"
css_class = "step done" if completed else "step"
marker = "✅" if completed else "⬜"
html_parts.append(f'')
html_parts.append(f'
{marker} Step {i}: {title}
')
if notes:
html_parts.append(f'
Notes: {notes}
')
if verification:
html_parts.append(f'
Verification: {html.escape(verification)}
')
duration_seconds = _get_step_duration_seconds(decision)
if duration_seconds is not None:
html_parts.append(f'
Duration: {_format_step_duration(duration_seconds)}
')
html_parts.append('
')
# Resolution
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
html_parts.append('Notes
')
html_parts.append(f'{html.escape(outcome_notes.strip())}
')
html_parts.extend(['', ''])
return "\n".join(html_parts)
def _generate_procedural_psa(session: Session, options: SessionExport) -> str:
"""Generate PSA/ticket export for procedural sessions."""
lines = []
outcome_label = _get_outcome_label(session)
tree_name = session.tree_snapshot.get("name", "Procedure")
ticket_number = session.ticket_number or "N/A"
client_name = session.client_name or "N/A"
date_str = session.started_at.strftime("%Y-%m-%d %H:%M")
lines.append("=== PROCEDURE NOTES ===")
lines.append(f"Ticket: {ticket_number} | Client: {client_name}")
lines.append(f"Procedure: {tree_name} | Date: {date_str}")
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
# Project Parameters
variables = _get_session_variables(session)
if variables:
lines.append("--- PROJECT PARAMETERS ---")
for key, value in variables.items():
lines.append(f" {key}: {value}")
lines.append("")
# Steps
lines.append("--- STEPS COMPLETED ---")
decisions = session.decisions or []
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
if decisions:
for i, decision in enumerate(decisions, 1):
title = decision.get("question") or decision.get("action_performed", "Step")
notes = decision.get("notes", "")
completed = decision.get("answer") == "completed"
marker = "[DONE]" if completed else "[ ]"
duration_seconds = _get_step_duration_seconds(decision)
line = f"{marker} {i}. {title}"
if duration_seconds is not None:
line += f" ({_format_step_duration(duration_seconds)})"
lines.append(line)
if notes:
lines.append(f" Notes: {notes}")
else:
lines.append("No steps recorded.")
lines.append("")
# Resolution
if session.completed_at:
lines.append("--- RESOLUTION ---")
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append(outcome_notes.strip())
else:
lines.append("Procedure completed successfully.")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
# Time spent
lines.append("--- TIME SPENT ---")
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
return "\n".join(lines)