""" Session export generators for ResolutionFlow. Provides markdown, plain text, HTML, and PSA/ticket note export formatters for troubleshooting sessions. """ import html from datetime import datetime from typing import Any from app.models.session import Session from app.schemas.session import SessionExport OUTCOME_LABELS = { "resolved": "Resolved", "escalated": "Escalated", "workaround": "Workaround Applied", "unresolved": "Unresolved", } def _format_duration(started_at: datetime, completed_at: datetime | None) -> str: """Format duration between two datetimes as human-readable string.""" if not completed_at: return "In progress" delta = completed_at - started_at total_seconds = int(delta.total_seconds()) if total_seconds < 0: return "0 minutes" hours, remainder = divmod(total_seconds, 3600) minutes = remainder // 60 if hours > 0: return f"{hours}h {minutes}m" return f"{minutes} minutes" def _format_step_duration(duration_seconds: int) -> str: """Format step duration seconds as compact human-readable text.""" if duration_seconds < 0: return "0s" if duration_seconds < 60: return f"{duration_seconds}s" hours, remainder = divmod(duration_seconds, 3600) minutes, seconds = divmod(remainder, 60) if hours > 0: if seconds > 0: return f"{hours}h {minutes}m {seconds}s" return f"{hours}h {minutes}m" if seconds > 0: return f"{minutes}m {seconds}s" return f"{minutes}m" def _parse_iso_datetime(value: Any) -> datetime | None: """Parse ISO datetime strings from JSONB with support for trailing Z.""" if isinstance(value, datetime): return value if not isinstance(value, str) or not value: return None try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return None def _get_step_duration_seconds(decision: dict[str, Any]) -> int | None: """Get step duration from explicit field or entered/exited timestamps.""" explicit_duration = decision.get("duration_seconds") if isinstance(explicit_duration, (int, float)): duration = int(explicit_duration) if duration >= 0: return duration entered_at = _parse_iso_datetime(decision.get("entered_at")) exited_at = _parse_iso_datetime(decision.get("exited_at")) if entered_at and exited_at: total_seconds = int((exited_at - entered_at).total_seconds()) if total_seconds >= 0: return total_seconds return None def _get_command_output(decision: dict[str, Any]) -> str | None: """Extract and normalize command_output from a decision dict.""" output = decision.get("command_output") if not isinstance(output, str): return None output = output.strip() return output if output else None def _truncate_command_output(output: str, max_lines: int = 5, fmt: str = "text") -> str: """Truncate command output to max_lines for standard detail level. Args: fmt: One of "markdown", "text", "html", "psa" — controls suffix formatting. """ lines = output.splitlines() if len(lines) <= max_lines: return output truncated = "\n".join(lines[:max_lines]) count = len(lines) if fmt == "markdown": suffix = f"*(full output omitted — {count} lines)*" elif fmt == "html": suffix = f"(full output omitted — {count} lines)" else: # text, psa suffix = f"(full output omitted — {count} lines)" return f"{truncated}\n{suffix}" def _find_node_commands(tree_snapshot: dict[str, Any], node_id: str) -> list[str]: """Find the commands list for a node in the tree snapshot.""" def _search(node: dict[str, Any]) -> list[str] | None: if node.get("id") == node_id: return node.get("commands") or [] for child in node.get("children", []): result = _search(child) if result is not None: return result return None structure = tree_snapshot.get("tree_structure") or tree_snapshot return _search(structure) or [] def _get_outcome_label(session: Session) -> str | None: """Map stored outcome enum to human-friendly label.""" outcome = getattr(session, "outcome", None) if not outcome: return None return OUTCOME_LABELS.get(outcome, str(outcome).replace("_", " ").title()) def _build_summary_fields(session: Session) -> dict[str, str]: """Build auto-populated summary fields from session data. Empty fields are left blank — users fill them in via the editable preview. """ tree_name = session.tree_snapshot.get("name", "Troubleshooting Session") tree_desc = session.tree_snapshot.get("description", "") issue = f"{tree_name}: {tree_desc}" if tree_desc else tree_name if session.completed_at: status = "Resolved" if getattr(session, "outcome", None) == "resolved" else \ f"Completed — {_get_outcome_label(session) or 'Unknown'}" else: step_count = len(session.decisions) if session.decisions else 0 status = f"In Progress — paused at step {step_count}" if step_count else "In Progress" _raw_notes = getattr(session, 'outcome_notes', None) resolution = (_raw_notes if isinstance(_raw_notes, str) else '').strip() _raw_next = getattr(session, 'next_steps', None) next_steps = (_raw_next if isinstance(_raw_next, str) else '').strip() return { "issue": issue, "impact": "", "status": status, "resolution": resolution, "next_steps": next_steps, } def _escape_markdown_table(value: str) -> str: """Escape value for use in a markdown table cell.""" return value.replace("|", "\\|").replace("\n", " ") def generate_markdown_export(session: Session, options: SessionExport) -> str: """Generate markdown export.""" if _is_procedural_session(session): return _generate_procedural_markdown(session, options) lines = [] outcome_label = _get_outcome_label(session) if options.include_tree_info: tree_name = session.tree_snapshot.get("name", "Troubleshooting Session") lines.append(f"# {tree_name}") lines.append("") if session.ticket_number: lines.append(f"**Ticket:** {session.ticket_number}") if session.client_name: lines.append(f"**Client:** {session.client_name}") if options.include_timestamps: lines.append(f"**Started:** {session.started_at.strftime('%Y-%m-%d %H:%M')}") if session.completed_at: lines.append(f"**Completed:** {session.completed_at.strftime('%Y-%m-%d %H:%M')}") lines.append(f"**Duration:** {_format_duration(session.started_at, session.completed_at)}") if outcome_label: lines.append(f"**Outcome:** {outcome_label}") lines.append("") lines.append("---") lines.append("") if options.include_summary: summary = _build_summary_fields(session) esc = _escape_markdown_table lines.append("## Summary") lines.append("") lines.append("| Field | Details |") lines.append("|-------|---------|") lines.append(f"| Issue | {esc(summary['issue'])} |") lines.append(f"| Impact | {esc(summary['impact'])} |") lines.append(f"| Status | {esc(summary['status'])} |") lines.append(f"| Resolution | {esc(summary['resolution'])} |") lines.append(f"| Next Steps | {esc(summary['next_steps'])} |") lines.append("") lines.append("---") lines.append("") # Scratchpad / Evidence section scratchpad = getattr(session, 'scratchpad', '') or '' if scratchpad.strip(): lines.append("## Evidence / Reference") lines.append("") lines.append(scratchpad) lines.append("") lines.append("---") lines.append("") lines.append("## Troubleshooting Steps") lines.append("") decisions = session.decisions if options.max_step_index is not None: decisions = decisions[:options.max_step_index] for i, decision in enumerate(decisions, 1): question = decision.get("question") or decision.get("action_performed", "Step") answer = decision.get("answer", "") notes = decision.get("notes", "") duration_seconds = _get_step_duration_seconds(decision) is_custom = decision.get("node_id", "").startswith("custom-") prefix = "[CUSTOM] " if is_custom else "" lines.append(f"### Step {i}: {prefix}{question}") if is_custom: lines.append("*Custom step added by engineer*") if answer: lines.append(f"**Answer:** {answer}") if notes: lines.append(f"**Notes:** {notes}") if command_output := _get_command_output(decision): if options.detail_level == "standard": command_output = _truncate_command_output(command_output, fmt="markdown") commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", "")) if commands: lines.append(f"**Commands Run:** {', '.join(f'`{c}`' for c in commands)}") lines.append("**Output:**") lines.append("```") lines.append(command_output) lines.append("```") if duration_seconds is not None: lines.append(f"**Duration:** {_format_step_duration(duration_seconds)}") if options.include_timestamps and decision.get("timestamp"): lines.append(f"*{decision['timestamp']}*") lines.append("") # Resolution / Outcome Notes _raw_notes = getattr(session, 'outcome_notes', None) outcome_notes = _raw_notes if isinstance(_raw_notes, str) else '' if outcome_notes.strip() and options.include_outcome_notes: lines.append("---") lines.append("") lines.append("## Resolution") lines.append("") lines.append(outcome_notes.strip()) lines.append("") # Next Steps _raw_next = getattr(session, 'next_steps', None) next_steps = _raw_next if isinstance(_raw_next, str) else '' if next_steps.strip() and options.include_next_steps: lines.append("---") lines.append("") lines.append("## Next Steps") lines.append("") lines.append(next_steps.strip()) lines.append("") return "\n".join(lines) def generate_text_export(session: Session, options: SessionExport) -> str: """Generate plain text export.""" if _is_procedural_session(session): return _generate_procedural_text(session, options) lines = [] outcome_label = _get_outcome_label(session) if options.include_tree_info: tree_name = session.tree_snapshot.get("name", "Troubleshooting Session") lines.append(tree_name) lines.append("=" * len(tree_name)) if session.ticket_number: lines.append(f"Ticket: {session.ticket_number}") if session.client_name: lines.append(f"Client: {session.client_name}") if options.include_timestamps: lines.append(f"Started: {session.started_at.strftime('%Y-%m-%d %H:%M')}") if session.completed_at: lines.append(f"Completed: {session.completed_at.strftime('%Y-%m-%d %H:%M')}") lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}") if outcome_label: lines.append(f"Outcome: {outcome_label}") lines.append("") if options.include_summary: summary = _build_summary_fields(session) lines.append("SUMMARY") lines.append("-" * 20) lines.append(f"Issue: {summary['issue']}") lines.append(f"Impact: {summary['impact']}") lines.append(f"Status: {summary['status']}") lines.append(f"Resolution: {summary['resolution']}") lines.append(f"Next Steps: {summary['next_steps']}") lines.append("") # Scratchpad / Evidence section scratchpad = getattr(session, 'scratchpad', '') or '' if scratchpad.strip(): lines.append("EVIDENCE / REFERENCE") lines.append("-" * 20) lines.append(scratchpad) lines.append("") lines.append("TROUBLESHOOTING STEPS") lines.append("-" * 20) decisions = session.decisions if options.max_step_index is not None: decisions = decisions[:options.max_step_index] for i, decision in enumerate(decisions, 1): question = decision.get("question") or decision.get("action_performed", "Step") answer = decision.get("answer", "") notes = decision.get("notes", "") duration_seconds = _get_step_duration_seconds(decision) is_custom = decision.get("node_id", "").startswith("custom-") prefix = "[CUSTOM] " if is_custom else "" lines.append(f"\n{i}. {prefix}{question}") if answer: lines.append(f" Answer: {answer}") if notes: lines.append(f" Notes: {notes}") if command_output := _get_command_output(decision): if options.detail_level == "standard": command_output = _truncate_command_output(command_output, fmt="text") commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", "")) if commands: lines.append(f" Commands Run: {', '.join(commands)}") lines.append(" Output:") for output_line in command_output.splitlines(): lines.append(f" {output_line}") if duration_seconds is not None: lines.append(f" Duration: {_format_step_duration(duration_seconds)}") # Resolution _raw_notes = getattr(session, 'outcome_notes', None) outcome_notes = _raw_notes if isinstance(_raw_notes, str) else '' if outcome_notes.strip() and options.include_outcome_notes: lines.append("") lines.append("RESOLUTION") lines.append("-" * 20) lines.append(outcome_notes.strip()) # Next Steps _raw_next = getattr(session, 'next_steps', None) next_steps = _raw_next if isinstance(_raw_next, str) else '' if next_steps.strip() and options.include_next_steps: lines.append("") lines.append("NEXT STEPS") lines.append("-" * 20) lines.append(next_steps.strip()) return "\n".join(lines) def generate_html_export(session: Session, options: SessionExport) -> str: """Generate HTML export.""" if _is_procedural_session(session): return _generate_procedural_html(session, options) tree_name = html.escape(session.tree_snapshot.get("name", "Troubleshooting Session")) outcome_label = _get_outcome_label(session) html_parts = ['', '', '', '', f'{tree_name}', '', '', ''] if options.include_tree_info: html_parts.append(f'

