When a system/default tree has no author (author_id is None), str(None) produces the literal string 'None' which asyncpg rejects as an invalid UUID for the created_by column. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
215 lines
8.1 KiB
Python
215 lines
8.1 KiB
Python
"""Sync steps from published flows into the step library."""
|
|
from __future__ import annotations
|
|
import json
|
|
from typing import Any, Generator, Literal, Optional
|
|
from uuid import UUID
|
|
from datetime import datetime, timezone
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
|
|
StepVisibility = Literal['private', 'team', 'public']
|
|
|
|
|
|
def resolve_step_visibility(
|
|
is_public: bool,
|
|
account_id: Optional[UUID],
|
|
node_override: Optional[str],
|
|
) -> StepVisibility:
|
|
"""Resolve the visibility for a synced step.
|
|
|
|
Priority: node-level library_visibility overrides flow visibility.
|
|
Flow visibility: 'public' if is_public, otherwise 'team'.
|
|
"""
|
|
if node_override in ('team', 'public'):
|
|
return node_override # type: ignore[return-value]
|
|
return 'public' if is_public else 'team'
|
|
|
|
|
|
def _normalize_commands(raw: Any) -> list[dict]:
|
|
"""Normalize the commands field to a list of StepCommand dicts."""
|
|
if not raw:
|
|
return []
|
|
if isinstance(raw, str):
|
|
return [{"label": "", "command": raw, "command_type": None}]
|
|
if isinstance(raw, list):
|
|
result = []
|
|
for item in raw:
|
|
if isinstance(item, str):
|
|
result.append({"label": "", "command": item, "command_type": None})
|
|
elif isinstance(item, dict):
|
|
result.append({
|
|
"label": item.get("label", ""),
|
|
"command": item.get("code", item.get("command", "")),
|
|
"command_type": item.get("language", item.get("command_type")),
|
|
})
|
|
return result
|
|
return []
|
|
|
|
|
|
def _walk_troubleshooting(node: dict) -> Generator[dict, None, None]:
|
|
"""Recursively yield action and solution nodes from a troubleshooting tree."""
|
|
if node.get("type") in ("action", "solution"):
|
|
yield node
|
|
for child in node.get("children", []):
|
|
yield from _walk_troubleshooting(child)
|
|
|
|
|
|
def extract_steps_for_sync(
|
|
tree_structure: dict,
|
|
tree_type: str,
|
|
) -> Generator[dict, None, None]:
|
|
"""Extract step dicts ready for upsert from a tree structure.
|
|
|
|
Yields dicts with keys:
|
|
source_node_id, title, step_type, content (dict), node_visibility_override
|
|
"""
|
|
if tree_type in ("procedural", "maintenance"):
|
|
steps = tree_structure.get("steps", [])
|
|
current_section: Optional[str] = None
|
|
for node in steps:
|
|
node_type = node.get("type")
|
|
if node_type == "section_header":
|
|
current_section = node.get("title") or node.get("section_header")
|
|
continue
|
|
if node_type != "procedure_step":
|
|
continue
|
|
instructions = node.get("description") or node.get("title", "")
|
|
commands = _normalize_commands(node.get("commands")) or None
|
|
content: dict = {"instructions": instructions}
|
|
if node.get("expected_outcome"):
|
|
content["help_text"] = node["expected_outcome"]
|
|
if commands:
|
|
content["commands"] = commands
|
|
if current_section:
|
|
content["group_label"] = current_section
|
|
yield {
|
|
"source_node_id": node["id"],
|
|
"title": node.get("title", "Untitled step"),
|
|
"step_type": "action",
|
|
"content": content,
|
|
"node_visibility_override": node.get("library_visibility"),
|
|
}
|
|
|
|
elif tree_type == "troubleshooting":
|
|
for node in _walk_troubleshooting(tree_structure):
|
|
instructions = node.get("description") or node.get("title", "")
|
|
yield {
|
|
"source_node_id": node["id"],
|
|
"title": node.get("title", "Untitled step"),
|
|
"step_type": "action" if node["type"] == "action" else "solution",
|
|
"content": {"instructions": instructions},
|
|
"node_visibility_override": None,
|
|
}
|
|
|
|
|
|
async def sync_steps_from_tree(
|
|
db: AsyncSession,
|
|
tree_id: UUID,
|
|
tree_type: str,
|
|
tree_structure: dict,
|
|
author_id: UUID,
|
|
account_id: Optional[UUID],
|
|
is_public: bool,
|
|
) -> int:
|
|
"""Upsert step library entries from a published tree.
|
|
|
|
Returns the number of steps synced.
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
extracted = list(extract_steps_for_sync(tree_structure, tree_type))
|
|
|
|
for step_data in extracted:
|
|
visibility = resolve_step_visibility(
|
|
is_public=is_public,
|
|
account_id=account_id,
|
|
node_override=step_data["node_visibility_override"],
|
|
)
|
|
await db.execute(
|
|
text("""
|
|
INSERT INTO step_library (
|
|
id, title, step_type, content, created_by, account_id,
|
|
visibility, is_flow_synced, source_tree_id, source_node_id,
|
|
last_synced_at, tags, is_active,
|
|
usage_count, rating_average, rating_count,
|
|
helpful_yes, helpful_no, is_featured, is_verified,
|
|
created_at, updated_at
|
|
) VALUES (
|
|
gen_random_uuid(), :title, :step_type, CAST(:content AS jsonb),
|
|
:created_by, :account_id, :visibility, true,
|
|
:source_tree_id, :source_node_id, :last_synced_at,
|
|
'{}', true,
|
|
0, 0, 0, 0, 0, false, false,
|
|
:now, :now
|
|
)
|
|
ON CONFLICT (source_tree_id, source_node_id)
|
|
DO UPDATE SET
|
|
title = EXCLUDED.title,
|
|
step_type = EXCLUDED.step_type,
|
|
content = EXCLUDED.content,
|
|
visibility = EXCLUDED.visibility,
|
|
last_synced_at = EXCLUDED.last_synced_at,
|
|
updated_at = EXCLUDED.updated_at,
|
|
is_active = true
|
|
"""),
|
|
{
|
|
"title": step_data["title"],
|
|
"step_type": step_data["step_type"],
|
|
"content": json.dumps(step_data["content"]),
|
|
"created_by": str(author_id) if author_id else None,
|
|
"account_id": str(account_id) if account_id else None,
|
|
"visibility": visibility,
|
|
"source_tree_id": str(tree_id),
|
|
"source_node_id": step_data["source_node_id"],
|
|
"last_synced_at": now,
|
|
"now": now,
|
|
}
|
|
)
|
|
|
|
# Soft-delete previously synced steps that no longer exist in the tree
|
|
current_node_ids = [s["source_node_id"] for s in extracted]
|
|
if current_node_ids:
|
|
# Build NOT IN using explicit named placeholders — asyncpg does not
|
|
# auto-cast a Python list to a PostgreSQL array in text() queries.
|
|
placeholders = ", ".join(f":id_{i}" for i in range(len(current_node_ids)))
|
|
params = {f"id_{i}": nid for i, nid in enumerate(current_node_ids)}
|
|
params.update({"tree_id": str(tree_id), "now": now})
|
|
await db.execute(
|
|
text(f"""
|
|
UPDATE step_library
|
|
SET is_active = false, updated_at = :now
|
|
WHERE source_tree_id = :tree_id
|
|
AND is_flow_synced = true
|
|
AND source_node_id NOT IN ({placeholders})
|
|
"""),
|
|
params
|
|
)
|
|
else:
|
|
await db.execute(
|
|
text("""
|
|
UPDATE step_library
|
|
SET is_active = false, updated_at = :now
|
|
WHERE source_tree_id = :tree_id AND is_flow_synced = true
|
|
"""),
|
|
{"tree_id": str(tree_id), "now": now}
|
|
)
|
|
|
|
return len(extracted)
|
|
|
|
|
|
async def deactivate_synced_steps_for_tree(db: AsyncSession, tree_id: UUID) -> None:
|
|
"""Soft-delete all synced library entries for a tree (on tree delete/deactivate).
|
|
|
|
Must be called BEFORE deleting the tree row — after deletion the FK ondelete='SET NULL'
|
|
will null source_tree_id, making the WHERE clause match nothing.
|
|
"""
|
|
await db.execute(
|
|
text("""
|
|
UPDATE step_library
|
|
SET is_active = false, updated_at = :now
|
|
WHERE source_tree_id = :tree_id AND is_flow_synced = true
|
|
"""),
|
|
{"tree_id": str(tree_id), "now": datetime.now(timezone.utc)}
|
|
)
|