feat: add dual-mode tree editor with Code Mode, variables, and markdown sync

Implements the full dual-mode tree editor (Plan Phases 1-5):

Backend:
- JSONB↔Markdown bidirectional serializer/parser with mistune
- Markdown validator with line/column error reporting
- 3 API endpoints: export-markdown, import-markdown, validate-markdown
- Variable extraction/resolution service ([USER_INPUT], [VAR], [SAVE_AS])
- Session variables JSONB column (migration 028)
- 39 tree markdown tests + variable service tests (403 total passing)

Frontend:
- Monaco-based Code Mode with custom Monarch tokenizer and dark theme
- Autocomplete for @node_id refs, type values, variable names
- Debounced validation (800ms) with inline Monaco error markers
- Syntax help panel (absolute overlay, toggleable)
- Starter template for new trees with valid cross-references
- Bidirectional metadata sync (name/description/category/tags frontmatter)
- Synchronous tree→markdown serializer (fixes async race condition)
- Pre-save validation blocks save on broken refs or missing tree name
- Mode-aware undo/redo: Monaco native in Code Mode, throttled zundo in Flow Mode
- Variable prompt modal and frontend resolver for session navigation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-02-10 09:45:26 -05:00
parent 2bd47004e7
commit eac6e184ec
32 changed files with 3369 additions and 52 deletions

View File

