From 7fe9f4044c780235f3c6b10716afd1532bd07c49 Mon Sep 17 00:00:00 2001 From: chihlasm Date: Wed, 25 Feb 2026 13:35:23 -0500 Subject: [PATCH] feat: add step_sync module with extraction and upsert logic Co-Authored-By: Claude Sonnet 4.6 --- backend/app/core/step_sync.py | 205 ++++++++++++++++++++++++++++++++ backend/tests/test_step_sync.py | 140 ++++++++++++++++++++++ 2 files changed, 345 insertions(+) create mode 100644 backend/app/core/step_sync.py create mode 100644 backend/tests/test_step_sync.py diff --git a/backend/app/core/step_sync.py b/backend/app/core/step_sync.py new file mode 100644 index 00000000..5b92e5ca --- /dev/null +++ b/backend/app/core/step_sync.py @@ -0,0 +1,205 @@ +"""Sync steps from published flows into the step library.""" +from __future__ import annotations +import json +from typing import Any, Generator, Literal, Optional +from uuid import UUID +from datetime import datetime, timezone + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + + +StepVisibility = Literal['private', 'team', 'public'] + + +def resolve_step_visibility( + is_public: bool, + account_id: Optional[UUID], + node_override: Optional[str], +) -> StepVisibility: + """Resolve the visibility for a synced step. + + Priority: node-level library_visibility overrides flow visibility. + Flow visibility: 'public' if is_public, otherwise 'team'. + """ + if node_override in ('team', 'public'): + return node_override # type: ignore[return-value] + return 'public' if is_public else 'team' + + +def _normalize_commands(raw: Any) -> list[dict]: + """Normalize the commands field to a list of StepCommand dicts.""" + if not raw: + return [] + if isinstance(raw, str): + return [{"label": "", "command": raw, "command_type": None}] + if isinstance(raw, list): + result = [] + for item in raw: + if isinstance(item, str): + result.append({"label": "", "command": item, "command_type": None}) + elif isinstance(item, dict): + result.append({ + "label": item.get("label", ""), + "command": item.get("code", item.get("command", "")), + "command_type": item.get("language", item.get("command_type")), + }) + return result + return [] + + +def _walk_troubleshooting(node: dict) -> Generator[dict, None, None]: + """Recursively yield action and solution nodes from a troubleshooting tree.""" + if node.get("type") in ("action", "solution"): + yield node + for child in node.get("children", []): + yield from _walk_troubleshooting(child) + + +def extract_steps_for_sync( + tree_structure: dict, + tree_type: str, +) -> Generator[dict, None, None]: + """Extract step dicts ready for upsert from a tree structure. + + Yields dicts with keys: + source_node_id, title, step_type, content (dict), node_visibility_override + """ + if tree_type in ("procedural", "maintenance"): + steps = tree_structure.get("steps", []) + current_section: Optional[str] = None + for node in steps: + node_type = node.get("type") + if node_type == "section_header": + current_section = node.get("title") or node.get("section_header") + continue + if node_type != "procedure_step": + continue + instructions = node.get("description") or node.get("title", "") + commands = _normalize_commands(node.get("commands")) or None + content: dict = {"instructions": instructions} + if node.get("expected_outcome"): + content["help_text"] = node["expected_outcome"] + if commands: + content["commands"] = commands + if current_section: + content["group_label"] = current_section + yield { + "source_node_id": node["id"], + "title": node.get("title", "Untitled step"), + "step_type": "action", + "content": content, + "node_visibility_override": node.get("library_visibility"), + } + + elif tree_type == "troubleshooting": + for node in _walk_troubleshooting(tree_structure): + instructions = node.get("description") or node.get("title", "") + yield { + "source_node_id": node["id"], + "title": node.get("title", "Untitled step"), + "step_type": "action" if node["type"] == "action" else "solution", + "content": {"instructions": instructions}, + "node_visibility_override": None, + } + + +async def sync_steps_from_tree( + db: AsyncSession, + tree_id: UUID, + tree_type: str, + tree_structure: dict, + author_id: UUID, + account_id: Optional[UUID], + is_public: bool, +) -> int: + """Upsert step library entries from a published tree. + + Returns the number of steps synced. + """ + now = datetime.now(timezone.utc) + extracted = list(extract_steps_for_sync(tree_structure, tree_type)) + + for step_data in extracted: + visibility = resolve_step_visibility( + is_public=is_public, + account_id=account_id, + node_override=step_data["node_visibility_override"], + ) + await db.execute( + text(""" + INSERT INTO step_library ( + id, title, step_type, content, created_by, account_id, + visibility, is_flow_synced, source_tree_id, source_node_id, + last_synced_at, tags, is_active, + usage_count, rating_average, rating_count, + helpful_yes, helpful_no, is_featured, is_verified, + created_at, updated_at + ) VALUES ( + gen_random_uuid(), :title, :step_type, :content::jsonb, + :created_by, :account_id, :visibility, true, + :source_tree_id, :source_node_id, :last_synced_at, + '{}', true, + 0, 0, 0, 0, 0, false, false, + :now, :now + ) + ON CONFLICT (source_tree_id, source_node_id) + DO UPDATE SET + title = EXCLUDED.title, + step_type = EXCLUDED.step_type, + content = EXCLUDED.content, + visibility = EXCLUDED.visibility, + last_synced_at = EXCLUDED.last_synced_at, + updated_at = EXCLUDED.updated_at, + is_active = true + """), + { + "title": step_data["title"], + "step_type": step_data["step_type"], + "content": json.dumps(step_data["content"]), + "created_by": str(author_id), + "account_id": str(account_id) if account_id else None, + "visibility": visibility, + "source_tree_id": str(tree_id), + "source_node_id": step_data["source_node_id"], + "last_synced_at": now, + "now": now, + } + ) + + # Soft-delete previously synced steps that no longer exist in the tree + current_node_ids = [s["source_node_id"] for s in extracted] + if current_node_ids: + await db.execute( + text(""" + UPDATE step_library + SET is_active = false, updated_at = :now + WHERE source_tree_id = :tree_id + AND is_flow_synced = true + AND source_node_id != ALL(:node_ids) + """), + {"tree_id": str(tree_id), "node_ids": current_node_ids, "now": now} + ) + else: + await db.execute( + text(""" + UPDATE step_library + SET is_active = false, updated_at = :now + WHERE source_tree_id = :tree_id AND is_flow_synced = true + """), + {"tree_id": str(tree_id), "now": now} + ) + + return len(extracted) + + +async def deactivate_synced_steps_for_tree(db: AsyncSession, tree_id: UUID) -> None: + """Soft-delete all synced library entries for a tree (on tree delete/deactivate).""" + await db.execute( + text(""" + UPDATE step_library + SET is_active = false, updated_at = :now + WHERE source_tree_id = :tree_id AND is_flow_synced = true + """), + {"tree_id": str(tree_id), "now": datetime.now(timezone.utc)} + ) diff --git a/backend/tests/test_step_sync.py b/backend/tests/test_step_sync.py new file mode 100644 index 00000000..8f734f05 --- /dev/null +++ b/backend/tests/test_step_sync.py @@ -0,0 +1,140 @@ +"""Tests for flow-to-library step sync.""" +import pytest +from uuid import uuid4 +from app.core.step_sync import extract_steps_for_sync, resolve_step_visibility + + +class TestResolveStepVisibility: + """Test visibility resolution logic.""" + + def test_public_flow_gives_public_steps(self): + result = resolve_step_visibility(is_public=True, account_id=None, node_override=None) + assert result == 'public' + + def test_team_flow_gives_team_steps(self): + result = resolve_step_visibility(is_public=False, account_id=uuid4(), node_override=None) + assert result == 'team' + + def test_private_flow_gives_team_steps(self): + result = resolve_step_visibility(is_public=False, account_id=None, node_override=None) + assert result == 'team' + + def test_node_override_takes_precedence(self): + result = resolve_step_visibility(is_public=True, account_id=None, node_override='team') + assert result == 'team' + + def test_public_override_on_team_flow(self): + result = resolve_step_visibility(is_public=False, account_id=uuid4(), node_override='public') + assert result == 'public' + + +class TestExtractStepsForSync: + """Test step extraction from tree structures.""" + + def test_extracts_procedure_steps_from_procedural_flow(self): + tree_structure = { + "steps": [ + {"id": "step_1", "type": "procedure_step", "title": "Verify prerequisites", + "description": "Check all prereqs", "content_type": "action"}, + {"id": "end_1", "type": "procedure_end", "title": "Done"}, + ] + } + results = list(extract_steps_for_sync(tree_structure, tree_type='procedural')) + assert len(results) == 1 + assert results[0]['source_node_id'] == 'step_1' + assert results[0]['title'] == 'Verify prerequisites' + assert results[0]['step_type'] == 'action' + assert results[0]['content']['instructions'] == 'Check all prereqs' + + def test_skips_section_header_nodes(self): + tree_structure = { + "steps": [ + {"id": "sec_1", "type": "section_header", "title": "Phase 1"}, + {"id": "step_1", "type": "procedure_step", "title": "First step", + "description": "Do this"}, + ] + } + results = list(extract_steps_for_sync(tree_structure, tree_type='procedural')) + assert len(results) == 1 + assert results[0]['source_node_id'] == 'step_1' + + def test_captures_section_header_as_group_label(self): + tree_structure = { + "steps": [ + {"id": "sec_1", "type": "section_header", "title": "Cable Checks"}, + {"id": "step_1", "type": "procedure_step", "title": "Check cable", + "description": "Verify cable is seated"}, + ] + } + results = list(extract_steps_for_sync(tree_structure, tree_type='procedural')) + assert results[0]['content']['group_label'] == 'Cable Checks' + + def test_normalizes_string_commands(self): + tree_structure = { + "steps": [ + {"id": "step_1", "type": "procedure_step", "title": "Run command", + "description": "Execute this", "commands": "ping 8.8.8.8"}, + ] + } + results = list(extract_steps_for_sync(tree_structure, tree_type='procedural')) + assert results[0]['content']['commands'] == [{"label": "", "command": "ping 8.8.8.8", "command_type": None}] + + def test_normalizes_commandblock_commands(self): + tree_structure = { + "steps": [ + {"id": "step_1", "type": "procedure_step", "title": "Run PS", + "description": "Run powershell", + "commands": [{"code": "Get-Service", "language": "powershell", "label": "Check services"}]}, + ] + } + results = list(extract_steps_for_sync(tree_structure, tree_type='procedural')) + cmds = results[0]['content']['commands'] + assert len(cmds) == 1 + assert cmds[0]['command'] == 'Get-Service' + assert cmds[0]['command_type'] == 'powershell' + assert cmds[0]['label'] == 'Check services' + + def test_extracts_action_and_solution_from_troubleshooting(self): + tree_structure = { + "id": "root", + "type": "decision", + "question": "What is wrong?", + "options": [{"id": "o1", "label": "Thing A", "next_node_id": "act_1"}], + "children": [ + {"id": "act_1", "type": "action", "title": "Fix thing A", + "description": "Do the fix", "next_node_id": "sol_1", + "children": [{"id": "sol_1", "type": "solution", "title": "All fixed", + "description": "Problem resolved"}]}, + ] + } + results = list(extract_steps_for_sync(tree_structure, tree_type='troubleshooting')) + node_ids = {r['source_node_id'] for r in results} + assert 'act_1' in node_ids + assert 'sol_1' in node_ids + types = {r['source_node_id']: r['step_type'] for r in results} + assert types['act_1'] == 'action' + assert types['sol_1'] == 'solution' + + def test_uses_title_as_instructions_fallback(self): + tree_structure = { + "steps": [ + {"id": "step_1", "type": "procedure_step", "title": "Do the thing"}, + ] + } + results = list(extract_steps_for_sync(tree_structure, tree_type='procedural')) + assert results[0]['content']['instructions'] == 'Do the thing' + + def test_empty_steps_list(self): + tree_structure = {"steps": []} + results = list(extract_steps_for_sync(tree_structure, tree_type='procedural')) + assert results == [] + + def test_maintenance_treated_same_as_procedural(self): + tree_structure = { + "steps": [ + {"id": "step_1", "type": "procedure_step", "title": "Maintenance step", + "description": "Do maintenance"}, + ] + } + results = list(extract_steps_for_sync(tree_structure, tree_type='maintenance')) + assert len(results) == 1