{tree_name}

') html_parts.append('
') if session.ticket_number: html_parts.append(f'

Ticket: {html.escape(session.ticket_number)}

') if session.client_name: html_parts.append(f'

Client: {html.escape(session.client_name)}

') if options.include_timestamps: html_parts.append(f'

Started: {session.started_at.strftime("%Y-%m-%d %H:%M")}

') if session.completed_at: html_parts.append(f'

Completed: {session.completed_at.strftime("%Y-%m-%d %H:%M")}

') html_parts.append(f'

Duration: {_format_duration(session.started_at, session.completed_at)}

') if outcome_label: html_parts.append(f'

Outcome: {html.escape(outcome_label)}

') html_parts.append('
') if options.include_summary: summary = _build_summary_fields(session) html_parts.append('

Summary

') html_parts.append('') for label, value in [("Issue", summary["issue"]), ("Impact", summary["impact"]), ("Status", summary["status"]), ("Resolution", summary["resolution"]), ("Next Steps", summary["next_steps"])]: html_parts.append(f'') html_parts.append(f'') html_parts.append('
{html.escape(label)}{html.escape(value)}
') # Scratchpad / Evidence section scratchpad = getattr(session, 'scratchpad', '') or '' if scratchpad.strip(): html_parts.append('

Evidence / Reference

