feat: add procedural flows with intake forms, navigation, and seed templates

Adds a new "procedural" tree type for linear step-by-step project workflows
(domain controller setup, M365 onboarding, VPN config, etc). Includes intake
form builder, two-panel step navigation, variable resolution, procedural
exports, 3 seed templates, and UI rename from "Trees" to "Flows".

Also archives 19 implemented plan docs and creates deferred features backlog.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-02-14 04:13:52 -05:00
parent 303570ca2c
commit 350c977eda
58 changed files with 11686 additions and 167 deletions

View File

@@ -0,0 +1,47 @@
"""add tree_type and intake_form to trees
Revision ID: 035
Revises: 034
Create Date: 2026-02-14
Adds tree_type (troubleshooting/procedural) and intake_form (JSONB) columns to trees table.
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB
# revision identifiers, used by Alembic.
revision: str = '035'
down_revision: Union[str, None] = '034'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column('trees', sa.Column(
'tree_type',
sa.String(20),
nullable=False,
server_default='troubleshooting',
))
op.add_column('trees', sa.Column(
'intake_form',
JSONB(),
nullable=True,
))
op.create_check_constraint(
'ck_trees_tree_type',
'trees',
"tree_type IN ('troubleshooting', 'procedural')",
)
op.create_index('ix_trees_tree_type', 'trees', ['tree_type'])
def downgrade() -> None:
op.drop_index('ix_trees_tree_type', table_name='trees')
op.drop_constraint('ck_trees_tree_type', 'trees', type_='check')
op.drop_column('trees', 'intake_form')
op.drop_column('trees', 'tree_type')

View File

@@ -136,13 +136,27 @@ async def start_session(
detail="You don't have access to this tree"
)
# For procedural trees with intake forms, validate required fields
session_variables = session_data.session_variables or {}
if tree.tree_type == 'procedural' and tree.intake_form:
missing_fields = []
for field in tree.intake_form:
if field.get("required") and not session_variables.get(field["variable_name"]):
missing_fields.append(field["label"])
if missing_fields:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Missing required intake form fields: {', '.join(missing_fields)}"
)
# Create session with tree snapshot (includes tree metadata for filtering/export)
tree_snapshot = {
**tree.tree_structure,
"name": tree.name,
"description": tree.description,
"category": tree.category,
"version": tree.version
"version": tree.version,
"tree_type": tree.tree_type,
}
new_session = Session(
@@ -152,7 +166,8 @@ async def start_session(
path_taken=[],
decisions=[],
ticket_number=session_data.ticket_number,
client_name=session_data.client_name
client_name=session_data.client_name,
session_variables=session_variables,
)
# Increment tree usage count
@@ -421,7 +436,9 @@ async def save_session_as_tree(
can_publish, validation_errors = can_publish_tree(
tree_structure,
tree_name,
request_data.description
request_data.description,
tree_type=original_tree.tree_type,
intake_form=original_tree.intake_form,
)
if not can_publish:
raise HTTPException(

View File

@@ -44,6 +44,7 @@ def build_tree_response(tree: Tree) -> TreeListResponse:
id=tree.id,
name=tree.name,
description=tree.description,
tree_type=tree.tree_type,
category=tree.category,
category_id=tree.category_id,
category_info=category_info,
@@ -89,12 +90,14 @@ def build_full_tree_response(tree: Tree, parent_tree: Tree = None) -> TreeRespon
id=tree.id,
name=tree.name,
description=tree.description,
tree_type=tree.tree_type,
category=tree.category,
category_id=tree.category_id,
category_info=category_info,
tags=tree.tag_names,
fork_info=fork_info,
tree_structure=tree.tree_structure,
intake_form=tree.intake_form,
author_id=tree.author_id,
account_id=tree.account_id,
is_active=tree.is_active,
@@ -112,6 +115,7 @@ def build_full_tree_response(tree: Tree, parent_tree: Tree = None) -> TreeRespon
async def list_trees(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
tree_type: Optional[str] = Query(None, description="Filter by tree type: troubleshooting or procedural"),
category: Optional[str] = Query(None, description="Filter by legacy category string"),
category_id: Optional[UUID] = Query(None, description="Filter by category ID"),
tags: Optional[str] = Query(None, description="Comma-separated tag slugs to filter by"),
@@ -137,6 +141,8 @@ async def list_trees(
)
# Apply filters
if tree_type:
query = query.where(Tree.tree_type == tree_type)
if category:
query = query.where(Tree.category == category)
if category_id:
@@ -297,10 +303,16 @@ async def create_tree(
"""
# Validate tree if status is 'published'
if tree_data.status == 'published':
# Convert intake_form to dicts for validation
intake_form_dicts = None
if tree_data.intake_form:
intake_form_dicts = [f.model_dump() for f in tree_data.intake_form]
can_publish, validation_errors = can_publish_tree(
tree_data.tree_structure,
tree_data.name,
tree_data.description
tree_data.description,
tree_type=tree_data.tree_type,
intake_form=intake_form_dicts,
)
if not can_publish:
raise HTTPException(
@@ -332,12 +344,19 @@ async def create_tree(
detail="You don't have access to this category"
)
# Convert intake_form Pydantic models to dicts for JSONB storage
intake_form_data = None
if tree_data.intake_form:
intake_form_data = [f.model_dump(exclude_none=True) for f in tree_data.intake_form]
new_tree = Tree(
name=tree_data.name,
description=tree_data.description,
category=tree_data.category,
category_id=tree_data.category_id,
tree_type=tree_data.tree_type,
tree_structure=tree_data.tree_structure,
intake_form=intake_form_data,
author_id=None if is_default else current_user.id, # Default trees have no author
account_id=None if is_default else current_user.account_id,
is_public=True if is_default else tree_data.is_public, # Default trees are always public
@@ -463,11 +482,15 @@ async def update_tree(
final_tree_structure = update_data.get("tree_structure", tree.tree_structure)
final_name = update_data.get("name", tree.name)
final_description = update_data.get("description", tree.description)
final_tree_type = update_data.get("tree_type", tree.tree_type)
final_intake_form = update_data.get("intake_form", tree.intake_form)
can_publish, validation_errors = can_publish_tree(
final_tree_structure,
final_name,
final_description
final_description,
tree_type=final_tree_type,
intake_form=final_intake_form,
)
if not can_publish:
raise HTTPException(
@@ -673,7 +696,9 @@ async def fork_tree(
description=parent.description,
category=parent.category,
category_id=parent.category_id,
tree_type=parent.tree_type,
tree_structure=parent.tree_structure,
intake_form=parent.intake_form,
author_id=current_user.id,
account_id=current_user.account_id,
is_public=False,
@@ -996,7 +1021,9 @@ async def check_tree_can_publish(
can_publish, validation_errors = can_publish_tree(
tree.tree_structure,
tree.name,
tree.description
tree.description,
tree_type=tree.tree_type,
intake_form=tree.intake_form,
)
return TreeValidationResponse(

View File

@@ -10,8 +10,10 @@ class TreeValidationError(Exception):
super().__init__(f"{field}: {message}")
# --- Troubleshooting Tree Validation ---
def validate_tree_structure(tree_structure: dict[str, Any]) -> tuple[bool, list[dict[str, str]]]:
"""Validate tree structure for publishing.
"""Validate troubleshooting tree structure for publishing.
A valid tree for publishing must have:
- A root node with id, type, and appropriate content fields
@@ -53,13 +55,7 @@ def validate_tree_structure(tree_structure: dict[str, Any]) -> tuple[bool, list[
def _validate_node(node: dict[str, Any], path: str, errors: list[dict[str, str]]) -> None:
"""Validate a single node in the tree structure.
Args:
node: The node dict to validate
path: Current path in the tree (for error messages)
errors: List to append errors to
"""
"""Validate a single node in the tree structure."""
node_type = node.get("type")
if node_type == "decision":
@@ -99,13 +95,7 @@ def _validate_node(node: dict[str, Any], path: str, errors: list[dict[str, str]]
def _validate_children(children: list[dict[str, Any]], path: str, errors: list[dict[str, str]]) -> None:
"""Recursively validate child nodes.
Args:
children: List of child nodes
path: Current path in the tree (for error messages)
errors: List to append errors to
"""
"""Recursively validate child nodes."""
for i, child in enumerate(children):
child_path = f"{path}[{i}]"
@@ -123,17 +113,106 @@ def _validate_children(children: list[dict[str, Any]], path: str, errors: list[d
_validate_children(child["children"], f"{child_path}.children", errors)
def can_publish_tree(tree_structure: dict[str, Any], name: str, description: str | None = None) -> tuple[bool, list[dict[str, str]]]:
# --- Procedural Tree Validation ---
VALID_STEP_TYPES = {"procedure_step", "procedure_end"}
VALID_CONTENT_TYPES = {"action", "informational", "verification", "warning"}
def validate_procedural_structure(tree_structure: dict[str, Any]) -> tuple[bool, list[dict[str, str]]]:
"""Validate procedural tree structure for publishing.
Procedural trees store steps as a flat ordered array in tree_structure["steps"].
Rules:
- Must have a non-empty "steps" array
- Each step must have: id, type, title
- Only procedure_step and procedure_end types allowed
- Must have exactly one procedure_end (as the last step)
- All other steps must be procedure_step
- No duplicate step IDs
- Steps with content_type must use valid values
Args:
tree_structure: Dict with a "steps" key containing the ordered step array
Returns:
Tuple of (is_valid, list of errors)
"""
errors = []
if not tree_structure:
errors.append({"field": "tree_structure", "message": "Tree structure cannot be empty"})
return False, errors
steps = tree_structure.get("steps")
if not steps or not isinstance(steps, list):
errors.append({"field": "tree_structure.steps", "message": "Procedural tree must have a non-empty steps array"})
return False, errors
# Track IDs for uniqueness
seen_ids: set[str] = set()
end_count = 0
for i, step in enumerate(steps):
path = f"steps[{i}]"
# Required fields
step_id = step.get("id")
if not step_id:
errors.append({"field": f"{path}.id", "message": "Step must have an id"})
elif step_id in seen_ids:
errors.append({"field": f"{path}.id", "message": f"Duplicate step id: {step_id}"})
else:
seen_ids.add(step_id)
step_type = step.get("type")
if not step_type:
errors.append({"field": f"{path}.type", "message": "Step must have a type"})
elif step_type not in VALID_STEP_TYPES:
errors.append({"field": f"{path}.type", "message": f"Invalid step type: {step_type}. Must be one of: {', '.join(VALID_STEP_TYPES)}"})
elif step_type == "procedure_end":
end_count += 1
# procedure_end must be last step
if i != len(steps) - 1:
errors.append({"field": f"{path}.type", "message": "procedure_end must be the last step"})
if not step.get("title"):
errors.append({"field": f"{path}.title", "message": "Step must have a non-empty title"})
# Validate content_type if present
content_type = step.get("content_type")
if content_type and content_type not in VALID_CONTENT_TYPES:
errors.append({"field": f"{path}.content_type", "message": f"Invalid content_type: {content_type}. Must be one of: {', '.join(VALID_CONTENT_TYPES)}"})
# Must have exactly one end step
if end_count == 0:
errors.append({"field": "tree_structure.steps", "message": "Procedural tree must have a procedure_end step as the last step"})
elif end_count > 1:
errors.append({"field": "tree_structure.steps", "message": "Procedural tree must have exactly one procedure_end step"})
return len(errors) == 0, errors
# --- Dispatch ---
def can_publish_tree(
tree_structure: dict[str, Any],
name: str,
description: str | None = None,
tree_type: str = "troubleshooting",
intake_form: list[dict[str, Any]] | None = None,
) -> tuple[bool, list[dict[str, str]]]:
"""Check if a tree can be published.
Validates:
- Tree has a name (non-empty)
- Tree structure is valid
Dispatches to the appropriate validator based on tree_type.
Args:
tree_structure: The tree structure to validate
name: The tree name
description: Optional tree description
tree_type: 'troubleshooting' or 'procedural'
intake_form: Optional intake form fields (procedural only)
Returns:
Tuple of (can_publish, list of errors)
@@ -144,8 +223,44 @@ def can_publish_tree(tree_structure: dict[str, Any], name: str, description: str
if not name or not name.strip():
errors.append({"field": "name", "message": "Tree must have a name to be published"})
# Validate tree structure
structure_valid, structure_errors = validate_tree_structure(tree_structure)
# Validate structure based on tree type
if tree_type == "procedural":
structure_valid, structure_errors = validate_procedural_structure(tree_structure)
else:
structure_valid, structure_errors = validate_tree_structure(tree_structure)
errors.extend(structure_errors)
# Validate intake form if present (procedural only)
if intake_form and tree_type == "procedural":
form_valid, form_errors = _validate_intake_form(intake_form)
errors.extend(form_errors)
return len(errors) == 0, errors
def _validate_intake_form(intake_form: list[dict[str, Any]]) -> tuple[bool, list[dict[str, str]]]:
"""Validate intake form field definitions."""
errors = []
variable_names: set[str] = set()
for i, field in enumerate(intake_form):
path = f"intake_form[{i}]"
var_name = field.get("variable_name")
if not var_name:
errors.append({"field": f"{path}.variable_name", "message": "Field must have a variable_name"})
elif var_name in variable_names:
errors.append({"field": f"{path}.variable_name", "message": f"Duplicate variable_name: {var_name}"})
else:
variable_names.add(var_name)
if not field.get("label"):
errors.append({"field": f"{path}.label", "message": "Field must have a label"})
field_type = field.get("field_type")
if field_type in ("select", "multi_select"):
options = field.get("options")
if not options or len(options) == 0:
errors.append({"field": f"{path}.options", "message": f"{field_type} fields must have at least one option"})
return len(errors) == 0, errors

View File

@@ -28,6 +28,10 @@ class Tree(Base):
"status IN ('draft', 'published')",
name='ck_trees_status'
),
CheckConstraint(
"tree_type IN ('troubleshooting', 'procedural')",
name='ck_trees_tree_type'
),
)
id: Mapped[uuid.UUID] = mapped_column(
@@ -47,7 +51,20 @@ class Tree(Base):
nullable=True,
index=True
)
tree_type: Mapped[str] = mapped_column(
String(20),
nullable=False,
default='troubleshooting',
server_default='troubleshooting',
index=True,
comment="Tree type: troubleshooting (branching decision tree) or procedural (linear step-by-step flow)"
)
tree_structure: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)
intake_form: Mapped[Optional[list[dict[str, Any]]]] = mapped_column(
JSONB,
nullable=True,
comment="Intake form field definitions for procedural flows (JSONB array)"
)
author_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("users.id"),

View File

@@ -41,6 +41,7 @@ class SessionCreate(BaseModel):
tree_id: UUID
ticket_number: Optional[str] = Field(None, max_length=100)
client_name: Optional[str] = Field(None, max_length=255)
session_variables: Optional[dict[str, str]] = Field(None, description="Intake form values for procedural flows")
class SessionUpdate(BaseModel):

View File

@@ -1,7 +1,69 @@
from datetime import datetime
from typing import Optional, Any, Literal
from uuid import UUID
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator, model_validator
import re
# --- Tree Type ---
TreeType = Literal['troubleshooting', 'procedural']
# --- Intake Form Schemas ---
FIELD_TYPES = Literal[
'text', 'textarea', 'number', 'ip_address', 'email',
'select', 'multi_select', 'checkbox', 'password'
]
VARIABLE_NAME_PATTERN = re.compile(r'^[a-z][a-z0-9_]*$')
class IntakeFieldValidation(BaseModel):
"""Validation config for an intake form field."""
min_length: Optional[int] = None
max_length: Optional[int] = None
pattern: Optional[str] = None
pattern_message: Optional[str] = None
format: Optional[Literal['ipv4', 'email']] = None
min_value: Optional[float] = None
max_value: Optional[float] = None
min_selections: Optional[int] = None
max_selections: Optional[int] = None
class IntakeFormField(BaseModel):
"""Schema for a single intake form field definition."""
variable_name: str = Field(..., min_length=1, max_length=50)
label: str = Field(..., min_length=1, max_length=100)
field_type: FIELD_TYPES
required: bool = True
options: Optional[list[str]] = None
placeholder: Optional[str] = Field(None, max_length=200)
help_text: Optional[str] = Field(None, max_length=500)
default_value: Optional[str] = None
group_name: Optional[str] = Field(None, max_length=100)
display_order: int = Field(..., ge=1)
validation: Optional[IntakeFieldValidation] = None
@field_validator('variable_name')
@classmethod
def validate_variable_name(cls, v: str) -> str:
if not VARIABLE_NAME_PATTERN.match(v):
raise ValueError(
'Variable name must start with a lowercase letter and contain only '
'lowercase letters, numbers, and underscores'
)
return v
@model_validator(mode='after')
def validate_options_for_select(self):
if self.field_type in ('select', 'multi_select'):
if not self.options or len(self.options) == 0:
raise ValueError(
f'{self.field_type} fields must have at least one option'
)
return self
class CategoryInfo(BaseModel):
@@ -22,25 +84,45 @@ class TreeBase(BaseModel):
class TreeCreate(TreeBase):
tree_type: TreeType = Field('troubleshooting', description="Tree type: troubleshooting or procedural")
tree_structure: dict[str, Any] = Field(..., description="The decision tree structure in JSON format")
intake_form: Optional[list[IntakeFormField]] = Field(None, description="Intake form field definitions (procedural flows only)")
is_public: bool = Field(False, description="Make tree visible to all users")
is_default: bool = Field(False, description="Mark as a default/system tree (admin only)")
status: Literal['draft', 'published'] = Field('published', description="Status: draft or published")
category_id: Optional[UUID] = Field(None, description="Category ID from tree_categories table")
tags: Optional[list[str]] = Field(None, max_length=10, description="List of tag names to assign")
@model_validator(mode='after')
def validate_intake_form_unique_variables(self):
if self.intake_form:
names = [f.variable_name for f in self.intake_form]
if len(names) != len(set(names)):
raise ValueError('Intake form variable names must be unique')
return self
class TreeUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=255)
description: Optional[str] = None
category: Optional[str] = Field(None, max_length=100)
category_id: Optional[UUID] = None
tree_type: Optional[TreeType] = None
tree_structure: Optional[dict[str, Any]] = None
intake_form: Optional[list[IntakeFormField]] = None
is_public: Optional[bool] = None
is_active: Optional[bool] = None
status: Optional[Literal['draft', 'published']] = None
tags: Optional[list[str]] = Field(None, max_length=10, description="List of tag names to assign (replaces existing)")
@model_validator(mode='after')
def validate_intake_form_unique_variables(self):
if self.intake_form:
names = [f.variable_name for f in self.intake_form]
if len(names) != len(set(names)):
raise ValueError('Intake form variable names must be unique')
return self
class ForkCreate(BaseModel):
fork_reason: Optional[str] = Field(None, max_length=255, description="Brief reason for forking")
@@ -62,7 +144,9 @@ class ForkInfo(BaseModel):
class TreeResponse(TreeBase):
id: UUID
tree_type: str = 'troubleshooting'
tree_structure: dict[str, Any]
intake_form: Optional[list[dict[str, Any]]] = None
author_id: Optional[UUID] = None
account_id: Optional[UUID] = None
category_id: Optional[UUID] = None
@@ -86,6 +170,7 @@ class TreeListResponse(BaseModel):
id: UUID
name: str
description: Optional[str] = None
tree_type: str = 'troubleshooting'
category: Optional[str] = None
category_id: Optional[UUID] = None
category_info: Optional[CategoryInfo] = None

View File

@@ -171,6 +171,8 @@ def _escape_markdown_table(value: str) -> str:
def generate_markdown_export(session: Session, options: SessionExport) -> str:
"""Generate markdown export."""
if _is_procedural_session(session):
return _generate_procedural_markdown(session, options)
lines = []
outcome_label = _get_outcome_label(session)
@@ -284,6 +286,8 @@ def generate_markdown_export(session: Session, options: SessionExport) -> str:
def generate_text_export(session: Session, options: SessionExport) -> str:
"""Generate plain text export."""
if _is_procedural_session(session):
return _generate_procedural_text(session, options)
lines = []
outcome_label = _get_outcome_label(session)
@@ -378,6 +382,8 @@ def generate_text_export(session: Session, options: SessionExport) -> str:
def generate_html_export(session: Session, options: SessionExport) -> str:
"""Generate HTML export."""
if _is_procedural_session(session):
return _generate_procedural_html(session, options)
tree_name = html.escape(session.tree_snapshot.get("name", "Troubleshooting Session"))
outcome_label = _get_outcome_label(session)
@@ -484,6 +490,8 @@ def generate_html_export(session: Session, options: SessionExport) -> str:
def generate_psa_export(session: Session, options: SessionExport) -> str:
"""Generate PSA/ticket note export optimized for ConnectWise and similar PSA tools."""
if _is_procedural_session(session):
return _generate_procedural_psa(session, options)
lines = []
outcome_label = _get_outcome_label(session)
@@ -588,3 +596,311 @@ def generate_psa_export(session: Session, options: SessionExport) -> str:
lines.append(scratchpad.strip() if scratchpad.strip() else "None")
return "\n".join(lines)
def _is_procedural_session(session: Session) -> bool:
"""Check if session is for a procedural flow."""
return session.tree_snapshot.get("tree_type") == "procedural"
def _get_session_variables(session: Session) -> dict[str, str]:
"""Get session variables (intake form values) from session."""
variables = getattr(session, "session_variables", None)
if isinstance(variables, dict):
return variables
return {}
def _generate_procedural_markdown(session: Session, options: SessionExport) -> str:
"""Generate markdown export for procedural sessions."""
lines = []
outcome_label = _get_outcome_label(session)
tree_name = session.tree_snapshot.get("name", "Procedure")
if options.include_tree_info:
lines.append(f"# {tree_name}")
lines.append("")
if session.ticket_number:
lines.append(f"**Ticket:** {session.ticket_number}")
if session.client_name:
lines.append(f"**Client:** {session.client_name}")
if options.include_timestamps:
lines.append(f"**Started:** {session.started_at.strftime('%Y-%m-%d %H:%M')}")
if session.completed_at:
lines.append(f"**Completed:** {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"**Duration:** {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"**Outcome:** {outcome_label}")
lines.append("")
lines.append("---")
lines.append("")
# Project Parameters
variables = _get_session_variables(session)
if variables:
lines.append("## Project Parameters")
lines.append("")
lines.append("| Parameter | Value |")
lines.append("|-----------|-------|")
for key, value in variables.items():
lines.append(f"| `{_escape_markdown_table(key)}` | {_escape_markdown_table(value)} |")
lines.append("")
lines.append("---")
lines.append("")
# Steps
lines.append("## Procedure Steps")
lines.append("")
decisions = session.decisions or []
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
title = decision.get("question") or decision.get("action_performed", "Step")
notes = decision.get("notes", "")
verification = _get_command_output(decision)
completed = decision.get("answer") == "completed"
checkbox = "[x]" if completed else "[ ]"
lines.append(f"- {checkbox} **Step {i}: {title}**")
if notes:
lines.append(f" - Notes: {notes}")
if verification:
lines.append(f" - Verification: {verification}")
duration_seconds = _get_step_duration_seconds(decision)
if duration_seconds is not None:
lines.append(f" - Duration: {_format_step_duration(duration_seconds)}")
lines.append("")
# Resolution
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append("---")
lines.append("")
lines.append("## Notes")
lines.append("")
lines.append(outcome_notes.strip())
lines.append("")
return "\n".join(lines)
def _generate_procedural_text(session: Session, options: SessionExport) -> str:
"""Generate plain text export for procedural sessions."""
lines = []
outcome_label = _get_outcome_label(session)
tree_name = session.tree_snapshot.get("name", "Procedure")
if options.include_tree_info:
lines.append(tree_name)
lines.append("=" * len(tree_name))
if session.ticket_number:
lines.append(f"Ticket: {session.ticket_number}")
if session.client_name:
lines.append(f"Client: {session.client_name}")
if options.include_timestamps:
lines.append(f"Started: {session.started_at.strftime('%Y-%m-%d %H:%M')}")
if session.completed_at:
lines.append(f"Completed: {session.completed_at.strftime('%Y-%m-%d %H:%M')}")
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
# Project Parameters
variables = _get_session_variables(session)
if variables:
lines.append("PROJECT PARAMETERS")
lines.append("-" * 20)
for key, value in variables.items():
lines.append(f" {key}: {value}")
lines.append("")
lines.append("PROCEDURE STEPS")
lines.append("-" * 20)
decisions = session.decisions or []
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
title = decision.get("question") or decision.get("action_performed", "Step")
notes = decision.get("notes", "")
verification = _get_command_output(decision)
completed = decision.get("answer") == "completed"
marker = "[DONE]" if completed else "[ ]"
lines.append(f"\n{marker} {i}. {title}")
if notes:
lines.append(f" Notes: {notes}")
if verification:
lines.append(f" Verification: {verification}")
duration_seconds = _get_step_duration_seconds(decision)
if duration_seconds is not None:
lines.append(f" Duration: {_format_step_duration(duration_seconds)}")
lines.append("")
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append("NOTES")
lines.append("-" * 20)
lines.append(outcome_notes.strip())
return "\n".join(lines)
def _generate_procedural_html(session: Session, options: SessionExport) -> str:
"""Generate HTML export for procedural sessions."""
tree_name = html.escape(session.tree_snapshot.get("name", "Procedure"))
outcome_label = _get_outcome_label(session)
html_parts = ['<!DOCTYPE html>', '<html>', '<head>',
'<meta charset="UTF-8">',
f'<title>{tree_name}</title>',
'<style>',
'body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }',
'h1 { color: #333; }',
'.meta { color: #666; margin-bottom: 20px; }',
'.params { margin-bottom: 20px; }',
'.params table { width: 100%; border-collapse: collapse; }',
'.params td { padding: 6px 12px; border: 1px solid #ddd; }',
'.params td:first-child { font-family: monospace; font-weight: bold; width: 200px; background: #f9f9f9; }',
'.step { margin-bottom: 10px; padding: 10px 15px; background: #f5f5f5; border-radius: 5px; border-left: 4px solid #ccc; }',
'.step.done { border-left-color: #22c55e; }',
'.step h3 { margin: 0 0 5px 0; color: #444; }',
'.notes { font-style: italic; color: #555; font-size: 0.9em; }',
'</style>',
'</head>', '<body>']
if options.include_tree_info:
html_parts.append(f'<h1>{tree_name}</h1>')
html_parts.append('<div class="meta">')
if session.ticket_number:
html_parts.append(f'<p><strong>Ticket:</strong> {html.escape(session.ticket_number)}</p>')
if session.client_name:
html_parts.append(f'<p><strong>Client:</strong> {html.escape(session.client_name)}</p>')
if options.include_timestamps:
html_parts.append(f'<p><strong>Started:</strong> {session.started_at.strftime("%Y-%m-%d %H:%M")}</p>')
if session.completed_at:
html_parts.append(f'<p><strong>Completed:</strong> {session.completed_at.strftime("%Y-%m-%d %H:%M")}</p>')
html_parts.append(f'<p><strong>Duration:</strong> {_format_duration(session.started_at, session.completed_at)}</p>')
if outcome_label:
html_parts.append(f'<p><strong>Outcome:</strong> {html.escape(outcome_label)}</p>')
html_parts.append('</div>')
# Project Parameters
variables = _get_session_variables(session)
if variables:
html_parts.append('<div class="params">')
html_parts.append('<h2>Project Parameters</h2>')
html_parts.append('<table>')
for key, value in variables.items():
html_parts.append(f'<tr><td>{html.escape(key)}</td><td>{html.escape(value)}</td></tr>')
html_parts.append('</table>')
html_parts.append('</div>')
html_parts.append('<h2>Procedure Steps</h2>')
decisions = session.decisions or []
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
for i, decision in enumerate(decisions, 1):
title = html.escape(decision.get("question") or decision.get("action_performed", "Step"))
notes = html.escape(decision.get("notes", ""))
verification = _get_command_output(decision)
completed = decision.get("answer") == "completed"
css_class = "step done" if completed else "step"
marker = "&#x2705;" if completed else "&#x2B1C;"
html_parts.append(f'<div class="{css_class}">')
html_parts.append(f'<h3>{marker} Step {i}: {title}</h3>')
if notes:
html_parts.append(f'<p class="notes">Notes: {notes}</p>')
if verification:
html_parts.append(f'<p>Verification: {html.escape(verification)}</p>')
duration_seconds = _get_step_duration_seconds(decision)
if duration_seconds is not None:
html_parts.append(f'<p style="color: #888; font-size: 0.85em;">Duration: {_format_step_duration(duration_seconds)}</p>')
html_parts.append('</div>')
# Resolution
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
html_parts.append('<h2>Notes</h2>')
html_parts.append(f'<div style="white-space: pre-wrap;">{html.escape(outcome_notes.strip())}</div>')
html_parts.extend(['</body>', '</html>'])
return "\n".join(html_parts)
def _generate_procedural_psa(session: Session, options: SessionExport) -> str:
"""Generate PSA/ticket export for procedural sessions."""
lines = []
outcome_label = _get_outcome_label(session)
tree_name = session.tree_snapshot.get("name", "Procedure")
ticket_number = session.ticket_number or "N/A"
client_name = session.client_name or "N/A"
date_str = session.started_at.strftime("%Y-%m-%d %H:%M")
lines.append("=== PROCEDURE NOTES ===")
lines.append(f"Ticket: {ticket_number} | Client: {client_name}")
lines.append(f"Procedure: {tree_name} | Date: {date_str}")
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
# Project Parameters
variables = _get_session_variables(session)
if variables:
lines.append("--- PROJECT PARAMETERS ---")
for key, value in variables.items():
lines.append(f" {key}: {value}")
lines.append("")
# Steps
lines.append("--- STEPS COMPLETED ---")
decisions = session.decisions or []
if options.max_step_index is not None:
decisions = decisions[:options.max_step_index]
if decisions:
for i, decision in enumerate(decisions, 1):
title = decision.get("question") or decision.get("action_performed", "Step")
notes = decision.get("notes", "")
completed = decision.get("answer") == "completed"
marker = "[DONE]" if completed else "[ ]"
duration_seconds = _get_step_duration_seconds(decision)
line = f"{marker} {i}. {title}"
if duration_seconds is not None:
line += f" ({_format_step_duration(duration_seconds)})"
lines.append(line)
if notes:
lines.append(f" Notes: {notes}")
else:
lines.append("No steps recorded.")
lines.append("")
# Resolution
if session.completed_at:
lines.append("--- RESOLUTION ---")
_raw_notes = getattr(session, 'outcome_notes', None)
outcome_notes = _raw_notes if isinstance(_raw_notes, str) else ''
if outcome_notes.strip() and options.include_outcome_notes:
lines.append(outcome_notes.strip())
else:
lines.append("Procedure completed successfully.")
if outcome_label:
lines.append(f"Outcome: {outcome_label}")
lines.append("")
# Time spent
lines.append("--- TIME SPENT ---")
lines.append(f"Duration: {_format_duration(session.started_at, session.completed_at)}")
return "\n".join(lines)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,710 @@
"""Tests for procedural flows feature (tree_type='procedural')."""
import pytest
from app.core.tree_validation import (
validate_procedural_structure,
can_publish_tree,
)
from app.schemas.tree import IntakeFormField, TreeCreate
# --- Helper Data ---
def make_valid_procedural_steps():
"""Return a valid procedural step array."""
return {
"steps": [
{
"id": "step-1",
"type": "procedure_step",
"title": "Configure Static IP",
"description": "Set the IP to [VAR:ip_address]",
"content_type": "action",
},
{
"id": "step-2",
"type": "procedure_step",
"title": "Install AD DS Role",
"description": "Add the AD DS server role",
"content_type": "action",
},
{
"id": "step-end",
"type": "procedure_end",
"title": "Procedure Complete",
},
]
}
def make_valid_intake_form():
"""Return valid intake form field dicts."""
return [
{
"variable_name": "server_name",
"label": "Server Name",
"field_type": "text",
"required": True,
"display_order": 1,
},
{
"variable_name": "ip_address",
"label": "IP Address",
"field_type": "ip_address",
"required": True,
"display_order": 2,
},
{
"variable_name": "notes",
"label": "Additional Notes",
"field_type": "textarea",
"required": False,
"display_order": 3,
},
]
# --- Procedural Validation Unit Tests ---
class TestValidateProceduralStructure:
"""Unit tests for validate_procedural_structure()."""
def test_valid_procedural_tree(self):
is_valid, errors = validate_procedural_structure(make_valid_procedural_steps())
assert is_valid
assert errors == []
def test_empty_structure(self):
is_valid, errors = validate_procedural_structure({})
assert not is_valid
assert any("empty" in e["message"].lower() for e in errors)
def test_none_structure(self):
is_valid, errors = validate_procedural_structure(None)
assert not is_valid
def test_missing_steps_array(self):
is_valid, errors = validate_procedural_structure({"name": "test"})
assert not is_valid
assert any("steps" in e["message"] for e in errors)
def test_empty_steps_array(self):
is_valid, errors = validate_procedural_structure({"steps": []})
assert not is_valid
def test_step_missing_id(self):
structure = {
"steps": [
{"type": "procedure_step", "title": "Step 1"},
{"id": "end", "type": "procedure_end", "title": "Done"},
]
}
is_valid, errors = validate_procedural_structure(structure)
assert not is_valid
assert any("id" in e["field"] for e in errors)
def test_step_missing_type(self):
structure = {
"steps": [
{"id": "s1", "title": "Step 1"},
{"id": "end", "type": "procedure_end", "title": "Done"},
]
}
is_valid, errors = validate_procedural_structure(structure)
assert not is_valid
assert any("type" in e["field"] for e in errors)
def test_step_missing_title(self):
structure = {
"steps": [
{"id": "s1", "type": "procedure_step"},
{"id": "end", "type": "procedure_end", "title": "Done"},
]
}
is_valid, errors = validate_procedural_structure(structure)
assert not is_valid
assert any("title" in e["field"] for e in errors)
def test_invalid_step_type(self):
structure = {
"steps": [
{"id": "s1", "type": "decision", "title": "Bad Type"},
{"id": "end", "type": "procedure_end", "title": "Done"},
]
}
is_valid, errors = validate_procedural_structure(structure)
assert not is_valid
assert any("Invalid step type" in e["message"] for e in errors)
def test_no_end_step(self):
structure = {
"steps": [
{"id": "s1", "type": "procedure_step", "title": "Step 1"},
{"id": "s2", "type": "procedure_step", "title": "Step 2"},
]
}
is_valid, errors = validate_procedural_structure(structure)
assert not is_valid
assert any("procedure_end" in e["message"] for e in errors)
def test_multiple_end_steps(self):
structure = {
"steps": [
{"id": "s1", "type": "procedure_step", "title": "Step 1"},
{"id": "end1", "type": "procedure_end", "title": "End 1"},
{"id": "end2", "type": "procedure_end", "title": "End 2"},
]
}
is_valid, errors = validate_procedural_structure(structure)
assert not is_valid
def test_end_step_not_last(self):
structure = {
"steps": [
{"id": "end", "type": "procedure_end", "title": "End"},
{"id": "s1", "type": "procedure_step", "title": "Step 1"},
]
}
is_valid, errors = validate_procedural_structure(structure)
assert not is_valid
assert any("last step" in e["message"] for e in errors)
def test_duplicate_step_ids(self):
structure = {
"steps": [
{"id": "s1", "type": "procedure_step", "title": "Step 1"},
{"id": "s1", "type": "procedure_step", "title": "Step 2"},
{"id": "end", "type": "procedure_end", "title": "Done"},
]
}
is_valid, errors = validate_procedural_structure(structure)
assert not is_valid
assert any("Duplicate" in e["message"] for e in errors)
def test_invalid_content_type(self):
structure = {
"steps": [
{"id": "s1", "type": "procedure_step", "title": "Step 1", "content_type": "bad_type"},
{"id": "end", "type": "procedure_end", "title": "Done"},
]
}
is_valid, errors = validate_procedural_structure(structure)
assert not is_valid
assert any("content_type" in e["field"] for e in errors)
def test_valid_content_types(self):
structure = {
"steps": [
{"id": "s1", "type": "procedure_step", "title": "Step 1", "content_type": "action"},
{"id": "s2", "type": "procedure_step", "title": "Step 2", "content_type": "informational"},
{"id": "s3", "type": "procedure_step", "title": "Step 3", "content_type": "verification"},
{"id": "s4", "type": "procedure_step", "title": "Step 4", "content_type": "warning"},
{"id": "end", "type": "procedure_end", "title": "Done"},
]
}
is_valid, errors = validate_procedural_structure(structure)
assert is_valid
def test_single_step_with_end(self):
"""Minimal valid procedural tree: one step + end."""
structure = {
"steps": [
{"id": "s1", "type": "procedure_step", "title": "Only Step"},
{"id": "end", "type": "procedure_end", "title": "Done"},
]
}
is_valid, errors = validate_procedural_structure(structure)
assert is_valid
# --- can_publish_tree dispatch ---
class TestCanPublishTreeDispatch:
"""Test can_publish_tree dispatches correctly by tree_type."""
def test_troubleshooting_uses_tree_validation(self):
"""Default tree_type uses troubleshooting validation."""
structure = {
"id": "root",
"type": "decision",
"question": "Test?",
"children": [
{"id": "y", "type": "solution", "solution": "Yes"},
{"id": "n", "type": "solution", "solution": "No"},
]
}
can, errors = can_publish_tree(structure, "My Tree", tree_type="troubleshooting")
assert can
def test_procedural_uses_procedural_validation(self):
can, errors = can_publish_tree(
make_valid_procedural_steps(),
"DC Build Procedure",
tree_type="procedural",
)
assert can
def test_procedural_rejects_troubleshooting_structure(self):
"""A troubleshooting structure should fail procedural validation."""
ts_structure = {
"id": "root",
"type": "decision",
"question": "Test?",
}
can, errors = can_publish_tree(ts_structure, "My Tree", tree_type="procedural")
assert not can
def test_procedural_validates_intake_form(self):
can, errors = can_publish_tree(
make_valid_procedural_steps(),
"DC Build",
tree_type="procedural",
intake_form=make_valid_intake_form(),
)
assert can
def test_procedural_rejects_duplicate_variable_names(self):
intake = [
{"variable_name": "name", "label": "Name", "field_type": "text", "required": True, "display_order": 1},
{"variable_name": "name", "label": "Name 2", "field_type": "text", "required": False, "display_order": 2},
]
can, errors = can_publish_tree(
make_valid_procedural_steps(),
"DC Build",
tree_type="procedural",
intake_form=intake,
)
assert not can
assert any("Duplicate" in e["message"] for e in errors)
def test_procedural_rejects_select_without_options(self):
intake = [
{"variable_name": "role", "label": "Server Role", "field_type": "select", "required": True, "display_order": 1},
]
can, errors = can_publish_tree(
make_valid_procedural_steps(),
"DC Build",
tree_type="procedural",
intake_form=intake,
)
assert not can
assert any("option" in e["message"].lower() for e in errors)
def test_empty_name_blocks_publish(self):
can, errors = can_publish_tree(
make_valid_procedural_steps(),
"",
tree_type="procedural",
)
assert not can
assert any("name" in e["field"] for e in errors)
# --- IntakeFormField Pydantic Schema Tests ---
class TestIntakeFormFieldSchema:
"""Test IntakeFormField Pydantic validation."""
def test_valid_text_field(self):
field = IntakeFormField(
variable_name="server_name",
label="Server Name",
field_type="text",
required=True,
display_order=1,
)
assert field.variable_name == "server_name"
def test_invalid_variable_name_uppercase(self):
with pytest.raises(Exception):
IntakeFormField(
variable_name="ServerName",
label="Server Name",
field_type="text",
required=True,
display_order=1,
)
def test_invalid_variable_name_starts_with_number(self):
with pytest.raises(Exception):
IntakeFormField(
variable_name="1server",
label="Server Name",
field_type="text",
required=True,
display_order=1,
)
def test_valid_variable_name_with_underscores(self):
field = IntakeFormField(
variable_name="ip_address_v4",
label="IPv4 Address",
field_type="ip_address",
required=True,
display_order=1,
)
assert field.variable_name == "ip_address_v4"
def test_select_requires_options(self):
with pytest.raises(Exception):
IntakeFormField(
variable_name="role",
label="Role",
field_type="select",
required=True,
display_order=1,
)
def test_select_with_options_valid(self):
field = IntakeFormField(
variable_name="role",
label="Role",
field_type="select",
required=True,
options=["AD DS", "DNS", "DHCP"],
display_order=1,
)
assert field.options == ["AD DS", "DNS", "DHCP"]
def test_multi_select_requires_options(self):
with pytest.raises(Exception):
IntakeFormField(
variable_name="roles",
label="Roles",
field_type="multi_select",
required=True,
display_order=1,
)
def test_checkbox_field(self):
field = IntakeFormField(
variable_name="confirm_backup",
label="Backup confirmed?",
field_type="checkbox",
required=False,
display_order=1,
)
assert field.field_type == "checkbox"
# --- TreeCreate Schema with Procedural Fields ---
class TestTreeCreateProceduralSchema:
"""Test TreeCreate schema with tree_type and intake_form."""
def test_defaults_to_troubleshooting(self):
tree = TreeCreate(
name="Test",
tree_structure={"id": "root", "type": "decision", "question": "Test?"},
)
assert tree.tree_type == "troubleshooting"
assert tree.intake_form is None
def test_procedural_with_intake_form(self):
tree = TreeCreate(
name="DC Build",
tree_type="procedural",
tree_structure=make_valid_procedural_steps(),
intake_form=[
IntakeFormField(
variable_name="server_name",
label="Server Name",
field_type="text",
required=True,
display_order=1,
),
],
)
assert tree.tree_type == "procedural"
assert len(tree.intake_form) == 1
def test_duplicate_variable_names_rejected(self):
with pytest.raises(Exception):
TreeCreate(
name="DC Build",
tree_type="procedural",
tree_structure=make_valid_procedural_steps(),
intake_form=[
IntakeFormField(
variable_name="name",
label="Name",
field_type="text",
required=True,
display_order=1,
),
IntakeFormField(
variable_name="name",
label="Name Again",
field_type="text",
required=False,
display_order=2,
),
],
)
# --- API Integration Tests ---
@pytest.mark.asyncio
class TestProceduralFlowsAPI:
"""Integration tests for procedural flow CRUD via API."""
async def test_create_procedural_draft(self, client, auth_headers):
"""Create a procedural flow as draft."""
tree_data = {
"name": "DC Build Procedure",
"description": "Domain Controller setup procedure",
"tree_type": "procedural",
"status": "draft",
"tree_structure": make_valid_procedural_steps(),
"intake_form": make_valid_intake_form(),
}
response = await client.post(
"/api/v1/trees",
json=tree_data,
headers=auth_headers,
)
assert response.status_code == 200 or response.status_code == 201
data = response.json()
assert data["tree_type"] == "procedural"
assert data["intake_form"] is not None
assert len(data["intake_form"]) == 3
async def test_create_procedural_published(self, client, auth_headers):
"""Create a procedural flow and publish it (validation runs)."""
tree_data = {
"name": "M365 User Onboarding",
"tree_type": "procedural",
"status": "published",
"tree_structure": make_valid_procedural_steps(),
}
response = await client.post(
"/api/v1/trees",
json=tree_data,
headers=auth_headers,
)
assert response.status_code == 200 or response.status_code == 201
async def test_create_procedural_published_invalid_fails(self, client, auth_headers):
"""Publish should fail if procedural structure is invalid."""
tree_data = {
"name": "Bad Procedure",
"tree_type": "procedural",
"status": "published",
"tree_structure": {
"steps": [
{"id": "s1", "type": "procedure_step", "title": "Step 1"},
# No end step
]
},
}
response = await client.post(
"/api/v1/trees",
json=tree_data,
headers=auth_headers,
)
assert response.status_code == 422
async def test_list_trees_filter_by_type(self, client, auth_headers):
"""Create both types and filter by tree_type."""
# Create troubleshooting
await client.post(
"/api/v1/trees",
json={
"name": "Troubleshooting Tree",
"status": "draft",
"tree_structure": {"id": "root", "type": "decision", "question": "Test?"},
},
headers=auth_headers,
)
# Create procedural
await client.post(
"/api/v1/trees",
json={
"name": "Procedure Flow",
"tree_type": "procedural",
"status": "draft",
"tree_structure": make_valid_procedural_steps(),
},
headers=auth_headers,
)
# Filter by procedural
response = await client.get(
"/api/v1/trees?tree_type=procedural",
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert all(t["tree_type"] == "procedural" for t in data)
# Filter by troubleshooting
response = await client.get(
"/api/v1/trees?tree_type=troubleshooting",
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert all(t["tree_type"] == "troubleshooting" for t in data)
async def test_update_procedural_tree(self, client, auth_headers):
"""Update a procedural tree's intake form."""
# Create
create_resp = await client.post(
"/api/v1/trees",
json={
"name": "Procedure",
"tree_type": "procedural",
"status": "draft",
"tree_structure": make_valid_procedural_steps(),
},
headers=auth_headers,
)
tree_id = create_resp.json()["id"]
# Update with intake form
update_resp = await client.put(
f"/api/v1/trees/{tree_id}",
json={
"intake_form": make_valid_intake_form(),
},
headers=auth_headers,
)
assert update_resp.status_code == 200
data = update_resp.json()
assert data["intake_form"] is not None
async def test_start_session_procedural_with_variables(self, client, auth_headers):
"""Start a procedural session with intake form variables."""
# Create published procedural tree with intake form
create_resp = await client.post(
"/api/v1/trees",
json={
"name": "DC Build",
"tree_type": "procedural",
"status": "published",
"tree_structure": make_valid_procedural_steps(),
"intake_form": make_valid_intake_form(),
},
headers=auth_headers,
)
assert create_resp.status_code in (200, 201)
tree_id = create_resp.json()["id"]
# Start session with variables
session_resp = await client.post(
"/api/v1/sessions",
json={
"tree_id": tree_id,
"session_variables": {
"server_name": "DC01",
"ip_address": "192.168.1.10",
},
},
headers=auth_headers,
)
assert session_resp.status_code in (200, 201)
session_data = session_resp.json()
assert session_data["session_variables"]["server_name"] == "DC01"
assert session_data["tree_snapshot"]["tree_type"] == "procedural"
async def test_start_session_procedural_missing_required_field(self, client, auth_headers):
"""Starting a procedural session without required intake fields should fail."""
# Create published procedural tree with required intake form
create_resp = await client.post(
"/api/v1/trees",
json={
"name": "DC Build",
"tree_type": "procedural",
"status": "published",
"tree_structure": make_valid_procedural_steps(),
"intake_form": make_valid_intake_form(),
},
headers=auth_headers,
)
tree_id = create_resp.json()["id"]
# Try to start without required fields
session_resp = await client.post(
"/api/v1/sessions",
json={
"tree_id": tree_id,
# Missing server_name and ip_address (both required)
},
headers=auth_headers,
)
assert session_resp.status_code == 422
assert "Missing required" in session_resp.json()["detail"]
async def test_start_session_procedural_optional_fields_ok(self, client, auth_headers):
"""Starting a session with only required fields (optional missing) should work."""
create_resp = await client.post(
"/api/v1/trees",
json={
"name": "DC Build",
"tree_type": "procedural",
"status": "published",
"tree_structure": make_valid_procedural_steps(),
"intake_form": make_valid_intake_form(),
},
headers=auth_headers,
)
tree_id = create_resp.json()["id"]
# Start with only required fields (notes is optional)
session_resp = await client.post(
"/api/v1/sessions",
json={
"tree_id": tree_id,
"session_variables": {
"server_name": "DC01",
"ip_address": "192.168.1.10",
# notes is optional, not provided
},
},
headers=auth_headers,
)
assert session_resp.status_code in (200, 201)
async def test_fork_preserves_tree_type_and_intake_form(self, client, auth_headers):
"""Forking a procedural tree should preserve tree_type and intake_form."""
# Create procedural tree
create_resp = await client.post(
"/api/v1/trees",
json={
"name": "DC Build Original",
"tree_type": "procedural",
"status": "published",
"tree_structure": make_valid_procedural_steps(),
"intake_form": make_valid_intake_form(),
},
headers=auth_headers,
)
tree_id = create_resp.json()["id"]
# Fork it
fork_resp = await client.post(
f"/api/v1/trees/{tree_id}/fork",
json={"fork_reason": "Customized for Client X"},
headers=auth_headers,
)
assert fork_resp.status_code in (200, 201)
fork_data = fork_resp.json()
assert fork_data["tree_type"] == "procedural"
assert fork_data["intake_form"] is not None
assert len(fork_data["intake_form"]) == 3
async def test_existing_trees_default_troubleshooting(self, client, auth_headers):
"""Trees created without tree_type should default to troubleshooting."""
response = await client.post(
"/api/v1/trees",
json={
"name": "Legacy Tree",
"status": "draft",
"tree_structure": {"id": "root", "type": "decision", "question": "Test?"},
},
headers=auth_headers,
)
assert response.status_code in (200, 201)
data = response.json()
assert data["tree_type"] == "troubleshooting"
assert data.get("intake_form") is None