Files
resolutionflow/backend/app/api/endpoints/trees.py
chihlasm e6a0c0549b feat: Step Library sync + service account for default tree ownership
* feat: maintenance flow UX redesign — batch status hub, context strip, detail page upgrades (#85)

- Add BatchStatusPage (/flows/:id/batches/:batchId): per-target Start/Resume/View cards, progress bar, 5s polling while in-progress, completion outcome summary
- Add BatchStatusCard: handles not-started/in-progress/complete states with step progress for in-progress targets
- Add ActiveBatchBanner: amber banner on detail page when a batch is running, links to BatchStatusPage
- Add MaintenanceContextStrip: amber strip in ProceduralNavigationPage for maintenance flows showing target name, batch progress (X/Y complete), and Back to Batch nav
- Update MaintenanceFlowDetailPage: active batch banner, clickable run history rows with mini progress dots and outcome summaries, Run button loading state, post-launch navigates to BatchStatusPage
- Update ProceduralNavigationPage: renders MaintenanceContextStrip between top bar and content when tree_type === 'maintenance'; fetches batch progress once on mount
- Add batch_id filter to GET /sessions backend endpoint and SessionListParams frontend type
- Add /flows/:id/batches/:batchId route to router

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: session detail page — completion action + outcome summary card

- In-progress sessions: amber banner with "Complete Session" button opens
  SessionOutcomeModal to set outcome/notes/next-steps and finalize
- Completed sessions: colored outcome summary card (icon + outcome label +
  duration + notes + next steps) replaces dense header metadata; "Copy for
  Ticket" promoted to primary action inside the card
- Export toolbar de-emphasized to secondary row of smaller controls below
  the summary card

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: add library-page action props to StepCard (edit/delete/save)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: pass library-page action props through StepLibraryBrowser + refreshKey

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: StepFormModal wrapper + submitLabel/isSubmitting props on StepForm

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: Step Library page — create, edit, delete, save-to-library

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: add RuntimeStep union type for procedural custom steps

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: StepChecklist accepts RuntimeStep[], renders amber Custom badge

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: StepDetail accepts RuntimeStep, renders Custom Step badge for custom steps

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: custom step insertion in procedural flow sessions

Engineers can add custom steps inline during execution. Steps are
persisted to session.custom_steps and restored on resume.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: suppress StepFeedback on custom steps, fix resume stepState seeding, functional updater for step index

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs: add tree forking UI design doc

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs: add tree fork UI implementation plan

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: add ForkInfo type and fork fields to Tree/TreeListItem

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: align ForkInfo type with backend schema, remove redundant fork fields

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: ForkInfo placement, required fork_info field, add JSDoc

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: add ForkModal component with name and reason fields

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: ForkModal accessibility and UX (escape, click-outside, labels, maxLength)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: open ForkModal on fork action in TreeLibraryPage

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: add ForkModal to MyTreesPage

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: show Fork chip badge on forked tree cards

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs: add flow-to-library step sync design doc

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs: add flow-to-library sync implementation plan

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: add sync tracking columns to step_library

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: add sync columns and source_tree relationship to StepLibrary model

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: add group_label to StepContent, is_flow_synced/source_tree_name to StepLibraryResponse

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: include is_flow_synced and source_tree_name in step list/detail responses

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: add is_flow_synced and source_tree_name to step list response

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: add selectinload and sync fields to search and get_step endpoints

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: add step_sync module with extraction and upsert logic

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: safe NOT IN placeholders for asyncpg, add deactivate docstring

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: trigger step library sync on tree publish and deactivate on delete

- Call sync_steps_from_tree in update_tree whenever the tree is published
  (status transitions to 'published' or is already published and structure changes)
- Call deactivate_synced_steps_for_tree in delete_tree before db.commit()
  so the FK SET NULL does not nullify source_tree_id before the WHERE clause runs
- Fix ::jsonb cast syntax in step_sync.py (asyncpg rejects :: operator in text()
  queries; replaced with CAST(:content AS jsonb))
- Add UniqueConstraint('source_tree_id','source_node_id') to StepLibrary model
  so Base.metadata.create_all (used by tests) creates the constraint that the
  ON CONFLICT clause in sync_steps_from_tree depends on

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: add is_flow_synced and source_tree_name to Step types

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: show From Flow badge and lock icon on flow-synced StepCard

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: show source flow name in StepDetailModal for synced steps

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: add Library Visibility select to procedural StepEditor

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: address code review issues in flow-to-library sync

- Fix sync trigger: only fire on publish transition, not every PUT
- Add TestSyncOnPublish integration tests (2 tests, 16 total passing)
- Add group_label to frontend StepContent interface
- Guard Library Visibility select to procedure_step nodes only
- Block API edits to flow-synced steps (400 read-only guard)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: handle None author_id in step sync to avoid invalid UUID error

When a system/default tree has no author (author_id is None),
str(None) produces the literal string 'None' which asyncpg
rejects as an invalid UUID for the created_by column.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: add ResolutionFlow service account to own default tree steps in library

Default/system trees had no author_id (NULL), causing a NOT NULL violation
when syncing steps to step_library.created_by on publish.

- Add is_service_account flag to users table (migration 4f4137ce)
- Add service_account.py: idempotent ensure_service_account() creates
  noreply@resolutionflow.com with unusable password on startup
- Cache service account ID on app.state at lifespan startup
- Add get_service_account_id() FastAPI dep (returns None in tests)
- sync_steps_from_tree: resolve author_id or service_account_id as created_by
- create_tree: set author_id=service_account_id for is_default trees
- Migration 1490781700bc: backfill author_id on 31 existing default trees

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-25 23:17:29 -05:00

1265 lines
42 KiB
Python

from datetime import datetime, timezone
from typing import Annotated, Optional
from uuid import UUID
import secrets
from fastapi import APIRouter, Depends, HTTPException, status, Query, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, or_, update
from sqlalchemy.orm import selectinload
from app.core.database import get_db
from app.models.tree import Tree
from app.models.tree_share import TreeShare
from app.models.user import User
from app.models.category import TreeCategory
from app.models.tag import TreeTag, tree_tag_assignments
from app.models.folder import UserFolder, user_folder_trees
from app.schemas.tree import (
TreeCreate, TreeUpdate, TreeResponse, TreeListResponse, CategoryInfo,
ForkCreate, ForkInfo, TreeShareCreate, TreeShareResponse,
TreeVisibilityUpdate, SharedTreeResponse, TreeValidationResponse, ValidationError,
PinnedFlowResponse, PinnedFlowsListResponse, PinnedFlowReorderRequest
)
from app.models.user_pinned_tree import UserPinnedTree
from app.api.deps import get_current_active_user, require_engineer_or_admin, require_admin, get_service_account_id
from app.core.permissions import can_edit_tree, can_access_tree
from app.core.filters import build_tree_access_filter
from app.core.subscriptions import check_tree_limit
from app.core.audit import log_audit
from app.core.config import settings
from app.core.tree_validation import can_publish_tree
from app.core.step_sync import sync_steps_from_tree, deactivate_synced_steps_for_tree
router = APIRouter(prefix="/trees", tags=["trees"])
def build_tree_response(tree: Tree, author_map: dict | None = None) -> TreeListResponse:
"""Build TreeListResponse with category_info and tags."""
category_info = None
if tree.category_rel:
category_info = CategoryInfo(
id=tree.category_rel.id,
name=tree.category_rel.name,
slug=tree.category_rel.slug
)
author_name = (author_map or {}).get(tree.author_id)
return TreeListResponse(
id=tree.id,
name=tree.name,
description=tree.description,
tree_type=tree.tree_type,
category=tree.category,
category_id=tree.category_id,
category_info=category_info,
tags=tree.tag_names,
author_id=tree.author_id,
author_name=author_name,
account_id=tree.account_id,
is_active=tree.is_active,
is_public=tree.is_public,
is_default=tree.is_default,
visibility=tree.visibility,
status=tree.status,
version=tree.version,
usage_count=tree.usage_count,
created_at=tree.created_at,
updated_at=tree.updated_at
)
def build_full_tree_response(tree: Tree, parent_tree: Tree = None) -> TreeResponse:
"""Build TreeResponse with all details including category_info, tags, and fork_info."""
category_info = None
if tree.category_rel:
category_info = CategoryInfo(
id=tree.category_rel.id,
name=tree.category_rel.name,
slug=tree.category_rel.slug
)
fork_info = None
if tree.parent_tree_id or tree.fork_depth > 0:
has_updates = False
if parent_tree and tree.parent_updated_at:
has_updates = parent_tree.updated_at > tree.parent_updated_at
fork_info = ForkInfo(
parent_tree_id=tree.parent_tree_id,
root_tree_id=tree.root_tree_id,
fork_reason=tree.fork_reason,
fork_depth=tree.fork_depth,
parent_updated_at=tree.parent_updated_at,
has_parent_updates=has_updates
)
return TreeResponse(
id=tree.id,
name=tree.name,
description=tree.description,
tree_type=tree.tree_type,
category=tree.category,
category_id=tree.category_id,
category_info=category_info,
tags=tree.tag_names,
fork_info=fork_info,
tree_structure=tree.tree_structure,
intake_form=tree.intake_form,
author_id=tree.author_id,
account_id=tree.account_id,
is_active=tree.is_active,
is_public=tree.is_public,
is_default=tree.is_default,
status=tree.status,
version=tree.version,
usage_count=tree.usage_count,
created_at=tree.created_at,
updated_at=tree.updated_at
)
@router.get("", response_model=list[TreeListResponse])
async def list_trees(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
tree_type: Optional[str] = Query(None, description="Filter by tree type: troubleshooting or procedural"),
category: Optional[str] = Query(None, description="Filter by legacy category string"),
category_id: Optional[UUID] = Query(None, description="Filter by category ID"),
tags: Optional[str] = Query(None, description="Comma-separated tag slugs to filter by"),
folder_id: Optional[UUID] = Query(None, description="Filter by folder ID (user's folders only)"),
is_active: Optional[bool] = Query(None, description="Filter by active status"),
author_id: Optional[UUID] = Query(None, description="Filter by author ID"),
is_public: Optional[bool] = Query(None, description="Filter by public status"),
visibility: Optional[str] = Query(None, description="Filter by visibility: private, team, link, public"),
sort_by: Optional[str] = Query("usage_count", description="Sort order: usage_count, updated_at, created_at, name, name_desc, version"),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100)
):
"""List all trees with optional filters.
New filters:
- category_id: Filter by category (from tree_categories table)
- tags: Comma-separated tag slugs (e.g., "citrix,networking")
- folder_id: Show only trees in a specific folder
- sort_by: Sort order (usage_count [default], updated_at, created_at, name, name_desc, version)
"""
query = select(Tree).options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
)
# Apply filters
if tree_type:
query = query.where(Tree.tree_type == tree_type)
if category:
query = query.where(Tree.category == category)
if category_id:
query = query.where(Tree.category_id == category_id)
if is_active is not None:
query = query.where(Tree.is_active == is_active)
else:
# Default to only showing active trees
query = query.where(Tree.is_active == True)
if author_id:
query = query.where(Tree.author_id == author_id)
if is_public is not None:
query = query.where(Tree.is_public == is_public)
if visibility:
query = query.where(Tree.visibility == visibility)
# Filter by tags (all specified tags must be present)
if tags:
tag_slugs = [t.strip() for t in tags.split(",") if t.strip()]
for tag_slug in tag_slugs:
query = query.where(
Tree.tags.any(TreeTag.slug == tag_slug)
)
# Filter by folder
if folder_id:
# Verify folder belongs to user
folder_result = await db.execute(
select(UserFolder).where(
UserFolder.id == folder_id,
UserFolder.user_id == current_user.id
)
)
folder = folder_result.scalar_one_or_none()
if not folder:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Folder not found"
)
query = query.where(Tree.folders.any(UserFolder.id == folder_id))
# Apply access filter
query = query.where(build_tree_access_filter(current_user))
# Apply sorting
if sort_by == "updated_at":
query = query.order_by(Tree.updated_at.desc())
elif sort_by == "created_at":
query = query.order_by(Tree.created_at.desc())
elif sort_by == "name":
query = query.order_by(Tree.name.asc())
elif sort_by == "name_desc":
query = query.order_by(Tree.name.desc())
elif sort_by == "version":
query = query.order_by(Tree.version.desc(), Tree.updated_at.desc())
else: # Default to usage_count
query = query.order_by(Tree.usage_count.desc(), Tree.updated_at.desc())
query = query.offset(skip).limit(limit)
result = await db.execute(query)
trees = result.scalars().unique().all()
# Fetch author names in one query (avoids N+1)
author_ids = {t.author_id for t in trees if t.author_id}
author_map: dict = {}
if author_ids:
authors_result = await db.execute(
select(User.id, User.name, User.email).where(User.id.in_(author_ids))
)
for row in authors_result:
author_map[row.id] = row.name or row.email
return [build_tree_response(tree, author_map) for tree in trees]
@router.get("/categories", response_model=list[str])
async def list_categories(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""List all unique categories from trees the user can access.
Note: This returns legacy string categories. For the new category system,
use the /categories endpoint.
"""
query = select(Tree.category).where(
Tree.category.isnot(None),
Tree.is_active == True,
build_tree_access_filter(current_user)
).distinct()
result = await db.execute(query)
categories = [row[0] for row in result.all() if row[0]]
return sorted(categories)
# --- Pinned Flows Endpoints (must be before /{tree_id} to avoid route shadowing) ---
MAX_PINNED_FLOWS = 15
@router.get("/pinned", response_model=PinnedFlowsListResponse)
async def list_pinned_flows(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""List user's pinned flows, ordered by display_order."""
result = await db.execute(
select(UserPinnedTree, Tree)
.join(Tree, UserPinnedTree.tree_id == Tree.id)
.options(selectinload(Tree.category_rel))
.where(
UserPinnedTree.user_id == current_user.id,
Tree.is_active == True,
Tree.deleted_at.is_(None)
)
.order_by(UserPinnedTree.display_order, UserPinnedTree.pinned_at)
)
rows = result.all()
items = []
for pin, tree in rows:
items.append(PinnedFlowResponse(
id=pin.id,
tree_id=tree.id,
tree_name=tree.name,
tree_type=tree.tree_type,
category_emoji=None,
category_name=tree.category_rel.name if tree.category_rel else None,
pinned_at=pin.pinned_at,
display_order=pin.display_order,
))
return PinnedFlowsListResponse(items=items, count=len(items))
@router.patch("/pinned/reorder", response_model=PinnedFlowsListResponse)
async def reorder_pinned_flows(
reorder_data: PinnedFlowReorderRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Update display_order for all pinned flows."""
for item in reorder_data.order:
await db.execute(
update(UserPinnedTree)
.where(
UserPinnedTree.user_id == current_user.id,
UserPinnedTree.tree_id == item.tree_id
)
.values(display_order=item.display_order)
)
await db.commit()
# Return updated list
result = await db.execute(
select(UserPinnedTree, Tree)
.join(Tree, UserPinnedTree.tree_id == Tree.id)
.options(selectinload(Tree.category_rel))
.where(
UserPinnedTree.user_id == current_user.id,
Tree.is_active == True,
Tree.deleted_at.is_(None)
)
.order_by(UserPinnedTree.display_order, UserPinnedTree.pinned_at)
)
rows = result.all()
items = []
for pin, tree in rows:
items.append(PinnedFlowResponse(
id=pin.id,
tree_id=tree.id,
tree_name=tree.name,
tree_type=tree.tree_type,
category_emoji=None,
category_name=tree.category_rel.name if tree.category_rel else None,
pinned_at=pin.pinned_at,
display_order=pin.display_order,
))
return PinnedFlowsListResponse(items=items, count=len(items))
@router.get("/search", response_model=list[TreeListResponse])
async def search_trees(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
q: str = Query(..., min_length=2, description="Search query"),
limit: int = Query(20, ge=1, le=50)
):
"""Full-text search trees by name and description."""
# Using PostgreSQL full-text search
search_vector = func.to_tsvector('english', func.coalesce(Tree.name, '') + ' ' + func.coalesce(Tree.description, ''))
search_query = func.plainto_tsquery('english', q)
query = select(Tree).options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
).where(
Tree.is_active == True,
build_tree_access_filter(current_user),
search_vector.op('@@')(search_query)
).order_by(
func.ts_rank(search_vector, search_query).desc()
).limit(limit)
result = await db.execute(query)
trees = result.scalars().unique().all()
return [build_tree_response(tree) for tree in trees]
@router.get("/{tree_id}", response_model=TreeResponse)
async def get_tree(
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Get a specific tree by ID."""
result = await db.execute(
select(Tree)
.options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
)
.where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
if not tree.is_active or not can_access_tree(current_user, tree):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this tree"
)
return build_full_tree_response(tree)
@router.post("", response_model=TreeResponse, status_code=status.HTTP_201_CREATED)
async def create_tree(
tree_data: TreeCreate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_engineer_or_admin)],
service_account_id: Annotated[Optional[UUID], Depends(get_service_account_id)],
):
"""Create a new tree (engineers and admins only).
Supports:
- category_id: Assign to a category from tree_categories
- tags: List of tag names to assign (creates new tags if needed)
- status: draft or published (published requires validation)
"""
# Validate tree if status is 'published'
if tree_data.status == 'published':
# Convert intake_form to dicts for validation
intake_form_dicts = None
if tree_data.intake_form:
intake_form_dicts = [f.model_dump() for f in tree_data.intake_form]
can_publish, validation_errors = can_publish_tree(
tree_data.tree_structure,
tree_data.name,
tree_data.description,
tree_type=tree_data.tree_type,
intake_form=intake_form_dicts,
)
if not can_publish:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={
"message": "Cannot publish tree with validation errors",
"errors": validation_errors
}
)
# Only admins can create default/system trees
is_default = tree_data.is_default and current_user.is_super_admin
# Verify category exists if provided
if tree_data.category_id:
cat_result = await db.execute(
select(TreeCategory).where(TreeCategory.id == tree_data.category_id)
)
category = cat_result.scalar_one_or_none()
if not category:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Category not found"
)
# Check category access
if category.account_id and category.account_id != current_user.account_id and not current_user.is_super_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this category"
)
# Convert intake_form Pydantic models to dicts for JSONB storage
intake_form_data = None
if tree_data.intake_form:
intake_form_data = [f.model_dump(exclude_none=True) for f in tree_data.intake_form]
new_tree = Tree(
name=tree_data.name,
description=tree_data.description,
category=tree_data.category,
category_id=tree_data.category_id,
tree_type=tree_data.tree_type,
tree_structure=tree_data.tree_structure,
intake_form=intake_form_data,
author_id=service_account_id if is_default else current_user.id,
account_id=None if is_default else current_user.account_id,
is_public=True if is_default else tree_data.is_public, # Default trees are always public
is_default=is_default,
status=tree_data.status
)
# Check subscription tree limit
if not is_default and current_user.account_id:
can_create, limit, count = await check_tree_limit(current_user.account_id, db)
if not can_create:
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail=f"Tree limit reached ({count}/{limit}). Upgrade your plan to create more trees."
)
db.add(new_tree)
await db.flush() # Get the ID
# Handle tags
if tree_data.tags:
tree_account_id = new_tree.account_id or (current_user.account_id if not current_user.is_super_admin else None)
# Collect tags to add
tags_to_add = []
for tag_name in tree_data.tags:
slug = TreeTag.slugify(tag_name)
# Try to find existing tag
tag_query = select(TreeTag).where(
TreeTag.slug == slug,
or_(
TreeTag.account_id.is_(None),
TreeTag.account_id == tree_account_id
)
)
tag_result = await db.execute(tag_query)
tag = tag_result.scalar_one_or_none()
if not tag:
# Create new tag
tag = TreeTag(
name=tag_name,
slug=slug,
account_id=tree_account_id,
created_by=current_user.id
)
db.add(tag)
await db.flush()
tags_to_add.append(tag)
tag.usage_count += 1
# Use direct SQL insert for the junction table to avoid lazy load issues
from app.models.tag import tree_tag_assignments
for tag in tags_to_add:
await db.execute(
tree_tag_assignments.insert().values(
tree_id=new_tree.id,
tag_id=tag.id,
assigned_by=current_user.id
)
)
await db.commit()
# Reload with relationships
result = await db.execute(
select(Tree)
.options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
)
.where(Tree.id == new_tree.id)
)
tree = result.scalar_one()
return build_full_tree_response(tree)
@router.put("/{tree_id}", response_model=TreeResponse)
async def update_tree(
tree_id: UUID,
tree_data: TreeUpdate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_engineer_or_admin)],
service_account_id: Annotated[Optional[UUID], Depends(get_service_account_id)],
):
"""Update an existing tree (engineers and admins only).
Supports:
- category_id: Change category assignment
- tags: Replace all tags on the tree
- status: Update status (requires validation when publishing)
"""
result = await db.execute(
select(Tree)
.options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
)
.where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
if not can_edit_tree(current_user, tree):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only edit your own trees"
)
# Extract tags for separate handling
update_data = tree_data.model_dump(exclude_unset=True)
tags_data = update_data.pop("tags", None)
# Validate if transitioning to published status
if "status" in update_data and update_data["status"] == 'published':
# Get the final tree structure and name after update
final_tree_structure = update_data.get("tree_structure", tree.tree_structure)
final_name = update_data.get("name", tree.name)
final_description = update_data.get("description", tree.description)
final_tree_type = update_data.get("tree_type", tree.tree_type)
final_intake_form = update_data.get("intake_form", tree.intake_form)
can_publish, validation_errors = can_publish_tree(
final_tree_structure,
final_name,
final_description,
tree_type=final_tree_type,
intake_form=final_intake_form,
)
if not can_publish:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail={
"message": "Cannot publish tree with validation errors",
"errors": validation_errors
}
)
# Verify new category if provided
if "category_id" in update_data and update_data["category_id"]:
cat_result = await db.execute(
select(TreeCategory).where(TreeCategory.id == update_data["category_id"])
)
category = cat_result.scalar_one_or_none()
if not category:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Category not found"
)
if category.account_id and category.account_id != current_user.account_id and not current_user.is_super_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this category"
)
# Update basic fields
for field, value in update_data.items():
setattr(tree, field, value)
# Keep visibility and is_public in sync
if tree_data.is_public is not None:
if tree_data.is_public and tree.visibility not in ('public',):
tree.visibility = 'public'
elif not tree_data.is_public and tree.visibility == 'public':
tree.visibility = 'team' # downgrade from public to team
# Increment version if tree structure changed
if "tree_structure" in update_data:
tree.version += 1
# Sync steps to step library on publish transition only
if update_data.get("status") == 'published':
_structure = update_data.get("tree_structure", tree.tree_structure)
_type = update_data.get("tree_type", tree.tree_type)
_is_public = update_data.get("is_public", tree.is_public)
await sync_steps_from_tree(
db=db,
tree_id=tree.id,
tree_type=_type,
tree_structure=_structure,
author_id=tree.author_id,
account_id=tree.account_id,
is_public=_is_public,
service_account_id=service_account_id,
)
# Handle tags replacement
if tags_data is not None:
from app.models.tag import tree_tag_assignments
# Decrement usage count for old tags (already eagerly loaded)
for tag in tree.tags:
tag.usage_count = max(0, tag.usage_count - 1)
# Delete existing tag assignments using direct SQL
await db.execute(
tree_tag_assignments.delete().where(
tree_tag_assignments.c.tree_id == tree.id
)
)
# Add new tags
tree_account_id = tree.account_id or (current_user.account_id if not current_user.is_super_admin else None)
added_tag_ids = set()
for tag_name in tags_data:
slug = TreeTag.slugify(tag_name)
tag_query = select(TreeTag).where(
TreeTag.slug == slug,
or_(
TreeTag.account_id.is_(None),
TreeTag.account_id == tree_account_id
)
)
tag_result = await db.execute(tag_query)
tag = tag_result.scalar_one_or_none()
if not tag:
tag = TreeTag(
name=tag_name,
slug=slug,
account_id=tree_account_id,
created_by=current_user.id
)
db.add(tag)
await db.flush()
if tag.id not in added_tag_ids:
await db.execute(
tree_tag_assignments.insert().values(
tree_id=tree.id,
tag_id=tag.id,
assigned_by=current_user.id
)
)
added_tag_ids.add(tag.id)
tag.usage_count += 1
await db.commit()
# Reload with relationships
result = await db.execute(
select(Tree)
.options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
)
.where(Tree.id == tree_id)
)
tree = result.scalar_one()
return build_full_tree_response(tree)
@router.delete("/{tree_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_tree(
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_admin)]
):
"""Soft delete a tree (admin only). Sets deleted_at timestamp and is_active=False."""
result = await db.execute(select(Tree).where(Tree.id == tree_id))
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
tree.is_active = False
tree.deleted_at = datetime.now(timezone.utc)
tree.deleted_by = current_user.id
# Clean up folder assignments
await db.execute(
user_folder_trees.delete().where(user_folder_trees.c.tree_id == tree.id)
)
# Decrement usage_count on associated tags (floor at 0)
tag_ids_result = await db.execute(
select(tree_tag_assignments.c.tag_id).where(
tree_tag_assignments.c.tree_id == tree.id
)
)
tag_ids = [row[0] for row in tag_ids_result.fetchall()]
if tag_ids:
await db.execute(
update(TreeTag)
.where(TreeTag.id.in_(tag_ids))
.values(usage_count=func.greatest(TreeTag.usage_count - 1, 0))
)
# Clean up tag assignments
await db.execute(
tree_tag_assignments.delete().where(tree_tag_assignments.c.tree_id == tree.id)
)
# Deactivate any synced step library entries before deletion
# (must happen before db.delete/commit — FK SET NULL would lose the reference)
await deactivate_synced_steps_for_tree(db, tree.id)
await log_audit(db, current_user.id, "tree.delete", "tree", tree.id,
{"tree_name": tree.name})
await db.commit()
return None
# --- Fork Endpoints ---
@router.post("/{tree_id}/fork", response_model=TreeResponse, status_code=status.HTTP_201_CREATED)
async def fork_tree(
tree_id: UUID,
fork_data: ForkCreate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_engineer_or_admin)]
):
"""Fork a tree to create a personal copy.
Engineers can fork any tree they can access (public, account, or default).
Fork inherits tree_structure but gets new ownership.
"""
# Load parent tree
result = await db.execute(
select(Tree)
.options(selectinload(Tree.category_rel), selectinload(Tree.tags))
.where(Tree.id == tree_id)
)
parent = result.scalar_one_or_none()
if not parent:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
if not can_access_tree(current_user, parent):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this tree"
)
# Check subscription tree limit
if current_user.account_id:
can_create, limit, count = await check_tree_limit(current_user.account_id, db)
if not can_create:
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail=f"Tree limit reached ({count}/{limit}). Upgrade your plan to create more trees."
)
# Build fork
fork_name = fork_data.name or f"Fork of {parent.name}"
fork = Tree(
name=fork_name,
description=parent.description,
category=parent.category,
category_id=parent.category_id,
tree_type=parent.tree_type,
tree_structure=parent.tree_structure,
intake_form=parent.intake_form,
author_id=current_user.id,
account_id=current_user.account_id,
is_public=False,
is_default=False,
version=1,
# Fork tracking
parent_tree_id=parent.id,
fork_reason=fork_data.fork_reason,
parent_updated_at=parent.updated_at,
# Lineage tracking
root_tree_id=parent.root_tree_id if parent.root_tree_id else parent.id,
fork_depth=parent.fork_depth + 1,
)
db.add(fork)
await db.flush()
await log_audit(db, current_user.id, "tree.fork", "tree", fork.id,
{"parent_tree_id": str(parent.id), "parent_name": parent.name,
"fork_reason": fork_data.fork_reason})
await db.commit()
# Reload with relationships
result = await db.execute(
select(Tree)
.options(selectinload(Tree.category_rel), selectinload(Tree.tags))
.where(Tree.id == fork.id)
)
fork = result.scalar_one()
return build_full_tree_response(fork, parent_tree=parent)
@router.get("/{tree_id}/forks", response_model=list[TreeListResponse])
async def list_forks(
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100)
):
"""List all direct forks of a tree."""
# Verify parent exists and user can access it
parent_result = await db.execute(select(Tree).where(Tree.id == tree_id))
parent = parent_result.scalar_one_or_none()
if not parent:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
if not can_access_tree(current_user, parent):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this tree"
)
# Query direct forks, filtered by access
query = select(Tree).options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
).where(
Tree.parent_tree_id == tree_id,
Tree.is_active == True,
build_tree_access_filter(current_user)
).order_by(Tree.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(query)
forks = result.scalars().unique().all()
return [build_tree_response(tree) for tree in forks]
@router.get("/{tree_id}/lineage", response_model=list[TreeListResponse])
async def get_tree_lineage(
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Get the fork lineage chain from current tree back to root.
Returns ordered list: [current tree, parent, grandparent, ..., root]
Limited to 10 levels to prevent infinite loops.
"""
lineage = []
current_id = tree_id
visited = set()
max_depth = 10
for _ in range(max_depth):
if current_id is None or current_id in visited:
break
visited.add(current_id)
result = await db.execute(
select(Tree)
.options(selectinload(Tree.category_rel), selectinload(Tree.tags))
.where(Tree.id == current_id)
)
tree = result.scalar_one_or_none()
if not tree:
break
lineage.append(build_tree_response(tree))
current_id = tree.parent_tree_id
return lineage
# --- Tree Sharing Endpoints ---
@router.post("/{tree_id}/share", response_model=TreeShareResponse, status_code=status.HTTP_201_CREATED)
async def create_tree_share(
tree_id: UUID,
share_data: TreeShareCreate,
request: Request,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Generate a share token for a tree.
Requirements:
- Tree author can always create shares
- Account members can share trees with visibility 'team', 'link', or 'public'
- Super admins can share any tree
"""
# Load tree
result = await db.execute(
select(Tree).where(Tree.id == tree_id, Tree.is_active == True)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
# Check permissions
if not can_access_tree(current_user, tree):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this tree"
)
# Generate unique share token
share_token = secrets.token_urlsafe(48) # 48 bytes -> 64 base64 chars
# Create share
tree_share = TreeShare(
tree_id=tree.id,
share_token=share_token,
created_by=current_user.id,
allow_forking=share_data.allow_forking,
expires_at=share_data.expires_at
)
db.add(tree_share)
await log_audit(db, current_user.id, "tree.share.create", "tree_share", tree_share.id,
{"tree_id": str(tree.id), "tree_name": tree.name, "allow_forking": share_data.allow_forking})
await db.commit()
await db.refresh(tree_share)
# Build share URL
base_url = str(request.base_url).rstrip('/')
share_url = f"{base_url}/shared/{share_token}"
return TreeShareResponse(
id=tree_share.id,
tree_id=tree_share.tree_id,
share_token=tree_share.share_token,
share_url=share_url,
allow_forking=tree_share.allow_forking,
created_by=tree_share.created_by,
created_at=tree_share.created_at,
expires_at=tree_share.expires_at
)
@router.get("/{tree_id}/shares", response_model=list[TreeShareResponse])
async def list_tree_shares(
tree_id: UUID,
request: Request,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""List all active shares for a tree."""
# Verify tree exists and user can access it
result = await db.execute(
select(Tree).where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
if not can_access_tree(current_user, tree):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this tree"
)
# Query shares
shares_result = await db.execute(
select(TreeShare)
.where(TreeShare.tree_id == tree_id)
.order_by(TreeShare.created_at.desc())
)
shares = shares_result.scalars().all()
# Build responses with share URLs
base_url = str(request.base_url).rstrip('/')
return [
TreeShareResponse(
id=share.id,
tree_id=share.tree_id,
share_token=share.share_token,
share_url=f"{base_url}/shared/{share.share_token}",
allow_forking=share.allow_forking,
created_by=share.created_by,
created_at=share.created_at,
expires_at=share.expires_at
)
for share in shares
]
@router.patch("/{tree_id}/visibility", response_model=TreeResponse)
async def update_tree_visibility(
tree_id: UUID,
visibility_data: TreeVisibilityUpdate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(require_engineer_or_admin)]
):
"""Update tree visibility level.
Visibility levels:
- private: Only tree author can access
- team: Account members can access
- link: Anyone with a valid share token can access
- public: All authenticated users can access
"""
result = await db.execute(
select(Tree).where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
if not can_edit_tree(current_user, tree):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You can only edit your own trees"
)
# Update visibility
old_visibility = tree.visibility
tree.visibility = visibility_data.visibility
tree.is_public = (visibility_data.visibility == 'public')
await log_audit(db, current_user.id, "tree.visibility.update", "tree", tree.id,
{"tree_name": tree.name, "old_visibility": old_visibility,
"new_visibility": visibility_data.visibility})
await db.commit()
# Reload with relationships
result = await db.execute(
select(Tree)
.options(
selectinload(Tree.category_rel),
selectinload(Tree.tags)
)
.where(Tree.id == tree_id)
)
tree = result.scalar_one()
return build_full_tree_response(tree)
# --- Tree Validation Endpoint ---
@router.post("/{tree_id}/can-publish", response_model=TreeValidationResponse)
async def check_tree_can_publish(
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Check if a tree can be published (validation endpoint).
Returns validation status and any errors that would prevent publishing.
Useful for providing real-time feedback in the UI without attempting to publish.
"""
result = await db.execute(
select(Tree).where(Tree.id == tree_id)
)
tree = result.scalar_one_or_none()
if not tree:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Tree not found"
)
if not can_access_tree(current_user, tree):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have access to this tree"
)
# Validate the tree
can_publish, validation_errors = can_publish_tree(
tree.tree_structure,
tree.name,
tree.description,
tree_type=tree.tree_type,
intake_form=tree.intake_form,
)
return TreeValidationResponse(
can_publish=can_publish,
errors=[ValidationError(**error) for error in validation_errors]
)
@router.post("/{tree_id}/pin", response_model=PinnedFlowResponse)
async def pin_flow(
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Pin a flow to the user's sidebar."""
# Check tree exists and user can access it
tree_result = await db.execute(
select(Tree)
.options(selectinload(Tree.category_rel))
.where(Tree.id == tree_id, Tree.is_active == True)
)
tree = tree_result.scalar_one_or_none()
if not tree:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Tree not found")
if not can_access_tree(current_user, tree):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You don't have access to this tree")
# Check if already pinned (idempotent)
existing = await db.execute(
select(UserPinnedTree).where(
UserPinnedTree.user_id == current_user.id,
UserPinnedTree.tree_id == tree_id
)
)
pin = existing.scalar_one_or_none()
if pin:
return PinnedFlowResponse(
id=pin.id,
tree_id=tree.id,
tree_name=tree.name,
tree_type=tree.tree_type,
category_emoji=None,
category_name=tree.category_rel.name if tree.category_rel else None,
pinned_at=pin.pinned_at,
display_order=pin.display_order,
)
# Check max pins
count_result = await db.execute(
select(func.count(UserPinnedTree.id)).where(
UserPinnedTree.user_id == current_user.id
)
)
count = count_result.scalar() or 0
if count >= MAX_PINNED_FLOWS:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Maximum of {MAX_PINNED_FLOWS} pinned flows reached"
)
# Create pin
pin = UserPinnedTree(
user_id=current_user.id,
tree_id=tree_id,
display_order=count, # Append at end
)
db.add(pin)
await db.commit()
await db.refresh(pin)
return PinnedFlowResponse(
id=pin.id,
tree_id=tree.id,
tree_name=tree.name,
tree_type=tree.tree_type,
category_emoji=None,
category_name=tree.category_rel.name if tree.category_rel else None,
pinned_at=pin.pinned_at,
display_order=pin.display_order,
)
@router.delete("/{tree_id}/pin")
async def unpin_flow(
tree_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Unpin a flow from the user's sidebar."""
result = await db.execute(
select(UserPinnedTree).where(
UserPinnedTree.user_id == current_user.id,
UserPinnedTree.tree_id == tree_id
)
)
pin = result.scalar_one_or_none()
if pin:
await db.delete(pin)
await db.commit()
return {"success": True}