') html_parts.append(f'
{html.escape(scratchpad)}
') html_parts.append('

Troubleshooting Steps

') decisions = session.decisions if options.max_step_index is not None: decisions = decisions[:options.max_step_index] for i, decision in enumerate(decisions, 1): question = html.escape(decision.get("question") or decision.get("action_performed", "Step")) answer = html.escape(decision.get("answer", "")) notes = html.escape(decision.get("notes", "")) duration_seconds = _get_step_duration_seconds(decision) html_parts.append('
') is_custom = decision.get("node_id", "").startswith("custom-") custom_badge = 'CUSTOM' if is_custom else '' html_parts.append(f'

{custom_badge}Step {i}: {question}

') if answer: html_parts.append(f'

Answer: {answer}

') if notes: html_parts.append(f'

Notes: {notes}

') if command_output := _get_command_output(decision): if options.detail_level == "standard": command_output = _truncate_command_output(command_output, fmt="html") commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", "")) if commands: cmd_html = ", ".join(f"{html.escape(c)}" for c in commands) html_parts.append(f'

Commands Run: {cmd_html}

') html_parts.append(f'
{html.escape(command_output)}
') if duration_seconds is not None: html_parts.append(f'

Duration: {_format_step_duration(duration_seconds)}

') if options.include_timestamps and decision.get("timestamp"): html_parts.append(f'

