Files
resolutionflow/backend/app/services/tree_markdown_parser.py
chihlasm eac6e184ec feat: add dual-mode tree editor with Code Mode, variables, and markdown sync
Implements the full dual-mode tree editor (Plan Phases 1-5):

Backend:
- JSONB↔Markdown bidirectional serializer/parser with mistune
- Markdown validator with line/column error reporting
- 3 API endpoints: export-markdown, import-markdown, validate-markdown
- Variable extraction/resolution service ([USER_INPUT], [VAR], [SAVE_AS])
- Session variables JSONB column (migration 028)
- 39 tree markdown tests + variable service tests (403 total passing)

Frontend:
- Monaco-based Code Mode with custom Monarch tokenizer and dark theme
- Autocomplete for @node_id refs, type values, variable names
- Debounced validation (800ms) with inline Monaco error markers
- Syntax help panel (absolute overlay, toggleable)
- Starter template for new trees with valid cross-references
- Bidirectional metadata sync (name/description/category/tags frontmatter)
- Synchronous tree→markdown serializer (fixes async race condition)
- Pre-save validation blocks save on broken refs or missing tree name
- Mode-aware undo/redo: Monaco native in Code Mode, throttled zundo in Flow Mode
- Variable prompt modal and frontend resolver for session navigation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 09:45:26 -05:00

492 lines
15 KiB
Python

