""" Markdown → JSONB parser for ResolutionFlow tree structures. Parses ResolutionFlow Markdown format (frontmatter-delimited node blocks) back into the recursive tree_structure JSONB dict. """ import re from dataclasses import dataclass, field from typing import Any @dataclass class ParseError: """A validation/parse error with location info.""" line: int column: int message: str severity: str = "error" # 'error' or 'warning' @dataclass class ParseResult: """Result of parsing markdown into a tree structure.""" tree_structure: dict[str, Any] | None errors: list[ParseError] = field(default_factory=list) metadata: dict[str, Any] | None = None # Regex patterns FRONTMATTER_RE = re.compile(r"^---\s*$", re.MULTILINE) OPTION_RE = re.compile( r"^-\s*\[([A-Za-z0-9]+)\]\s*(.+?)(?:\s*→\s*@(\S+))?\s*$" ) NEXT_NODE_RE = re.compile(r"^→\s*@(\S+)\s*$") EXPECTED_RE = re.compile(r"^\*\*Expected:\*\*\s*(.+)$") HEADING1_RE = re.compile(r"^#\s+(.+)$") HEADING2_RE = re.compile(r"^##\s+(.+)$") BLOCKQUOTE_RE = re.compile(r"^>\s*(.*)$") ORDERED_LIST_RE = re.compile(r"^\d+\.\s+(.+)$") COMMAND_BLOCK_START = re.compile(r"^```commands\s*$") COMMAND_BLOCK_END = re.compile(r"^```\s*$") def parse_markdown_to_tree(markdown: str) -> ParseResult: """Parse ResolutionFlow markdown into a tree structure JSONB dict. Args: markdown: The markdown string to parse. Returns: ParseResult with tree_structure, errors, and optional metadata. """ errors: list[ParseError] = [] raw_blocks = _split_into_blocks(markdown) if not raw_blocks: errors.append(ParseError(line=1, column=1, message="No node blocks found")) return ParseResult(tree_structure=None, errors=errors) # Check if the first block is a metadata block (has 'name' but no 'id'/'type') metadata = None node_blocks = raw_blocks first_block_text, _ = raw_blocks[0] meta = _try_parse_metadata_block(first_block_text) if meta is not None: metadata = meta node_blocks = raw_blocks[1:] if not node_blocks: errors.append(ParseError(line=1, column=1, message="No node blocks found (only metadata)")) return ParseResult(tree_structure=None, errors=errors, metadata=metadata) # Parse each block into a flat node dict flat_nodes: list[dict[str, Any]] = [] for block_text, start_line in node_blocks: node, block_errors = _parse_block(block_text, start_line) errors.extend(block_errors) if node: flat_nodes.append(node) if not flat_nodes: errors.append(ParseError(line=1, column=1, message="No valid nodes parsed")) return ParseResult(tree_structure=None, errors=errors) # Check for duplicate IDs seen_ids: dict[str, int] = {} for node in flat_nodes: nid = node.get("id", "") if nid in seen_ids: errors.append(ParseError( line=node.get("_start_line", 1), column=1, message=f"Duplicate node ID: '{nid}'" )) else: seen_ids[nid] = node.get("_start_line", 1) # Reconstruct recursive tree from flat nodes tree, reconstruct_errors = _reconstruct_tree(flat_nodes) errors.extend(reconstruct_errors) return ParseResult(tree_structure=tree, errors=errors, metadata=metadata) def _try_parse_metadata_block(block_text: str) -> dict[str, Any] | None: """Try to parse a block as tree metadata (name, description, category, tags). Returns metadata dict if the block contains 'name' but no 'id'/'type'. Returns None if it's a regular node block. """ lines = block_text.split("\n") fm_start = None fm_end = None for i, line in enumerate(lines): if line.strip() == "---": if fm_start is None: fm_start = i else: fm_end = i break if fm_start is None or fm_end is None: return None fm_data: dict[str, str] = {} for i in range(fm_start + 1, fm_end): line = lines[i].strip() if not line: continue if ":" in line: key, _, value = line.partition(":") fm_data[key.strip()] = value.strip() # It's a metadata block if it has 'name' but no 'id' and no 'type' if "name" in fm_data and "id" not in fm_data and "type" not in fm_data: metadata: dict[str, Any] = {"name": fm_data["name"]} if "description" in fm_data: metadata["description"] = fm_data["description"] if "category" in fm_data: metadata["category"] = fm_data["category"] if "tags" in fm_data: tags_str = fm_data["tags"].strip("[]") metadata["tags"] = [t.strip() for t in tags_str.split(",") if t.strip()] return metadata return None def _split_into_blocks(markdown: str) -> list[tuple[str, int]]: """Split markdown into blocks delimited by --- frontmatter markers. Returns list of (block_text, start_line_number) tuples. """ lines = markdown.split("\n") blocks: list[tuple[str, int]] = [] # Find frontmatter boundaries (--- on its own line) fm_lines: list[int] = [] for i, line in enumerate(lines): if line.strip() == "---": fm_lines.append(i) # Pair up frontmatter markers: each block starts at a `---` and the # frontmatter ends at the next `---`. The body follows until the # next block's first `---` (or end of file). i = 0 while i < len(fm_lines) - 1: start = fm_lines[i] end_fm = fm_lines[i + 1] # Find the next block start (or EOF) next_block_start = len(lines) if i + 2 < len(fm_lines): next_block_start = fm_lines[i + 2] block_lines = lines[start:next_block_start] block_text = "\n".join(block_lines) blocks.append((block_text, start + 1)) # 1-indexed line number i += 2 # Jump to next frontmatter pair return blocks def _parse_block(block_text: str, start_line: int) -> tuple[dict[str, Any] | None, list[ParseError]]: """Parse a single frontmatter+body block into a node dict.""" errors: list[ParseError] = [] lines = block_text.split("\n") # Extract frontmatter (between first and second ---) fm_start = None fm_end = None for i, line in enumerate(lines): if line.strip() == "---": if fm_start is None: fm_start = i else: fm_end = i break if fm_start is None or fm_end is None: errors.append(ParseError( line=start_line, column=1, message="Block missing valid frontmatter delimiters" )) return None, errors # Parse YAML-like frontmatter (simple key: value) fm_data: dict[str, str] = {} for i in range(fm_start + 1, fm_end): line = lines[i].strip() if not line: continue if ":" in line: key, _, value = line.partition(":") fm_data[key.strip()] = value.strip() node_id = fm_data.get("id", "") node_type = fm_data.get("type", "") parent_id = fm_data.get("parent") if not node_id: errors.append(ParseError( line=start_line, column=1, message="Node block missing 'id' in frontmatter" )) return None, errors if node_type not in ("decision", "action", "solution"): errors.append(ParseError( line=start_line, column=1, message=f"Invalid node type: '{node_type}' (must be decision, action, or solution)" )) return None, errors # Parse body (everything after frontmatter) body_lines = lines[fm_end + 1:] body_text = "\n".join(body_lines) node: dict[str, Any] = { "id": node_id, "type": node_type, "_parent_id": parent_id, "_start_line": start_line, } if node_type == "decision": _parse_decision_body(body_lines, node, start_line + fm_end + 1, errors) elif node_type == "action": _parse_action_body(body_lines, node, start_line + fm_end + 1, errors) elif node_type == "solution": _parse_solution_body(body_lines, node, start_line + fm_end + 1, errors) return node, errors def _parse_decision_body( lines: list[str], node: dict[str, Any], body_start_line: int, errors: list[ParseError], ) -> None: """Parse the body of a decision node.""" question = "" help_text_lines: list[str] = [] options: list[dict[str, Any]] = [] for i, line in enumerate(lines): stripped = line.strip() if not stripped: continue # Check for heading (question) m = HEADING1_RE.match(stripped) if m: question = m.group(1).strip() continue # Check for blockquote (help_text) m = BLOCKQUOTE_RE.match(stripped) if m: help_text_lines.append(m.group(1)) continue # Check for option m = OPTION_RE.match(stripped) if m: opt_label = m.group(2).strip() opt_next = m.group(3) or "" options.append({ "id": f"opt_{node['id']}_{len(options)}", "label": opt_label, "next_node_id": opt_next, }) continue node["question"] = question node["help_text"] = "\n".join(help_text_lines) if help_text_lines else "" node["options"] = options node["children"] = [] def _parse_action_body( lines: list[str], node: dict[str, Any], body_start_line: int, errors: list[ParseError], ) -> None: """Parse the body of an action node.""" title = "" description_lines: list[str] = [] commands: list[str] = [] expected_outcome = "" next_node_id = "" in_command_block = False for i, line in enumerate(lines): stripped = line.strip() # Command block handling if in_command_block: if COMMAND_BLOCK_END.match(stripped): in_command_block = False else: commands.append(line.rstrip()) continue if COMMAND_BLOCK_START.match(stripped): in_command_block = True continue if not stripped: # Blank lines are part of description if title and not expected_outcome and not next_node_id: description_lines.append("") continue # Title m = HEADING2_RE.match(stripped) if m: title = m.group(1).strip() continue # Expected outcome m = EXPECTED_RE.match(stripped) if m: expected_outcome = m.group(1).strip() continue # Next node reference m = NEXT_NODE_RE.match(stripped) if m: next_node_id = m.group(1).strip() continue # Everything else is description description_lines.append(stripped) # Trim leading and trailing empty lines from description while description_lines and not description_lines[-1].strip(): description_lines.pop() while description_lines and not description_lines[0].strip(): description_lines.pop(0) node["title"] = title node["description"] = "\n".join(description_lines) node["commands"] = commands if commands else [] node["expected_outcome"] = expected_outcome node["next_node_id"] = next_node_id node["children"] = [] def _parse_solution_body( lines: list[str], node: dict[str, Any], body_start_line: int, errors: list[ParseError], ) -> None: """Parse the body of a solution node.""" title = "" description_lines: list[str] = [] resolution_steps: list[str] = [] for i, line in enumerate(lines): stripped = line.strip() if not stripped: if title: description_lines.append("") continue # Title m = HEADING2_RE.match(stripped) if m: title = m.group(1).strip() continue # Ordered list item (resolution step) m = ORDERED_LIST_RE.match(stripped) if m: resolution_steps.append(m.group(1).strip()) continue # Everything else is description description_lines.append(stripped) # Trim leading and trailing empty lines while description_lines and not description_lines[-1].strip(): description_lines.pop() while description_lines and not description_lines[0].strip(): description_lines.pop(0) node["title"] = title node["description"] = "\n".join(description_lines) node["resolution_steps"] = resolution_steps node["solution"] = title # solution field required for publishing def _reconstruct_tree(flat_nodes: list[dict[str, Any]]) -> tuple[dict[str, Any] | None, list[ParseError]]: """Reconstruct a recursive tree from flat nodes using parent references. Returns (tree_structure, errors). """ errors: list[ParseError] = [] if not flat_nodes: return None, errors # Build lookup node_map: dict[str, dict[str, Any]] = {} for node in flat_nodes: nid = node["id"] # Clean node (remove internal fields) clean = {k: v for k, v in node.items() if not k.startswith("_")} if "children" not in clean: clean["children"] = [] node_map[nid] = clean # Find root (node with no parent) root_id = None for node in flat_nodes: if node.get("_parent_id") is None: if root_id is not None: errors.append(ParseError( line=node.get("_start_line", 1), column=1, message=f"Multiple root nodes found: '{root_id}' and '{node['id']}'", )) root_id = node["id"] if root_id is None: # Fall back to first node root_id = flat_nodes[0]["id"] errors.append(ParseError( line=1, column=1, message="No root node found (no node without a parent). Using first node as root.", severity="warning" )) # Build children relationships for node in flat_nodes: parent_id = node.get("_parent_id") if parent_id and parent_id in node_map: child = node_map[node["id"]] node_map[parent_id]["children"].append(child) elif parent_id and parent_id not in node_map: errors.append(ParseError( line=node.get("_start_line", 1), column=1, message=f"Node '{node['id']}' references non-existent parent '{parent_id}'" )) # Validate option references for nid, node in node_map.items(): if node.get("type") == "decision": for opt in node.get("options", []): ref = opt.get("next_node_id", "") if ref and ref not in node_map: errors.append(ParseError( line=1, column=1, message=f"Option '{opt.get('label', '')}' in node '{nid}' references non-existent node '@{ref}'" )) elif node.get("type") == "action": ref = node.get("next_node_id", "") if ref and ref not in node_map: errors.append(ParseError( line=1, column=1, message=f"Action node '{nid}' references non-existent next node '@{ref}'" )) root = node_map.get(root_id) return root, errors