{html.escape(str(decision["timestamp"]))}

') html_parts.append('
') # Resolution _raw_notes = getattr(session, 'outcome_notes', None) outcome_notes = _raw_notes if isinstance(_raw_notes, str) else '' if outcome_notes.strip() and options.include_outcome_notes: html_parts.append('

Resolution

') html_parts.append(f'
{html.escape(outcome_notes.strip())}
') # Next Steps _raw_next = getattr(session, 'next_steps', None) next_steps = _raw_next if isinstance(_raw_next, str) else '' if next_steps.strip() and options.include_next_steps: html_parts.append('

Next Steps

') html_parts.append(f'
{html.escape(next_steps.strip())}
') html_parts.extend(['', '']) return "\n".join(html_parts) def generate_psa_export(session: Session, options: SessionExport) -> str: """Generate PSA/ticket note export optimized for ConnectWise and similar PSA tools.""" if _is_procedural_session(session): return _generate_procedural_psa(session, options) lines = [] outcome_label = _get_outcome_label(session) tree_name = session.tree_snapshot.get("name", "Troubleshooting Session") tree_description = session.tree_snapshot.get("description", "") ticket_number = session.ticket_number or "N/A" client_name = session.client_name or "N/A" date_str = session.started_at.strftime("%Y-%m-%d %H:%M") lines.append("=== TROUBLESHOOTING NOTES ===") lines.append(f"Ticket: {ticket_number} | Client: {client_name}") lines.append(f"Tree: {tree_name} | Date: {date_str}") lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}") if outcome_label: lines.append(f"Outcome: {outcome_label}") lines.append("") if options.include_summary: summary = _build_summary_fields(session) lines.append("--- SUMMARY ---") lines.append(f"Issue: {summary['issue']}") lines.append(f"Impact: {summary['impact']}") lines.append(f"Status: {summary['status']}") lines.append(f"Resolution: {summary['resolution']}") lines.append(f"Next Steps: {summary['next_steps']}") lines.append("") # Problem section lines.append("--- PROBLEM ---") lines.append(tree_description if tree_description else "No description provided.") lines.append("") # Steps taken lines.append("--- STEPS TAKEN ---") decisions = session.decisions if options.max_step_index is not None: decisions = decisions[:options.max_step_index] if decisions: for i, decision in enumerate(decisions, 1): question = decision.get("question") or decision.get("action_performed", "Step") answer = decision.get("answer", "") notes = decision.get("notes", "") duration_seconds = _get_step_duration_seconds(decision) is_custom = decision.get("node_id", "").startswith("custom-") prefix = "[CUSTOM] " if is_custom else "" line = f"{i}. {prefix}{question}" if answer: line += f" -> {answer}" if duration_seconds is not None: line += f" ({_format_step_duration(duration_seconds)})" lines.append(line) if notes: lines.append(f" Notes: {notes}") if command_output := _get_command_output(decision): if options.detail_level == "standard": command_output = _truncate_command_output(command_output, fmt="psa") commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", "")) if commands: lines.append(f" Commands: {', '.join(commands)}") lines.append(" Output:") for output_line in command_output.splitlines(): lines.append(f" {output_line}") else: lines.append("No steps recorded.") lines.append("") # Resolution — only for completed sessions if session.completed_at: lines.append("--- RESOLUTION ---") _raw_notes = getattr(session, 'outcome_notes', None) outcome_notes = _raw_notes if isinstance(_raw_notes, str) else '' if outcome_notes.strip() and options.include_outcome_notes: lines.append(outcome_notes.strip()) elif session.decisions: last_decision = session.decisions[-1] resolution = last_decision.get("answer") or last_decision.get("question", "No resolution recorded") lines.append(resolution) else: lines.append("No resolution recorded.") if outcome_label: lines.append(f"Outcome: {outcome_label}") lines.append("") # Next Steps _raw_next = getattr(session, 'next_steps', None) next_steps = _raw_next if isinstance(_raw_next, str) else '' if next_steps.strip() and options.include_next_steps: lines.append("--- NEXT STEPS ---") lines.append(next_steps.strip()) lines.append("") # Time spent lines.append("--- TIME SPENT ---") duration = _format_duration(session.started_at, session.completed_at) lines.append(f"Duration: {duration}") lines.append("") # Engineer notes (scratchpad) lines.append("--- ENGINEER NOTES ---") scratchpad = getattr(session, 'scratchpad', '') or '' lines.append(scratchpad.strip() if scratchpad.strip() else "None") return "\n".join(lines) def _is_procedural_session(session: Session) -> bool: """Check if session is for a procedural flow.""" return session.tree_snapshot.get("tree_type") == "procedural" def _get_session_variables(session: Session) -> dict[str, str]: """Get session variables (intake form values) from session.""" variables = getattr(session, "session_variables", None) if isinstance(variables, dict): return variables return {} def _generate_procedural_markdown(session: Session, options: SessionExport) -> str: """Generate markdown export for procedural sessions.""" lines = [] outcome_label = _get_outcome_label(session) tree_name = session.tree_snapshot.get("name", "Procedure") if options.include_tree_info: lines.append(f"# {tree_name}") lines.append("") if session.ticket_number: lines.append(f"**Ticket:** {session.ticket_number}") if session.client_name: lines.append(f"**Client:** {session.client_name}") if options.include_timestamps: lines.append(f"**Started:** {session.started_at.strftime('%Y-%m-%d %H:%M')}") if session.completed_at: lines.append(f"**Completed:** {session.completed_at.strftime('%Y-%m-%d %H:%M')}") lines.append(f"**Duration:** {_format_duration(session.started_at, session.completed_at)}") if outcome_label: lines.append(f"**Outcome:** {outcome_label}") lines.append("") lines.append("---") lines.append("") # Project Parameters variables = _get_session_variables(session) if variables: lines.append("## Project Parameters") lines.append("") lines.append("| Parameter | Value |") lines.append("|-----------|-------|") for key, value in variables.items(): lines.append(f"| `{_escape_markdown_table(key)}` | {_escape_markdown_table(value)} |") lines.append("") lines.append("---") lines.append("") # Steps lines.append("## Procedure Steps") lines.append("") decisions = session.decisions or [] if options.max_step_index is not None: decisions = decisions[:options.max_step_index] for i, decision in enumerate(decisions, 1): title = decision.get("question") or decision.get("action_performed", "Step") notes = decision.get("notes", "") verification = _get_command_output(decision) completed = decision.get("answer") == "completed" checkbox = "[x]" if completed else "[ ]" lines.append(f"- {checkbox} **Step {i}: {title}**") if notes: lines.append(f" - Notes: {notes}") if verification: lines.append(f" - Verification: {verification}") duration_seconds = _get_step_duration_seconds(decision) if duration_seconds is not None: lines.append(f" - Duration: {_format_step_duration(duration_seconds)}") lines.append("") # Resolution _raw_notes = getattr(session, 'outcome_notes', None) outcome_notes = _raw_notes if isinstance(_raw_notes, str) else '' if outcome_notes.strip() and options.include_outcome_notes: lines.append("---") lines.append("") lines.append("## Notes") lines.append("") lines.append(outcome_notes.strip()) lines.append("") return "\n".join(lines) def _generate_procedural_text(session: Session, options: SessionExport) -> str: """Generate plain text export for procedural sessions.""" lines = [] outcome_label = _get_outcome_label(session) tree_name = session.tree_snapshot.get("name", "Procedure") if options.include_tree_info: lines.append(tree_name) lines.append("=" * len(tree_name)) if session.ticket_number: lines.append(f"Ticket: {session.ticket_number}") if session.client_name: lines.append(f"Client: {session.client_name}") if options.include_timestamps: lines.append(f"Started: {session.started_at.strftime('%Y-%m-%d %H:%M')}") if session.completed_at: lines.append(f"Completed: {session.completed_at.strftime('%Y-%m-%d %H:%M')}") lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}") if outcome_label: lines.append(f"Outcome: {outcome_label}") lines.append("") # Project Parameters variables = _get_session_variables(session) if variables: lines.append("PROJECT PARAMETERS") lines.append("-" * 20) for key, value in variables.items(): lines.append(f" {key}: {value}") lines.append("") lines.append("PROCEDURE STEPS") lines.append("-" * 20) decisions = session.decisions or [] if options.max_step_index is not None: decisions = decisions[:options.max_step_index] for i, decision in enumerate(decisions, 1): title = decision.get("question") or decision.get("action_performed", "Step") notes = decision.get("notes", "") verification = _get_command_output(decision) completed = decision.get("answer") == "completed" marker = "[DONE]" if completed else "[ ]" lines.append(f"\n{marker} {i}. {title}") if notes: lines.append(f" Notes: {notes}") if verification: lines.append(f" Verification: {verification}") duration_seconds = _get_step_duration_seconds(decision) if duration_seconds is not None: lines.append(f" Duration: {_format_step_duration(duration_seconds)}") lines.append("") _raw_notes = getattr(session, 'outcome_notes', None) outcome_notes = _raw_notes if isinstance(_raw_notes, str) else '' if outcome_notes.strip() and options.include_outcome_notes: lines.append("NOTES") lines.append("-" * 20) lines.append(outcome_notes.strip()) return "\n".join(lines) def _generate_procedural_html(session: Session, options: SessionExport) -> str: """Generate HTML export for procedural sessions.""" tree_name = html.escape(session.tree_snapshot.get("name", "Procedure")) outcome_label = _get_outcome_label(session) html_parts = ['', '', '', '', f'{tree_name}', '', '', ''] if options.include_tree_info: html_parts.append(f'

