Files
resolutionflow/backend/app/core/step_sync.py
2026-02-25 13:35:23 -05:00

206 lines
7.6 KiB
Python

"""Sync steps from published flows into the step library."""
from __future__ import annotations
import json
from typing import Any, Generator, Literal, Optional
from uuid import UUID
from datetime import datetime, timezone
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
StepVisibility = Literal['private', 'team', 'public']
def resolve_step_visibility(
is_public: bool,
account_id: Optional[UUID],
node_override: Optional[str],
) -> StepVisibility:
"""Resolve the visibility for a synced step.
Priority: node-level library_visibility overrides flow visibility.
Flow visibility: 'public' if is_public, otherwise 'team'.
"""
if node_override in ('team', 'public'):
return node_override # type: ignore[return-value]
return 'public' if is_public else 'team'
def _normalize_commands(raw: Any) -> list[dict]:
"""Normalize the commands field to a list of StepCommand dicts."""
if not raw:
return []
if isinstance(raw, str):
return [{"label": "", "command": raw, "command_type": None}]
if isinstance(raw, list):
result = []
for item in raw:
if isinstance(item, str):
result.append({"label": "", "command": item, "command_type": None})
elif isinstance(item, dict):
result.append({
"label": item.get("label", ""),
"command": item.get("code", item.get("command", "")),
"command_type": item.get("language", item.get("command_type")),
})
return result
return []
def _walk_troubleshooting(node: dict) -> Generator[dict, None, None]:
"""Recursively yield action and solution nodes from a troubleshooting tree."""
if node.get("type") in ("action", "solution"):
yield node
for child in node.get("children", []):
yield from _walk_troubleshooting(child)
def extract_steps_for_sync(
tree_structure: dict,
tree_type: str,
) -> Generator[dict, None, None]:
"""Extract step dicts ready for upsert from a tree structure.
Yields dicts with keys:
source_node_id, title, step_type, content (dict), node_visibility_override
"""
if tree_type in ("procedural", "maintenance"):
steps = tree_structure.get("steps", [])
current_section: Optional[str] = None
for node in steps:
node_type = node.get("type")
if node_type == "section_header":
current_section = node.get("title") or node.get("section_header")
continue
if node_type != "procedure_step":
continue
instructions = node.get("description") or node.get("title", "")
commands = _normalize_commands(node.get("commands")) or None
content: dict = {"instructions": instructions}
if node.get("expected_outcome"):
content["help_text"] = node["expected_outcome"]
if commands:
content["commands"] = commands
if current_section:
content["group_label"] = current_section
yield {
"source_node_id": node["id"],
"title": node.get("title", "Untitled step"),
"step_type": "action",
"content": content,
"node_visibility_override": node.get("library_visibility"),
}
elif tree_type == "troubleshooting":
for node in _walk_troubleshooting(tree_structure):
instructions = node.get("description") or node.get("title", "")
yield {
"source_node_id": node["id"],
"title": node.get("title", "Untitled step"),
"step_type": "action" if node["type"] == "action" else "solution",
"content": {"instructions": instructions},
"node_visibility_override": None,
}
async def sync_steps_from_tree(
db: AsyncSession,
tree_id: UUID,
tree_type: str,
tree_structure: dict,
author_id: UUID,
account_id: Optional[UUID],
is_public: bool,
) -> int:
"""Upsert step library entries from a published tree.
Returns the number of steps synced.
"""
now = datetime.now(timezone.utc)
extracted = list(extract_steps_for_sync(tree_structure, tree_type))
for step_data in extracted:
visibility = resolve_step_visibility(
is_public=is_public,
account_id=account_id,
node_override=step_data["node_visibility_override"],
)
await db.execute(
text("""
INSERT INTO step_library (
id, title, step_type, content, created_by, account_id,
visibility, is_flow_synced, source_tree_id, source_node_id,
last_synced_at, tags, is_active,
usage_count, rating_average, rating_count,
helpful_yes, helpful_no, is_featured, is_verified,
created_at, updated_at
) VALUES (
gen_random_uuid(), :title, :step_type, :content::jsonb,
:created_by, :account_id, :visibility, true,
:source_tree_id, :source_node_id, :last_synced_at,
'{}', true,
0, 0, 0, 0, 0, false, false,
:now, :now
)
ON CONFLICT (source_tree_id, source_node_id)
DO UPDATE SET
title = EXCLUDED.title,
step_type = EXCLUDED.step_type,
content = EXCLUDED.content,
visibility = EXCLUDED.visibility,
last_synced_at = EXCLUDED.last_synced_at,
updated_at = EXCLUDED.updated_at,
is_active = true
"""),
{
"title": step_data["title"],
"step_type": step_data["step_type"],
"content": json.dumps(step_data["content"]),
"created_by": str(author_id),
"account_id": str(account_id) if account_id else None,
"visibility": visibility,
"source_tree_id": str(tree_id),
"source_node_id": step_data["source_node_id"],
"last_synced_at": now,
"now": now,
}
)
# Soft-delete previously synced steps that no longer exist in the tree
current_node_ids = [s["source_node_id"] for s in extracted]
if current_node_ids:
await db.execute(
text("""
UPDATE step_library
SET is_active = false, updated_at = :now
WHERE source_tree_id = :tree_id
AND is_flow_synced = true
AND source_node_id != ALL(:node_ids)
"""),
{"tree_id": str(tree_id), "node_ids": current_node_ids, "now": now}
)
else:
await db.execute(
text("""
UPDATE step_library
SET is_active = false, updated_at = :now
WHERE source_tree_id = :tree_id AND is_flow_synced = true
"""),
{"tree_id": str(tree_id), "now": now}
)
return len(extracted)
async def deactivate_synced_steps_for_tree(db: AsyncSession, tree_id: UUID) -> None:
"""Soft-delete all synced library entries for a tree (on tree delete/deactivate)."""
await db.execute(
text("""
UPDATE step_library
SET is_active = false, updated_at = :now
WHERE source_tree_id = :tree_id AND is_flow_synced = true
"""),
{"tree_id": str(tree_id), "now": datetime.now(timezone.utc)}
)