Files
resolutionflow/backend/app/services/export_service.py
2026-03-20 03:34:54 +00:00

1292 lines
53 KiB
Python

"""
Session export generators for ResolutionFlow.
Provides markdown, plain text, HTML, PDF, and PSA/ticket note export formatters
for troubleshooting sessions.
"""
import html
import os
from datetime import datetime
from pathlib import Path
from typing import Any
from app.models.session import Session
from app.schemas.session import SessionExport
OUTCOME_LABELS = {
"resolved": "Resolved",
"escalated": "Escalated",
"workaround": "Workaround Applied",
"unresolved": "Unresolved",
}
def _format_duration(started_at: datetime, completed_at: datetime | None) -> str:
"""Format duration between two datetimes as human-readable string."""
if not completed_at:
return "In progress"
delta = completed_at - started_at
total_seconds = int(delta.total_seconds())
if total_seconds < 0:
return "0 minutes"
hours, remainder = divmod(total_seconds, 3600)
minutes = remainder // 60
if hours > 0:
return f"{hours}h {minutes}m"
return f"{minutes} minutes"
def _format_step_duration(duration_seconds: int) -> str:
"""Format step duration seconds as compact human-readable text."""
if duration_seconds < 0:
return "0s"
if duration_seconds < 60:
return f"{duration_seconds}s"
hours, remainder = divmod(duration_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
if hours > 0:
if seconds > 0:
return f"{hours}h {minutes}m {seconds}s"
return f"{hours}h {minutes}m"
if seconds > 0:
return f"{minutes}m {seconds}s"
return f"{minutes}m"
def _parse_iso_datetime(value: Any) -> datetime | None:
"""Parse ISO datetime strings from JSONB with support for trailing Z."""
if isinstance(value, datetime):
return value
if not isinstance(value, str) or not value:
return None
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None
def _get_step_duration_seconds(decision: dict[str, Any]) -> int | None:
"""Get step duration from explicit field or entered/exited timestamps."""
explicit_duration = decision.get("duration_seconds")
if isinstance(explicit_duration, (int, float)):
duration = int(explicit_duration)
if duration >= 0:
return duration
entered_at = _parse_iso_datetime(decision.get("entered_at"))
exited_at = _parse_iso_datetime(decision.get("exited_at"))
if entered_at and exited_at:
total_seconds = int((exited_at - entered_at).total_seconds())
if total_seconds >= 0:
return total_seconds
return None
def _get_command_output(decision: dict[str, Any]) -> str | None:
"""Extract and normalize command_output from a decision dict."""
output = decision.get("command_output")
if not isinstance(output, str):
return None
output = output.strip()
return output if output else None
def _truncate_command_output(output: str, max_lines: int = 5, fmt: str = "text") -> str:
"""Truncate command output to max_lines for standard detail level.
Args:
fmt: One of "markdown", "text", "html", "psa" — controls suffix formatting.
"""
lines = output.splitlines()
if len(lines) <= max_lines:
return output
truncated = "\n".join(lines[:max_lines])
count = len(lines)
if fmt == "markdown":
suffix = f"*(full output omitted — {count} lines)*"
elif fmt == "html":
suffix = f"<em>(full output omitted — {count} lines)</em>"
else: # text, psa
suffix = f"(full output omitted — {count} lines)"
return f"{truncated}\n{suffix}"
def _find_node_commands(tree_snapshot: dict[str, Any], node_id: str) -> list[str]:
"""Find the commands list for a node in the tree snapshot."""
def _search(node: dict[str, Any]) -> list[str] | None:
if node.get("id") == node_id:
return node.get("commands") or []
for child in node.get("children", []):
result = _search(child)
if result is not None:
return result
return None
structure = tree_snapshot.get("tree_structure") or tree_snapshot
return _search(structure) or []
def _get_outcome_label(session: Session) -> str | None:
"""Map stored outcome enum to human-friendly label."""
outcome = getattr(session, "outcome", None)
if not outcome:
return None
return OUTCOME_LABELS.get(outcome, str(outcome).replace("_", " ").title())
def _build_summary_fields(session: Session) -> dict[str, str]:
"""Build auto-populated summary fields from session data.
Empty fields are left blank — users fill them in via the editable preview.
"""
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
tree_desc = session.tree_snapshot.get("description", "")
issue = f"{tree_name}: {tree_desc}" if tree_desc else tree_name
if session.completed_at:
status = "Resolved" if getattr(session, "outcome", None) == "resolved" else \
f"Completed — {_get_outcome_label(session) or 'Unknown'}"
else:
step_count = len(session.decisions) if session.decisions else 0
status = f"In Progress — paused at step {step_count}" if step_count else "In Progress"
_raw_notes = getattr(session, 'outcome_notes', None)
resolution = (_raw_notes if isinstance(_raw_notes, str) else '').strip()
_raw_next = getattr(session, 'next_steps', None)
next_steps = (_raw_next if isinstance(_raw_next, str) else '').strip()
return {
"issue": issue,
"impact": "",
"status": status,
"resolution": resolution,
"next_steps": next_steps,
}
def _escape_markdown_table(value: str) -> str:
"""Escape value for use in a markdown table cell."""
return value.replace("|", "\\|").replace("\n", " ")
def generate_markdown_export(session: Session, options: SessionExport, supporting_data: list[dict] | None = None, uploads: list[dict] | None = None) -> str:
"""Generate markdown export."""
if _is_procedural_session(session):
return _generate_procedural_markdown(session, options, uploads=uploads)
lines = []
outcome_label = _get_outcome_label(session)
if options.include_tree_info:
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
lines.append(f"# {tree_name}")
lines.append("")
if session.ticket_number:
lines.append(f"**Ticket:** {session.ticket_number}")
if session.client_name:
lines.append(f"**Client:** {session.client_name}")
if options.include_timestamps:
lines.append(f"**Started:** {session.started_at.strftime('%Y-%m-%d %H:%M')}")
if session.completed_at:
lines.append(f"**Completed:** {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"**Duration:** {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"**Outcome:** {outcome_label}")
lines.append("")
lines.append("---")
lines.append("")
if options.include_summary:
summary = _build_summary_fields(session)
esc = _escape_markdown_table
lines.append("## Summary")
lines.append("")
lines.append("| Field | Details |")
lines.append("|-------|---------|")
lines.append(f"| Issue | {esc(summary['issue'])} |")
lines.append(f"| Impact | {esc(summary['impact'])} |")
lines.append(f"| Status | {esc(summary['status'])} |")
lines.append(f"| Resolution | {esc(summary['resolution'])} |")
lines.append(f"| Next Steps | {esc(summary['next_steps'])} |")
lines.append("")
lines.append("---")
lines.append("")
# Scratchpad / Evidence section
scratchpad = getattr(session, 'scratchpad', '') or ''
if scratchpad.strip():
lines.append("## Evidence / Reference")
lines.append("")
lines.append(scratchpad)
lines.append("")
lines.append("---")
lines.append("")
# File upload evidence
if uploads:
lines.append("## Evidence")
lines.append("")
for upload in uploads:
name = upload["filename"]
url = upload["url"]
if upload.get("is_image"):
lines.append(f"- ![{name}]({url})")
else:
lines.append(f"- [{name}]({url})")
lines.append("")
lines.append("---")
lines.append("")
lines.append("## Troubleshooting Steps")
lines.append("")
decisions = session.decisions
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
question = decision.get("question") or decision.get("action_performed", "Step")
answer = decision.get("answer", "")
notes = decision.get("notes", "")
duration_seconds = _get_step_duration_seconds(decision)
is_custom = decision.get("node_id", "").startswith("custom-")
prefix = "[CUSTOM] " if is_custom else ""
lines.append(f"### Step {i}: {prefix}{question}")
if is_custom:
lines.append("*Custom step added by engineer*")
if answer:
lines.append(f"**Answer:** {answer}")
if notes:
lines.append(f"**Notes:** {notes}")
if command_output := _get_command_output(decision):
if options.detail_level == "standard":
command_output = _truncate_command_output(command_output, fmt="markdown")
commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", ""))
if commands:
lines.append(f"**Commands Run:** {', '.join(f'`{c}`' for c in commands)}")
lines.append("**Output:**")
lines.append("```")
lines.append(command_output)
lines.append("```")
if duration_seconds is not None:
lines.append(f"**Duration:** {_format_step_duration(duration_seconds)}")
if options.include_timestamps and decision.get("timestamp"):
lines.append(f"*{decision['timestamp']}*")
lines.append("")
# Supporting Data
if supporting_data:
lines.append("---")
lines.append("")
lines.append("## Supporting Data")
lines.append("")
for sd in supporting_data:
lines.append(f"### {sd['label']}")
if sd["data_type"] == "text_snippet":
lines.append("```")
lines.append(sd["content"])
lines.append("```")
else:
lines.append(f"[Screenshot: {sd['label']}]")
lines.append("")
# Resolution / Outcome Notes
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append("---")
lines.append("")
lines.append("## Resolution")
lines.append("")
lines.append(outcome_notes.strip())
lines.append("")
# Next Steps
_raw_next = getattr(session, 'next_steps', None)
next_steps = _raw_next if isinstance(_raw_next, str) else ''
if next_steps.strip() and options.include_next_steps:
lines.append("---")
lines.append("")
lines.append("## Next Steps")
lines.append("")
lines.append(next_steps.strip())
lines.append("")
# Branding footer
lines.append("---")
lines.append("Generated with ResolutionFlow — https://resolutionflow.com")
return "\n".join(lines)
def generate_text_export(session: Session, options: SessionExport, supporting_data: list[dict] | None = None, uploads: list[dict] | None = None) -> str:
"""Generate plain text export."""
if _is_procedural_session(session):
return _generate_procedural_text(session, options, uploads=uploads)
lines = []
outcome_label = _get_outcome_label(session)
if options.include_tree_info:
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
lines.append(tree_name)
lines.append("=" * len(tree_name))
if session.ticket_number:
lines.append(f"Ticket: {session.ticket_number}")
if session.client_name:
lines.append(f"Client: {session.client_name}")
if options.include_timestamps:
lines.append(f"Started: {session.started_at.strftime('%Y-%m-%d %H:%M')}")
if session.completed_at:
lines.append(f"Completed: {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
if options.include_summary:
summary = _build_summary_fields(session)
lines.append("SUMMARY")
lines.append("-" * 20)
lines.append(f"Issue: {summary['issue']}")
lines.append(f"Impact: {summary['impact']}")
lines.append(f"Status: {summary['status']}")
lines.append(f"Resolution: {summary['resolution']}")
lines.append(f"Next Steps: {summary['next_steps']}")
lines.append("")
# Scratchpad / Evidence section
scratchpad = getattr(session, 'scratchpad', '') or ''
if scratchpad.strip():
lines.append("EVIDENCE / REFERENCE")
lines.append("-" * 20)
lines.append(scratchpad)
lines.append("")
# File upload evidence
if uploads:
lines.append("--- Evidence ---")
for upload in uploads:
lines.append(f"- {upload['filename']}: {upload['url']}")
lines.append("")
lines.append("TROUBLESHOOTING STEPS")
lines.append("-" * 20)
decisions = session.decisions
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
question = decision.get("question") or decision.get("action_performed", "Step")
answer = decision.get("answer", "")
notes = decision.get("notes", "")
duration_seconds = _get_step_duration_seconds(decision)
is_custom = decision.get("node_id", "").startswith("custom-")
prefix = "[CUSTOM] " if is_custom else ""
lines.append(f"\n{i}. {prefix}{question}")
if answer:
lines.append(f" Answer: {answer}")
if notes:
lines.append(f" Notes: {notes}")
if command_output := _get_command_output(decision):
if options.detail_level == "standard":
command_output = _truncate_command_output(command_output, fmt="text")
commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", ""))
if commands:
lines.append(f" Commands Run: {', '.join(commands)}")
lines.append(" Output:")
for output_line in command_output.splitlines():
lines.append(f" {output_line}")
if duration_seconds is not None:
lines.append(f" Duration: {_format_step_duration(duration_seconds)}")
# Supporting Data
if supporting_data:
lines.append("")
lines.append("SUPPORTING DATA")
lines.append("-" * 20)
for sd in supporting_data:
lines.append(f"\n {sd['label']}:")
if sd["data_type"] == "text_snippet":
for content_line in sd["content"].splitlines():
lines.append(f" {content_line}")
else:
lines.append(f" [Screenshot: {sd['label']}]")
# Resolution
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append("")
lines.append("RESOLUTION")
lines.append("-" * 20)
lines.append(outcome_notes.strip())
# Next Steps
_raw_next = getattr(session, 'next_steps', None)
next_steps = _raw_next if isinstance(_raw_next, str) else ''
if next_steps.strip() and options.include_next_steps:
lines.append("")
lines.append("NEXT STEPS")
lines.append("-" * 20)
lines.append(next_steps.strip())
# Branding footer
lines.append("")
lines.append("---")
lines.append("Generated with ResolutionFlow — https://resolutionflow.com")
return "\n".join(lines)
def generate_html_export(session: Session, options: SessionExport, supporting_data: list[dict] | None = None, uploads: list[dict] | None = None) -> str:
"""Generate HTML export."""
if _is_procedural_session(session):
return _generate_procedural_html(session, options, uploads=uploads)
tree_name = html.escape(session.tree_snapshot.get("name", "Troubleshooting Session"))
outcome_label = _get_outcome_label(session)
html_parts = ['<!DOCTYPE html>', '<html>', '<head>',
'<meta charset="UTF-8">',
f'<title>{tree_name}</title>',
'<style>',
'body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }',
'h1 { color: #333; }',
'.meta { color: #666; margin-bottom: 20px; }',
'.step { margin-bottom: 15px; padding: 10px; background: #f5f5f5; border-radius: 5px; }',
'.step h3 { margin: 0 0 10px 0; color: #444; }',
'.answer { font-weight: bold; }',
'.notes { font-style: italic; color: #555; }',
'.duration { color: #444; margin-top: 6px; }',
'.timestamp { font-size: 0.85em; color: #888; }',
'</style>',
'</head>', '<body>']
if options.include_tree_info:
html_parts.append(f'<h1>{tree_name}</h1>')
html_parts.append('<div class="meta">')
if session.ticket_number:
html_parts.append(f'<p><strong>Ticket:</strong> {html.escape(session.ticket_number)}</p>')
if session.client_name:
html_parts.append(f'<p><strong>Client:</strong> {html.escape(session.client_name)}</p>')
if options.include_timestamps:
html_parts.append(f'<p><strong>Started:</strong> {session.started_at.strftime("%Y-%m-%d %H:%M")}</p>')
if session.completed_at:
html_parts.append(f'<p><strong>Completed:</strong> {session.completed_at.strftime("%Y-%m-%d %H:%M")}</p>')
html_parts.append(f'<p><strong>Duration:</strong> {_format_duration(session.started_at, session.completed_at)}</p>')
if outcome_label:
html_parts.append(f'<p><strong>Outcome:</strong> {html.escape(outcome_label)}</p>')
html_parts.append('</div>')
if options.include_summary:
summary = _build_summary_fields(session)
html_parts.append('<h2>Summary</h2>')
html_parts.append('<table style="width: 100%; border-collapse: collapse; margin-bottom: 20px;">')
for label, value in [("Issue", summary["issue"]), ("Impact", summary["impact"]),
("Status", summary["status"]), ("Resolution", summary["resolution"]),
("Next Steps", summary["next_steps"])]:
html_parts.append(f'<tr><td style="padding: 6px 12px; border: 1px solid #ddd; font-weight: bold; width: 120px;">{html.escape(label)}</td>')
html_parts.append(f'<td style="padding: 6px 12px; border: 1px solid #ddd;">{html.escape(value)}</td></tr>')
html_parts.append('</table>')
# Scratchpad / Evidence section
scratchpad = getattr(session, 'scratchpad', '') or ''
if scratchpad.strip():
html_parts.append('<h2>Evidence / Reference</h2>')
html_parts.append(f'<div class="scratchpad" style="white-space: pre-wrap; background: #f9f9f9; padding: 12px; border-radius: 4px; margin-bottom: 20px;">{html.escape(scratchpad)}</div>')
# File upload evidence
if uploads:
html_parts.append('<h3>Evidence</h3>')
html_parts.append('<div class="evidence-grid" style="margin-bottom: 20px;">')
for upload in uploads:
name = html.escape(upload["filename"])
url = html.escape(upload["url"])
if upload.get("is_image"):
html_parts.append(f'<img src="{url}" alt="{name}" style="max-width: 400px; border-radius: 8px; display: block; margin-bottom: 8px;" />')
else:
html_parts.append(f'<p><a href="{url}">{name}</a></p>')
html_parts.append('</div>')
html_parts.append('<h2>Troubleshooting Steps</h2>')
decisions = session.decisions
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
question = html.escape(decision.get("question") or decision.get("action_performed", "Step"))
answer = html.escape(decision.get("answer", ""))
notes = html.escape(decision.get("notes", ""))
duration_seconds = _get_step_duration_seconds(decision)
html_parts.append('<div class="step">')
is_custom = decision.get("node_id", "").startswith("custom-")
custom_badge = '<span style="background: #7c3aed; color: white; padding: 2px 6px; border-radius: 3px; font-size: 0.75em; margin-right: 6px;">CUSTOM</span>' if is_custom else ''
html_parts.append(f'<h3>{custom_badge}Step {i}: {question}</h3>')
if answer:
html_parts.append(f'<p class="answer">Answer: {answer}</p>')
if notes:
html_parts.append(f'<p class="notes">Notes: {notes}</p>')
if command_output := _get_command_output(decision):
if options.detail_level == "standard":
command_output = _truncate_command_output(command_output, fmt="html")
commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", ""))
if commands:
cmd_html = ", ".join(f"<code>{html.escape(c)}</code>" for c in commands)
html_parts.append(f'<p><strong>Commands Run:</strong> {cmd_html}</p>')
html_parts.append(f'<pre><code>{html.escape(command_output)}</code></pre>')
if duration_seconds is not None:
html_parts.append(f'<p class="duration"><strong>Duration:</strong> {_format_step_duration(duration_seconds)}</p>')
if options.include_timestamps and decision.get("timestamp"):
html_parts.append(f'<p class="timestamp">{html.escape(str(decision["timestamp"]))}</p>')
html_parts.append('</div>')
# Supporting Data
if supporting_data:
html_parts.append('<h2>Supporting Data</h2>')
for sd in supporting_data:
if sd["data_type"] == "text_snippet":
html_parts.append(f'<div class="supporting-item"><h3>{html.escape(sd["label"])}</h3><pre>{html.escape(sd["content"])}</pre></div>')
else:
html_parts.append(f'<div class="supporting-item"><h3>{html.escape(sd["label"])}</h3><img src="data:{html.escape(sd.get("content_type") or "image/png")};base64,{sd["content"]}" alt="{html.escape(sd["label"])}"></div>')
# Resolution
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
html_parts.append('<h2>Resolution</h2>')
html_parts.append(f'<div style="white-space: pre-wrap; margin-bottom: 20px;">{html.escape(outcome_notes.strip())}</div>')
# Next Steps
_raw_next = getattr(session, 'next_steps', None)
next_steps = _raw_next if isinstance(_raw_next, str) else ''
if next_steps.strip() and options.include_next_steps:
html_parts.append('<h2>Next Steps</h2>')
html_parts.append(f'<div style="white-space: pre-wrap; margin-bottom: 20px;">{html.escape(next_steps.strip())}</div>')
# Branding footer
html_parts.append('<hr style="margin-top: 32px; border: none; border-top: 1px solid #ddd;">')
html_parts.append('<p style="margin-top: 12px; font-size: 0.8em; color: #999; text-align: center;">Generated with <a href="https://resolutionflow.com" style="color: #06b6d4; text-decoration: none;">ResolutionFlow</a> — https://resolutionflow.com</p>')
html_parts.extend(['</body>', '</html>'])
return "\n".join(html_parts)
def generate_psa_export(session: Session, options: SessionExport, supporting_data: list[dict] | None = None, uploads: list[dict] | None = None) -> str:
"""Generate PSA/ticket note export optimized for ConnectWise and similar PSA tools."""
if _is_procedural_session(session):
return _generate_procedural_psa(session, options, uploads=uploads)
lines = []
outcome_label = _get_outcome_label(session)
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
tree_description = session.tree_snapshot.get("description", "")
ticket_number = session.ticket_number or "N/A"
client_name = session.client_name or "N/A"
date_str = session.started_at.strftime("%Y-%m-%d %H:%M")
lines.append("=== TROUBLESHOOTING NOTES ===")
lines.append(f"Ticket: {ticket_number} | Client: {client_name}")
lines.append(f"Tree: {tree_name} | Date: {date_str}")
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
if options.include_summary:
summary = _build_summary_fields(session)
lines.append("--- SUMMARY ---")
lines.append(f"Issue: {summary['issue']}")
lines.append(f"Impact: {summary['impact']}")
lines.append(f"Status: {summary['status']}")
lines.append(f"Resolution: {summary['resolution']}")
lines.append(f"Next Steps: {summary['next_steps']}")
lines.append("")
# Problem section
lines.append("--- PROBLEM ---")
lines.append(tree_description if tree_description else "No description provided.")
lines.append("")
# Steps taken
lines.append("--- STEPS TAKEN ---")
decisions = session.decisions
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
if decisions:
for i, decision in enumerate(decisions, 1):
question = decision.get("question") or decision.get("action_performed", "Step")
answer = decision.get("answer", "")
notes = decision.get("notes", "")
duration_seconds = _get_step_duration_seconds(decision)
is_custom = decision.get("node_id", "").startswith("custom-")
prefix = "[CUSTOM] " if is_custom else ""
line = f"{i}. {prefix}{question}"
if answer:
line += f" -> {answer}"
if duration_seconds is not None:
line += f" ({_format_step_duration(duration_seconds)})"
lines.append(line)
if notes:
lines.append(f" Notes: {notes}")
if command_output := _get_command_output(decision):
if options.detail_level == "standard":
command_output = _truncate_command_output(command_output, fmt="psa")
commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", ""))
if commands:
lines.append(f" Commands: {', '.join(commands)}")
lines.append(" Output:")
for output_line in command_output.splitlines():
lines.append(f" {output_line}")
else:
lines.append("No steps recorded.")
lines.append("")
# Supporting Data
if supporting_data:
lines.append("--- SUPPORTING DATA ---")
for sd in supporting_data:
if sd["data_type"] == "text_snippet":
lines.append(f"## {sd['label']}")
lines.append("```")
lines.append(sd["content"])
lines.append("```")
else:
lines.append(f"[Screenshot: {sd['label']}]")
lines.append("")
# Resolution — only for completed sessions
if session.completed_at:
lines.append("--- RESOLUTION ---")
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append(outcome_notes.strip())
elif session.decisions:
last_decision = session.decisions[-1]
resolution = last_decision.get("answer") or last_decision.get("question", "No resolution recorded")
lines.append(resolution)
else:
lines.append("No resolution recorded.")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
# Next Steps
_raw_next = getattr(session, 'next_steps', None)
next_steps = _raw_next if isinstance(_raw_next, str) else ''
if next_steps.strip() and options.include_next_steps:
lines.append("--- NEXT STEPS ---")
lines.append(next_steps.strip())
lines.append("")
# Time spent
lines.append("--- TIME SPENT ---")
duration = _format_duration(session.started_at, session.completed_at)
lines.append(f"Duration: {duration}")
lines.append("")
# Engineer notes (scratchpad)
lines.append("--- ENGINEER NOTES ---")
scratchpad = getattr(session, 'scratchpad', '') or ''
lines.append(scratchpad.strip() if scratchpad.strip() else "None")
# File upload evidence
if uploads:
lines.append("")
lines.append("--- Evidence ---")
for upload in uploads:
lines.append(f"- {upload['filename']} — [{upload['url']}]")
# Branding footer
lines.append("")
lines.append("---")
lines.append("Generated with ResolutionFlow — https://resolutionflow.com")
return "\n".join(lines)
def _is_procedural_session(session: Session) -> bool:
"""Check if session is for a procedural flow."""
return session.tree_snapshot.get("tree_type") == "procedural"
def _get_session_variables(session: Session) -> dict[str, str]:
"""Get session variables (intake form values) from session."""
variables = getattr(session, "session_variables", None)
if isinstance(variables, dict):
return variables
return {}
def _generate_procedural_markdown(session: Session, options: SessionExport, uploads: list[dict] | None = None) -> str:
"""Generate markdown export for procedural sessions."""
lines = []
outcome_label = _get_outcome_label(session)
tree_name = session.tree_snapshot.get("name", "Procedure")
if options.include_tree_info:
lines.append(f"# {tree_name}")
lines.append("")
if session.ticket_number:
lines.append(f"**Ticket:** {session.ticket_number}")
if session.client_name:
lines.append(f"**Client:** {session.client_name}")
if options.include_timestamps:
lines.append(f"**Started:** {session.started_at.strftime('%Y-%m-%d %H:%M')}")
if session.completed_at:
lines.append(f"**Completed:** {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"**Duration:** {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"**Outcome:** {outcome_label}")
lines.append("")
lines.append("---")
lines.append("")
# Project Parameters
variables = _get_session_variables(session)
if variables:
lines.append("## Project Parameters")
lines.append("")
lines.append("| Parameter | Value |")
lines.append("|-----------|-------|")
for key, value in variables.items():
lines.append(f"| `{_escape_markdown_table(key)}` | {_escape_markdown_table(value)} |")
lines.append("")
lines.append("---")
lines.append("")
# Steps
lines.append("## Procedure Steps")
lines.append("")
decisions = session.decisions or []
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
title = decision.get("question") or decision.get("action_performed", "Step")
notes = decision.get("notes", "")
verification = _get_command_output(decision)
completed = decision.get("answer") == "completed"
checkbox = "[x]" if completed else "[ ]"
lines.append(f"- {checkbox} **Step {i}: {title}**")
if notes:
lines.append(f" - Notes: {notes}")
if verification:
lines.append(f" - Verification: {verification}")
duration_seconds = _get_step_duration_seconds(decision)
if duration_seconds is not None:
lines.append(f" - Duration: {_format_step_duration(duration_seconds)}")
lines.append("")
# Resolution
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append("---")
lines.append("")
lines.append("## Notes")
lines.append("")
lines.append(outcome_notes.strip())
lines.append("")
# File upload evidence
if uploads:
lines.append("---")
lines.append("")
lines.append("## Evidence")
lines.append("")
for upload in uploads:
name = upload["filename"]
url = upload["url"]
if upload.get("is_image"):
lines.append(f"- ![{name}]({url})")
else:
lines.append(f"- [{name}]({url})")
lines.append("")
# Branding footer
lines.append("---")
lines.append("Generated with ResolutionFlow — https://resolutionflow.com")
return "\n".join(lines)
def _generate_procedural_text(session: Session, options: SessionExport, uploads: list[dict] | None = None) -> str:
"""Generate plain text export for procedural sessions."""
lines = []
outcome_label = _get_outcome_label(session)
tree_name = session.tree_snapshot.get("name", "Procedure")
if options.include_tree_info:
lines.append(tree_name)
lines.append("=" * len(tree_name))
if session.ticket_number:
lines.append(f"Ticket: {session.ticket_number}")
if session.client_name:
lines.append(f"Client: {session.client_name}")
if options.include_timestamps:
lines.append(f"Started: {session.started_at.strftime('%Y-%m-%d %H:%M')}")
if session.completed_at:
lines.append(f"Completed: {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
# Project Parameters
variables = _get_session_variables(session)
if variables:
lines.append("PROJECT PARAMETERS")
lines.append("-" * 20)
for key, value in variables.items():
lines.append(f" {key}: {value}")
lines.append("")
lines.append("PROCEDURE STEPS")
lines.append("-" * 20)
decisions = session.decisions or []
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
title = decision.get("question") or decision.get("action_performed", "Step")
notes = decision.get("notes", "")
verification = _get_command_output(decision)
completed = decision.get("answer") == "completed"
marker = "[DONE]" if completed else "[ ]"
lines.append(f"\n{marker} {i}. {title}")
if notes:
lines.append(f" Notes: {notes}")
if verification:
lines.append(f" Verification: {verification}")
duration_seconds = _get_step_duration_seconds(decision)
if duration_seconds is not None:
lines.append(f" Duration: {_format_step_duration(duration_seconds)}")
lines.append("")
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append("NOTES")
lines.append("-" * 20)
lines.append(outcome_notes.strip())
# File upload evidence
if uploads:
lines.append("")
lines.append("--- Evidence ---")
for upload in uploads:
lines.append(f"- {upload['filename']}: {upload['url']}")
# Branding footer
lines.append("")
lines.append("---")
lines.append("Generated with ResolutionFlow — https://resolutionflow.com")
return "\n".join(lines)
def _generate_procedural_html(session: Session, options: SessionExport, uploads: list[dict] | None = None) -> str:
"""Generate HTML export for procedural sessions."""
tree_name = html.escape(session.tree_snapshot.get("name", "Procedure"))
outcome_label = _get_outcome_label(session)
html_parts = ['<!DOCTYPE html>', '<html>', '<head>',
'<meta charset="UTF-8">',
f'<title>{tree_name}</title>',
'<style>',
'body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }',
'h1 { color: #333; }',
'.meta { color: #666; margin-bottom: 20px; }',
'.params { margin-bottom: 20px; }',
'.params table { width: 100%; border-collapse: collapse; }',
'.params td { padding: 6px 12px; border: 1px solid #ddd; }',
'.params td:first-child { font-family: monospace; font-weight: bold; width: 200px; background: #f9f9f9; }',
'.step { margin-bottom: 10px; padding: 10px 15px; background: #f5f5f5; border-radius: 5px; border-left: 4px solid #ccc; }',
'.step.done { border-left-color: #22c55e; }',
'.step h3 { margin: 0 0 5px 0; color: #444; }',
'.notes { font-style: italic; color: #555; font-size: 0.9em; }',
'</style>',
'</head>', '<body>']
if options.include_tree_info:
html_parts.append(f'<h1>{tree_name}</h1>')
html_parts.append('<div class="meta">')
if session.ticket_number:
html_parts.append(f'<p><strong>Ticket:</strong> {html.escape(session.ticket_number)}</p>')
if session.client_name:
html_parts.append(f'<p><strong>Client:</strong> {html.escape(session.client_name)}</p>')
if options.include_timestamps:
html_parts.append(f'<p><strong>Started:</strong> {session.started_at.strftime("%Y-%m-%d %H:%M")}</p>')
if session.completed_at:
html_parts.append(f'<p><strong>Completed:</strong> {session.completed_at.strftime("%Y-%m-%d %H:%M")}</p>')
html_parts.append(f'<p><strong>Duration:</strong> {_format_duration(session.started_at, session.completed_at)}</p>')
if outcome_label:
html_parts.append(f'<p><strong>Outcome:</strong> {html.escape(outcome_label)}</p>')
html_parts.append('</div>')
# Project Parameters
variables = _get_session_variables(session)
if variables:
html_parts.append('<div class="params">')
html_parts.append('<h2>Project Parameters</h2>')
html_parts.append('<table>')
for key, value in variables.items():
html_parts.append(f'<tr><td>{html.escape(key)}</td><td>{html.escape(value)}</td></tr>')
html_parts.append('</table>')
html_parts.append('</div>')
html_parts.append('<h2>Procedure Steps</h2>')
decisions = session.decisions or []
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
title = html.escape(decision.get("question") or decision.get("action_performed", "Step"))
notes = html.escape(decision.get("notes", ""))
verification = _get_command_output(decision)
completed = decision.get("answer") == "completed"
css_class = "step done" if completed else "step"
marker = "&#x2705;" if completed else "&#x2B1C;"
html_parts.append(f'<div class="{css_class}">')
html_parts.append(f'<h3>{marker} Step {i}: {title}</h3>')
if notes:
html_parts.append(f'<p class="notes">Notes: {notes}</p>')
if verification:
html_parts.append(f'<p>Verification: {html.escape(verification)}</p>')
duration_seconds = _get_step_duration_seconds(decision)
if duration_seconds is not None:
html_parts.append(f'<p style="color: #888; font-size: 0.85em;">Duration: {_format_step_duration(duration_seconds)}</p>')
html_parts.append('</div>')
# Resolution
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
html_parts.append('<h2>Notes</h2>')
html_parts.append(f'<div style="white-space: pre-wrap;">{html.escape(outcome_notes.strip())}</div>')
# File upload evidence
if uploads:
html_parts.append('<h3>Evidence</h3>')
html_parts.append('<div class="evidence-grid" style="margin-bottom: 20px;">')
for upload in uploads:
name = html.escape(upload["filename"])
url = html.escape(upload["url"])
if upload.get("is_image"):
html_parts.append(f'<img src="{url}" alt="{name}" style="max-width: 400px; border-radius: 8px; display: block; margin-bottom: 8px;" />')
else:
html_parts.append(f'<p><a href="{url}">{name}</a></p>')
html_parts.append('</div>')
# Branding footer
html_parts.append('<hr style="margin-top: 32px; border: none; border-top: 1px solid #ddd;">')
html_parts.append('<p style="margin-top: 12px; font-size: 0.8em; color: #999; text-align: center;">Generated with <a href="https://resolutionflow.com" style="color: #06b6d4; text-decoration: none;">ResolutionFlow</a> — https://resolutionflow.com</p>')
html_parts.extend(['</body>', '</html>'])
return "\n".join(html_parts)
def _generate_procedural_psa(session: Session, options: SessionExport, uploads: list[dict] | None = None) -> str:
"""Generate PSA/ticket export for procedural sessions."""
lines = []
outcome_label = _get_outcome_label(session)
tree_name = session.tree_snapshot.get("name", "Procedure")
ticket_number = session.ticket_number or "N/A"
client_name = session.client_name or "N/A"
date_str = session.started_at.strftime("%Y-%m-%d %H:%M")
lines.append("=== PROCEDURE NOTES ===")
lines.append(f"Ticket: {ticket_number} | Client: {client_name}")
lines.append(f"Procedure: {tree_name} | Date: {date_str}")
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
# Project Parameters
variables = _get_session_variables(session)
if variables:
lines.append("--- PROJECT PARAMETERS ---")
for key, value in variables.items():
lines.append(f" {key}: {value}")
lines.append("")
# Steps
lines.append("--- STEPS COMPLETED ---")
decisions = session.decisions or []
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
if decisions:
for i, decision in enumerate(decisions, 1):
title = decision.get("question") or decision.get("action_performed", "Step")
notes = decision.get("notes", "")
completed = decision.get("answer") == "completed"
marker = "[DONE]" if completed else "[ ]"
duration_seconds = _get_step_duration_seconds(decision)
line = f"{marker} {i}. {title}"
if duration_seconds is not None:
line += f" ({_format_step_duration(duration_seconds)})"
lines.append(line)
if notes:
lines.append(f" Notes: {notes}")
else:
lines.append("No steps recorded.")
lines.append("")
# Resolution
if session.completed_at:
lines.append("--- RESOLUTION ---")
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append(outcome_notes.strip())
else:
lines.append("Procedure completed successfully.")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
# Time spent
lines.append("--- TIME SPENT ---")
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
# File upload evidence
if uploads:
lines.append("")
lines.append("--- Evidence ---")
for upload in uploads:
lines.append(f"- {upload['filename']} — [{upload['url']}]")
# Branding footer
lines.append("")
lines.append("---")
lines.append("Generated with ResolutionFlow — https://resolutionflow.com")
return "\n".join(lines)
async def generate_pdf_export(session: Session, options: SessionExport, db) -> bytes:
"""Generate PDF export using WeasyPrint and a Jinja2 HTML template.
Args:
session: The session to export.
options: Export options (redaction_mode, max_step_index, etc.).
db: Async database session for loading supporting data and branding.
Returns:
PDF file contents as bytes.
"""
from jinja2 import Environment, FileSystemLoader
import weasyprint
from sqlalchemy import select as sa_select
# Load Jinja2 template
template_dir = Path(__file__).resolve().parent.parent / "templates"
env = Environment(loader=FileSystemLoader(str(template_dir)), autoescape=True)
template = env.get_template("export_pdf.html")
# Tree snapshot data
tree_snapshot = session.tree_snapshot or {}
flow_title = tree_snapshot.get("name", "Session Export")
tree_type = tree_snapshot.get("tree_type", "troubleshooting")
is_procedural = tree_type == "procedural"
report_type = "Procedure Report" if is_procedural else "Troubleshooting Report"
# Branding — check team first, then user (solo pros)
logo_data = None
logo_content_type = None
company_name = None
from app.models.user import User
user_result = await db.execute(
sa_select(User).where(User.id == session.user_id)
)
user = user_result.scalar_one_or_none()
engineer_name = user.name if user else "Unknown"
if user and user.team_id:
from app.models.team import Team
team_result = await db.execute(
sa_select(Team).where(Team.id == user.team_id)
)
team = team_result.scalar_one_or_none()
if team:
logo_data = team.logo_data
logo_content_type = team.logo_content_type
company_name = team.company_display_name or team.name
elif user:
logo_data = user.logo_data
logo_content_type = user.logo_content_type
company_name = user.company_display_name
has_custom_logo = bool(logo_data)
# Build steps list from decisions
decisions = session.decisions or []
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
steps = []
for decision in decisions:
title = decision.get("question") or decision.get("action_performed", "Step")
answer = decision.get("answer", "")
notes = decision.get("notes", "")
duration_seconds = _get_step_duration_seconds(decision)
duration_str = _format_step_duration(duration_seconds) if duration_seconds is not None else None
if is_procedural:
completed = answer == "completed"
decision_text = "Completed" if completed else ("Skipped" if answer else "")
else:
decision_text = answer
steps.append({
"title": title,
"decision": decision_text,
"notes": notes,
"duration": duration_str,
})
# Query supporting data
from app.models.supporting_data import SessionSupportingData
sd_result = await db.execute(
sa_select(SessionSupportingData)
.where(SessionSupportingData.session_id == session.id)
.order_by(SessionSupportingData.sort_order)
)
supporting_data_rows = sd_result.scalars().all()
supporting_data = [
{
"label": sd.label,
"data_type": sd.data_type,
"content": sd.content,
"content_type": sd.content_type,
}
for sd in supporting_data_rows
]
# Query file upload evidence
from app.models.file_upload import FileUpload
from app.services import storage_service
from app.core.config import settings as _settings
uploads_for_export: list[dict] = []
if _settings.STORAGE_ENDPOINT:
try:
uploads_result = await db.execute(
sa_select(FileUpload)
.where(FileUpload.session_id == session.id)
.order_by(FileUpload.created_at)
)
upload_rows = uploads_result.scalars().all()
for u in upload_rows:
try:
url = storage_service.get_presigned_url(u.storage_key)
uploads_for_export.append({
"filename": u.filename,
"url": url,
"is_image": u.content_type.startswith("image/"),
})
except Exception:
pass # Skip individual uploads that fail URL generation
except Exception:
pass # Storage errors should not fail the export
# Calculate duration and format outcome
duration = _format_duration(session.started_at, session.completed_at)
session_date = session.started_at.strftime("%Y-%m-%d %H:%M")
outcome_label = _get_outcome_label(session) or ("In Progress" if not session.completed_at else "Completed")
outcome_raw = getattr(session, "outcome", None) or ""
outcome_class = f"outcome-{outcome_raw}" if outcome_raw else ""
# Build summary text
summary_text = ""
if options.include_summary:
summary_fields = _build_summary_fields(session)
parts = []
for label, value in summary_fields.items():
if value:
parts.append(f"{label.replace('_', ' ').title()}: {value}")
summary_text = "\n".join(parts)
# Resolution / outcome notes as summary fallback
if not summary_text:
_raw_notes = getattr(session, "outcome_notes", None)
if isinstance(_raw_notes, str) and _raw_notes.strip():
summary_text = _raw_notes.strip()
generated_at = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")
# Variable resolution
session_vars = getattr(session, "session_variables", None) or {}
if session_vars:
from app.services.variable_service import resolve_variables
flow_title = resolve_variables(flow_title, session_vars)
summary_text = resolve_variables(summary_text, session_vars)
for step in steps:
step["title"] = resolve_variables(step["title"], session_vars)
if step["decision"]:
step["decision"] = resolve_variables(step["decision"], session_vars)
if step["notes"]:
step["notes"] = resolve_variables(step["notes"], session_vars)
for sd in supporting_data:
if sd["data_type"] == "text_snippet":
sd["content"] = resolve_variables(sd["content"], session_vars)
# Apply redaction
if options.redaction_mode == "mask":
from app.services.redaction_service import apply_redaction_to_text
try:
flow_title, _ = apply_redaction_to_text(flow_title)
summary_text, _ = apply_redaction_to_text(summary_text)
for step in steps:
step["title"], _ = apply_redaction_to_text(step["title"])
if step["decision"]:
step["decision"], _ = apply_redaction_to_text(step["decision"])
if step["notes"]:
step["notes"], _ = apply_redaction_to_text(step["notes"])
for sd in supporting_data:
if sd["data_type"] == "text_snippet":
sd["content"], _ = apply_redaction_to_text(sd["content"])
except Exception:
pass # Redaction is best-effort for PDF
# Render HTML
html_content = template.render(
report_type=report_type,
flow_title=flow_title,
logo_data=logo_data,
logo_content_type=logo_content_type or "image/png",
has_custom_logo=has_custom_logo,
company_name=company_name,
engineer_name=engineer_name,
client_name=session.client_name,
ticket_number=session.ticket_number,
session_date=session_date,
duration=duration,
outcome_class=outcome_class,
outcome_display=outcome_label,
summary=summary_text,
steps=steps,
supporting_data=supporting_data,
uploads=uploads_for_export,
generated_at=generated_at,
)
# Convert to PDF
pdf_bytes = weasyprint.HTML(string=html_content).write_pdf()
return pdf_bytes