{tree_name}

') html_parts.append('
') if session.ticket_number: html_parts.append(f'

Ticket: {html.escape(session.ticket_number)}

') if session.client_name: html_parts.append(f'

Client: {html.escape(session.client_name)}

') if options.include_timestamps: html_parts.append(f'

Started: {session.started_at.strftime("%Y-%m-%d %H:%M")}

') if session.completed_at: html_parts.append(f'

Completed: {session.completed_at.strftime("%Y-%m-%d %H:%M")}

') html_parts.append(f'

Duration: {_format_duration(session.started_at, session.completed_at)}

') if outcome_label: html_parts.append(f'

Outcome: {html.escape(outcome_label)}

') html_parts.append('
') # Project Parameters variables = _get_session_variables(session) if variables: html_parts.append('
') html_parts.append('

Project Parameters

') html_parts.append('') for key, value in variables.items(): html_parts.append(f'') html_parts.append('
{html.escape(key)}{html.escape(value)}
') html_parts.append('
') html_parts.append('

Procedure Steps

') decisions = session.decisions or [] if options.max_step_index is not None: decisions = decisions[:options.max_step_index] for i, decision in enumerate(decisions, 1): title = html.escape(decision.get("question") or decision.get("action_performed", "Step")) notes = html.escape(decision.get("notes", "")) verification = _get_command_output(decision) completed = decision.get("answer") == "completed" css_class = "step done" if completed else "step" marker = "✅" if completed else "⬜" html_parts.append(f'
') html_parts.append(f'