@@ -0,0 +1,37 @@
"""add session_variables JSONB column to sessions
Revision ID: 028
Revises: 027
Create Date: 2026-02-09
Adds session_variables JSONB column for storing variable values
collected during tree navigation (e.g., hostname, ticket_number).
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = '028'
down_revision: Union[str, None] = '027'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column(
'sessions',
sa.Column(
'session_variables',
postgresql.JSONB(),
server_default=sa.text("'{}'::jsonb"),
nullable=False,
)
)
def downgrade() -> None:
op.drop_column('sessions', 'session_variables')

View File

@@ -295,6 +295,12 @@ async def export_session(
content = generate_text_export(session, export_options)
media_type = "text/plain"
# Resolve variables in export output
session_vars = getattr(session, 'session_variables', None) or {}
if session_vars:
from app.services.variable_service import resolve_variables
content = resolve_variables(content, session_vars)
# Mark as exported
session.exported = True
await db.commit()

View File

@@ -0,0 +1,132 @@
"""API endpoints for tree markdown import/export."""
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from app.core.database import get_db
from app.models.tree import Tree
from app.models.user import User
from app.api.deps import get_current_active_user, require_engineer_or_admin
from app.core.permissions import can_access_tree, can_edit_tree
from app.schemas.tree_markdown import (
TreeMarkdownExportResponse,
TreeMarkdownImportRequest,
TreeMarkdownValidationResponse,
MarkdownValidationError,
)
from app.services.tree_markdown_service import serialize_tree_to_markdown
from app.services.tree_markdown_parser import parse_markdown_to_tree
from app.services.tree_markdown_validator import validate_tree_markdown
router = APIRouter(prefix="/trees", tags=["tree-markdown"])
@router.get("/{tree_id}/export-markdown", response_model=TreeMarkdownExportResponse)
async def export_tree_markdown(
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
):
"""Export a tree's JSONB structure as ResolutionFlow markdown."""
result = await db.execute(
select(Tree).where(Tree.id == tree_id, Tree.deleted_at.is_(None))
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Tree not found")
if not can_access_tree(current_user, tree):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
metadata = {
"name": tree.name or "",
"description": tree.description or "",
"category": tree.category or "",
}
markdown = serialize_tree_to_markdown(tree.tree_structure, metadata=metadata)
return TreeMarkdownExportResponse(markdown=markdown)
@router.put("/{tree_id}/import-markdown", response_model=TreeMarkdownValidationResponse)
async def import_tree_markdown(
tree_id: UUID,
body: TreeMarkdownImportRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_engineer_or_admin)],
):
"""Parse markdown and update a tree's JSONB structure."""
result = await db.execute(
select(Tree).where(Tree.id == tree_id, Tree.deleted_at.is_(None))
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Tree not found")
if not can_edit_tree(current_user, tree):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
parse_result = parse_markdown_to_tree(body.markdown)
has_errors = any(e.severity == "error" for e in parse_result.errors)
if has_errors:
return TreeMarkdownValidationResponse(
valid=False,
errors=[
MarkdownValidationError(
line=e.line, column=e.column, message=e.message, severity=e.severity
)
for e in parse_result.errors
],
tree_structure=None,
)
# Apply the parsed tree structure
tree.tree_structure = parse_result.tree_structure
await db.commit()
return TreeMarkdownValidationResponse(
valid=True,
errors=[
MarkdownValidationError(
line=e.line, column=e.column, message=e.message, severity=e.severity
)
for e in parse_result.errors
],
tree_structure=parse_result.tree_structure,
)
@router.post("/validate-markdown", response_model=TreeMarkdownValidationResponse)
async def validate_markdown(
body: TreeMarkdownImportRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
):
"""Validate markdown without saving. Returns errors and preview JSONB."""
parse_result = parse_markdown_to_tree(body.markdown)
all_errors = validate_tree_markdown(body.markdown)
# Deduplicate errors by message
seen: set[str] = set()
unique_errors: list[MarkdownValidationError] = []
for e in all_errors:
key = f"{e.line}:{e.column}:{e.message}"
if key not in seen:
seen.add(key)
unique_errors.append(
MarkdownValidationError(
line=e.line, column=e.column, message=e.message, severity=e.severity
)
)
has_hard_errors = any(e.severity == "error" for e in unique_errors)
return TreeMarkdownValidationResponse(
valid=not has_hard_errors,
errors=unique_errors,
tree_structure=parse_result.tree_structure,
metadata=parse_result.metadata,
)

View File

@@ -1,5 +1,5 @@
from fastapi import APIRouter
from app.api.endpoints import auth, trees, sessions, invite, categories, tags, folders, step_categories, steps, admin, accounts, webhooks, shares, shared
from app.api.endpoints import auth, trees, sessions, invite, categories, tags, folders, step_categories, steps, admin, accounts, webhooks, shares, shared, tree_markdown
from app.api.endpoints import admin_dashboard, admin_audit, admin_plan_limits, admin_feature_flags, admin_settings, admin_categories
api_router = APIRouter()
@@ -24,3 +24,4 @@ api_router.include_router(accounts.router)
api_router.include_router(webhooks.router)
api_router.include_router(shares.router)
api_router.include_router(shared.router) # Public endpoints (no auth)
api_router.include_router(tree_markdown.router)

View File

@@ -48,6 +48,9 @@ class Session(Base):
ticket_number: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
client_name: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
exported: Mapped[bool] = mapped_column(Boolean, default=False)
session_variables: Mapped[dict[str, Any]] = mapped_column(
JSONB, nullable=False, server_default=sa.text("'{}'::jsonb")
)
scratchpad: Mapped[Optional[str]] = mapped_column(
Text, nullable=True, server_default=sa.text("''")
)

View File

@@ -44,6 +44,7 @@ class SessionUpdate(BaseModel):
ticket_number: Optional[str] = Field(None, max_length=100)
client_name: Optional[str] = Field(None, max_length=255)
scratchpad: Optional[str] = None
session_variables: Optional[dict[str, str]] = None
class SessionResponse(BaseModel):
@@ -60,6 +61,7 @@ class SessionResponse(BaseModel):
client_name: Optional[str] = None
exported: bool
scratchpad: str = ""
session_variables: dict[str, str] = Field(default_factory=dict)
@validator('scratchpad', pre=True, always=True)
def normalize_scratchpad(cls, v):

View File

@@ -0,0 +1,28 @@
"""Pydantic schemas for tree markdown import/export."""
from pydantic import BaseModel
class TreeMarkdownExportResponse(BaseModel):
"""Response for markdown export endpoint."""
markdown: str
class TreeMarkdownImportRequest(BaseModel):
"""Request body for markdown import endpoint."""
markdown: str
class MarkdownValidationError(BaseModel):
"""A single validation error with location info."""
line: int
column: int
message: str
severity: str # 'error' or 'warning'
class TreeMarkdownValidationResponse(BaseModel):
"""Response for markdown validation endpoint."""
valid: bool
errors: list[MarkdownValidationError]
tree_structure: dict | None = None
metadata: dict | None = None

View File

@@ -0,0 +1,491 @@
"""
Markdown → JSONB parser for ResolutionFlow tree structures.
Parses ResolutionFlow Markdown format (frontmatter-delimited node blocks)
back into the recursive tree_structure JSONB dict.
"""
import re
from dataclasses import dataclass, field
from typing import Any
@dataclass
class ParseError:
"""A validation/parse error with location info."""
line: int
column: int
message: str
severity: str = "error" # 'error' or 'warning'
@dataclass
class ParseResult:
"""Result of parsing markdown into a tree structure."""
tree_structure: dict[str, Any] | None
errors: list[ParseError] = field(default_factory=list)
metadata: dict[str, Any] | None = None
# Regex patterns
FRONTMATTER_RE = re.compile(r"^---\s*$", re.MULTILINE)
OPTION_RE = re.compile(
r"^-\s*\[([A-Za-z0-9]+)\]\s*(.+?)(?:\s*→\s*@(\S+))?\s*$"
)
NEXT_NODE_RE = re.compile(r"^→\s*@(\S+)\s*$")
EXPECTED_RE = re.compile(r"^\*\*Expected:\*\*\s*(.+)$")
HEADING1_RE = re.compile(r"^#\s+(.+)$")
HEADING2_RE = re.compile(r"^##\s+(.+)$")
BLOCKQUOTE_RE = re.compile(r"^>\s*(.*)$")
ORDERED_LIST_RE = re.compile(r"^\d+\.\s+(.+)$")
COMMAND_BLOCK_START = re.compile(r"^```commands\s*$")
COMMAND_BLOCK_END = re.compile(r"^```\s*$")
def parse_markdown_to_tree(markdown: str) -> ParseResult:
"""Parse ResolutionFlow markdown into a tree structure JSONB dict.
Args:
markdown: The markdown string to parse.
Returns:
ParseResult with tree_structure, errors, and optional metadata.
"""
errors: list[ParseError] = []
raw_blocks = _split_into_blocks(markdown)
if not raw_blocks:
errors.append(ParseError(line=1, column=1, message="No node blocks found"))
return ParseResult(tree_structure=None, errors=errors)
# Check if the first block is a metadata block (has 'name' but no 'id'/'type')
metadata = None
node_blocks = raw_blocks
first_block_text, _ = raw_blocks[0]
meta = _try_parse_metadata_block(first_block_text)
if meta is not None:
metadata = meta
node_blocks = raw_blocks[1:]
if not node_blocks:
errors.append(ParseError(line=1, column=1, message="No node blocks found (only metadata)"))
return ParseResult(tree_structure=None, errors=errors, metadata=metadata)
# Parse each block into a flat node dict
flat_nodes: list[dict[str, Any]] = []
for block_text, start_line in node_blocks:
node, block_errors = _parse_block(block_text, start_line)
errors.extend(block_errors)
if node:
flat_nodes.append(node)
if not flat_nodes:
errors.append(ParseError(line=1, column=1, message="No valid nodes parsed"))
return ParseResult(tree_structure=None, errors=errors)
# Check for duplicate IDs
seen_ids: dict[str, int] = {}
for node in flat_nodes:
nid = node.get("id", "")
if nid in seen_ids:
errors.append(ParseError(
line=node.get("_start_line", 1),
column=1,
message=f"Duplicate node ID: '{nid}'"
))
else:
seen_ids[nid] = node.get("_start_line", 1)
# Reconstruct recursive tree from flat nodes
tree, reconstruct_errors = _reconstruct_tree(flat_nodes)
errors.extend(reconstruct_errors)
return ParseResult(tree_structure=tree, errors=errors, metadata=metadata)
def _try_parse_metadata_block(block_text: str) -> dict[str, Any] | None:
"""Try to parse a block as tree metadata (name, description, category, tags).
Returns metadata dict if the block contains 'name' but no 'id'/'type'.
Returns None if it's a regular node block.
"""
lines = block_text.split("\n")
fm_start = None
fm_end = None
for i, line in enumerate(lines):
if line.strip() == "---":
if fm_start is None:
fm_start = i
else:
fm_end = i
break
if fm_start is None or fm_end is None:
return None
fm_data: dict[str, str] = {}
for i in range(fm_start + 1, fm_end):
line = lines[i].strip()
if not line:
continue
if ":" in line:
key, _, value = line.partition(":")
fm_data[key.strip()] = value.strip()
# It's a metadata block if it has 'name' but no 'id' and no 'type'
if "name" in fm_data and "id" not in fm_data and "type" not in fm_data:
metadata: dict[str, Any] = {"name": fm_data["name"]}
if "description" in fm_data:
metadata["description"] = fm_data["description"]
if "category" in fm_data:
metadata["category"] = fm_data["category"]
if "tags" in fm_data:
tags_str = fm_data["tags"].strip("[]")
metadata["tags"] = [t.strip() for t in tags_str.split(",") if t.strip()]
return metadata
return None
def _split_into_blocks(markdown: str) -> list[tuple[str, int]]:
"""Split markdown into blocks delimited by --- frontmatter markers.
Returns list of (block_text, start_line_number) tuples.
"""
lines = markdown.split("\n")
blocks: list[tuple[str, int]] = []
# Find frontmatter boundaries (--- on its own line)
fm_lines: list[int] = []
for i, line in enumerate(lines):
if line.strip() == "---":
fm_lines.append(i)
# Pair up frontmatter markers: each block starts at a `---` and the
# frontmatter ends at the next `---`. The body follows until the
# next block's first `---` (or end of file).
i = 0
while i < len(fm_lines) - 1:
start = fm_lines[i]
end_fm = fm_lines[i + 1]
# Find the next block start (or EOF)
next_block_start = len(lines)
if i + 2 < len(fm_lines):
next_block_start = fm_lines[i + 2]
block_lines = lines[start:next_block_start]
block_text = "\n".join(block_lines)
blocks.append((block_text, start + 1)) # 1-indexed line number
i += 2 # Jump to next frontmatter pair
return blocks
def _parse_block(block_text: str, start_line: int) -> tuple[dict[str, Any] | None, list[ParseError]]:
"""Parse a single frontmatter+body block into a node dict."""
errors: list[ParseError] = []
lines = block_text.split("\n")
# Extract frontmatter (between first and second ---)
fm_start = None
fm_end = None
for i, line in enumerate(lines):
if line.strip() == "---":
if fm_start is None:
fm_start = i
else:
fm_end = i
break
if fm_start is None or fm_end is None:
errors.append(ParseError(
line=start_line, column=1,
message="Block missing valid frontmatter delimiters"
))
return None, errors
# Parse YAML-like frontmatter (simple key: value)
fm_data: dict[str, str] = {}
for i in range(fm_start + 1, fm_end):
line = lines[i].strip()
if not line:
continue
if ":" in line:
key, _, value = line.partition(":")
fm_data[key.strip()] = value.strip()
node_id = fm_data.get("id", "")
node_type = fm_data.get("type", "")
parent_id = fm_data.get("parent")
if not node_id:
errors.append(ParseError(
line=start_line, column=1,
message="Node block missing 'id' in frontmatter"
))
return None, errors
if node_type not in ("decision", "action", "solution"):
errors.append(ParseError(
line=start_line, column=1,
message=f"Invalid node type: '{node_type}' (must be decision, action, or solution)"
))
return None, errors
# Parse body (everything after frontmatter)
body_lines = lines[fm_end + 1:]
body_text = "\n".join(body_lines)
node: dict[str, Any] = {
"id": node_id,
"type": node_type,
"_parent_id": parent_id,
"_start_line": start_line,
}
if node_type == "decision":
_parse_decision_body(body_lines, node, start_line + fm_end + 1, errors)
elif node_type == "action":
_parse_action_body(body_lines, node, start_line + fm_end + 1, errors)
elif node_type == "solution":
_parse_solution_body(body_lines, node, start_line + fm_end + 1, errors)
return node, errors
def _parse_decision_body(
lines: list[str],
node: dict[str, Any],
body_start_line: int,
errors: list[ParseError],
) -> None:
"""Parse the body of a decision node."""
question = ""
help_text_lines: list[str] = []
options: list[dict[str, Any]] = []
for i, line in enumerate(lines):
stripped = line.strip()
if not stripped:
continue
# Check for heading (question)
m = HEADING1_RE.match(stripped)
if m:
question = m.group(1).strip()
continue
# Check for blockquote (help_text)
m = BLOCKQUOTE_RE.match(stripped)
if m:
help_text_lines.append(m.group(1))
continue
# Check for option
m = OPTION_RE.match(stripped)
if m:
opt_label = m.group(2).strip()
opt_next = m.group(3) or ""
options.append({
"id": f"opt_{node['id']}_{len(options)}",
"label": opt_label,
"next_node_id": opt_next,
})
continue
node["question"] = question
node["help_text"] = "\n".join(help_text_lines) if help_text_lines else ""
node["options"] = options
node["children"] = []
def _parse_action_body(
lines: list[str],
node: dict[str, Any],
body_start_line: int,
errors: list[ParseError],
) -> None:
"""Parse the body of an action node."""
title = ""
description_lines: list[str] = []
commands: list[str] = []
expected_outcome = ""
next_node_id = ""
in_command_block = False
for i, line in enumerate(lines):
stripped = line.strip()
# Command block handling
if in_command_block:
if COMMAND_BLOCK_END.match(stripped):
in_command_block = False
else:
commands.append(line.rstrip())
continue
if COMMAND_BLOCK_START.match(stripped):
in_command_block = True
continue
if not stripped:
# Blank lines are part of description
if title and not expected_outcome and not next_node_id:
description_lines.append("")
continue
# Title
m = HEADING2_RE.match(stripped)
if m:
title = m.group(1).strip()
continue
# Expected outcome
m = EXPECTED_RE.match(stripped)
if m:
expected_outcome = m.group(1).strip()
continue
# Next node reference
m = NEXT_NODE_RE.match(stripped)
if m:
next_node_id = m.group(1).strip()
continue
# Everything else is description
description_lines.append(stripped)
# Trim leading and trailing empty lines from description
while description_lines and not description_lines[-1].strip():
description_lines.pop()
while description_lines and not description_lines[0].strip():
description_lines.pop(0)
node["title"] = title
node["description"] = "\n".join(description_lines)
node["commands"] = commands if commands else []
node["expected_outcome"] = expected_outcome
node["next_node_id"] = next_node_id
node["children"] = []
def _parse_solution_body(
lines: list[str],
node: dict[str, Any],
body_start_line: int,
errors: list[ParseError],
) -> None:
"""Parse the body of a solution node."""
title = ""
description_lines: list[str] = []
resolution_steps: list[str] = []
for i, line in enumerate(lines):
stripped = line.strip()
if not stripped:
if title:
description_lines.append("")
continue
# Title
m = HEADING2_RE.match(stripped)
if m:
title = m.group(1).strip()
continue
# Ordered list item (resolution step)
m = ORDERED_LIST_RE.match(stripped)
if m:
resolution_steps.append(m.group(1).strip())
continue
# Everything else is description
description_lines.append(stripped)
# Trim leading and trailing empty lines
while description_lines and not description_lines[-1].strip():
description_lines.pop()
while description_lines and not description_lines[0].strip():
description_lines.pop(0)
node["title"] = title
node["description"] = "\n".join(description_lines)
node["resolution_steps"] = resolution_steps
node["solution"] = title # solution field required for publishing
def _reconstruct_tree(flat_nodes: list[dict[str, Any]]) -> tuple[dict[str, Any] | None, list[ParseError]]:
"""Reconstruct a recursive tree from flat nodes using parent references.
Returns (tree_structure, errors).
"""
errors: list[ParseError] = []
if not flat_nodes:
return None, errors
# Build lookup
node_map: dict[str, dict[str, Any]] = {}
for node in flat_nodes:
nid = node["id"]
# Clean node (remove internal fields)
clean = {k: v for k, v in node.items() if not k.startswith("_")}
if "children" not in clean:
clean["children"] = []
node_map[nid] = clean
# Find root (node with no parent)
root_id = None
for node in flat_nodes:
if node.get("_parent_id") is None:
if root_id is not None:
errors.append(ParseError(
line=node.get("_start_line", 1),
column=1,
message=f"Multiple root nodes found: '{root_id}' and '{node['id']}'",
))
root_id = node["id"]
if root_id is None:
# Fall back to first node
root_id = flat_nodes[0]["id"]
errors.append(ParseError(
line=1, column=1,
message="No root node found (no node without a parent). Using first node as root.",
severity="warning"
))
# Build children relationships
for node in flat_nodes:
parent_id = node.get("_parent_id")
if parent_id and parent_id in node_map:
child = node_map[node["id"]]
node_map[parent_id]["children"].append(child)
elif parent_id and parent_id not in node_map:
errors.append(ParseError(
line=node.get("_start_line", 1),
column=1,
message=f"Node '{node['id']}' references non-existent parent '{parent_id}'"
))
# Validate option references
for nid, node in node_map.items():
if node.get("type") == "decision":
for opt in node.get("options", []):
ref = opt.get("next_node_id", "")
if ref and ref not in node_map:
errors.append(ParseError(
line=1, column=1,
message=f"Option '{opt.get('label', '')}' in node '{nid}' references non-existent node '@{ref}'"
))
elif node.get("type") == "action":
ref = node.get("next_node_id", "")
if ref and ref not in node_map:
errors.append(ParseError(
line=1, column=1,
message=f"Action node '{nid}' references non-existent next node '@{ref}'"
))
root = node_map.get(root_id)
return root, errors

View File

@@ -0,0 +1,157 @@
"""
JSONB → Markdown serializer for ResolutionFlow tree structures.
Converts a tree_structure JSONB dict into the ResolutionFlow Markdown format
where each node is a block delimited by YAML frontmatter.
"""
from typing import Any
def serialize_tree_to_markdown(
tree_structure: dict[str, Any],
metadata: dict[str, Any] | None = None,
) -> str:
"""Convert a tree structure JSONB dict to ResolutionFlow Markdown format.
Args:
tree_structure: The recursive tree_structure dict from the database.
metadata: Optional tree metadata (name, description, category, tags)
to include as a frontmatter block at the top.
Returns:
Markdown string with YAML frontmatter blocks for each node.
"""
blocks: list[str] = []
if metadata:
meta_lines = ["---"]
if metadata.get("name"):
meta_lines.append(f"name: {metadata['name']}")
if metadata.get("description"):
meta_lines.append(f"description: {metadata['description']}")
if metadata.get("category"):
meta_lines.append(f"category: {metadata['category']}")
tags = metadata.get("tags")
if tags:
tags_str = ", ".join(tags) if isinstance(tags, list) else str(tags)
meta_lines.append(f"tags: [{tags_str}]")
meta_lines.append("---")
blocks.append("\n".join(meta_lines))
_serialize_node(tree_structure, parent_id=None, blocks=blocks)
return "\n\n".join(blocks) + "\n"
def _serialize_node(
node: dict[str, Any],
parent_id: str | None,
blocks: list[str],
) -> None:
"""Recursively serialize a single node and its children."""
node_type = node.get("type", "decision")
node_id = node.get("id", "unknown")
# Build frontmatter
frontmatter_lines = [
"---",
f"id: {node_id}",
f"type: {node_type}",
]
if parent_id is not None:
frontmatter_lines.append(f"parent: {parent_id}")
frontmatter_lines.append("---")
# Build body based on node type
body_lines: list[str] = []
if node_type == "decision":
_serialize_decision(node, body_lines)
elif node_type == "action":
_serialize_action(node, body_lines)
elif node_type == "solution":
_serialize_solution(node, body_lines)
block = "\n".join(frontmatter_lines) + "\n" + "\n".join(body_lines)
blocks.append(block)
# Recurse into children
children = node.get("children", [])
if children:
for child in children:
_serialize_node(child, parent_id=node_id, blocks=blocks)
def _serialize_decision(node: dict[str, Any], lines: list[str]) -> None:
"""Serialize a decision node body."""
question = node.get("question", "")
if question:
lines.append(f"# {question}")
lines.append("")
help_text = node.get("help_text", "")
if help_text:
for ht_line in help_text.split("\n"):
lines.append(f"> {ht_line}")
lines.append("")
options = node.get("options", [])
for i, opt in enumerate(options):
label = opt.get("label", "")
next_id = opt.get("next_node_id", "")
letter = chr(ord("A") + i) if i < 26 else str(i + 1)
if next_id:
lines.append(f"- [{letter}] {label} → @{next_id}")
else:
lines.append(f"- [{letter}] {label}")
def _serialize_action(node: dict[str, Any], lines: list[str]) -> None:
"""Serialize an action node body."""
title = node.get("title", "")
if title:
lines.append(f"## {title}")
lines.append("")
description = node.get("description", "")
if description:
lines.append(description)
lines.append("")
commands = node.get("commands", [])
if commands:
lines.append("```commands")
for cmd in commands:
lines.append(cmd)
lines.append("```")
lines.append("")
expected = node.get("expected_outcome", "")
if expected:
lines.append(f"**Expected:** {expected}")
lines.append("")
next_id = node.get("next_node_id", "")
if next_id:
lines.append(f"→ @{next_id}")
def _serialize_solution(node: dict[str, Any], lines: list[str]) -> None:
"""Serialize a solution node body."""
title = node.get("title", "")
if title:
lines.append(f"## {title}")
lines.append("")
description = node.get("description", "")
if description:
lines.append(description)
lines.append("")
steps = node.get("resolution_steps", [])
if steps:
for i, step in enumerate(steps, 1):
lines.append(f"{i}. {step}")
solution = node.get("solution", "")
if solution and not description and not steps:
lines.append(solution)

View File

@@ -0,0 +1,93 @@
"""
Validation for ResolutionFlow tree markdown.
Validates markdown without saving, returning detailed errors with line numbers.
"""
from app.services.tree_markdown_parser import parse_markdown_to_tree, ParseError
def validate_tree_markdown(markdown: str) -> list[ParseError]:
"""Validate tree markdown and return all errors/warnings.
This wraps the parser and adds additional semantic checks.
Args:
markdown: The markdown string to validate.
Returns:
List of ParseError objects with line, column, message, severity.
"""
result = parse_markdown_to_tree(markdown)
errors = list(result.errors)
# If parsing completely failed, return early
if result.tree_structure is None:
return errors
# Additional semantic validation on the parsed tree
_validate_tree_semantics(result.tree_structure, errors)
return errors
def _validate_tree_semantics(tree: dict, errors: list[ParseError]) -> None:
"""Run semantic checks on a parsed tree structure."""
all_ids: set[str] = set()
has_solution = False
def _collect_ids(node: dict) -> None:
nonlocal has_solution
all_ids.add(node.get("id", ""))
if node.get("type") == "solution":
has_solution = True
for child in node.get("children", []):
_collect_ids(child)
_collect_ids(tree)
if not has_solution:
errors.append(ParseError(
line=1, column=1,
message="Tree must have at least one solution node",
severity="warning"
))
# Check for empty required fields
def _validate_node(node: dict) -> None:
ntype = node.get("type", "")
nid = node.get("id", "")
if ntype == "decision":
if not node.get("question", "").strip():
errors.append(ParseError(
line=1, column=1,
message=f"Decision node '{nid}' has an empty question",
severity="warning"
))
if not node.get("options"):
errors.append(ParseError(
line=1, column=1,
message=f"Decision node '{nid}' has no options",
severity="warning"
))
elif ntype == "action":
if not node.get("title", "").strip():
errors.append(ParseError(
line=1, column=1,
message=f"Action node '{nid}' has an empty title",
severity="warning"
))
elif ntype == "solution":
if not node.get("title", "").strip():
errors.append(ParseError(
line=1, column=1,
message=f"Solution node '{nid}' has an empty title",
severity="warning"
))
for child in node.get("children", []):
_validate_node(child)
_validate_node(tree)

View File

@@ -0,0 +1,134 @@
"""
Variable extraction and resolution for ResolutionFlow tree structures.
Supports three variable tokens:
- [USER_INPUT:prompt] — prompts user for input during session navigation
- [VAR:name] — references a previously saved variable value
- [SAVE_AS:name] — saves the current context as a named variable
"""
import re
from dataclasses import dataclass
from typing import Any
@dataclass
class VariableDefinition:
"""A variable found in a tree structure."""
name: str
kind: str # 'user_input', 'reference', 'save_as'
prompt: str # For user_input: the prompt text; for others: empty
node_id: str # The node where this variable appears
# Regex patterns for variable tokens
USER_INPUT_RE = re.compile(r'\[USER_INPUT:([^\]]+)\]')
VAR_REF_RE = re.compile(r'\[VAR:([^\]]+)\]')
SAVE_AS_RE = re.compile(r'\[SAVE_AS:([^\]]+)\]')
def extract_variables(tree_structure: dict[str, Any]) -> list[VariableDefinition]:
"""Extract all variable definitions from a tree structure.
Traverses the tree recursively and finds all [USER_INPUT:...],
[VAR:...], and [SAVE_AS:...] tokens.
Args:
tree_structure: The recursive tree_structure dict.
Returns:
List of VariableDefinition objects found in the tree.
"""
variables: list[VariableDefinition] = []
_scan_node(tree_structure, variables)
return variables
def _scan_node(node: dict[str, Any], variables: list[VariableDefinition]) -> None:
"""Scan a node and its children for variable tokens."""
node_id = node.get("id", "unknown")
# Collect all text fields to scan
text_fields = [
node.get("question", ""),
node.get("title", ""),
node.get("description", ""),
node.get("help_text", ""),
node.get("expected_outcome", ""),
node.get("solution", ""),
]
# Include option labels
for opt in node.get("options", []):
text_fields.append(opt.get("label", ""))
# Include resolution steps
for step in node.get("resolution_steps", []):
text_fields.append(step)
# Include commands
for cmd in node.get("commands", []):
text_fields.append(cmd)
# Scan all text fields
for text in text_fields:
if not text:
continue
for match in USER_INPUT_RE.finditer(text):
variables.append(VariableDefinition(
name=match.group(1).strip(),
kind="user_input",
prompt=match.group(1).strip(),
node_id=node_id,
))
for match in VAR_REF_RE.finditer(text):
variables.append(VariableDefinition(
name=match.group(1).strip(),
kind="reference",
prompt="",
node_id=node_id,
))
for match in SAVE_AS_RE.finditer(text):
variables.append(VariableDefinition(
name=match.group(1).strip(),
kind="save_as",
prompt="",
node_id=node_id,
))
# Recurse into children
for child in node.get("children", []):
_scan_node(child, variables)
def resolve_variables(text: str, variables: dict[str, str]) -> str:
"""Replace variable tokens in text with their resolved values.
- [VAR:name] → replaced with variables[name] if present
- [USER_INPUT:prompt] → replaced with variables[prompt] if present
- [SAVE_AS:name] → removed (save directives don't appear in output)
Args:
text: The text containing variable tokens.
variables: Dict mapping variable names to their values.
Returns:
Text with variable tokens replaced.
"""
def _replace_var(match: re.Match) -> str:
name = match.group(1).strip()
return variables.get(name, f"[VAR:{name}]")
def _replace_input(match: re.Match) -> str:
prompt = match.group(1).strip()
return variables.get(prompt, f"[USER_INPUT:{prompt}]")
def _replace_save(match: re.Match) -> str:
return "" # Save directives are removed from output
result = VAR_REF_RE.sub(_replace_var, text)
result = USER_INPUT_RE.sub(_replace_input, result)
result = SAVE_AS_RE.sub(_replace_save, result)
return result

View File

@@ -0,0 +1,595 @@
"""
Tests for tree markdown serialization, parsing, and round-trip fidelity.
"""
import pytest
from app.services.tree_markdown_service import serialize_tree_to_markdown
from app.services.tree_markdown_parser import parse_markdown_to_tree
from app.services.tree_markdown_validator import validate_tree_markdown
# --- Fixtures: Tree structures ---
SIMPLE_TREE = {
"id": "root",
"type": "decision",
"question": "Is this a test?",
"help_text": "Check if this is a test scenario",
"options": [
{"id": "yes", "label": "Yes", "next_node_id": "solution1"},
{"id": "no", "label": "No", "next_node_id": "solution2"},
],
"children": [
{
"id": "solution1",
"type": "solution",
"title": "Test Confirmed",
"description": "This is indeed a test.",
"resolution_steps": ["Step 1", "Step 2"],
"solution": "Test confirmed",
},
{
"id": "solution2",
"type": "solution",
"title": "Not a Test",
"description": "This is not a test.",
"solution": "Not a test",
},
],
}
ACTION_TREE = {
"id": "root",
"type": "decision",
"question": "What type of issue?",
"help_text": "Identify the primary issue",
"options": [
{"id": "opt_net", "label": "Network issue", "next_node_id": "check_net"},
],
"children": [
{
"id": "check_net",
"type": "action",
"title": "Run Network Diagnostics",
"description": "Check basic connectivity.",
"commands": ["ping 8.8.8.8", "tracert gateway.local"],
"expected_outcome": "Ping replies or timeout",
"next_node_id": "resolved",
"children": [
{
"id": "resolved",
"type": "solution",
"title": "Issue Resolved",
"description": "Network is working now.",
"solution": "Issue resolved",
}
],
}
],
}
DEEPLY_NESTED_TREE = {
"id": "root",
"type": "decision",
"question": "Level 1?",
"options": [
{"id": "o1", "label": "Go deeper", "next_node_id": "level2"},
],
"children": [
{
"id": "level2",
"type": "decision",
"question": "Level 2?",
"options": [
{"id": "o2", "label": "Go deeper", "next_node_id": "level3"},
],
"children": [
{
"id": "level3",
"type": "decision",
"question": "Level 3?",
"options": [
{"id": "o3", "label": "Done", "next_node_id": "final"},
],
"children": [
{
"id": "final",
"type": "solution",
"title": "Deep Solution",
"description": "Found it at level 3.",
"solution": "Deep solution found",
}
],
}
],
}
],
}
SINGLE_NODE_TREE = {
"id": "root",
"type": "solution",
"title": "Quick Fix",
"description": "Just restart the computer.",
"solution": "Restart the computer",
}
# --- Serialization Tests ---
class TestSerializeTreeToMarkdown:
def test_simple_tree(self):
md = serialize_tree_to_markdown(SIMPLE_TREE)
assert "id: root" in md
assert "type: decision" in md
assert "# Is this a test?" in md
assert "> Check if this is a test scenario" in md
assert "- [A] Yes → @solution1" in md
assert "- [B] No → @solution2" in md
assert "id: solution1" in md
assert "## Test Confirmed" in md
assert "1. Step 1" in md
assert "2. Step 2" in md
def test_action_tree(self):
md = serialize_tree_to_markdown(ACTION_TREE)
assert "## Run Network Diagnostics" in md
assert "```commands" in md
assert "ping 8.8.8.8" in md
assert "**Expected:** Ping replies or timeout" in md
assert "→ @resolved" in md
def test_deeply_nested(self):
md = serialize_tree_to_markdown(DEEPLY_NESTED_TREE)
assert "id: level2" in md
assert "parent: root" in md
assert "id: level3" in md
assert "parent: level2" in md
assert "id: final" in md
assert "parent: level3" in md
def test_single_node(self):
md = serialize_tree_to_markdown(SINGLE_NODE_TREE)
assert "id: root" in md
assert "type: solution" in md
assert "## Quick Fix" in md
def test_root_has_no_parent(self):
md = serialize_tree_to_markdown(SIMPLE_TREE)
lines = md.split("\n")
# Find the first frontmatter block
in_first_block = False
for line in lines:
if line.strip() == "---":
if not in_first_block:
in_first_block = True
continue
else:
break
if in_first_block:
assert not line.startswith("parent:"), "Root node should not have parent"
# --- Parsing Tests ---
class TestParseMarkdownToTree:
def test_simple_roundtrip(self):
md = serialize_tree_to_markdown(SIMPLE_TREE)
result = parse_markdown_to_tree(md)
assert result.tree_structure is not None
tree = result.tree_structure
assert tree["id"] == "root"
assert tree["type"] == "decision"
assert tree["question"] == "Is this a test?"
assert len(tree["options"]) == 2
assert tree["options"][0]["label"] == "Yes"
assert tree["options"][0]["next_node_id"] == "solution1"
assert len(tree["children"]) == 2
def test_action_roundtrip(self):
md = serialize_tree_to_markdown(ACTION_TREE)
result = parse_markdown_to_tree(md)
assert result.tree_structure is not None
tree = result.tree_structure
# Find the action node
action = tree["children"][0]
assert action["type"] == "action"
assert action["title"] == "Run Network Diagnostics"
assert action["commands"] == ["ping 8.8.8.8", "tracert gateway.local"]
assert action["expected_outcome"] == "Ping replies or timeout"
assert action["next_node_id"] == "resolved"
# Action should have children
assert len(action["children"]) == 1
assert action["children"][0]["type"] == "solution"
def test_deeply_nested_roundtrip(self):
md = serialize_tree_to_markdown(DEEPLY_NESTED_TREE)
result = parse_markdown_to_tree(md)
assert result.tree_structure is not None
tree = result.tree_structure
assert tree["id"] == "root"
level2 = tree["children"][0]
assert level2["id"] == "level2"
level3 = level2["children"][0]
assert level3["id"] == "level3"
final = level3["children"][0]
assert final["id"] == "final"
assert final["type"] == "solution"
def test_single_node_roundtrip(self):
md = serialize_tree_to_markdown(SINGLE_NODE_TREE)
result = parse_markdown_to_tree(md)
assert result.tree_structure is not None
tree = result.tree_structure
assert tree["id"] == "root"
assert tree["type"] == "solution"
assert tree["title"] == "Quick Fix"
def test_empty_markdown(self):
result = parse_markdown_to_tree("")
assert result.tree_structure is None
assert len(result.errors) > 0
def test_malformed_frontmatter(self):
md = "---\nno_id_here: true\n---\n# Some question"
result = parse_markdown_to_tree(md)
assert any("missing 'id'" in e.message.lower() or "missing" in e.message.lower()
for e in result.errors)
def test_invalid_type(self):
md = "---\nid: root\ntype: invalid_type\n---\n# Question"
result = parse_markdown_to_tree(md)
assert any("invalid node type" in e.message.lower() for e in result.errors)
def test_duplicate_ids(self):
md = (
"---\nid: root\ntype: decision\n---\n# Q1\n- [A] Opt → @dup\n\n"
"---\nid: dup\ntype: solution\nparent: root\n---\n## Sol1\n\n"
"---\nid: dup\ntype: solution\nparent: root\n---\n## Sol2\n"
)
result = parse_markdown_to_tree(md)
assert any("duplicate" in e.message.lower() for e in result.errors)
def test_broken_reference(self):
md = (
"---\nid: root\ntype: decision\n---\n"
"# Which issue?\n"
"- [A] Network → @nonexistent\n"
)
result = parse_markdown_to_tree(md)
assert any("non-existent" in e.message.lower() for e in result.errors)
def test_orphaned_parent_reference(self):
md = (
"---\nid: root\ntype: decision\n---\n# Q\n- [A] Opt → @child\n\n"
"---\nid: child\ntype: solution\nparent: nonexistent_parent\n---\n## Sol\n"
)
result = parse_markdown_to_tree(md)
assert any("non-existent parent" in e.message.lower() for e in result.errors)
def test_resolution_steps_parsed(self):
md = serialize_tree_to_markdown(SIMPLE_TREE)
result = parse_markdown_to_tree(md)
tree = result.tree_structure
# solution1 should have resolution_steps
sol1 = tree["children"][0]
assert sol1["id"] == "solution1"
assert sol1["resolution_steps"] == ["Step 1", "Step 2"]
def test_help_text_preserved(self):
md = serialize_tree_to_markdown(SIMPLE_TREE)
result = parse_markdown_to_tree(md)
tree = result.tree_structure
assert tree["help_text"] == "Check if this is a test scenario"
def test_description_with_markdown(self):
"""Test that markdown content in descriptions is preserved."""
tree = {
"id": "root",
"type": "solution",
"title": "Complex Solution",
"description": "Use **bold** and *italic* and `code`.\n\nMultiple paragraphs too.",
"solution": "Complex solution",
}
md = serialize_tree_to_markdown(tree)
result = parse_markdown_to_tree(md)
assert result.tree_structure is not None
assert "**bold**" in result.tree_structure["description"]
assert "`code`" in result.tree_structure["description"]
# --- Validation Tests ---
class TestValidateTreeMarkdown:
def test_valid_tree_no_errors(self):
md = serialize_tree_to_markdown(SIMPLE_TREE)
errors = validate_tree_markdown(md)
hard_errors = [e for e in errors if e.severity == "error"]
assert len(hard_errors) == 0
def test_empty_markdown_errors(self):
errors = validate_tree_markdown("")
assert len(errors) > 0
def test_missing_solution_warning(self):
tree = {
"id": "root",
"type": "decision",
"question": "Question?",
"options": [],
"children": [],
}
md = serialize_tree_to_markdown(tree)
errors = validate_tree_markdown(md)
warnings = [e for e in errors if e.severity == "warning"]
assert any("solution" in e.message.lower() for e in warnings)
def test_empty_question_warning(self):
tree = {
"id": "root",
"type": "decision",
"question": "",
"options": [],
"children": [],
}
md = serialize_tree_to_markdown(tree)
errors = validate_tree_markdown(md)
assert any("empty question" in e.message.lower() for e in errors)
# --- Metadata Tests ---
class TestMetadataBlocks:
def test_serialize_with_metadata(self):
metadata = {"name": "Test Tree", "description": "A test", "category": "Help Desk", "tags": ["dns", "network"]}
md = serialize_tree_to_markdown(SINGLE_NODE_TREE, metadata=metadata)
assert "name: Test Tree" in md
assert "description: A test" in md
assert "category: Help Desk" in md
assert "tags: [dns, network]" in md
# Node should still be present
assert "id: root" in md
def test_parse_with_metadata(self):
md = (
"---\nname: My Tree\ndescription: desc here\ncategory: Help Desk\ntags: [dns, vpn]\n---\n\n"
"---\nid: root\ntype: solution\n---\n## Quick Fix\n"
)
result = parse_markdown_to_tree(md)
assert result.metadata is not None
assert result.metadata["name"] == "My Tree"
assert result.metadata["description"] == "desc here"
assert result.metadata["category"] == "Help Desk"
assert result.metadata["tags"] == ["dns", "vpn"]
assert result.tree_structure is not None
assert result.tree_structure["id"] == "root"
def test_parse_without_metadata(self):
md = "---\nid: root\ntype: solution\n---\n## Quick Fix\n"
result = parse_markdown_to_tree(md)
assert result.metadata is None
assert result.tree_structure is not None
def test_metadata_roundtrip(self):
metadata = {"name": "Roundtrip Tree", "description": "Tests roundtrip", "tags": ["test"]}
md = serialize_tree_to_markdown(SIMPLE_TREE, metadata=metadata)
result = parse_markdown_to_tree(md)
assert result.metadata is not None
assert result.metadata["name"] == "Roundtrip Tree"
assert result.metadata["description"] == "Tests roundtrip"
assert result.metadata["tags"] == ["test"]
assert result.tree_structure is not None
assert result.tree_structure["id"] == "root"
def test_metadata_only_no_nodes(self):
md = "---\nname: Just Metadata\n---\n"
result = parse_markdown_to_tree(md)
assert result.metadata is not None
assert result.metadata["name"] == "Just Metadata"
assert result.tree_structure is None
assert any("no node blocks" in e.message.lower() for e in result.errors)
# --- Round-Trip Fidelity Tests ---
class TestRoundTripFidelity:
"""Ensure serialize → parse produces semantically identical trees."""
def _assert_node_equal(self, original: dict, parsed: dict, path: str = "root"):
"""Deep compare two node dicts, ignoring internal fields and option IDs."""
assert original["id"] == parsed["id"], f"{path}: id mismatch"
assert original["type"] == parsed["type"], f"{path}: type mismatch"
if original["type"] == "decision":
assert original.get("question", "") == parsed.get("question", ""), \
f"{path}: question mismatch"
# Compare option labels and next_node_ids (not auto-generated option IDs)
orig_opts = original.get("options", [])
parsed_opts = parsed.get("options", [])
assert len(orig_opts) == len(parsed_opts), \
f"{path}: options count mismatch ({len(orig_opts)} vs {len(parsed_opts)})"
for j, (oo, po) in enumerate(zip(orig_opts, parsed_opts)):
assert oo["label"] == po["label"], \
f"{path}.options[{j}]: label mismatch"
assert oo.get("next_node_id", "") == po.get("next_node_id", ""), \
f"{path}.options[{j}]: next_node_id mismatch"
elif original["type"] == "action":
assert original.get("title", "") == parsed.get("title", ""), \
f"{path}: title mismatch"
assert original.get("commands", []) == parsed.get("commands", []), \
f"{path}: commands mismatch"
assert original.get("expected_outcome", "") == parsed.get("expected_outcome", ""), \
f"{path}: expected_outcome mismatch"
assert original.get("next_node_id", "") == parsed.get("next_node_id", ""), \
f"{path}: next_node_id mismatch"
elif original["type"] == "solution":
assert original.get("title", "") == parsed.get("title", ""), \
f"{path}: title mismatch"
assert original.get("resolution_steps", []) == parsed.get("resolution_steps", []), \
f"{path}: resolution_steps mismatch"
# Compare children recursively
orig_children = original.get("children", [])
parsed_children = parsed.get("children", [])
assert len(orig_children) == len(parsed_children), \
f"{path}: children count mismatch ({len(orig_children)} vs {len(parsed_children)})"
for i, (oc, pc) in enumerate(zip(orig_children, parsed_children)):
self._assert_node_equal(oc, pc, f"{path}.children[{i}]")
def test_simple_tree_roundtrip(self):
md = serialize_tree_to_markdown(SIMPLE_TREE)
result = parse_markdown_to_tree(md)
assert result.tree_structure is not None
self._assert_node_equal(SIMPLE_TREE, result.tree_structure)
def test_action_tree_roundtrip(self):
md = serialize_tree_to_markdown(ACTION_TREE)
result = parse_markdown_to_tree(md)
assert result.tree_structure is not None
self._assert_node_equal(ACTION_TREE, result.tree_structure)
def test_deeply_nested_roundtrip(self):
md = serialize_tree_to_markdown(DEEPLY_NESTED_TREE)
result = parse_markdown_to_tree(md)
assert result.tree_structure is not None
self._assert_node_equal(DEEPLY_NESTED_TREE, result.tree_structure)
def test_double_roundtrip(self):
"""serialize → parse → serialize should produce same markdown."""
md1 = serialize_tree_to_markdown(SIMPLE_TREE)
result1 = parse_markdown_to_tree(md1)
assert result1.tree_structure is not None
md2 = serialize_tree_to_markdown(result1.tree_structure)
result2 = parse_markdown_to_tree(md2)
assert result2.tree_structure is not None
self._assert_node_equal(result1.tree_structure, result2.tree_structure)
# --- API Integration Tests ---
@pytest.mark.asyncio
class TestTreeMarkdownAPI:
async def test_export_markdown(self, client, auth_headers, test_tree):
tree_id = test_tree["id"]
response = await client.get(
f"/api/v1/trees/{tree_id}/export-markdown",
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert "markdown" in data
# Should include metadata block
assert "name:" in data["markdown"]
# Should include node content
assert "id: root" in data["markdown"]
assert "# Is this a test?" in data["markdown"]
async def test_export_not_found(self, client, auth_headers):
import uuid
fake_id = str(uuid.uuid4())
response = await client.get(
f"/api/v1/trees/{fake_id}/export-markdown",
headers=auth_headers,
)
assert response.status_code == 404
async def test_export_requires_auth(self, client, test_tree):
tree_id = test_tree["id"]
response = await client.get(
f"/api/v1/trees/{tree_id}/export-markdown",
)
assert response.status_code == 401
async def test_import_markdown(self, client, auth_headers, test_tree):
tree_id = test_tree["id"]
# Export first
export_resp = await client.get(
f"/api/v1/trees/{tree_id}/export-markdown",
headers=auth_headers,
)
markdown = export_resp.json()["markdown"]
# Import back
response = await client.put(
f"/api/v1/trees/{tree_id}/import-markdown",
json={"markdown": markdown},
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert data["valid"] is True
assert data["tree_structure"] is not None
async def test_import_invalid_markdown(self, client, auth_headers, test_tree):
tree_id = test_tree["id"]
response = await client.put(
f"/api/v1/trees/{tree_id}/import-markdown",
json={"markdown": "this is not valid tree markdown"},
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert data["valid"] is False
assert len(data["errors"]) > 0
async def test_validate_markdown(self, client, auth_headers, test_tree):
tree_id = test_tree["id"]
# Export first
export_resp = await client.get(
f"/api/v1/trees/{tree_id}/export-markdown",
headers=auth_headers,
)
markdown = export_resp.json()["markdown"]
# Validate
response = await client.post(
"/api/v1/trees/validate-markdown",
json={"markdown": markdown},
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert data["valid"] is True
assert data["tree_structure"] is not None
# Should return parsed metadata from export
assert data["metadata"] is not None
async def test_validate_requires_auth(self, client):
response = await client.post(
"/api/v1/trees/validate-markdown",
json={"markdown": "---\nid: root\ntype: solution\n---\n## Fix\n"},
)
assert response.status_code == 401
async def test_roundtrip_via_api(self, client, auth_headers, test_tree):
"""Export tree → import same markdown → verify tree unchanged."""
tree_id = test_tree["id"]
# Export
export_resp = await client.get(
f"/api/v1/trees/{tree_id}/export-markdown",
headers=auth_headers,
)
markdown = export_resp.json()["markdown"]
# Import
import_resp = await client.put(
f"/api/v1/trees/{tree_id}/import-markdown",
json={"markdown": markdown},
headers=auth_headers,
)
assert import_resp.json()["valid"] is True
# Re-export and compare
export_resp2 = await client.get(
f"/api/v1/trees/{tree_id}/export-markdown",
headers=auth_headers,
)
markdown2 = export_resp2.json()["markdown"]
# Should be identical
assert markdown == markdown2

View File

@@ -0,0 +1,119 @@
"""Tests for variable extraction and resolution service."""
import pytest
from app.services.variable_service import extract_variables, resolve_variables
TREE_WITH_VARIABLES = {
"id": "root",
"type": "decision",
"question": "What is the hostname of [USER_INPUT:hostname]?",
"help_text": "Enter the server hostname",
"options": [
{"id": "opt1", "label": "Check [VAR:hostname]", "next_node_id": "action1"},
],
"children": [
{
"id": "action1",
"type": "action",
"title": "Ping [VAR:hostname]",
"description": "Run diagnostics on [VAR:hostname].\n\n[SAVE_AS:test_results]",
"commands": ["ping [VAR:hostname]"],
"expected_outcome": "Replies from [VAR:hostname]",
"next_node_id": "solution1",
"children": [
{
"id": "solution1",
"type": "solution",
"title": "Resolved",
"description": "Issue on [VAR:hostname] resolved.",
"resolution_steps": [
"Document results for [VAR:hostname]",
"Save as [SAVE_AS:final_result]"
],
"solution": "Resolved",
}
],
}
],
}
TREE_WITHOUT_VARIABLES = {
"id": "root",
"type": "solution",
"title": "Simple Fix",
"description": "Just restart it.",
"solution": "Restart",
}
class TestExtractVariables:
def test_extracts_user_input(self):
variables = extract_variables(TREE_WITH_VARIABLES)
user_inputs = [v for v in variables if v.kind == "user_input"]
assert len(user_inputs) == 1
assert user_inputs[0].name == "hostname"
assert user_inputs[0].node_id == "root"
def test_extracts_var_references(self):
variables = extract_variables(TREE_WITH_VARIABLES)
refs = [v for v in variables if v.kind == "reference"]
assert len(refs) >= 4 # hostname used in multiple places
def test_extracts_save_as(self):
variables = extract_variables(TREE_WITH_VARIABLES)
saves = [v for v in variables if v.kind == "save_as"]
assert len(saves) == 2
names = {v.name for v in saves}
assert "test_results" in names
assert "final_result" in names
def test_no_variables(self):
variables = extract_variables(TREE_WITHOUT_VARIABLES)
assert len(variables) == 0
def test_extracts_from_commands(self):
variables = extract_variables(TREE_WITH_VARIABLES)
cmd_vars = [v for v in variables if v.node_id == "action1" and v.kind == "reference"]
# hostname in title, description, commands, expected_outcome
assert len(cmd_vars) >= 3
class TestResolveVariables:
def test_resolves_var_reference(self):
text = "Ping [VAR:hostname] now"
result = resolve_variables(text, {"hostname": "server01"})
assert result == "Ping server01 now"
def test_resolves_user_input(self):
text = "Server: [USER_INPUT:hostname]"
result = resolve_variables(text, {"hostname": "server01"})
assert result == "Server: server01"
def test_removes_save_as(self):
text = "Done [SAVE_AS:result] here"
result = resolve_variables(text, {})
assert result == "Done here"
def test_unresolved_var_preserved(self):
text = "Server: [VAR:unknown_var]"
result = resolve_variables(text, {})
assert result == "Server: [VAR:unknown_var]"
def test_multiple_variables(self):
text = "[VAR:hostname] ([VAR:ip]) - [USER_INPUT:ticket]"
result = resolve_variables(text, {
"hostname": "server01",
"ip": "10.0.0.1",
"ticket": "INC001",
})
assert result == "server01 (10.0.0.1) - INC001"
def test_empty_variables_dict(self):
text = "No vars here"
result = resolve_variables(text, {})
assert result == "No vars here"
def test_empty_text(self):
result = resolve_variables("", {"hostname": "server01"})
assert result == ""