"""
Markdown → JSONB parser for ResolutionFlow tree structures.
Parses ResolutionFlow Markdown format (frontmatter-delimited node blocks)
back into the recursive tree_structure JSONB dict.
"""
import re
from dataclasses import dataclass, field
from typing import Any
@dataclass
class ParseError:
"""A validation/parse error with location info."""
line: int
column: int
message: str
severity: str = "error" # 'error' or 'warning'
@dataclass
class ParseResult:
"""Result of parsing markdown into a tree structure."""
tree_structure: dict[str, Any] | None
errors: list[ParseError] = field(default_factory=list)
metadata: dict[str, Any] | None = None
# Regex patterns
FRONTMATTER_RE = re.compile(r"^---\s*$", re.MULTILINE)
OPTION_RE = re.compile(
r"^-\s*\[([A-Za-z0-9]+)\]\s*(.+?)(?:\s*→\s*@(\S+))?\s*$"
)
NEXT_NODE_RE = re.compile(r"^→\s*@(\S+)\s*$")
EXPECTED_RE = re.compile(r"^\*\*Expected:\*\*\s*(.+)$")
HEADING1_RE = re.compile(r"^#\s+(.+)$")
HEADING2_RE = re.compile(r"^##\s+(.+)$")
BLOCKQUOTE_RE = re.compile(r"^>\s*(.*)$")
ORDERED_LIST_RE = re.compile(r"^\d+\.\s+(.+)$")
COMMAND_BLOCK_START = re.compile(r"^```commands\s*$")
COMMAND_BLOCK_END = re.compile(r"^```\s*$")
def parse_markdown_to_tree(markdown: str) -> ParseResult:
"""Parse ResolutionFlow markdown into a tree structure JSONB dict.
Args:
markdown: The markdown string to parse.
Returns:
ParseResult with tree_structure, errors, and optional metadata.
"""
errors: list[ParseError] = []
raw_blocks = _split_into_blocks(markdown)
if not raw_blocks:
errors.append(ParseError(line=1, column=1, message="No node blocks found"))
return ParseResult(tree_structure=None, errors=errors)
# Check if the first block is a metadata block (has 'name' but no 'id'/'type')
metadata = None
node_blocks = raw_blocks
first_block_text, _ = raw_blocks[0]
meta = _try_parse_metadata_block(first_block_text)
if meta is not None:
metadata = meta
node_blocks = raw_blocks[1:]
if not node_blocks:
errors.append(ParseError(line=1, column=1, message="No node blocks found (only metadata)"))
return ParseResult(tree_structure=None, errors=errors, metadata=metadata)
# Parse each block into a flat node dict
flat_nodes: list[dict[str, Any]] = []
for block_text, start_line in node_blocks:
node, block_errors = _parse_block(block_text, start_line)
errors.extend(block_errors)
if node:
flat_nodes.append(node)
if not flat_nodes:
errors.append(ParseError(line=1, column=1, message="No valid nodes parsed"))
return ParseResult(tree_structure=None, errors=errors)
# Check for duplicate IDs
seen_ids: dict[str, int] = {}
for node in flat_nodes:
nid = node.get("id", "")
if nid in seen_ids:
errors.append(ParseError(
line=node.get("_start_line", 1),
column=1,
message=f"Duplicate node ID: '{nid}'"
))
else:
seen_ids[nid] = node.get("_start_line", 1)
# Reconstruct recursive tree from flat nodes
tree, reconstruct_errors = _reconstruct_tree(flat_nodes)
errors.extend(reconstruct_errors)
return ParseResult(tree_structure=tree, errors=errors, metadata=metadata)
def _try_parse_metadata_block(block_text: str) -> dict[str, Any] | None:
"""Try to parse a block as tree metadata (name, description, category, tags).
Returns metadata dict if the block contains 'name' but no 'id'/'type'.
Returns None if it's a regular node block.
"""
lines = block_text.split("\n")
fm_start = None
fm_end = None
for i, line in enumerate(lines):
if line.strip() == "---":
if fm_start is None:
fm_start = i
else:
fm_end = i
break
if fm_start is None or fm_end is None:
return None
fm_data: dict[str, str] = {}
for i in range(fm_start + 1, fm_end):
line = lines[i].strip()
if not line:
continue
if ":" in line:
key, _, value = line.partition(":")
fm_data[key.strip()] = value.strip()
# It's a metadata block if it has 'name' but no 'id' and no 'type'
if "name" in fm_data and "id" not in fm_data and "type" not in fm_data:
metadata: dict[str, Any] = {"name": fm_data["name"]}
if "description" in fm_data:
metadata["description"] = fm_data["description"]
if "category" in fm_data:
metadata["category"] = fm_data["category"]
if "tags" in fm_data:
tags_str = fm_data["tags"].strip("[]")
metadata["tags"] = [t.strip() for t in tags_str.split(",") if t.strip()]
return metadata
return None
def _split_into_blocks(markdown: str) -> list[tuple[str, int]]:
"""Split markdown into blocks delimited by --- frontmatter markers.
Returns list of (block_text, start_line_number) tuples.
"""
lines = markdown.split("\n")
blocks: list[tuple[str, int]] = []
# Find frontmatter boundaries (--- on its own line)
fm_lines: list[int] = []
for i, line in enumerate(lines):
if line.strip() == "---":
fm_lines.append(i)
# Pair up frontmatter markers: each block starts at a `---` and the
# frontmatter ends at the next `---`. The body follows until the
# next block's first `---` (or end of file).
i = 0
while i < len(fm_lines) - 1:
start = fm_lines[i]
end_fm = fm_lines[i + 1]
# Find the next block start (or EOF)
next_block_start = len(lines)
if i + 2 < len(fm_lines):
next_block_start = fm_lines[i + 2]
block_lines = lines[start:next_block_start]
block_text = "\n".join(block_lines)
blocks.append((block_text, start + 1)) # 1-indexed line number
i += 2 # Jump to next frontmatter pair
return blocks
def _parse_block(block_text: str, start_line: int) -> tuple[dict[str, Any] | None, list[ParseError]]:
"""Parse a single frontmatter+body block into a node dict."""
errors: list[ParseError] = []
lines = block_text.split("\n")
# Extract frontmatter (between first and second ---)
fm_start = None
fm_end = None
for i, line in enumerate(lines):
if line.strip() == "---":
if fm_start is None:
fm_start = i
else:
fm_end = i
break
if fm_start is None or fm_end is None:
errors.append(ParseError(
line=start_line, column=1,
message="Block missing valid frontmatter delimiters"
))
return None, errors
# Parse YAML-like frontmatter (simple key: value)
fm_data: dict[str, str] = {}
for i in range(fm_start + 1, fm_end):
line = lines[i].strip()
if not line:
continue
if ":" in line:
key, _, value = line.partition(":")
fm_data[key.strip()] = value.strip()
node_id = fm_data.get("id", "")
node_type = fm_data.get("type", "")
parent_id = fm_data.get("parent")
if not node_id:
errors.append(ParseError(
line=start_line, column=1,
message="Node block missing 'id' in frontmatter"
))
return None, errors
if node_type not in ("decision", "action", "solution"):
errors.append(ParseError(
line=start_line, column=1,
message=f"Invalid node type: '{node_type}' (must be decision, action, or solution)"
))
return None, errors
# Parse body (everything after frontmatter)
body_lines = lines[fm_end + 1:]
body_text = "\n".join(body_lines)
node: dict[str, Any] = {
"id": node_id,
"type": node_type,
"_parent_id": parent_id,
"_start_line": start_line,
}
if node_type == "decision":
_parse_decision_body(body_lines, node, start_line + fm_end + 1, errors)
elif node_type == "action":
_parse_action_body(body_lines, node, start_line + fm_end + 1, errors)
elif node_type == "solution":
_parse_solution_body(body_lines, node, start_line + fm_end + 1, errors)
return node, errors
def _parse_decision_body(
lines: list[str],
node: dict[str, Any],
body_start_line: int,
errors: list[ParseError],
) -> None:
"""Parse the body of a decision node."""
question = ""
help_text_lines: list[str] = []
options: list[dict[str, Any]] = []
for i, line in enumerate(lines):
stripped = line.strip()
if not stripped:
continue
# Check for heading (question)
m = HEADING1_RE.match(stripped)
if m:
question = m.group(1).strip()
continue
# Check for blockquote (help_text)
m = BLOCKQUOTE_RE.match(stripped)
if m:
help_text_lines.append(m.group(1))
continue
# Check for option
m = OPTION_RE.match(stripped)
if m:
opt_label = m.group(2).strip()
opt_next = m.group(3) or ""
options.append({
"id": f"opt_{node['id']}_{len(options)}",
"label": opt_label,
"next_node_id": opt_next,
})
continue
node["question"] = question
node["help_text"] = "\n".join(help_text_lines) if help_text_lines else ""
node["options"] = options
node["children"] = []
def _parse_action_body(
lines: list[str],
node: dict[str, Any],
body_start_line: int,
errors: list[ParseError],
) -> None:
"""Parse the body of an action node."""
title = ""
description_lines: list[str] = []
commands: list[str] = []
expected_outcome = ""
next_node_id = ""
in_command_block = False
for i, line in enumerate(lines):
stripped = line.strip()
# Command block handling
if in_command_block:
if COMMAND_BLOCK_END.match(stripped):
in_command_block = False
else:
commands.append(line.rstrip())
continue
if COMMAND_BLOCK_START.match(stripped):
in_command_block = True
continue
if not stripped:
# Blank lines are part of description
if title and not expected_outcome and not next_node_id:
description_lines.append("")
continue
# Title
m = HEADING2_RE.match(stripped)
if m:
title = m.group(1).strip()
continue
# Expected outcome
m = EXPECTED_RE.match(stripped)
if m:
expected_outcome = m.group(1).strip()
continue
# Next node reference
m = NEXT_NODE_RE.match(stripped)
if m:
next_node_id = m.group(1).strip()
continue
# Everything else is description
description_lines.append(stripped)
# Trim leading and trailing empty lines from description
while description_lines and not description_lines[-1].strip():
description_lines.pop()
while description_lines and not description_lines[0].strip():
description_lines.pop(0)
node["title"] = title
node["description"] = "\n".join(description_lines)
node["commands"] = commands if commands else []
node["expected_outcome"] = expected_outcome
node["next_node_id"] = next_node_id
node["children"] = []
def _parse_solution_body(
lines: list[str],
node: dict[str, Any],
body_start_line: int,
errors: list[ParseError],
) -> None:
"""Parse the body of a solution node."""
title = ""
description_lines: list[str] = []
resolution_steps: list[str] = []
for i, line in enumerate(lines):
stripped = line.strip()
if not stripped:
if title:
description_lines.append("")
continue
# Title
m = HEADING2_RE.match(stripped)
if m:
title = m.group(1).strip()
continue
# Ordered list item (resolution step)
m = ORDERED_LIST_RE.match(stripped)
if m:
resolution_steps.append(m.group(1).strip())
continue
# Everything else is description
description_lines.append(stripped)
# Trim leading and trailing empty lines
while description_lines and not description_lines[-1].strip():
description_lines.pop()
while description_lines and not description_lines[0].strip():
description_lines.pop(0)
node["title"] = title
node["description"] = "\n".join(description_lines)
node["resolution_steps"] = resolution_steps
node["solution"] = title # solution field required for publishing
def _reconstruct_tree(flat_nodes: list[dict[str, Any]]) -> tuple[dict[str, Any] | None, list[ParseError]]:
"""Reconstruct a recursive tree from flat nodes using parent references.
Returns (tree_structure, errors).
"""
errors: list[ParseError] = []
if not flat_nodes:
return None, errors
# Build lookup
node_map: dict[str, dict[str, Any]] = {}
for node in flat_nodes:
nid = node["id"]
# Clean node (remove internal fields)
clean = {k: v for k, v in node.items() if not k.startswith("_")}
if "children" not in clean:
clean["children"] = []
node_map[nid] = clean
# Find root (node with no parent)
root_id = None
for node in flat_nodes:
if node.get("_parent_id") is None:
if root_id is not None:
errors.append(ParseError(
line=node.get("_start_line", 1),
column=1,
message=f"Multiple root nodes found: '{root_id}' and '{node['id']}'",
))
root_id = node["id"]
if root_id is None:
# Fall back to first node
root_id = flat_nodes[0]["id"]
errors.append(ParseError(
line=1, column=1,
message="No root node found (no node without a parent). Using first node as root.",
severity="warning"
))
# Build children relationships
for node in flat_nodes:
parent_id = node.get("_parent_id")
if parent_id and parent_id in node_map:
child = node_map[node["id"]]
node_map[parent_id]["children"].append(child)
elif parent_id and parent_id not in node_map:
errors.append(ParseError(
line=node.get("_start_line", 1),
column=1,
message=f"Node '{node['id']}' references non-existent parent '{parent_id}'"
))
# Validate option references
for nid, node in node_map.items():
if node.get("type") == "decision":
for opt in node.get("options", []):
ref = opt.get("next_node_id", "")
if ref and ref not in node_map:
errors.append(ParseError(
line=1, column=1,
message=f"Option '{opt.get('label', '')}' in node '{nid}' references non-existent node '@{ref}'"
))
elif node.get("type") == "action":
ref = node.get("next_node_id", "")
if ref and ref not in node_map:
errors.append(ParseError(
line=1, column=1,
message=f"Action node '{nid}' references non-existent next node '@{ref}'"
))
root = node_map.get(root_id)
return root, errors