{marker} Step {i}: {title}

') if notes: html_parts.append(f'

Notes: {notes}

') if verification: html_parts.append(f'

Verification: {html.escape(verification)}

') duration_seconds = _get_step_duration_seconds(decision) if duration_seconds is not None: html_parts.append(f'

Duration: {_format_step_duration(duration_seconds)}

') html_parts.append('
') # Resolution _raw_notes = getattr(session, 'outcome_notes', None) outcome_notes = _raw_notes if isinstance(_raw_notes, str) else '' if outcome_notes.strip() and options.include_outcome_notes: html_parts.append('

Notes

') html_parts.append(f'
{html.escape(outcome_notes.strip())}
') html_parts.extend(['', '']) return "\n".join(html_parts) def _generate_procedural_psa(session: Session, options: SessionExport) -> str: """Generate PSA/ticket export for procedural sessions.""" lines = [] outcome_label = _get_outcome_label(session) tree_name = session.tree_snapshot.get("name", "Procedure") ticket_number = session.ticket_number or "N/A" client_name = session.client_name or "N/A" date_str = session.started_at.strftime("%Y-%m-%d %H:%M") lines.append("=== PROCEDURE NOTES ===") lines.append(f"Ticket: {ticket_number} | Client: {client_name}") lines.append(f"Procedure: {tree_name} | Date: {date_str}") lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}") if outcome_label: lines.append(f"Outcome: {outcome_label}") lines.append("") # Project Parameters variables = _get_session_variables(session) if variables: lines.append("--- PROJECT PARAMETERS ---") for key, value in variables.items(): lines.append(f" {key}: {value}") lines.append("") # Steps lines.append("--- STEPS COMPLETED ---") decisions = session.decisions or [] if options.max_step_index is not None: decisions = decisions[:options.max_step_index] if decisions: for i, decision in enumerate(decisions, 1): title = decision.get("question") or decision.get("action_performed", "Step") notes = decision.get("notes", "") completed = decision.get("answer") == "completed" marker = "[DONE]" if completed else "[ ]" duration_seconds = _get_step_duration_seconds(decision) line = f"{marker} {i}. {title}" if duration_seconds is not None: line += f" ({_format_step_duration(duration_seconds)})" lines.append(line) if notes: lines.append(f" Notes: {notes}") else: lines.append("No steps recorded.") lines.append("") # Resolution if session.completed_at: lines.append("--- RESOLUTION ---") _raw_notes = getattr(session, 'outcome_notes', None) outcome_notes = _raw_notes if isinstance(_raw_notes, str) else '' if outcome_notes.strip() and options.include_outcome_notes: lines.append(outcome_notes.strip()) else: lines.append("Procedure completed successfully.") if outcome_label: lines.append(f"Outcome: {outcome_label}") lines.append("") # Time spent lines.append("--- TIME SPENT ---") lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}") return "\n".join(lines)