"""Sync steps from published flows into the step library.""" from __future__ import annotations import json from typing import Any, Generator, Literal, Optional from uuid import UUID from datetime import datetime, timezone from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession StepVisibility = Literal['private', 'team', 'public'] def resolve_step_visibility( is_public: bool, account_id: Optional[UUID], node_override: Optional[str], ) -> StepVisibility: """Resolve the visibility for a synced step. Priority: node-level library_visibility overrides flow visibility. Flow visibility: 'public' if is_public, otherwise 'team'. """ if node_override in ('team', 'public'): return node_override # type: ignore[return-value] return 'public' if is_public else 'team' def _normalize_commands(raw: Any) -> list[dict]: """Normalize the commands field to a list of StepCommand dicts.""" if not raw: return [] if isinstance(raw, str): return [{"label": "", "command": raw, "command_type": None}] if isinstance(raw, list): result = [] for item in raw: if isinstance(item, str): result.append({"label": "", "command": item, "command_type": None}) elif isinstance(item, dict): result.append({ "label": item.get("label", ""), "command": item.get("code", item.get("command", "")), "command_type": item.get("language", item.get("command_type")), }) return result return [] def _walk_troubleshooting(node: dict) -> Generator[dict, None, None]: """Recursively yield action and solution nodes from a troubleshooting tree.""" if node.get("type") in ("action", "solution"): yield node for child in node.get("children", []): yield from _walk_troubleshooting(child) def extract_steps_for_sync( tree_structure: dict, tree_type: str, ) -> Generator[dict, None, None]: """Extract step dicts ready for upsert from a tree structure. Yields dicts with keys: source_node_id, title, step_type, content (dict), node_visibility_override """ if tree_type in ("procedural", "maintenance"): steps = tree_structure.get("steps", []) current_section: Optional[str] = None for node in steps: node_type = node.get("type") if node_type == "section_header": current_section = node.get("title") or node.get("section_header") continue if node_type != "procedure_step": continue instructions = node.get("description") or node.get("title", "") commands = _normalize_commands(node.get("commands")) or None content: dict = {"instructions": instructions} if node.get("expected_outcome"): content["help_text"] = node["expected_outcome"] if commands: content["commands"] = commands if current_section: content["group_label"] = current_section yield { "source_node_id": node["id"], "title": node.get("title", "Untitled step"), "step_type": "action", "content": content, "node_visibility_override": node.get("library_visibility"), } elif tree_type == "troubleshooting": for node in _walk_troubleshooting(tree_structure): instructions = node.get("description") or node.get("title", "") yield { "source_node_id": node["id"], "title": node.get("title", "Untitled step"), "step_type": "action" if node["type"] == "action" else "solution", "content": {"instructions": instructions}, "node_visibility_override": None, } async def sync_steps_from_tree( db: AsyncSession, tree_id: UUID, tree_type: str, tree_structure: dict, author_id: Optional[UUID], account_id: Optional[UUID], is_public: bool, service_account_id: Optional[UUID] = None, ) -> int: """Upsert step library entries from a published tree. Returns the number of steps synced. For default/system trees that have no author_id, pass service_account_id so that created_by is set to the ResolutionFlow service account. """ resolved_author_id = author_id or service_account_id if not resolved_author_id: return 0 now = datetime.now(timezone.utc) extracted = list(extract_steps_for_sync(tree_structure, tree_type)) for step_data in extracted: visibility = resolve_step_visibility( is_public=is_public, account_id=account_id, node_override=step_data["node_visibility_override"], ) await db.execute( text(""" INSERT INTO step_library ( id, title, step_type, content, created_by, account_id, visibility, is_flow_synced, source_tree_id, source_node_id, last_synced_at, tags, is_active, usage_count, rating_average, rating_count, helpful_yes, helpful_no, is_featured, is_verified, created_at, updated_at ) VALUES ( gen_random_uuid(), :title, :step_type, CAST(:content AS jsonb), :created_by, :account_id, :visibility, true, :source_tree_id, :source_node_id, :last_synced_at, '{}', true, 0, 0, 0, 0, 0, false, false, :now, :now ) ON CONFLICT (source_tree_id, source_node_id) DO UPDATE SET title = EXCLUDED.title, step_type = EXCLUDED.step_type, content = EXCLUDED.content, visibility = EXCLUDED.visibility, last_synced_at = EXCLUDED.last_synced_at, updated_at = EXCLUDED.updated_at, is_active = true """), { "title": step_data["title"][:255], "step_type": step_data["step_type"], "content": json.dumps(step_data["content"]), "created_by": str(resolved_author_id), "account_id": str(account_id) if account_id else None, "visibility": visibility, "source_tree_id": str(tree_id), "source_node_id": step_data["source_node_id"], "last_synced_at": now, "now": now, } ) # Soft-delete previously synced steps that no longer exist in the tree current_node_ids = [s["source_node_id"] for s in extracted] if current_node_ids: # Build NOT IN using explicit named placeholders — asyncpg does not # auto-cast a Python list to a PostgreSQL array in text() queries. placeholders = ", ".join(f":id_{i}" for i in range(len(current_node_ids))) params = {f"id_{i}": nid for i, nid in enumerate(current_node_ids)} params.update({"tree_id": str(tree_id), "now": now}) await db.execute( text(f""" UPDATE step_library SET is_active = false, updated_at = :now WHERE source_tree_id = :tree_id AND is_flow_synced = true AND source_node_id NOT IN ({placeholders}) """), params ) else: await db.execute( text(""" UPDATE step_library SET is_active = false, updated_at = :now WHERE source_tree_id = :tree_id AND is_flow_synced = true """), {"tree_id": str(tree_id), "now": now} ) return len(extracted) async def deactivate_synced_steps_for_tree(db: AsyncSession, tree_id: UUID) -> None: """Soft-delete all synced library entries for a tree (on tree delete/deactivate). Must be called BEFORE deleting the tree row — after deletion the FK ondelete='SET NULL' will null source_tree_id, making the WHERE clause match nothing. """ await db.execute( text(""" UPDATE step_library SET is_active = false, updated_at = :now WHERE source_tree_id = :tree_id AND is_flow_synced = true """), {"tree_id": str(tree_id), "now": datetime.now(timezone.utc)} )