Implements the full dual-mode tree editor (Plan Phases 1-5): Backend: - JSONB↔Markdown bidirectional serializer/parser with mistune - Markdown validator with line/column error reporting - 3 API endpoints: export-markdown, import-markdown, validate-markdown - Variable extraction/resolution service ([USER_INPUT], [VAR], [SAVE_AS]) - Session variables JSONB column (migration 028) - 39 tree markdown tests + variable service tests (403 total passing) Frontend: - Monaco-based Code Mode with custom Monarch tokenizer and dark theme - Autocomplete for @node_id refs, type values, variable names - Debounced validation (800ms) with inline Monaco error markers - Syntax help panel (absolute overlay, toggleable) - Starter template for new trees with valid cross-references - Bidirectional metadata sync (name/description/category/tags frontmatter) - Synchronous tree→markdown serializer (fixes async race condition) - Pre-save validation blocks save on broken refs or missing tree name - Mode-aware undo/redo: Monaco native in Code Mode, throttled zundo in Flow Mode - Variable prompt modal and frontend resolver for session navigation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
492 lines
15 KiB
Python
492 lines
15 KiB
Python
"""
|
|
Markdown → JSONB parser for ResolutionFlow tree structures.
|
|
|
|
Parses ResolutionFlow Markdown format (frontmatter-delimited node blocks)
|
|
back into the recursive tree_structure JSONB dict.
|
|
"""
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
|
|
@dataclass
|
|
class ParseError:
|
|
"""A validation/parse error with location info."""
|
|
line: int
|
|
column: int
|
|
message: str
|
|
severity: str = "error" # 'error' or 'warning'
|
|
|
|
|
|
@dataclass
|
|
class ParseResult:
|
|
"""Result of parsing markdown into a tree structure."""
|
|
tree_structure: dict[str, Any] | None
|
|
errors: list[ParseError] = field(default_factory=list)
|
|
metadata: dict[str, Any] | None = None
|
|
|
|
|
|
# Regex patterns
|
|
FRONTMATTER_RE = re.compile(r"^---\s*$", re.MULTILINE)
|
|
OPTION_RE = re.compile(
|
|
r"^-\s*\[([A-Za-z0-9]+)\]\s*(.+?)(?:\s*→\s*@(\S+))?\s*$"
|
|
)
|
|
NEXT_NODE_RE = re.compile(r"^→\s*@(\S+)\s*$")
|
|
EXPECTED_RE = re.compile(r"^\*\*Expected:\*\*\s*(.+)$")
|
|
HEADING1_RE = re.compile(r"^#\s+(.+)$")
|
|
HEADING2_RE = re.compile(r"^##\s+(.+)$")
|
|
BLOCKQUOTE_RE = re.compile(r"^>\s*(.*)$")
|
|
ORDERED_LIST_RE = re.compile(r"^\d+\.\s+(.+)$")
|
|
COMMAND_BLOCK_START = re.compile(r"^```commands\s*$")
|
|
COMMAND_BLOCK_END = re.compile(r"^```\s*$")
|
|
|
|
|
|
def parse_markdown_to_tree(markdown: str) -> ParseResult:
|
|
"""Parse ResolutionFlow markdown into a tree structure JSONB dict.
|
|
|
|
Args:
|
|
markdown: The markdown string to parse.
|
|
|
|
Returns:
|
|
ParseResult with tree_structure, errors, and optional metadata.
|
|
"""
|
|
errors: list[ParseError] = []
|
|
raw_blocks = _split_into_blocks(markdown)
|
|
|
|
if not raw_blocks:
|
|
errors.append(ParseError(line=1, column=1, message="No node blocks found"))
|
|
return ParseResult(tree_structure=None, errors=errors)
|
|
|
|
# Check if the first block is a metadata block (has 'name' but no 'id'/'type')
|
|
metadata = None
|
|
node_blocks = raw_blocks
|
|
first_block_text, _ = raw_blocks[0]
|
|
meta = _try_parse_metadata_block(first_block_text)
|
|
if meta is not None:
|
|
metadata = meta
|
|
node_blocks = raw_blocks[1:]
|
|
|
|
if not node_blocks:
|
|
errors.append(ParseError(line=1, column=1, message="No node blocks found (only metadata)"))
|
|
return ParseResult(tree_structure=None, errors=errors, metadata=metadata)
|
|
|
|
# Parse each block into a flat node dict
|
|
flat_nodes: list[dict[str, Any]] = []
|
|
for block_text, start_line in node_blocks:
|
|
node, block_errors = _parse_block(block_text, start_line)
|
|
errors.extend(block_errors)
|
|
if node:
|
|
flat_nodes.append(node)
|
|
|
|
if not flat_nodes:
|
|
errors.append(ParseError(line=1, column=1, message="No valid nodes parsed"))
|
|
return ParseResult(tree_structure=None, errors=errors)
|
|
|
|
# Check for duplicate IDs
|
|
seen_ids: dict[str, int] = {}
|
|
for node in flat_nodes:
|
|
nid = node.get("id", "")
|
|
if nid in seen_ids:
|
|
errors.append(ParseError(
|
|
line=node.get("_start_line", 1),
|
|
column=1,
|
|
message=f"Duplicate node ID: '{nid}'"
|
|
))
|
|
else:
|
|
seen_ids[nid] = node.get("_start_line", 1)
|
|
|
|
# Reconstruct recursive tree from flat nodes
|
|
tree, reconstruct_errors = _reconstruct_tree(flat_nodes)
|
|
errors.extend(reconstruct_errors)
|
|
|
|
return ParseResult(tree_structure=tree, errors=errors, metadata=metadata)
|
|
|
|
|
|
def _try_parse_metadata_block(block_text: str) -> dict[str, Any] | None:
|
|
"""Try to parse a block as tree metadata (name, description, category, tags).
|
|
|
|
Returns metadata dict if the block contains 'name' but no 'id'/'type'.
|
|
Returns None if it's a regular node block.
|
|
"""
|
|
lines = block_text.split("\n")
|
|
fm_start = None
|
|
fm_end = None
|
|
for i, line in enumerate(lines):
|
|
if line.strip() == "---":
|
|
if fm_start is None:
|
|
fm_start = i
|
|
else:
|
|
fm_end = i
|
|
break
|
|
|
|
if fm_start is None or fm_end is None:
|
|
return None
|
|
|
|
fm_data: dict[str, str] = {}
|
|
for i in range(fm_start + 1, fm_end):
|
|
line = lines[i].strip()
|
|
if not line:
|
|
continue
|
|
if ":" in line:
|
|
key, _, value = line.partition(":")
|
|
fm_data[key.strip()] = value.strip()
|
|
|
|
# It's a metadata block if it has 'name' but no 'id' and no 'type'
|
|
if "name" in fm_data and "id" not in fm_data and "type" not in fm_data:
|
|
metadata: dict[str, Any] = {"name": fm_data["name"]}
|
|
if "description" in fm_data:
|
|
metadata["description"] = fm_data["description"]
|
|
if "category" in fm_data:
|
|
metadata["category"] = fm_data["category"]
|
|
if "tags" in fm_data:
|
|
tags_str = fm_data["tags"].strip("[]")
|
|
metadata["tags"] = [t.strip() for t in tags_str.split(",") if t.strip()]
|
|
return metadata
|
|
|
|
return None
|
|
|
|
|
|
def _split_into_blocks(markdown: str) -> list[tuple[str, int]]:
|
|
"""Split markdown into blocks delimited by --- frontmatter markers.
|
|
|
|
Returns list of (block_text, start_line_number) tuples.
|
|
"""
|
|
lines = markdown.split("\n")
|
|
blocks: list[tuple[str, int]] = []
|
|
|
|
# Find frontmatter boundaries (--- on its own line)
|
|
fm_lines: list[int] = []
|
|
for i, line in enumerate(lines):
|
|
if line.strip() == "---":
|
|
fm_lines.append(i)
|
|
|
|
# Pair up frontmatter markers: each block starts at a `---` and the
|
|
# frontmatter ends at the next `---`. The body follows until the
|
|
# next block's first `---` (or end of file).
|
|
i = 0
|
|
while i < len(fm_lines) - 1:
|
|
start = fm_lines[i]
|
|
end_fm = fm_lines[i + 1]
|
|
|
|
# Find the next block start (or EOF)
|
|
next_block_start = len(lines)
|
|
if i + 2 < len(fm_lines):
|
|
next_block_start = fm_lines[i + 2]
|
|
|
|
block_lines = lines[start:next_block_start]
|
|
block_text = "\n".join(block_lines)
|
|
blocks.append((block_text, start + 1)) # 1-indexed line number
|
|
|
|
i += 2 # Jump to next frontmatter pair
|
|
|
|
return blocks
|
|
|
|
|
|
def _parse_block(block_text: str, start_line: int) -> tuple[dict[str, Any] | None, list[ParseError]]:
|
|
"""Parse a single frontmatter+body block into a node dict."""
|
|
errors: list[ParseError] = []
|
|
lines = block_text.split("\n")
|
|
|
|
# Extract frontmatter (between first and second ---)
|
|
fm_start = None
|
|
fm_end = None
|
|
for i, line in enumerate(lines):
|
|
if line.strip() == "---":
|
|
if fm_start is None:
|
|
fm_start = i
|
|
else:
|
|
fm_end = i
|
|
break
|
|
|
|
if fm_start is None or fm_end is None:
|
|
errors.append(ParseError(
|
|
line=start_line, column=1,
|
|
message="Block missing valid frontmatter delimiters"
|
|
))
|
|
return None, errors
|
|
|
|
# Parse YAML-like frontmatter (simple key: value)
|
|
fm_data: dict[str, str] = {}
|
|
for i in range(fm_start + 1, fm_end):
|
|
line = lines[i].strip()
|
|
if not line:
|
|
continue
|
|
if ":" in line:
|
|
key, _, value = line.partition(":")
|
|
fm_data[key.strip()] = value.strip()
|
|
|
|
node_id = fm_data.get("id", "")
|
|
node_type = fm_data.get("type", "")
|
|
parent_id = fm_data.get("parent")
|
|
|
|
if not node_id:
|
|
errors.append(ParseError(
|
|
line=start_line, column=1,
|
|
message="Node block missing 'id' in frontmatter"
|
|
))
|
|
return None, errors
|
|
|
|
if node_type not in ("decision", "action", "solution"):
|
|
errors.append(ParseError(
|
|
line=start_line, column=1,
|
|
message=f"Invalid node type: '{node_type}' (must be decision, action, or solution)"
|
|
))
|
|
return None, errors
|
|
|
|
# Parse body (everything after frontmatter)
|
|
body_lines = lines[fm_end + 1:]
|
|
body_text = "\n".join(body_lines)
|
|
|
|
node: dict[str, Any] = {
|
|
"id": node_id,
|
|
"type": node_type,
|
|
"_parent_id": parent_id,
|
|
"_start_line": start_line,
|
|
}
|
|
|
|
if node_type == "decision":
|
|
_parse_decision_body(body_lines, node, start_line + fm_end + 1, errors)
|
|
elif node_type == "action":
|
|
_parse_action_body(body_lines, node, start_line + fm_end + 1, errors)
|
|
elif node_type == "solution":
|
|
_parse_solution_body(body_lines, node, start_line + fm_end + 1, errors)
|
|
|
|
return node, errors
|
|
|
|
|
|
def _parse_decision_body(
|
|
lines: list[str],
|
|
node: dict[str, Any],
|
|
body_start_line: int,
|
|
errors: list[ParseError],
|
|
) -> None:
|
|
"""Parse the body of a decision node."""
|
|
question = ""
|
|
help_text_lines: list[str] = []
|
|
options: list[dict[str, Any]] = []
|
|
|
|
for i, line in enumerate(lines):
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
continue
|
|
|
|
# Check for heading (question)
|
|
m = HEADING1_RE.match(stripped)
|
|
if m:
|
|
question = m.group(1).strip()
|
|
continue
|
|
|
|
# Check for blockquote (help_text)
|
|
m = BLOCKQUOTE_RE.match(stripped)
|
|
if m:
|
|
help_text_lines.append(m.group(1))
|
|
continue
|
|
|
|
# Check for option
|
|
m = OPTION_RE.match(stripped)
|
|
if m:
|
|
opt_label = m.group(2).strip()
|
|
opt_next = m.group(3) or ""
|
|
options.append({
|
|
"id": f"opt_{node['id']}_{len(options)}",
|
|
"label": opt_label,
|
|
"next_node_id": opt_next,
|
|
})
|
|
continue
|
|
|
|
node["question"] = question
|
|
node["help_text"] = "\n".join(help_text_lines) if help_text_lines else ""
|
|
node["options"] = options
|
|
node["children"] = []
|
|
|
|
|
|
def _parse_action_body(
|
|
lines: list[str],
|
|
node: dict[str, Any],
|
|
body_start_line: int,
|
|
errors: list[ParseError],
|
|
) -> None:
|
|
"""Parse the body of an action node."""
|
|
title = ""
|
|
description_lines: list[str] = []
|
|
commands: list[str] = []
|
|
expected_outcome = ""
|
|
next_node_id = ""
|
|
in_command_block = False
|
|
|
|
for i, line in enumerate(lines):
|
|
stripped = line.strip()
|
|
|
|
# Command block handling
|
|
if in_command_block:
|
|
if COMMAND_BLOCK_END.match(stripped):
|
|
in_command_block = False
|
|
else:
|
|
commands.append(line.rstrip())
|
|
continue
|
|
|
|
if COMMAND_BLOCK_START.match(stripped):
|
|
in_command_block = True
|
|
continue
|
|
|
|
if not stripped:
|
|
# Blank lines are part of description
|
|
if title and not expected_outcome and not next_node_id:
|
|
description_lines.append("")
|
|
continue
|
|
|
|
# Title
|
|
m = HEADING2_RE.match(stripped)
|
|
if m:
|
|
title = m.group(1).strip()
|
|
continue
|
|
|
|
# Expected outcome
|
|
m = EXPECTED_RE.match(stripped)
|
|
if m:
|
|
expected_outcome = m.group(1).strip()
|
|
continue
|
|
|
|
# Next node reference
|
|
m = NEXT_NODE_RE.match(stripped)
|
|
if m:
|
|
next_node_id = m.group(1).strip()
|
|
continue
|
|
|
|
# Everything else is description
|
|
description_lines.append(stripped)
|
|
|
|
# Trim leading and trailing empty lines from description
|
|
while description_lines and not description_lines[-1].strip():
|
|
description_lines.pop()
|
|
while description_lines and not description_lines[0].strip():
|
|
description_lines.pop(0)
|
|
|
|
node["title"] = title
|
|
node["description"] = "\n".join(description_lines)
|
|
node["commands"] = commands if commands else []
|
|
node["expected_outcome"] = expected_outcome
|
|
node["next_node_id"] = next_node_id
|
|
node["children"] = []
|
|
|
|
|
|
def _parse_solution_body(
|
|
lines: list[str],
|
|
node: dict[str, Any],
|
|
body_start_line: int,
|
|
errors: list[ParseError],
|
|
) -> None:
|
|
"""Parse the body of a solution node."""
|
|
title = ""
|
|
description_lines: list[str] = []
|
|
resolution_steps: list[str] = []
|
|
|
|
for i, line in enumerate(lines):
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
if title:
|
|
description_lines.append("")
|
|
continue
|
|
|
|
# Title
|
|
m = HEADING2_RE.match(stripped)
|
|
if m:
|
|
title = m.group(1).strip()
|
|
continue
|
|
|
|
# Ordered list item (resolution step)
|
|
m = ORDERED_LIST_RE.match(stripped)
|
|
if m:
|
|
resolution_steps.append(m.group(1).strip())
|
|
continue
|
|
|
|
# Everything else is description
|
|
description_lines.append(stripped)
|
|
|
|
# Trim leading and trailing empty lines
|
|
while description_lines and not description_lines[-1].strip():
|
|
description_lines.pop()
|
|
while description_lines and not description_lines[0].strip():
|
|
description_lines.pop(0)
|
|
|
|
node["title"] = title
|
|
node["description"] = "\n".join(description_lines)
|
|
node["resolution_steps"] = resolution_steps
|
|
node["solution"] = title # solution field required for publishing
|
|
|
|
|
|
def _reconstruct_tree(flat_nodes: list[dict[str, Any]]) -> tuple[dict[str, Any] | None, list[ParseError]]:
|
|
"""Reconstruct a recursive tree from flat nodes using parent references.
|
|
|
|
Returns (tree_structure, errors).
|
|
"""
|
|
errors: list[ParseError] = []
|
|
|
|
if not flat_nodes:
|
|
return None, errors
|
|
|
|
# Build lookup
|
|
node_map: dict[str, dict[str, Any]] = {}
|
|
for node in flat_nodes:
|
|
nid = node["id"]
|
|
# Clean node (remove internal fields)
|
|
clean = {k: v for k, v in node.items() if not k.startswith("_")}
|
|
if "children" not in clean:
|
|
clean["children"] = []
|
|
node_map[nid] = clean
|
|
|
|
# Find root (node with no parent)
|
|
root_id = None
|
|
for node in flat_nodes:
|
|
if node.get("_parent_id") is None:
|
|
if root_id is not None:
|
|
errors.append(ParseError(
|
|
line=node.get("_start_line", 1),
|
|
column=1,
|
|
message=f"Multiple root nodes found: '{root_id}' and '{node['id']}'",
|
|
))
|
|
root_id = node["id"]
|
|
|
|
if root_id is None:
|
|
# Fall back to first node
|
|
root_id = flat_nodes[0]["id"]
|
|
errors.append(ParseError(
|
|
line=1, column=1,
|
|
message="No root node found (no node without a parent). Using first node as root.",
|
|
severity="warning"
|
|
))
|
|
|
|
# Build children relationships
|
|
for node in flat_nodes:
|
|
parent_id = node.get("_parent_id")
|
|
if parent_id and parent_id in node_map:
|
|
child = node_map[node["id"]]
|
|
node_map[parent_id]["children"].append(child)
|
|
elif parent_id and parent_id not in node_map:
|
|
errors.append(ParseError(
|
|
line=node.get("_start_line", 1),
|
|
column=1,
|
|
message=f"Node '{node['id']}' references non-existent parent '{parent_id}'"
|
|
))
|
|
|
|
# Validate option references
|
|
for nid, node in node_map.items():
|
|
if node.get("type") == "decision":
|
|
for opt in node.get("options", []):
|
|
ref = opt.get("next_node_id", "")
|
|
if ref and ref not in node_map:
|
|
errors.append(ParseError(
|
|
line=1, column=1,
|
|
message=f"Option '{opt.get('label', '')}' in node '{nid}' references non-existent node '@{ref}'"
|
|
))
|
|
elif node.get("type") == "action":
|
|
ref = node.get("next_node_id", "")
|
|
if ref and ref not in node_map:
|
|
errors.append(ParseError(
|
|
line=1, column=1,
|
|
message=f"Action node '{nid}' references non-existent next node '@{ref}'"
|
|
))
|
|
|
|
root = node_map.get(root_id)
|
|
return root, errors
|