""" Session export generators for ResolutionFlow. Provides markdown, plain text, HTML, and PSA/ticket note export formatters for troubleshooting sessions. """ import html from datetime import datetime from typing import Any from app.models.session import Session from app.schemas.session import SessionExport OUTCOME_LABELS = { "resolved": "Resolved", "escalated": "Escalated", "workaround": "Workaround Applied", "unresolved": "Unresolved", } def _format_duration(started_at: datetime, completed_at: datetime | None) -> str: """Format duration between two datetimes as human-readable string.""" if not completed_at: return "In progress" delta = completed_at - started_at total_seconds = int(delta.total_seconds()) if total_seconds < 0: return "0 minutes" hours, remainder = divmod(total_seconds, 3600) minutes = remainder // 60 if hours > 0: return f"{hours}h {minutes}m" return f"{minutes} minutes" def _format_step_duration(duration_seconds: int) -> str: """Format step duration seconds as compact human-readable text.""" if duration_seconds < 0: return "0s" if duration_seconds < 60: return f"{duration_seconds}s" hours, remainder = divmod(duration_seconds, 3600) minutes, seconds = divmod(remainder, 60) if hours > 0: if seconds > 0: return f"{hours}h {minutes}m {seconds}s" return f"{hours}h {minutes}m" if seconds > 0: return f"{minutes}m {seconds}s" return f"{minutes}m" def _parse_iso_datetime(value: Any) -> datetime | None: """Parse ISO datetime strings from JSONB with support for trailing Z.""" if isinstance(value, datetime): return value if not isinstance(value, str) or not value: return None try: return datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return None def _get_step_duration_seconds(decision: dict[str, Any]) -> int | None: """Get step duration from explicit field or entered/exited timestamps.""" explicit_duration = decision.get("duration_seconds") if isinstance(explicit_duration, (int, float)): duration = int(explicit_duration) if duration >= 0: return duration entered_at = _parse_iso_datetime(decision.get("entered_at")) exited_at = _parse_iso_datetime(decision.get("exited_at")) if entered_at and exited_at: total_seconds = int((exited_at - entered_at).total_seconds()) if total_seconds >= 0: return total_seconds return None def _get_command_output(decision: dict[str, Any]) -> str | None: """Extract and normalize command_output from a decision dict.""" output = decision.get("command_output") if not isinstance(output, str): return None output = output.strip() return output if output else None def _find_node_commands(tree_snapshot: dict[str, Any], node_id: str) -> list[str]: """Find the commands list for a node in the tree snapshot.""" def _search(node: dict[str, Any]) -> list[str] | None: if node.get("id") == node_id: return node.get("commands") or [] for child in node.get("children", []): result = _search(child) if result is not None: return result return None structure = tree_snapshot.get("tree_structure") or tree_snapshot return _search(structure) or [] def _get_outcome_label(session: Session) -> str | None: """Map stored outcome enum to human-friendly label.""" outcome = getattr(session, "outcome", None) if not outcome: return None return OUTCOME_LABELS.get(outcome, str(outcome).replace("_", " ").title()) def generate_markdown_export(session: Session, options: SessionExport) -> str: """Generate markdown export.""" lines = [] outcome_label = _get_outcome_label(session) if options.include_tree_info: tree_name = session.tree_snapshot.get("name", "Troubleshooting Session") lines.append(f"# {tree_name}") lines.append("") if session.ticket_number: lines.append(f"**Ticket:** {session.ticket_number}") if session.client_name: lines.append(f"**Client:** {session.client_name}") if options.include_timestamps: lines.append(f"**Started:** {session.started_at.strftime('%Y-%m-%d %H:%M')}") if session.completed_at: lines.append(f"**Completed:** {session.completed_at.strftime('%Y-%m-%d %H:%M')}") lines.append(f"**Duration:** {_format_duration(session.started_at, session.completed_at)}") if outcome_label: lines.append(f"**Outcome:** {outcome_label}") lines.append("") lines.append("---") lines.append("") # Scratchpad / Evidence section scratchpad = getattr(session, 'scratchpad', '') or '' if scratchpad.strip(): lines.append("## Evidence / Reference") lines.append("") lines.append(scratchpad) lines.append("") lines.append("---") lines.append("") lines.append("## Troubleshooting Steps") lines.append("") decisions = session.decisions if options.max_step_index is not None: decisions = decisions[:options.max_step_index] for i, decision in enumerate(decisions, 1): question = decision.get("question") or decision.get("action_performed", "Step") answer = decision.get("answer", "") notes = decision.get("notes", "") duration_seconds = _get_step_duration_seconds(decision) lines.append(f"### Step {i}: {question}") if answer: lines.append(f"**Answer:** {answer}") if notes: lines.append(f"**Notes:** {notes}") if command_output := _get_command_output(decision): commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", "")) if commands: lines.append(f"**Commands Run:** {', '.join(f'`{c}`' for c in commands)}") lines.append("**Output:**") lines.append("```") lines.append(command_output) lines.append("```") if duration_seconds is not None: lines.append(f"**Duration:** {_format_step_duration(duration_seconds)}") if options.include_timestamps and decision.get("timestamp"): lines.append(f"*{decision['timestamp']}*") lines.append("") # Resolution / Outcome Notes _raw_notes = getattr(session, 'outcome_notes', None) outcome_notes = _raw_notes if isinstance(_raw_notes, str) else '' if outcome_notes.strip() and options.include_outcome_notes: lines.append("---") lines.append("") lines.append("## Resolution") lines.append("") lines.append(outcome_notes.strip()) lines.append("") # Next Steps _raw_next = getattr(session, 'next_steps', None) next_steps = _raw_next if isinstance(_raw_next, str) else '' if next_steps.strip() and options.include_next_steps: lines.append("---") lines.append("") lines.append("## Next Steps") lines.append("") lines.append(next_steps.strip()) lines.append("") return "\n".join(lines) def generate_text_export(session: Session, options: SessionExport) -> str: """Generate plain text export.""" lines = [] outcome_label = _get_outcome_label(session) if options.include_tree_info: tree_name = session.tree_snapshot.get("name", "Troubleshooting Session") lines.append(tree_name) lines.append("=" * len(tree_name)) if session.ticket_number: lines.append(f"Ticket: {session.ticket_number}") if session.client_name: lines.append(f"Client: {session.client_name}") if options.include_timestamps: lines.append(f"Started: {session.started_at.strftime('%Y-%m-%d %H:%M')}") if session.completed_at: lines.append(f"Completed: {session.completed_at.strftime('%Y-%m-%d %H:%M')}") lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}") if outcome_label: lines.append(f"Outcome: {outcome_label}") lines.append("") # Scratchpad / Evidence section scratchpad = getattr(session, 'scratchpad', '') or '' if scratchpad.strip(): lines.append("EVIDENCE / REFERENCE") lines.append("-" * 20) lines.append(scratchpad) lines.append("") lines.append("TROUBLESHOOTING STEPS") lines.append("-" * 20) decisions = session.decisions if options.max_step_index is not None: decisions = decisions[:options.max_step_index] for i, decision in enumerate(decisions, 1): question = decision.get("question") or decision.get("action_performed", "Step") answer = decision.get("answer", "") notes = decision.get("notes", "") duration_seconds = _get_step_duration_seconds(decision) lines.append(f"\n{i}. {question}") if answer: lines.append(f" Answer: {answer}") if notes: lines.append(f" Notes: {notes}") if command_output := _get_command_output(decision): commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", "")) if commands: lines.append(f" Commands Run: {', '.join(commands)}") lines.append(" Output:") for output_line in command_output.splitlines(): lines.append(f" {output_line}") if duration_seconds is not None: lines.append(f" Duration: {_format_step_duration(duration_seconds)}") # Resolution _raw_notes = getattr(session, 'outcome_notes', None) outcome_notes = _raw_notes if isinstance(_raw_notes, str) else '' if outcome_notes.strip() and options.include_outcome_notes: lines.append("") lines.append("RESOLUTION") lines.append("-" * 20) lines.append(outcome_notes.strip()) # Next Steps _raw_next = getattr(session, 'next_steps', None) next_steps = _raw_next if isinstance(_raw_next, str) else '' if next_steps.strip() and options.include_next_steps: lines.append("") lines.append("NEXT STEPS") lines.append("-" * 20) lines.append(next_steps.strip()) return "\n".join(lines) def generate_html_export(session: Session, options: SessionExport) -> str: """Generate HTML export.""" tree_name = html.escape(session.tree_snapshot.get("name", "Troubleshooting Session")) outcome_label = _get_outcome_label(session) html_parts = ['', '', '
', '', f'Answer: {answer}
') if notes: html_parts.append(f'Notes: {notes}
') if command_output := _get_command_output(decision): commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", "")) if commands: cmd_html = ", ".join(f"{html.escape(c)}" for c in commands)
html_parts.append(f'Commands Run: {cmd_html}
') html_parts.append(f'{html.escape(command_output)}')
if duration_seconds is not None:
html_parts.append(f'Duration: {_format_step_duration(duration_seconds)}
') if options.include_timestamps and decision.get("timestamp"): html_parts.append(f'') html_parts.append('