1292 lines
53 KiB
Python
1292 lines
53 KiB
Python
"""
|
|
Session export generators for ResolutionFlow.
|
|
|
|
Provides markdown, plain text, HTML, PDF, and PSA/ticket note export formatters
|
|
for troubleshooting sessions.
|
|
"""
|
|
import html
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from app.models.session import Session
|
|
from app.schemas.session import SessionExport
|
|
|
|
|
|
OUTCOME_LABELS = {
|
|
"resolved": "Resolved",
|
|
"escalated": "Escalated",
|
|
"workaround": "Workaround Applied",
|
|
"unresolved": "Unresolved",
|
|
}
|
|
|
|
|
|
def _format_duration(started_at: datetime, completed_at: datetime | None) -> str:
|
|
"""Format duration between two datetimes as human-readable string."""
|
|
if not completed_at:
|
|
return "In progress"
|
|
delta = completed_at - started_at
|
|
total_seconds = int(delta.total_seconds())
|
|
if total_seconds < 0:
|
|
return "0 minutes"
|
|
hours, remainder = divmod(total_seconds, 3600)
|
|
minutes = remainder // 60
|
|
if hours > 0:
|
|
return f"{hours}h {minutes}m"
|
|
return f"{minutes} minutes"
|
|
|
|
|
|
def _format_step_duration(duration_seconds: int) -> str:
|
|
"""Format step duration seconds as compact human-readable text."""
|
|
if duration_seconds < 0:
|
|
return "0s"
|
|
if duration_seconds < 60:
|
|
return f"{duration_seconds}s"
|
|
hours, remainder = divmod(duration_seconds, 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
if hours > 0:
|
|
if seconds > 0:
|
|
return f"{hours}h {minutes}m {seconds}s"
|
|
return f"{hours}h {minutes}m"
|
|
if seconds > 0:
|
|
return f"{minutes}m {seconds}s"
|
|
return f"{minutes}m"
|
|
|
|
|
|
def _parse_iso_datetime(value: Any) -> datetime | None:
|
|
"""Parse ISO datetime strings from JSONB with support for trailing Z."""
|
|
if isinstance(value, datetime):
|
|
return value
|
|
if not isinstance(value, str) or not value:
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(value.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _get_step_duration_seconds(decision: dict[str, Any]) -> int | None:
|
|
"""Get step duration from explicit field or entered/exited timestamps."""
|
|
explicit_duration = decision.get("duration_seconds")
|
|
if isinstance(explicit_duration, (int, float)):
|
|
duration = int(explicit_duration)
|
|
if duration >= 0:
|
|
return duration
|
|
|
|
entered_at = _parse_iso_datetime(decision.get("entered_at"))
|
|
exited_at = _parse_iso_datetime(decision.get("exited_at"))
|
|
if entered_at and exited_at:
|
|
total_seconds = int((exited_at - entered_at).total_seconds())
|
|
if total_seconds >= 0:
|
|
return total_seconds
|
|
return None
|
|
|
|
|
|
def _get_command_output(decision: dict[str, Any]) -> str | None:
|
|
"""Extract and normalize command_output from a decision dict."""
|
|
output = decision.get("command_output")
|
|
if not isinstance(output, str):
|
|
return None
|
|
output = output.strip()
|
|
return output if output else None
|
|
|
|
|
|
def _truncate_command_output(output: str, max_lines: int = 5, fmt: str = "text") -> str:
|
|
"""Truncate command output to max_lines for standard detail level.
|
|
|
|
Args:
|
|
fmt: One of "markdown", "text", "html", "psa" — controls suffix formatting.
|
|
"""
|
|
lines = output.splitlines()
|
|
if len(lines) <= max_lines:
|
|
return output
|
|
truncated = "\n".join(lines[:max_lines])
|
|
count = len(lines)
|
|
if fmt == "markdown":
|
|
suffix = f"*(full output omitted — {count} lines)*"
|
|
elif fmt == "html":
|
|
suffix = f"<em>(full output omitted — {count} lines)</em>"
|
|
else: # text, psa
|
|
suffix = f"(full output omitted — {count} lines)"
|
|
return f"{truncated}\n{suffix}"
|
|
|
|
|
|
def _find_node_commands(tree_snapshot: dict[str, Any], node_id: str) -> list[str]:
|
|
"""Find the commands list for a node in the tree snapshot."""
|
|
def _search(node: dict[str, Any]) -> list[str] | None:
|
|
if node.get("id") == node_id:
|
|
return node.get("commands") or []
|
|
for child in node.get("children", []):
|
|
result = _search(child)
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
structure = tree_snapshot.get("tree_structure") or tree_snapshot
|
|
return _search(structure) or []
|
|
|
|
|
|
def _get_outcome_label(session: Session) -> str | None:
|
|
"""Map stored outcome enum to human-friendly label."""
|
|
outcome = getattr(session, "outcome", None)
|
|
if not outcome:
|
|
return None
|
|
return OUTCOME_LABELS.get(outcome, str(outcome).replace("_", " ").title())
|
|
|
|
|
|
def _build_summary_fields(session: Session) -> dict[str, str]:
|
|
"""Build auto-populated summary fields from session data.
|
|
|
|
Empty fields are left blank — users fill them in via the editable preview.
|
|
"""
|
|
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
|
|
tree_desc = session.tree_snapshot.get("description", "")
|
|
issue = f"{tree_name}: {tree_desc}" if tree_desc else tree_name
|
|
|
|
if session.completed_at:
|
|
status = "Resolved" if getattr(session, "outcome", None) == "resolved" else \
|
|
f"Completed — {_get_outcome_label(session) or 'Unknown'}"
|
|
else:
|
|
step_count = len(session.decisions) if session.decisions else 0
|
|
status = f"In Progress — paused at step {step_count}" if step_count else "In Progress"
|
|
|
|
_raw_notes = getattr(session, 'outcome_notes', None)
|
|
resolution = (_raw_notes if isinstance(_raw_notes, str) else '').strip()
|
|
|
|
_raw_next = getattr(session, 'next_steps', None)
|
|
next_steps = (_raw_next if isinstance(_raw_next, str) else '').strip()
|
|
|
|
return {
|
|
"issue": issue,
|
|
"impact": "",
|
|
"status": status,
|
|
"resolution": resolution,
|
|
"next_steps": next_steps,
|
|
}
|
|
|
|
|
|
def _escape_markdown_table(value: str) -> str:
|
|
"""Escape value for use in a markdown table cell."""
|
|
return value.replace("|", "\\|").replace("\n", " ")
|
|
|
|
|
|
def generate_markdown_export(session: Session, options: SessionExport, supporting_data: list[dict] | None = None, uploads: list[dict] | None = None) -> str:
|
|
"""Generate markdown export."""
|
|
if _is_procedural_session(session):
|
|
return _generate_procedural_markdown(session, options, uploads=uploads)
|
|
lines = []
|
|
outcome_label = _get_outcome_label(session)
|
|
|
|
if options.include_tree_info:
|
|
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
|
|
lines.append(f"# {tree_name}")
|
|
lines.append("")
|
|
if session.ticket_number:
|
|
lines.append(f"**Ticket:** {session.ticket_number}")
|
|
if session.client_name:
|
|
lines.append(f"**Client:** {session.client_name}")
|
|
if options.include_timestamps:
|
|
lines.append(f"**Started:** {session.started_at.strftime('%Y-%m-%d %H:%M')}")
|
|
if session.completed_at:
|
|
lines.append(f"**Completed:** {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
|
|
lines.append(f"**Duration:** {_format_duration(session.started_at, session.completed_at)}")
|
|
if outcome_label:
|
|
lines.append(f"**Outcome:** {outcome_label}")
|
|
lines.append("")
|
|
lines.append("---")
|
|
lines.append("")
|
|
|
|
if options.include_summary:
|
|
summary = _build_summary_fields(session)
|
|
esc = _escape_markdown_table
|
|
lines.append("## Summary")
|
|
lines.append("")
|
|
lines.append("| Field | Details |")
|
|
lines.append("|-------|---------|")
|
|
lines.append(f"| Issue | {esc(summary['issue'])} |")
|
|
lines.append(f"| Impact | {esc(summary['impact'])} |")
|
|
lines.append(f"| Status | {esc(summary['status'])} |")
|
|
lines.append(f"| Resolution | {esc(summary['resolution'])} |")
|
|
lines.append(f"| Next Steps | {esc(summary['next_steps'])} |")
|
|
lines.append("")
|
|
lines.append("---")
|
|
lines.append("")
|
|
|
|
# Scratchpad / Evidence section
|
|
scratchpad = getattr(session, 'scratchpad', '') or ''
|
|
if scratchpad.strip():
|
|
lines.append("## Evidence / Reference")
|
|
lines.append("")
|
|
lines.append(scratchpad)
|
|
lines.append("")
|
|
lines.append("---")
|
|
lines.append("")
|
|
|
|
# File upload evidence
|
|
if uploads:
|
|
lines.append("## Evidence")
|
|
lines.append("")
|
|
for upload in uploads:
|
|
name = upload["filename"]
|
|
url = upload["url"]
|
|
if upload.get("is_image"):
|
|
lines.append(f"- ")
|
|
else:
|
|
lines.append(f"- [{name}]({url})")
|
|
lines.append("")
|
|
lines.append("---")
|
|
lines.append("")
|
|
|
|
lines.append("## Troubleshooting Steps")
|
|
lines.append("")
|
|
|
|
decisions = session.decisions
|
|
if options.max_step_index is not None:
|
|
decisions = decisions[:options.max_step_index]
|
|
|
|
for i, decision in enumerate(decisions, 1):
|
|
question = decision.get("question") or decision.get("action_performed", "Step")
|
|
answer = decision.get("answer", "")
|
|
notes = decision.get("notes", "")
|
|
duration_seconds = _get_step_duration_seconds(decision)
|
|
|
|
is_custom = decision.get("node_id", "").startswith("custom-")
|
|
prefix = "[CUSTOM] " if is_custom else ""
|
|
lines.append(f"### Step {i}: {prefix}{question}")
|
|
if is_custom:
|
|
lines.append("*Custom step added by engineer*")
|
|
if answer:
|
|
lines.append(f"**Answer:** {answer}")
|
|
if notes:
|
|
lines.append(f"**Notes:** {notes}")
|
|
if command_output := _get_command_output(decision):
|
|
if options.detail_level == "standard":
|
|
command_output = _truncate_command_output(command_output, fmt="markdown")
|
|
commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", ""))
|
|
if commands:
|
|
lines.append(f"**Commands Run:** {', '.join(f'`{c}`' for c in commands)}")
|
|
lines.append("**Output:**")
|
|
lines.append("```")
|
|
lines.append(command_output)
|
|
lines.append("```")
|
|
if duration_seconds is not None:
|
|
lines.append(f"**Duration:** {_format_step_duration(duration_seconds)}")
|
|
if options.include_timestamps and decision.get("timestamp"):
|
|
lines.append(f"*{decision['timestamp']}*")
|
|
lines.append("")
|
|
|
|
# Supporting Data
|
|
if supporting_data:
|
|
lines.append("---")
|
|
lines.append("")
|
|
lines.append("## Supporting Data")
|
|
lines.append("")
|
|
for sd in supporting_data:
|
|
lines.append(f"### {sd['label']}")
|
|
if sd["data_type"] == "text_snippet":
|
|
lines.append("```")
|
|
lines.append(sd["content"])
|
|
lines.append("```")
|
|
else:
|
|
lines.append(f"[Screenshot: {sd['label']}]")
|
|
lines.append("")
|
|
|
|
# Resolution / Outcome Notes
|
|
_raw_notes = getattr(session, 'outcome_notes', None)
|
|
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
|
|
if outcome_notes.strip() and options.include_outcome_notes:
|
|
lines.append("---")
|
|
lines.append("")
|
|
lines.append("## Resolution")
|
|
lines.append("")
|
|
lines.append(outcome_notes.strip())
|
|
lines.append("")
|
|
|
|
# Next Steps
|
|
_raw_next = getattr(session, 'next_steps', None)
|
|
next_steps = _raw_next if isinstance(_raw_next, str) else ''
|
|
if next_steps.strip() and options.include_next_steps:
|
|
lines.append("---")
|
|
lines.append("")
|
|
lines.append("## Next Steps")
|
|
lines.append("")
|
|
lines.append(next_steps.strip())
|
|
lines.append("")
|
|
|
|
# Branding footer
|
|
lines.append("---")
|
|
lines.append("Generated with ResolutionFlow — https://resolutionflow.com")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def generate_text_export(session: Session, options: SessionExport, supporting_data: list[dict] | None = None, uploads: list[dict] | None = None) -> str:
|
|
"""Generate plain text export."""
|
|
if _is_procedural_session(session):
|
|
return _generate_procedural_text(session, options, uploads=uploads)
|
|
lines = []
|
|
outcome_label = _get_outcome_label(session)
|
|
|
|
if options.include_tree_info:
|
|
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
|
|
lines.append(tree_name)
|
|
lines.append("=" * len(tree_name))
|
|
if session.ticket_number:
|
|
lines.append(f"Ticket: {session.ticket_number}")
|
|
if session.client_name:
|
|
lines.append(f"Client: {session.client_name}")
|
|
if options.include_timestamps:
|
|
lines.append(f"Started: {session.started_at.strftime('%Y-%m-%d %H:%M')}")
|
|
if session.completed_at:
|
|
lines.append(f"Completed: {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
|
|
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
|
|
if outcome_label:
|
|
lines.append(f"Outcome: {outcome_label}")
|
|
lines.append("")
|
|
|
|
if options.include_summary:
|
|
summary = _build_summary_fields(session)
|
|
lines.append("SUMMARY")
|
|
lines.append("-" * 20)
|
|
lines.append(f"Issue: {summary['issue']}")
|
|
lines.append(f"Impact: {summary['impact']}")
|
|
lines.append(f"Status: {summary['status']}")
|
|
lines.append(f"Resolution: {summary['resolution']}")
|
|
lines.append(f"Next Steps: {summary['next_steps']}")
|
|
lines.append("")
|
|
|
|
# Scratchpad / Evidence section
|
|
scratchpad = getattr(session, 'scratchpad', '') or ''
|
|
if scratchpad.strip():
|
|
lines.append("EVIDENCE / REFERENCE")
|
|
lines.append("-" * 20)
|
|
lines.append(scratchpad)
|
|
lines.append("")
|
|
|
|
# File upload evidence
|
|
if uploads:
|
|
lines.append("--- Evidence ---")
|
|
for upload in uploads:
|
|
lines.append(f"- {upload['filename']}: {upload['url']}")
|
|
lines.append("")
|
|
|
|
lines.append("TROUBLESHOOTING STEPS")
|
|
lines.append("-" * 20)
|
|
|
|
decisions = session.decisions
|
|
if options.max_step_index is not None:
|
|
decisions = decisions[:options.max_step_index]
|
|
|
|
for i, decision in enumerate(decisions, 1):
|
|
question = decision.get("question") or decision.get("action_performed", "Step")
|
|
answer = decision.get("answer", "")
|
|
notes = decision.get("notes", "")
|
|
duration_seconds = _get_step_duration_seconds(decision)
|
|
|
|
is_custom = decision.get("node_id", "").startswith("custom-")
|
|
prefix = "[CUSTOM] " if is_custom else ""
|
|
lines.append(f"\n{i}. {prefix}{question}")
|
|
if answer:
|
|
lines.append(f" Answer: {answer}")
|
|
if notes:
|
|
lines.append(f" Notes: {notes}")
|
|
if command_output := _get_command_output(decision):
|
|
if options.detail_level == "standard":
|
|
command_output = _truncate_command_output(command_output, fmt="text")
|
|
commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", ""))
|
|
if commands:
|
|
lines.append(f" Commands Run: {', '.join(commands)}")
|
|
lines.append(" Output:")
|
|
for output_line in command_output.splitlines():
|
|
lines.append(f" {output_line}")
|
|
if duration_seconds is not None:
|
|
lines.append(f" Duration: {_format_step_duration(duration_seconds)}")
|
|
|
|
# Supporting Data
|
|
if supporting_data:
|
|
lines.append("")
|
|
lines.append("SUPPORTING DATA")
|
|
lines.append("-" * 20)
|
|
for sd in supporting_data:
|
|
lines.append(f"\n {sd['label']}:")
|
|
if sd["data_type"] == "text_snippet":
|
|
for content_line in sd["content"].splitlines():
|
|
lines.append(f" {content_line}")
|
|
else:
|
|
lines.append(f" [Screenshot: {sd['label']}]")
|
|
|
|
# Resolution
|
|
_raw_notes = getattr(session, 'outcome_notes', None)
|
|
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
|
|
if outcome_notes.strip() and options.include_outcome_notes:
|
|
lines.append("")
|
|
lines.append("RESOLUTION")
|
|
lines.append("-" * 20)
|
|
lines.append(outcome_notes.strip())
|
|
|
|
# Next Steps
|
|
_raw_next = getattr(session, 'next_steps', None)
|
|
next_steps = _raw_next if isinstance(_raw_next, str) else ''
|
|
if next_steps.strip() and options.include_next_steps:
|
|
lines.append("")
|
|
lines.append("NEXT STEPS")
|
|
lines.append("-" * 20)
|
|
lines.append(next_steps.strip())
|
|
|
|
# Branding footer
|
|
lines.append("")
|
|
lines.append("---")
|
|
lines.append("Generated with ResolutionFlow — https://resolutionflow.com")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def generate_html_export(session: Session, options: SessionExport, supporting_data: list[dict] | None = None, uploads: list[dict] | None = None) -> str:
|
|
"""Generate HTML export."""
|
|
if _is_procedural_session(session):
|
|
return _generate_procedural_html(session, options, uploads=uploads)
|
|
tree_name = html.escape(session.tree_snapshot.get("name", "Troubleshooting Session"))
|
|
outcome_label = _get_outcome_label(session)
|
|
|
|
html_parts = ['<!DOCTYPE html>', '<html>', '<head>',
|
|
'<meta charset="UTF-8">',
|
|
f'<title>{tree_name}</title>',
|
|
'<style>',
|
|
'body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }',
|
|
'h1 { color: #333; }',
|
|
'.meta { color: #666; margin-bottom: 20px; }',
|
|
'.step { margin-bottom: 15px; padding: 10px; background: #f5f5f5; border-radius: 5px; }',
|
|
'.step h3 { margin: 0 0 10px 0; color: #444; }',
|
|
'.answer { font-weight: bold; }',
|
|
'.notes { font-style: italic; color: #555; }',
|
|
'.duration { color: #444; margin-top: 6px; }',
|
|
'.timestamp { font-size: 0.85em; color: #888; }',
|
|
'</style>',
|
|
'</head>', '<body>']
|
|
|
|
if options.include_tree_info:
|
|
html_parts.append(f'<h1>{tree_name}</h1>')
|
|
html_parts.append('<div class="meta">')
|
|
if session.ticket_number:
|
|
html_parts.append(f'<p><strong>Ticket:</strong> {html.escape(session.ticket_number)}</p>')
|
|
if session.client_name:
|
|
html_parts.append(f'<p><strong>Client:</strong> {html.escape(session.client_name)}</p>')
|
|
if options.include_timestamps:
|
|
html_parts.append(f'<p><strong>Started:</strong> {session.started_at.strftime("%Y-%m-%d %H:%M")}</p>')
|
|
if session.completed_at:
|
|
html_parts.append(f'<p><strong>Completed:</strong> {session.completed_at.strftime("%Y-%m-%d %H:%M")}</p>')
|
|
html_parts.append(f'<p><strong>Duration:</strong> {_format_duration(session.started_at, session.completed_at)}</p>')
|
|
if outcome_label:
|
|
html_parts.append(f'<p><strong>Outcome:</strong> {html.escape(outcome_label)}</p>')
|
|
html_parts.append('</div>')
|
|
|
|
if options.include_summary:
|
|
summary = _build_summary_fields(session)
|
|
html_parts.append('<h2>Summary</h2>')
|
|
html_parts.append('<table style="width: 100%; border-collapse: collapse; margin-bottom: 20px;">')
|
|
for label, value in [("Issue", summary["issue"]), ("Impact", summary["impact"]),
|
|
("Status", summary["status"]), ("Resolution", summary["resolution"]),
|
|
("Next Steps", summary["next_steps"])]:
|
|
html_parts.append(f'<tr><td style="padding: 6px 12px; border: 1px solid #ddd; font-weight: bold; width: 120px;">{html.escape(label)}</td>')
|
|
html_parts.append(f'<td style="padding: 6px 12px; border: 1px solid #ddd;">{html.escape(value)}</td></tr>')
|
|
html_parts.append('</table>')
|
|
|
|
# Scratchpad / Evidence section
|
|
scratchpad = getattr(session, 'scratchpad', '') or ''
|
|
if scratchpad.strip():
|
|
html_parts.append('<h2>Evidence / Reference</h2>')
|
|
html_parts.append(f'<div class="scratchpad" style="white-space: pre-wrap; background: #f9f9f9; padding: 12px; border-radius: 4px; margin-bottom: 20px;">{html.escape(scratchpad)}</div>')
|
|
|
|
# File upload evidence
|
|
if uploads:
|
|
html_parts.append('<h3>Evidence</h3>')
|
|
html_parts.append('<div class="evidence-grid" style="margin-bottom: 20px;">')
|
|
for upload in uploads:
|
|
name = html.escape(upload["filename"])
|
|
url = html.escape(upload["url"])
|
|
if upload.get("is_image"):
|
|
html_parts.append(f'<img src="{url}" alt="{name}" style="max-width: 400px; border-radius: 8px; display: block; margin-bottom: 8px;" />')
|
|
else:
|
|
html_parts.append(f'<p><a href="{url}">{name}</a></p>')
|
|
html_parts.append('</div>')
|
|
|
|
html_parts.append('<h2>Troubleshooting Steps</h2>')
|
|
|
|
decisions = session.decisions
|
|
if options.max_step_index is not None:
|
|
decisions = decisions[:options.max_step_index]
|
|
|
|
for i, decision in enumerate(decisions, 1):
|
|
question = html.escape(decision.get("question") or decision.get("action_performed", "Step"))
|
|
answer = html.escape(decision.get("answer", ""))
|
|
notes = html.escape(decision.get("notes", ""))
|
|
duration_seconds = _get_step_duration_seconds(decision)
|
|
|
|
html_parts.append('<div class="step">')
|
|
is_custom = decision.get("node_id", "").startswith("custom-")
|
|
custom_badge = '<span style="background: #7c3aed; color: white; padding: 2px 6px; border-radius: 3px; font-size: 0.75em; margin-right: 6px;">CUSTOM</span>' if is_custom else ''
|
|
html_parts.append(f'<h3>{custom_badge}Step {i}: {question}</h3>')
|
|
if answer:
|
|
html_parts.append(f'<p class="answer">Answer: {answer}</p>')
|
|
if notes:
|
|
html_parts.append(f'<p class="notes">Notes: {notes}</p>')
|
|
if command_output := _get_command_output(decision):
|
|
if options.detail_level == "standard":
|
|
command_output = _truncate_command_output(command_output, fmt="html")
|
|
commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", ""))
|
|
if commands:
|
|
cmd_html = ", ".join(f"<code>{html.escape(c)}</code>" for c in commands)
|
|
html_parts.append(f'<p><strong>Commands Run:</strong> {cmd_html}</p>')
|
|
html_parts.append(f'<pre><code>{html.escape(command_output)}</code></pre>')
|
|
if duration_seconds is not None:
|
|
html_parts.append(f'<p class="duration"><strong>Duration:</strong> {_format_step_duration(duration_seconds)}</p>')
|
|
if options.include_timestamps and decision.get("timestamp"):
|
|
html_parts.append(f'<p class="timestamp">{html.escape(str(decision["timestamp"]))}</p>')
|
|
html_parts.append('</div>')
|
|
|
|
# Supporting Data
|
|
if supporting_data:
|
|
html_parts.append('<h2>Supporting Data</h2>')
|
|
for sd in supporting_data:
|
|
if sd["data_type"] == "text_snippet":
|
|
html_parts.append(f'<div class="supporting-item"><h3>{html.escape(sd["label"])}</h3><pre>{html.escape(sd["content"])}</pre></div>')
|
|
else:
|
|
html_parts.append(f'<div class="supporting-item"><h3>{html.escape(sd["label"])}</h3><img src="data:{html.escape(sd.get("content_type") or "image/png")};base64,{sd["content"]}" alt="{html.escape(sd["label"])}"></div>')
|
|
|
|
# Resolution
|
|
_raw_notes = getattr(session, 'outcome_notes', None)
|
|
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
|
|
if outcome_notes.strip() and options.include_outcome_notes:
|
|
html_parts.append('<h2>Resolution</h2>')
|
|
html_parts.append(f'<div style="white-space: pre-wrap; margin-bottom: 20px;">{html.escape(outcome_notes.strip())}</div>')
|
|
|
|
# Next Steps
|
|
_raw_next = getattr(session, 'next_steps', None)
|
|
next_steps = _raw_next if isinstance(_raw_next, str) else ''
|
|
if next_steps.strip() and options.include_next_steps:
|
|
html_parts.append('<h2>Next Steps</h2>')
|
|
html_parts.append(f'<div style="white-space: pre-wrap; margin-bottom: 20px;">{html.escape(next_steps.strip())}</div>')
|
|
|
|
# Branding footer
|
|
html_parts.append('<hr style="margin-top: 32px; border: none; border-top: 1px solid #ddd;">')
|
|
html_parts.append('<p style="margin-top: 12px; font-size: 0.8em; color: #999; text-align: center;">Generated with <a href="https://resolutionflow.com" style="color: #06b6d4; text-decoration: none;">ResolutionFlow</a> — https://resolutionflow.com</p>')
|
|
|
|
html_parts.extend(['</body>', '</html>'])
|
|
return "\n".join(html_parts)
|
|
|
|
|
|
def generate_psa_export(session: Session, options: SessionExport, supporting_data: list[dict] | None = None, uploads: list[dict] | None = None) -> str:
|
|
"""Generate PSA/ticket note export optimized for ConnectWise and similar PSA tools."""
|
|
if _is_procedural_session(session):
|
|
return _generate_procedural_psa(session, options, uploads=uploads)
|
|
lines = []
|
|
outcome_label = _get_outcome_label(session)
|
|
|
|
tree_name = session.tree_snapshot.get("name", "Troubleshooting Session")
|
|
tree_description = session.tree_snapshot.get("description", "")
|
|
ticket_number = session.ticket_number or "N/A"
|
|
client_name = session.client_name or "N/A"
|
|
date_str = session.started_at.strftime("%Y-%m-%d %H:%M")
|
|
|
|
lines.append("=== TROUBLESHOOTING NOTES ===")
|
|
lines.append(f"Ticket: {ticket_number} | Client: {client_name}")
|
|
lines.append(f"Tree: {tree_name} | Date: {date_str}")
|
|
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
|
|
if outcome_label:
|
|
lines.append(f"Outcome: {outcome_label}")
|
|
lines.append("")
|
|
|
|
if options.include_summary:
|
|
summary = _build_summary_fields(session)
|
|
lines.append("--- SUMMARY ---")
|
|
lines.append(f"Issue: {summary['issue']}")
|
|
lines.append(f"Impact: {summary['impact']}")
|
|
lines.append(f"Status: {summary['status']}")
|
|
lines.append(f"Resolution: {summary['resolution']}")
|
|
lines.append(f"Next Steps: {summary['next_steps']}")
|
|
lines.append("")
|
|
|
|
# Problem section
|
|
lines.append("--- PROBLEM ---")
|
|
lines.append(tree_description if tree_description else "No description provided.")
|
|
lines.append("")
|
|
|
|
# Steps taken
|
|
lines.append("--- STEPS TAKEN ---")
|
|
decisions = session.decisions
|
|
if options.max_step_index is not None:
|
|
decisions = decisions[:options.max_step_index]
|
|
if decisions:
|
|
for i, decision in enumerate(decisions, 1):
|
|
question = decision.get("question") or decision.get("action_performed", "Step")
|
|
answer = decision.get("answer", "")
|
|
notes = decision.get("notes", "")
|
|
duration_seconds = _get_step_duration_seconds(decision)
|
|
|
|
is_custom = decision.get("node_id", "").startswith("custom-")
|
|
prefix = "[CUSTOM] " if is_custom else ""
|
|
line = f"{i}. {prefix}{question}"
|
|
if answer:
|
|
line += f" -> {answer}"
|
|
if duration_seconds is not None:
|
|
line += f" ({_format_step_duration(duration_seconds)})"
|
|
lines.append(line)
|
|
if notes:
|
|
lines.append(f" Notes: {notes}")
|
|
if command_output := _get_command_output(decision):
|
|
if options.detail_level == "standard":
|
|
command_output = _truncate_command_output(command_output, fmt="psa")
|
|
commands = _find_node_commands(session.tree_snapshot, decision.get("node_id", ""))
|
|
if commands:
|
|
lines.append(f" Commands: {', '.join(commands)}")
|
|
lines.append(" Output:")
|
|
for output_line in command_output.splitlines():
|
|
lines.append(f" {output_line}")
|
|
else:
|
|
lines.append("No steps recorded.")
|
|
lines.append("")
|
|
|
|
# Supporting Data
|
|
if supporting_data:
|
|
lines.append("--- SUPPORTING DATA ---")
|
|
for sd in supporting_data:
|
|
if sd["data_type"] == "text_snippet":
|
|
lines.append(f"## {sd['label']}")
|
|
lines.append("```")
|
|
lines.append(sd["content"])
|
|
lines.append("```")
|
|
else:
|
|
lines.append(f"[Screenshot: {sd['label']}]")
|
|
lines.append("")
|
|
|
|
# Resolution — only for completed sessions
|
|
if session.completed_at:
|
|
lines.append("--- RESOLUTION ---")
|
|
_raw_notes = getattr(session, 'outcome_notes', None)
|
|
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
|
|
if outcome_notes.strip() and options.include_outcome_notes:
|
|
lines.append(outcome_notes.strip())
|
|
elif session.decisions:
|
|
last_decision = session.decisions[-1]
|
|
resolution = last_decision.get("answer") or last_decision.get("question", "No resolution recorded")
|
|
lines.append(resolution)
|
|
else:
|
|
lines.append("No resolution recorded.")
|
|
if outcome_label:
|
|
lines.append(f"Outcome: {outcome_label}")
|
|
lines.append("")
|
|
|
|
# Next Steps
|
|
_raw_next = getattr(session, 'next_steps', None)
|
|
next_steps = _raw_next if isinstance(_raw_next, str) else ''
|
|
if next_steps.strip() and options.include_next_steps:
|
|
lines.append("--- NEXT STEPS ---")
|
|
lines.append(next_steps.strip())
|
|
lines.append("")
|
|
|
|
# Time spent
|
|
lines.append("--- TIME SPENT ---")
|
|
duration = _format_duration(session.started_at, session.completed_at)
|
|
lines.append(f"Duration: {duration}")
|
|
lines.append("")
|
|
|
|
# Engineer notes (scratchpad)
|
|
lines.append("--- ENGINEER NOTES ---")
|
|
scratchpad = getattr(session, 'scratchpad', '') or ''
|
|
lines.append(scratchpad.strip() if scratchpad.strip() else "None")
|
|
|
|
# File upload evidence
|
|
if uploads:
|
|
lines.append("")
|
|
lines.append("--- Evidence ---")
|
|
for upload in uploads:
|
|
lines.append(f"- {upload['filename']} — [{upload['url']}]")
|
|
|
|
# Branding footer
|
|
lines.append("")
|
|
lines.append("---")
|
|
lines.append("Generated with ResolutionFlow — https://resolutionflow.com")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _is_procedural_session(session: Session) -> bool:
|
|
"""Check if session is for a procedural flow."""
|
|
return session.tree_snapshot.get("tree_type") == "procedural"
|
|
|
|
|
|
def _get_session_variables(session: Session) -> dict[str, str]:
|
|
"""Get session variables (intake form values) from session."""
|
|
variables = getattr(session, "session_variables", None)
|
|
if isinstance(variables, dict):
|
|
return variables
|
|
return {}
|
|
|
|
|
|
def _generate_procedural_markdown(session: Session, options: SessionExport, uploads: list[dict] | None = None) -> str:
|
|
"""Generate markdown export for procedural sessions."""
|
|
lines = []
|
|
outcome_label = _get_outcome_label(session)
|
|
tree_name = session.tree_snapshot.get("name", "Procedure")
|
|
|
|
if options.include_tree_info:
|
|
lines.append(f"# {tree_name}")
|
|
lines.append("")
|
|
if session.ticket_number:
|
|
lines.append(f"**Ticket:** {session.ticket_number}")
|
|
if session.client_name:
|
|
lines.append(f"**Client:** {session.client_name}")
|
|
if options.include_timestamps:
|
|
lines.append(f"**Started:** {session.started_at.strftime('%Y-%m-%d %H:%M')}")
|
|
if session.completed_at:
|
|
lines.append(f"**Completed:** {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
|
|
lines.append(f"**Duration:** {_format_duration(session.started_at, session.completed_at)}")
|
|
if outcome_label:
|
|
lines.append(f"**Outcome:** {outcome_label}")
|
|
lines.append("")
|
|
lines.append("---")
|
|
lines.append("")
|
|
|
|
# Project Parameters
|
|
variables = _get_session_variables(session)
|
|
if variables:
|
|
lines.append("## Project Parameters")
|
|
lines.append("")
|
|
lines.append("| Parameter | Value |")
|
|
lines.append("|-----------|-------|")
|
|
for key, value in variables.items():
|
|
lines.append(f"| `{_escape_markdown_table(key)}` | {_escape_markdown_table(value)} |")
|
|
lines.append("")
|
|
lines.append("---")
|
|
lines.append("")
|
|
|
|
# Steps
|
|
lines.append("## Procedure Steps")
|
|
lines.append("")
|
|
|
|
decisions = session.decisions or []
|
|
if options.max_step_index is not None:
|
|
decisions = decisions[:options.max_step_index]
|
|
|
|
for i, decision in enumerate(decisions, 1):
|
|
title = decision.get("question") or decision.get("action_performed", "Step")
|
|
notes = decision.get("notes", "")
|
|
verification = _get_command_output(decision)
|
|
completed = decision.get("answer") == "completed"
|
|
checkbox = "[x]" if completed else "[ ]"
|
|
lines.append(f"- {checkbox} **Step {i}: {title}**")
|
|
if notes:
|
|
lines.append(f" - Notes: {notes}")
|
|
if verification:
|
|
lines.append(f" - Verification: {verification}")
|
|
duration_seconds = _get_step_duration_seconds(decision)
|
|
if duration_seconds is not None:
|
|
lines.append(f" - Duration: {_format_step_duration(duration_seconds)}")
|
|
|
|
lines.append("")
|
|
|
|
# Resolution
|
|
_raw_notes = getattr(session, 'outcome_notes', None)
|
|
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
|
|
if outcome_notes.strip() and options.include_outcome_notes:
|
|
lines.append("---")
|
|
lines.append("")
|
|
lines.append("## Notes")
|
|
lines.append("")
|
|
lines.append(outcome_notes.strip())
|
|
lines.append("")
|
|
|
|
# File upload evidence
|
|
if uploads:
|
|
lines.append("---")
|
|
lines.append("")
|
|
lines.append("## Evidence")
|
|
lines.append("")
|
|
for upload in uploads:
|
|
name = upload["filename"]
|
|
url = upload["url"]
|
|
if upload.get("is_image"):
|
|
lines.append(f"- ")
|
|
else:
|
|
lines.append(f"- [{name}]({url})")
|
|
lines.append("")
|
|
|
|
# Branding footer
|
|
lines.append("---")
|
|
lines.append("Generated with ResolutionFlow — https://resolutionflow.com")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _generate_procedural_text(session: Session, options: SessionExport, uploads: list[dict] | None = None) -> str:
|
|
"""Generate plain text export for procedural sessions."""
|
|
lines = []
|
|
outcome_label = _get_outcome_label(session)
|
|
tree_name = session.tree_snapshot.get("name", "Procedure")
|
|
|
|
if options.include_tree_info:
|
|
lines.append(tree_name)
|
|
lines.append("=" * len(tree_name))
|
|
if session.ticket_number:
|
|
lines.append(f"Ticket: {session.ticket_number}")
|
|
if session.client_name:
|
|
lines.append(f"Client: {session.client_name}")
|
|
if options.include_timestamps:
|
|
lines.append(f"Started: {session.started_at.strftime('%Y-%m-%d %H:%M')}")
|
|
if session.completed_at:
|
|
lines.append(f"Completed: {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
|
|
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
|
|
if outcome_label:
|
|
lines.append(f"Outcome: {outcome_label}")
|
|
lines.append("")
|
|
|
|
# Project Parameters
|
|
variables = _get_session_variables(session)
|
|
if variables:
|
|
lines.append("PROJECT PARAMETERS")
|
|
lines.append("-" * 20)
|
|
for key, value in variables.items():
|
|
lines.append(f" {key}: {value}")
|
|
lines.append("")
|
|
|
|
lines.append("PROCEDURE STEPS")
|
|
lines.append("-" * 20)
|
|
|
|
decisions = session.decisions or []
|
|
if options.max_step_index is not None:
|
|
decisions = decisions[:options.max_step_index]
|
|
|
|
for i, decision in enumerate(decisions, 1):
|
|
title = decision.get("question") or decision.get("action_performed", "Step")
|
|
notes = decision.get("notes", "")
|
|
verification = _get_command_output(decision)
|
|
completed = decision.get("answer") == "completed"
|
|
marker = "[DONE]" if completed else "[ ]"
|
|
lines.append(f"\n{marker} {i}. {title}")
|
|
if notes:
|
|
lines.append(f" Notes: {notes}")
|
|
if verification:
|
|
lines.append(f" Verification: {verification}")
|
|
duration_seconds = _get_step_duration_seconds(decision)
|
|
if duration_seconds is not None:
|
|
lines.append(f" Duration: {_format_step_duration(duration_seconds)}")
|
|
|
|
lines.append("")
|
|
|
|
_raw_notes = getattr(session, 'outcome_notes', None)
|
|
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
|
|
if outcome_notes.strip() and options.include_outcome_notes:
|
|
lines.append("NOTES")
|
|
lines.append("-" * 20)
|
|
lines.append(outcome_notes.strip())
|
|
|
|
# File upload evidence
|
|
if uploads:
|
|
lines.append("")
|
|
lines.append("--- Evidence ---")
|
|
for upload in uploads:
|
|
lines.append(f"- {upload['filename']}: {upload['url']}")
|
|
|
|
# Branding footer
|
|
lines.append("")
|
|
lines.append("---")
|
|
lines.append("Generated with ResolutionFlow — https://resolutionflow.com")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _generate_procedural_html(session: Session, options: SessionExport, uploads: list[dict] | None = None) -> str:
|
|
"""Generate HTML export for procedural sessions."""
|
|
tree_name = html.escape(session.tree_snapshot.get("name", "Procedure"))
|
|
outcome_label = _get_outcome_label(session)
|
|
|
|
html_parts = ['<!DOCTYPE html>', '<html>', '<head>',
|
|
'<meta charset="UTF-8">',
|
|
f'<title>{tree_name}</title>',
|
|
'<style>',
|
|
'body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }',
|
|
'h1 { color: #333; }',
|
|
'.meta { color: #666; margin-bottom: 20px; }',
|
|
'.params { margin-bottom: 20px; }',
|
|
'.params table { width: 100%; border-collapse: collapse; }',
|
|
'.params td { padding: 6px 12px; border: 1px solid #ddd; }',
|
|
'.params td:first-child { font-family: monospace; font-weight: bold; width: 200px; background: #f9f9f9; }',
|
|
'.step { margin-bottom: 10px; padding: 10px 15px; background: #f5f5f5; border-radius: 5px; border-left: 4px solid #ccc; }',
|
|
'.step.done { border-left-color: #22c55e; }',
|
|
'.step h3 { margin: 0 0 5px 0; color: #444; }',
|
|
'.notes { font-style: italic; color: #555; font-size: 0.9em; }',
|
|
'</style>',
|
|
'</head>', '<body>']
|
|
|
|
if options.include_tree_info:
|
|
html_parts.append(f'<h1>{tree_name}</h1>')
|
|
html_parts.append('<div class="meta">')
|
|
if session.ticket_number:
|
|
html_parts.append(f'<p><strong>Ticket:</strong> {html.escape(session.ticket_number)}</p>')
|
|
if session.client_name:
|
|
html_parts.append(f'<p><strong>Client:</strong> {html.escape(session.client_name)}</p>')
|
|
if options.include_timestamps:
|
|
html_parts.append(f'<p><strong>Started:</strong> {session.started_at.strftime("%Y-%m-%d %H:%M")}</p>')
|
|
if session.completed_at:
|
|
html_parts.append(f'<p><strong>Completed:</strong> {session.completed_at.strftime("%Y-%m-%d %H:%M")}</p>')
|
|
html_parts.append(f'<p><strong>Duration:</strong> {_format_duration(session.started_at, session.completed_at)}</p>')
|
|
if outcome_label:
|
|
html_parts.append(f'<p><strong>Outcome:</strong> {html.escape(outcome_label)}</p>')
|
|
html_parts.append('</div>')
|
|
|
|
# Project Parameters
|
|
variables = _get_session_variables(session)
|
|
if variables:
|
|
html_parts.append('<div class="params">')
|
|
html_parts.append('<h2>Project Parameters</h2>')
|
|
html_parts.append('<table>')
|
|
for key, value in variables.items():
|
|
html_parts.append(f'<tr><td>{html.escape(key)}</td><td>{html.escape(value)}</td></tr>')
|
|
html_parts.append('</table>')
|
|
html_parts.append('</div>')
|
|
|
|
html_parts.append('<h2>Procedure Steps</h2>')
|
|
|
|
decisions = session.decisions or []
|
|
if options.max_step_index is not None:
|
|
decisions = decisions[:options.max_step_index]
|
|
|
|
for i, decision in enumerate(decisions, 1):
|
|
title = html.escape(decision.get("question") or decision.get("action_performed", "Step"))
|
|
notes = html.escape(decision.get("notes", ""))
|
|
verification = _get_command_output(decision)
|
|
completed = decision.get("answer") == "completed"
|
|
css_class = "step done" if completed else "step"
|
|
marker = "✅" if completed else "⬜"
|
|
|
|
html_parts.append(f'<div class="{css_class}">')
|
|
html_parts.append(f'<h3>{marker} Step {i}: {title}</h3>')
|
|
if notes:
|
|
html_parts.append(f'<p class="notes">Notes: {notes}</p>')
|
|
if verification:
|
|
html_parts.append(f'<p>Verification: {html.escape(verification)}</p>')
|
|
duration_seconds = _get_step_duration_seconds(decision)
|
|
if duration_seconds is not None:
|
|
html_parts.append(f'<p style="color: #888; font-size: 0.85em;">Duration: {_format_step_duration(duration_seconds)}</p>')
|
|
html_parts.append('</div>')
|
|
|
|
# Resolution
|
|
_raw_notes = getattr(session, 'outcome_notes', None)
|
|
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
|
|
if outcome_notes.strip() and options.include_outcome_notes:
|
|
html_parts.append('<h2>Notes</h2>')
|
|
html_parts.append(f'<div style="white-space: pre-wrap;">{html.escape(outcome_notes.strip())}</div>')
|
|
|
|
# File upload evidence
|
|
if uploads:
|
|
html_parts.append('<h3>Evidence</h3>')
|
|
html_parts.append('<div class="evidence-grid" style="margin-bottom: 20px;">')
|
|
for upload in uploads:
|
|
name = html.escape(upload["filename"])
|
|
url = html.escape(upload["url"])
|
|
if upload.get("is_image"):
|
|
html_parts.append(f'<img src="{url}" alt="{name}" style="max-width: 400px; border-radius: 8px; display: block; margin-bottom: 8px;" />')
|
|
else:
|
|
html_parts.append(f'<p><a href="{url}">{name}</a></p>')
|
|
html_parts.append('</div>')
|
|
|
|
# Branding footer
|
|
html_parts.append('<hr style="margin-top: 32px; border: none; border-top: 1px solid #ddd;">')
|
|
html_parts.append('<p style="margin-top: 12px; font-size: 0.8em; color: #999; text-align: center;">Generated with <a href="https://resolutionflow.com" style="color: #06b6d4; text-decoration: none;">ResolutionFlow</a> — https://resolutionflow.com</p>')
|
|
|
|
html_parts.extend(['</body>', '</html>'])
|
|
return "\n".join(html_parts)
|
|
|
|
|
|
def _generate_procedural_psa(session: Session, options: SessionExport, uploads: list[dict] | None = None) -> str:
|
|
"""Generate PSA/ticket export for procedural sessions."""
|
|
lines = []
|
|
outcome_label = _get_outcome_label(session)
|
|
tree_name = session.tree_snapshot.get("name", "Procedure")
|
|
ticket_number = session.ticket_number or "N/A"
|
|
client_name = session.client_name or "N/A"
|
|
date_str = session.started_at.strftime("%Y-%m-%d %H:%M")
|
|
|
|
lines.append("=== PROCEDURE NOTES ===")
|
|
lines.append(f"Ticket: {ticket_number} | Client: {client_name}")
|
|
lines.append(f"Procedure: {tree_name} | Date: {date_str}")
|
|
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
|
|
if outcome_label:
|
|
lines.append(f"Outcome: {outcome_label}")
|
|
lines.append("")
|
|
|
|
# Project Parameters
|
|
variables = _get_session_variables(session)
|
|
if variables:
|
|
lines.append("--- PROJECT PARAMETERS ---")
|
|
for key, value in variables.items():
|
|
lines.append(f" {key}: {value}")
|
|
lines.append("")
|
|
|
|
# Steps
|
|
lines.append("--- STEPS COMPLETED ---")
|
|
decisions = session.decisions or []
|
|
if options.max_step_index is not None:
|
|
decisions = decisions[:options.max_step_index]
|
|
|
|
if decisions:
|
|
for i, decision in enumerate(decisions, 1):
|
|
title = decision.get("question") or decision.get("action_performed", "Step")
|
|
notes = decision.get("notes", "")
|
|
completed = decision.get("answer") == "completed"
|
|
marker = "[DONE]" if completed else "[ ]"
|
|
duration_seconds = _get_step_duration_seconds(decision)
|
|
line = f"{marker} {i}. {title}"
|
|
if duration_seconds is not None:
|
|
line += f" ({_format_step_duration(duration_seconds)})"
|
|
lines.append(line)
|
|
if notes:
|
|
lines.append(f" Notes: {notes}")
|
|
else:
|
|
lines.append("No steps recorded.")
|
|
lines.append("")
|
|
|
|
# Resolution
|
|
if session.completed_at:
|
|
lines.append("--- RESOLUTION ---")
|
|
_raw_notes = getattr(session, 'outcome_notes', None)
|
|
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
|
|
if outcome_notes.strip() and options.include_outcome_notes:
|
|
lines.append(outcome_notes.strip())
|
|
else:
|
|
lines.append("Procedure completed successfully.")
|
|
if outcome_label:
|
|
lines.append(f"Outcome: {outcome_label}")
|
|
lines.append("")
|
|
|
|
# Time spent
|
|
lines.append("--- TIME SPENT ---")
|
|
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
|
|
|
|
# File upload evidence
|
|
if uploads:
|
|
lines.append("")
|
|
lines.append("--- Evidence ---")
|
|
for upload in uploads:
|
|
lines.append(f"- {upload['filename']} — [{upload['url']}]")
|
|
|
|
# Branding footer
|
|
lines.append("")
|
|
lines.append("---")
|
|
lines.append("Generated with ResolutionFlow — https://resolutionflow.com")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
async def generate_pdf_export(session: Session, options: SessionExport, db) -> bytes:
|
|
"""Generate PDF export using WeasyPrint and a Jinja2 HTML template.
|
|
|
|
Args:
|
|
session: The session to export.
|
|
options: Export options (redaction_mode, max_step_index, etc.).
|
|
db: Async database session for loading supporting data and branding.
|
|
|
|
Returns:
|
|
PDF file contents as bytes.
|
|
"""
|
|
from jinja2 import Environment, FileSystemLoader
|
|
import weasyprint
|
|
from sqlalchemy import select as sa_select
|
|
|
|
# Load Jinja2 template
|
|
template_dir = Path(__file__).resolve().parent.parent / "templates"
|
|
env = Environment(loader=FileSystemLoader(str(template_dir)), autoescape=True)
|
|
template = env.get_template("export_pdf.html")
|
|
|
|
# Tree snapshot data
|
|
tree_snapshot = session.tree_snapshot or {}
|
|
flow_title = tree_snapshot.get("name", "Session Export")
|
|
tree_type = tree_snapshot.get("tree_type", "troubleshooting")
|
|
is_procedural = tree_type == "procedural"
|
|
report_type = "Procedure Report" if is_procedural else "Troubleshooting Report"
|
|
|
|
# Branding — check team first, then user (solo pros)
|
|
logo_data = None
|
|
logo_content_type = None
|
|
company_name = None
|
|
|
|
from app.models.user import User
|
|
user_result = await db.execute(
|
|
sa_select(User).where(User.id == session.user_id)
|
|
)
|
|
user = user_result.scalar_one_or_none()
|
|
engineer_name = user.name if user else "Unknown"
|
|
|
|
if user and user.team_id:
|
|
from app.models.team import Team
|
|
team_result = await db.execute(
|
|
sa_select(Team).where(Team.id == user.team_id)
|
|
)
|
|
team = team_result.scalar_one_or_none()
|
|
if team:
|
|
logo_data = team.logo_data
|
|
logo_content_type = team.logo_content_type
|
|
company_name = team.company_display_name or team.name
|
|
elif user:
|
|
logo_data = user.logo_data
|
|
logo_content_type = user.logo_content_type
|
|
company_name = user.company_display_name
|
|
|
|
has_custom_logo = bool(logo_data)
|
|
|
|
# Build steps list from decisions
|
|
decisions = session.decisions or []
|
|
if options.max_step_index is not None:
|
|
decisions = decisions[:options.max_step_index]
|
|
|
|
steps = []
|
|
for decision in decisions:
|
|
title = decision.get("question") or decision.get("action_performed", "Step")
|
|
answer = decision.get("answer", "")
|
|
notes = decision.get("notes", "")
|
|
duration_seconds = _get_step_duration_seconds(decision)
|
|
duration_str = _format_step_duration(duration_seconds) if duration_seconds is not None else None
|
|
|
|
if is_procedural:
|
|
completed = answer == "completed"
|
|
decision_text = "Completed" if completed else ("Skipped" if answer else "")
|
|
else:
|
|
decision_text = answer
|
|
|
|
steps.append({
|
|
"title": title,
|
|
"decision": decision_text,
|
|
"notes": notes,
|
|
"duration": duration_str,
|
|
})
|
|
|
|
# Query supporting data
|
|
from app.models.supporting_data import SessionSupportingData
|
|
sd_result = await db.execute(
|
|
sa_select(SessionSupportingData)
|
|
.where(SessionSupportingData.session_id == session.id)
|
|
.order_by(SessionSupportingData.sort_order)
|
|
)
|
|
supporting_data_rows = sd_result.scalars().all()
|
|
supporting_data = [
|
|
{
|
|
"label": sd.label,
|
|
"data_type": sd.data_type,
|
|
"content": sd.content,
|
|
"content_type": sd.content_type,
|
|
}
|
|
for sd in supporting_data_rows
|
|
]
|
|
|
|
# Query file upload evidence
|
|
from app.models.file_upload import FileUpload
|
|
from app.services import storage_service
|
|
from app.core.config import settings as _settings
|
|
uploads_for_export: list[dict] = []
|
|
if _settings.STORAGE_ENDPOINT:
|
|
try:
|
|
uploads_result = await db.execute(
|
|
sa_select(FileUpload)
|
|
.where(FileUpload.session_id == session.id)
|
|
.order_by(FileUpload.created_at)
|
|
)
|
|
upload_rows = uploads_result.scalars().all()
|
|
for u in upload_rows:
|
|
try:
|
|
url = storage_service.get_presigned_url(u.storage_key)
|
|
uploads_for_export.append({
|
|
"filename": u.filename,
|
|
"url": url,
|
|
"is_image": u.content_type.startswith("image/"),
|
|
})
|
|
except Exception:
|
|
pass # Skip individual uploads that fail URL generation
|
|
except Exception:
|
|
pass # Storage errors should not fail the export
|
|
|
|
# Calculate duration and format outcome
|
|
duration = _format_duration(session.started_at, session.completed_at)
|
|
session_date = session.started_at.strftime("%Y-%m-%d %H:%M")
|
|
outcome_label = _get_outcome_label(session) or ("In Progress" if not session.completed_at else "Completed")
|
|
outcome_raw = getattr(session, "outcome", None) or ""
|
|
outcome_class = f"outcome-{outcome_raw}" if outcome_raw else ""
|
|
|
|
# Build summary text
|
|
summary_text = ""
|
|
if options.include_summary:
|
|
summary_fields = _build_summary_fields(session)
|
|
parts = []
|
|
for label, value in summary_fields.items():
|
|
if value:
|
|
parts.append(f"{label.replace('_', ' ').title()}: {value}")
|
|
summary_text = "\n".join(parts)
|
|
|
|
# Resolution / outcome notes as summary fallback
|
|
if not summary_text:
|
|
_raw_notes = getattr(session, "outcome_notes", None)
|
|
if isinstance(_raw_notes, str) and _raw_notes.strip():
|
|
summary_text = _raw_notes.strip()
|
|
|
|
generated_at = datetime.utcnow().strftime("%Y-%m-%d %H:%M UTC")
|
|
|
|
# Variable resolution
|
|
session_vars = getattr(session, "session_variables", None) or {}
|
|
if session_vars:
|
|
from app.services.variable_service import resolve_variables
|
|
flow_title = resolve_variables(flow_title, session_vars)
|
|
summary_text = resolve_variables(summary_text, session_vars)
|
|
for step in steps:
|
|
step["title"] = resolve_variables(step["title"], session_vars)
|
|
if step["decision"]:
|
|
step["decision"] = resolve_variables(step["decision"], session_vars)
|
|
if step["notes"]:
|
|
step["notes"] = resolve_variables(step["notes"], session_vars)
|
|
for sd in supporting_data:
|
|
if sd["data_type"] == "text_snippet":
|
|
sd["content"] = resolve_variables(sd["content"], session_vars)
|
|
|
|
# Apply redaction
|
|
if options.redaction_mode == "mask":
|
|
from app.services.redaction_service import apply_redaction_to_text
|
|
try:
|
|
flow_title, _ = apply_redaction_to_text(flow_title)
|
|
summary_text, _ = apply_redaction_to_text(summary_text)
|
|
for step in steps:
|
|
step["title"], _ = apply_redaction_to_text(step["title"])
|
|
if step["decision"]:
|
|
step["decision"], _ = apply_redaction_to_text(step["decision"])
|
|
if step["notes"]:
|
|
step["notes"], _ = apply_redaction_to_text(step["notes"])
|
|
for sd in supporting_data:
|
|
if sd["data_type"] == "text_snippet":
|
|
sd["content"], _ = apply_redaction_to_text(sd["content"])
|
|
except Exception:
|
|
pass # Redaction is best-effort for PDF
|
|
|
|
# Render HTML
|
|
html_content = template.render(
|
|
report_type=report_type,
|
|
flow_title=flow_title,
|
|
logo_data=logo_data,
|
|
logo_content_type=logo_content_type or "image/png",
|
|
has_custom_logo=has_custom_logo,
|
|
company_name=company_name,
|
|
engineer_name=engineer_name,
|
|
client_name=session.client_name,
|
|
ticket_number=session.ticket_number,
|
|
session_date=session_date,
|
|
duration=duration,
|
|
outcome_class=outcome_class,
|
|
outcome_display=outcome_label,
|
|
summary=summary_text,
|
|
steps=steps,
|
|
supporting_data=supporting_data,
|
|
uploads=uploads_for_export,
|
|
generated_at=generated_at,
|
|
)
|
|
|
|
# Convert to PDF
|
|
pdf_bytes = weasyprint.HTML(string=html_content).write_pdf()
|
|
return pdf_bytes
|