feat: AI-assisted flow builder with 4-stage wizard (#87)

* feat: AI-assisted flow builder with 4-stage wizard

Implements the complete AI flow builder feature using a guided 4-stage
wizard (Foundation → Scaffold → Branch Detail → Review & Assemble).
AI assists at bounded points using Claude Haiku for cost-efficient
structured JSON generation (~$0.01-0.03/flow).

Backend: new models (ai_conversations, ai_usage), Alembic migration,
quota enforcement with billing anchor, Anthropic API integration with
prompt caching, tree validation, conversation CRUD with 24h TTL,
APScheduler cleanup job, 5 API endpoints, Pydantic schemas.

Frontend: TypeScript types, API client, Zustand store for wizard state,
7 components (modal, step indicator, foundation form, branch selector,
branch detail view, tree preview, quota display), MyTreesPage integration
with "Build with AI" button (hidden when AI not configured).

Tests: 14 validator unit tests + 11 endpoint integration tests with
mocked Anthropic (zero real API spend). All 25 tests passing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* docs: dashboard design doc and implementation plan

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: Phase 1 — pinnedFlowsStore, pagination hook, cached quota hook, sidebar refactor

- Add pin() to pinnedFlowsApi
- Create pinnedFlowsStore (Zustand) — single source of truth for pin state
- Add dashboardMyFlowsView preference to userPreferencesStore
- Create usePaginationParams hook (URL-synced)
- Create useCachedQuota hook (5-min TTL)
- Sidebar uses pinnedFlowsStore instead of local state

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: Phase 2 — pin/favorite buttons on all library view components

- TreeGridView: star in top-right corner of cards
- TreeListView: star at end of each row
- TreeTableView: dedicated leftmost Favorite column
- All with proper a11y (aria-label), event isolation, loading states

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: Phase 3 — Library page create dropdown + AI Builder + pin wiring

- Replace single Create link with dropdown menu (3 flow types + AI Builder)
- Wire pinnedFlowsStore to all view components
- AI Builder modal integration via useCachedQuota hook

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: Phase 4 — Dashboard refactor with Favorites grid + paginated My Flows

- Favorites section: compact grid from pinnedFlowsStore, max 2 rows, expandable
- My Flows: author_id filter, URL-synced pagination (10/25/50/All)
- View toggle (grid/list/table) with independent preference
- Skeleton loaders, empty states with CTAs
- Create dropdown with AI Builder option
- 500-item ceiling for "Show All" mode

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* feat: Phase 5 — Sidebar pinned section dual collapse + show more/less

- Header collapse hides entire section, resets to 5 items on re-expand
- List truncation: show first 5, "Show more (N)" expands to all
- Clicking a flow auto-collapses back to 5
- Smooth max-height CSS transition (250ms ease-out)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: stabilize usePaginationParams to prevent infinite re-render loop

allowedPageSizes array was recreated every render as a useMemo dep,
causing infinite updates. Use useRef to stabilize the reference.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: remove Set-based Zustand selectors causing infinite re-render loop

Zustand selectors returning new Set() on every call fail Object.is
equality check, triggering continuous re-renders. Replaced with
useMemo-derived Sets in consuming components.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: pin route ordering and star icon overlap in grid view

Move GET /pinned and PATCH /pinned/reorder before GET /{tree_id} to
prevent FastAPI from matching "pinned" as a UUID path parameter (422).
Relocate star button from absolute positioning into the header row to
avoid overlapping privacy icons and category badges.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: code review fixes — date calc, input validation, rate limits, shared components

- Fix monthly_reset_at crash when billing anchor day exceeds next month's length
- Add environment_tags sanitization (max 20 tags, 100 chars each) to prevent prompt injection
- Add @limiter.limit("10/minute") rate limiting to all AI endpoints
- Use getTreeNavigatePath() routing helper instead of hardcoded paths
- Extract shared CreateFlowDropdown component from QuickStartPage and TreeLibraryPage
- Clear useCachedQuota on logout to prevent stale data across user sessions
- Add useRef guard to scaffold useEffect to prevent potential double-fire
- Use node.id as React key instead of array index in BranchDetailView
- Remove redundant dead logic in ai_tree_validator

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: correct Anthropic model ID to full dated version

claude-haiku-4-5 is not a valid model alias — Anthropic requires the
full dated model ID claude-haiku-4-5-20251001.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: strip markdown code fences from AI JSON responses

Haiku sometimes wraps its JSON in ```json ... ``` despite the prompt
instructing otherwise. Strip fences before parsing to avoid JSONDecodeError
at char 0.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: increase branch_detail max_tokens to 8192 and add response logging

Truncated output at 4096 tokens produces invalid JSON mid-generation.
Also logs stop_reason and output_tokens per attempt to diagnose failures.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: pass explicit status='draft' when creating AI-generated flow

Tree model defaults to 'published' in the DB schema, but passing status=None
from the constructor overrides that default, causing a nullable=False violation
and a 500 on save.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: auto-advance branch detail and pin navigation bar

- Auto-advance to next undetailed branch after generation completes,
  using a useEffect that watches the count of detailed branches
- Cap tree preview at max-h-48 with internal scroll so the nav bar
  is never pushed off screen
- Make nav bar sticky bottom-0 with bg-card so it stays visible
  regardless of content height

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: increase branch retries to 3 and relax cross-reference validation on final attempt

next_node_id mismatches are a common model hallucination that the retry
prompt doesn't reliably fix. On the final (3rd) attempt, accept the branch
with strict=False so only truly fatal errors (missing fields, dead ends,
bad JSON) cause a hard failure. Cross-reference issues are minor and
fixable in the tree editor.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: strengthen prompt to prevent next_node_id mismatches, keep strict validation

Rather than lowering the validation bar, improve the system prompt:
- Rule 6 now explicitly states next_node_id must match a direct child's id
- Added rule 10: build tree bottom-up to avoid forward-reference errors
- Corrective prompt now calls out the ID mismatch constraint specifically

Reverts the strict=False fallback — flows must be correct before saving.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: persist branch viewing index in store to survive phase remounts

Local useState resets to 0 every time phase transitions from 'generating'
back to 'detailing', causing the view to snap back to branch 1.

Move viewingIndex to store's currentBranchIndex (already existed) and
advance it in generateBranchDetail after success. Component reads from
store so remounts no longer lose position.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: correct publish validation to check title instead of action/solution fields

The publish validator was checking for an 'action' field on action nodes
and a 'solution' field on solution nodes, but the actual node schema
(confirmed from seed data and frontend types) uses 'title'/'description'.
This caused all AI-generated trees to fail publish validation.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: correct action node schema and improve AI flow quality

- Fix action nodes to use next_node_id (not children) for continuation,
  matching how TreeNavigationPage.tsx navigates action nodes
- Validator now requires next_node_id on all action nodes and flags
  missing ones as broken dead ends
- Update _check_branch_termination: action nodes are not dead ends since
  they continue via next_node_id (validated separately)
- Improve scaffold prompt: branch names must describe observable symptoms
  users can self-identify, not internal category names
- Update branch_detail prompt with clearer action node schema, corrected
  few-shot example showing proper next_node_id on action nodes
- Improve assemble_tree root question to be more user-facing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs: add AI flow builder gotchas to CLAUDE.md (#23-25)

- Action nodes use next_node_id (not children) for navigation
- Anthropic model IDs require full dated version string
- Claude API may wrap JSON in markdown fences

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: resolve CI lint errors and httpx dependency conflict

- Fix httpx version conflict: requirements-dev.txt now uses >=0.27.0 to match requirements.txt
- Extract CSAT helper functions to csatUtils.ts to fix react-refresh/only-export-components
- Remove default export from admin/EmptyState.tsx shim (same rule)
- Fix empty catch block in Modal.tsx (no-empty)
- Add eslint-disable comments for intentional setState-in-effect patterns in
  FlowAnalyticsPanel, QuickLaunch, NodeEditorPanel, useCachedQuota,
  MyAnalyticsPage, TeamAnalyticsPage
- Add eslint-disable comments for intentional _children destructure in NodeEditorPanel
- Fix _parentId unused var in useTreeLayout.ts
- Rewrite usePaginationParams.ts to avoid reading refs during render

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: update tests to match action node schema (next_node_id, not children)

- Update _make_valid_tree() in test_ai_tree_validator to use next_node_id
  on action nodes (solution is a sibling, not a child)
- Fix test_dead_end_action_node → test_dead_end_decision_node (action nodes
  don't have child-based dead ends; dead ends are decision nodes with no children)
- Add test_action_missing_next_node_id for the new validation rule
- Update BRANCH_DETAIL_JSON in test_ai_endpoints to use next_node_id pattern
- Update test_draft_trees.py to use "title" field for action/solution nodes
  (tree_validation.py was updated this branch to require "title" not "action"/"solution")

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: update remaining tests and session_to_tree for title field rename

- test_tree_validation.py: replace "action"/"solution" content fields with "title"
- test_procedural_flows.py: update solution node fixtures to use "title"
- test_save_session_as_tree.py: update fixtures and assertions for "title" field
- session_to_tree.py: generate "title" instead of "action"/"solution" on converted nodes;
  fall back to legacy field names when reading from old tree snapshots for compatibility

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit was merged in pull request #87.
This commit is contained in:
chihlasm
2026-02-23 00:03:54 -05:00
committed by GitHub
parent aef40078d0
commit 97cd297f46
72 changed files with 5925 additions and 346 deletions

View File

@@ -0,0 +1,216 @@
"""add ai flow builder tables and columns
Revision ID: a1b2c3d4e5f6
Revises: e65b9f8fd458
Create Date: 2026-02-20 12:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision: str = "a1b2c3d4e5f6"
down_revision: Union[str, None] = "e65b9f8fd458"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ── ai_conversations table ──
op.create_table(
"ai_conversations",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column(
"user_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"account_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("accounts.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("status", sa.String(20), nullable=False, server_default="foundation"),
sa.Column("messages", postgresql.JSONB(), nullable=False, server_default="[]"),
sa.Column(
"wizard_state", postgresql.JSONB(), nullable=False, server_default="{}"
),
sa.Column("generated_tree", postgresql.JSONB(), nullable=True),
sa.Column("question_rounds", sa.Integer(), nullable=False, server_default="0"),
sa.Column(
"expires_at", sa.DateTime(timezone=True), nullable=False
),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
)
op.create_index(
"ix_ai_conversations_user_id", "ai_conversations", ["user_id"]
)
op.create_index(
"ix_ai_conversations_account_id", "ai_conversations", ["account_id"]
)
op.create_index(
"ix_ai_conversations_user_created",
"ai_conversations",
["user_id", sa.text("created_at DESC")],
)
op.create_index(
"ix_ai_conversations_expires_at", "ai_conversations", ["expires_at"]
)
# ── ai_usage table ──
op.create_table(
"ai_usage",
sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column(
"user_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"account_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("accounts.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"conversation_id",
postgresql.UUID(as_uuid=True),
sa.ForeignKey("ai_conversations.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column("generation_type", sa.String(20), nullable=False),
sa.Column("tier_at_time", sa.String(20), nullable=False),
sa.Column("input_tokens", sa.Integer(), nullable=False, server_default="0"),
sa.Column("output_tokens", sa.Integer(), nullable=False, server_default="0"),
sa.Column(
"estimated_cost_usd",
sa.Numeric(10, 6),
nullable=False,
server_default="0",
),
sa.Column("succeeded", sa.Boolean(), nullable=False, server_default="true"),
sa.Column(
"counts_toward_quota",
sa.Boolean(),
nullable=False,
server_default="false",
),
sa.Column("error_code", sa.String(100), nullable=True),
sa.Column("metadata", postgresql.JSONB(), nullable=False, server_default="{}"),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
)
op.create_index("ix_ai_usage_user_id", "ai_usage", ["user_id"])
op.create_index("ix_ai_usage_account_id", "ai_usage", ["account_id"])
op.create_index("ix_ai_usage_created_at", "ai_usage", ["created_at"])
op.create_index(
"ix_ai_usage_user_created",
"ai_usage",
["user_id", sa.text("created_at DESC")],
)
op.create_index(
"ix_ai_usage_user_type_created",
"ai_usage",
["user_id", "generation_type", sa.text("created_at DESC")],
)
# Prevents double quota decrement from race conditions
op.execute(
"""
CREATE UNIQUE INDEX ix_ai_usage_unique_quota
ON ai_usage (conversation_id)
WHERE counts_toward_quota = true;
"""
)
# ── Schema modifications to existing tables ──
# users: add ai_billing_cycle_anchor_at
op.add_column(
"users",
sa.Column("ai_billing_cycle_anchor_at", sa.DateTime(timezone=True), nullable=True),
)
# Backfill: use created_at as the billing anchor
op.execute(
"UPDATE users SET ai_billing_cycle_anchor_at = created_at WHERE ai_billing_cycle_anchor_at IS NULL"
)
# plan_limits: add AI limit columns
op.add_column(
"plan_limits",
sa.Column("max_ai_builds_per_month", sa.Integer(), nullable=True),
)
op.add_column(
"plan_limits",
sa.Column("max_ai_builds_per_24h", sa.Integer(), nullable=True),
)
# account_limit_overrides: add AI override columns
op.add_column(
"account_limit_overrides",
sa.Column("override_max_ai_builds_per_month", sa.Integer(), nullable=True),
)
op.add_column(
"account_limit_overrides",
sa.Column("override_max_ai_builds_per_24h", sa.Integer(), nullable=True),
)
# Seed plan_limits with AI quota values
op.execute(
"""
UPDATE plan_limits SET max_ai_builds_per_month = 2, max_ai_builds_per_24h = 1
WHERE plan = 'free';
"""
)
op.execute(
"""
UPDATE plan_limits SET max_ai_builds_per_month = 50, max_ai_builds_per_24h = 10
WHERE plan = 'pro';
"""
)
op.execute(
"""
UPDATE plan_limits SET max_ai_builds_per_month = 200, max_ai_builds_per_24h = 20
WHERE plan = 'team';
"""
)
# Enterprise: NULL means unlimited (no update needed as default is NULL)
def downgrade() -> None:
# Drop AI override columns from account_limit_overrides
op.drop_column("account_limit_overrides", "override_max_ai_builds_per_24h")
op.drop_column("account_limit_overrides", "override_max_ai_builds_per_month")
# Drop AI limit columns from plan_limits
op.drop_column("plan_limits", "max_ai_builds_per_24h")
op.drop_column("plan_limits", "max_ai_builds_per_month")
# Drop ai_billing_cycle_anchor_at from users
op.drop_column("users", "ai_billing_cycle_anchor_at")
# Drop ai_usage table (indexes drop automatically)
op.drop_table("ai_usage")
# Drop ai_conversations table (indexes drop automatically)
op.drop_table("ai_conversations")

View File

@@ -0,0 +1,437 @@
"""AI Flow Builder wizard endpoints.
4-stage wizard:
POST /ai/start — Stage 1: create conversation with metadata
POST /ai/scaffold — Stage 2: AI suggests branches
POST /ai/branch-detail — Stage 3: AI generates detail for one branch
POST /ai/assemble — Stage 4: assemble branches into tree (no AI)
GET /ai/quota — quota status
"""
import logging
from typing import Annotated
import anthropic
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.rate_limit import limiter
from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin
from app.core.config import settings
from app.core.ai_conversation_store import (
create_conversation,
get_conversation,
update_conversation,
)
from app.core.ai_quota_service import check_ai_quota, record_ai_usage, get_user_plan
from app.core.ai_tree_generator_service import (
scaffold_branches,
generate_branch_detail,
assemble_tree,
)
from app.models.user import User
from app.schemas.ai_builder import (
AIStartRequest,
AIStartResponse,
AIScaffoldRequest,
AIScaffoldResponse,
AIBranchDetailRequest,
AIBranchDetailResponse,
AIAssembleRequest,
AIAssembleResponse,
AIQuotaStatusResponse,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/ai", tags=["ai-builder"])
def _require_ai_enabled() -> None:
"""Raise 503 if AI is not configured."""
if not settings.ai_enabled:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="AI flow builder is not configured. Set ANTHROPIC_API_KEY.",
)
@router.get("/quota", response_model=AIQuotaStatusResponse)
async def get_quota(
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Get current user's AI quota status."""
if not settings.ai_enabled:
return AIQuotaStatusResponse(
plan="free",
monthly_used=0,
monthly_limit=None,
monthly_reset_at="",
daily_used=0,
daily_limit=None,
daily_reset_at="",
allowed=False,
ai_enabled=False,
)
_, quota_status = await check_ai_quota(
user_id=current_user.id,
account_id=current_user.account_id,
db=db,
billing_anchor=current_user.ai_billing_cycle_anchor_at,
)
return AIQuotaStatusResponse(
**quota_status,
ai_enabled=True,
)
@router.post("/start", response_model=AIStartResponse, status_code=201)
@limiter.limit("10/minute")
async def start_conversation(
request: Request,
data: AIStartRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
):
"""Stage 1: Create a new AI wizard conversation with foundation metadata."""
_require_ai_enabled()
# Check daily quota (anti-abuse)
allowed, quota_status = await check_ai_quota(
user_id=current_user.id,
account_id=current_user.account_id,
db=db,
billing_anchor=current_user.ai_billing_cycle_anchor_at,
)
if not allowed:
reset_key = (
"daily_reset_at"
if quota_status.get("deny_reason") == "daily"
else "monthly_reset_at"
)
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail={
"message": f"AI build limit exceeded ({quota_status['deny_reason']})",
"reset_at": quota_status.get(reset_key),
"quota": quota_status,
},
)
wizard_state = {
"flow_type": data.flow_type,
"name": data.name,
"description": data.description,
"environment_tags": data.environment_tags,
"category_id": str(data.category_id) if data.category_id else None,
}
conversation = await create_conversation(
user_id=current_user.id,
account_id=current_user.account_id,
wizard_state=wizard_state,
db=db,
)
await db.commit()
return AIStartResponse(
conversation_id=conversation.id,
status=conversation.status,
)
@router.post("/scaffold", response_model=AIScaffoldResponse)
@limiter.limit("10/minute")
async def scaffold(
request: Request,
data: AIScaffoldRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
):
"""Stage 2: AI suggests top-level branches."""
_require_ai_enabled()
conversation = await get_conversation(
data.conversation_id, current_user.id, db
)
# Check per-flow call limit
if conversation.question_rounds >= settings.AI_MAX_CALLS_PER_FLOW:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Maximum AI calls per flow exceeded",
)
plan = await get_user_plan(current_user.account_id, db)
try:
branches, input_tokens, output_tokens, cost = await scaffold_branches(
conversation.wizard_state,
)
except anthropic.APIError as e:
await record_ai_usage(
user_id=current_user.id,
account_id=current_user.account_id,
conversation_id=conversation.id,
generation_type="scaffold",
tier=plan,
input_tokens=0,
output_tokens=0,
estimated_cost=0,
succeeded=False,
counts_toward_quota=False,
error_code=type(e).__name__,
extra_data={"error": str(e)},
db=db,
)
await db.commit()
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="AI provider error. Please try again.",
)
except ValueError as e:
await record_ai_usage(
user_id=current_user.id,
account_id=current_user.account_id,
conversation_id=conversation.id,
generation_type="scaffold",
tier=plan,
input_tokens=0,
output_tokens=0,
estimated_cost=0,
succeeded=False,
counts_toward_quota=False,
error_code="invalid_output",
extra_data={"error": str(e)},
db=db,
)
await db.commit()
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"AI returned invalid output: {e}",
)
# Record successful usage
await record_ai_usage(
user_id=current_user.id,
account_id=current_user.account_id,
conversation_id=conversation.id,
generation_type="scaffold",
tier=plan,
input_tokens=input_tokens,
output_tokens=output_tokens,
estimated_cost=cost,
succeeded=True,
counts_toward_quota=False,
error_code=None,
extra_data=None,
db=db,
)
# Update conversation state
wizard_state = dict(conversation.wizard_state)
wizard_state["branches"] = branches
await update_conversation(
conversation.id,
current_user.id,
{
"status": "scaffolding",
"wizard_state": wizard_state,
"question_rounds": conversation.question_rounds + 1,
},
db,
)
await db.commit()
return AIScaffoldResponse(
conversation_id=conversation.id,
branches=branches,
status="scaffolding",
)
@router.post("/branch-detail", response_model=AIBranchDetailResponse)
@limiter.limit("10/minute")
async def branch_detail(
request: Request,
data: AIBranchDetailRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
):
"""Stage 3: AI generates detailed nodes for one branch."""
_require_ai_enabled()
conversation = await get_conversation(
data.conversation_id, current_user.id, db
)
if conversation.question_rounds >= settings.AI_MAX_CALLS_PER_FLOW:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Maximum AI calls per flow exceeded",
)
wizard_state = conversation.wizard_state
existing_branches = [
b.get("name", "") for b in wizard_state.get("branches", [])
]
plan = await get_user_plan(current_user.account_id, db)
try:
branch_tree, input_tokens, output_tokens, cost = (
await generate_branch_detail(
wizard_state,
data.branch_name,
existing_branches,
)
)
except anthropic.APIError as e:
await record_ai_usage(
user_id=current_user.id,
account_id=current_user.account_id,
conversation_id=conversation.id,
generation_type="branch_detail",
tier=plan,
input_tokens=0,
output_tokens=0,
estimated_cost=0,
succeeded=False,
counts_toward_quota=False,
error_code=type(e).__name__,
extra_data={"error": str(e), "branch_name": data.branch_name},
db=db,
)
await db.commit()
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="AI provider error. Please try again.",
)
except ValueError as e:
await record_ai_usage(
user_id=current_user.id,
account_id=current_user.account_id,
conversation_id=conversation.id,
generation_type="branch_detail",
tier=plan,
input_tokens=0,
output_tokens=0,
estimated_cost=0,
succeeded=False,
counts_toward_quota=False,
error_code="invalid_output",
extra_data={"error": str(e), "branch_name": data.branch_name},
db=db,
)
await db.commit()
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"AI returned invalid output: {e}",
)
# Record successful usage
await record_ai_usage(
user_id=current_user.id,
account_id=current_user.account_id,
conversation_id=conversation.id,
generation_type="branch_detail",
tier=plan,
input_tokens=input_tokens,
output_tokens=output_tokens,
estimated_cost=cost,
succeeded=True,
counts_toward_quota=False,
error_code=None,
extra_data={"branch_name": data.branch_name},
db=db,
)
# Update conversation
await update_conversation(
conversation.id,
current_user.id,
{
"status": "detailing",
"question_rounds": conversation.question_rounds + 1,
},
db,
)
await db.commit()
return AIBranchDetailResponse(
conversation_id=conversation.id,
branch_name=data.branch_name,
steps=branch_tree,
status="detailing",
)
@router.post("/assemble", response_model=AIAssembleResponse)
@limiter.limit("10/minute")
async def assemble(
request: Request,
data: AIAssembleRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
):
"""Stage 4: Assemble selected branches into a complete tree (no AI calls)."""
conversation = await get_conversation(
data.conversation_id, current_user.id, db
)
wizard_state = conversation.wizard_state
branches_for_assembly = [b.model_dump() for b in data.selected_branches]
try:
tree_structure, name, description, stats = assemble_tree(
wizard_state, branches_for_assembly
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(e),
)
# Record quota-consuming usage on successful assembly
plan = await get_user_plan(current_user.account_id, db)
await record_ai_usage(
user_id=current_user.id,
account_id=current_user.account_id,
conversation_id=conversation.id,
generation_type="tree",
tier=plan,
input_tokens=0,
output_tokens=0,
estimated_cost=0,
succeeded=True,
counts_toward_quota=True,
error_code=None,
extra_data={"stats": stats},
db=db,
)
# Update conversation with assembled tree
await update_conversation(
conversation.id,
current_user.id,
{
"status": "completed",
"generated_tree": tree_structure,
},
db,
)
await db.commit()
return AIAssembleResponse(
tree_structure=tree_structure,
suggested_name=name,
suggested_description=description,
summary=stats,
status="completed",
)

View File

@@ -229,6 +229,94 @@ async def list_categories(
return sorted(categories)
# --- Pinned Flows Endpoints (must be before /{tree_id} to avoid route shadowing) ---
MAX_PINNED_FLOWS = 15
@router.get("/pinned", response_model=PinnedFlowsListResponse)
async def list_pinned_flows(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""List user's pinned flows, ordered by display_order."""
result = await db.execute(
select(UserPinnedTree, Tree)
.join(Tree, UserPinnedTree.tree_id == Tree.id)
.options(selectinload(Tree.category_rel))
.where(
UserPinnedTree.user_id == current_user.id,
Tree.is_active == True,
Tree.deleted_at.is_(None)
)
.order_by(UserPinnedTree.display_order, UserPinnedTree.pinned_at)
)
rows = result.all()
items = []
for pin, tree in rows:
items.append(PinnedFlowResponse(
id=pin.id,
tree_id=tree.id,
tree_name=tree.name,
tree_type=tree.tree_type,
category_emoji=None,
category_name=tree.category_rel.name if tree.category_rel else None,
pinned_at=pin.pinned_at,
display_order=pin.display_order,
))
return PinnedFlowsListResponse(items=items, count=len(items))
@router.patch("/pinned/reorder", response_model=PinnedFlowsListResponse)
async def reorder_pinned_flows(
reorder_data: PinnedFlowReorderRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Update display_order for all pinned flows."""
for item in reorder_data.order:
await db.execute(
update(UserPinnedTree)
.where(
UserPinnedTree.user_id == current_user.id,
UserPinnedTree.tree_id == item.tree_id
)
.values(display_order=item.display_order)
)
await db.commit()
# Return updated list
result = await db.execute(
select(UserPinnedTree, Tree)
.join(Tree, UserPinnedTree.tree_id == Tree.id)
.options(selectinload(Tree.category_rel))
.where(
UserPinnedTree.user_id == current_user.id,
Tree.is_active == True,
Tree.deleted_at.is_(None)
)
.order_by(UserPinnedTree.display_order, UserPinnedTree.pinned_at)
)
rows = result.all()
items = []
for pin, tree in rows:
items.append(PinnedFlowResponse(
id=pin.id,
tree_id=tree.id,
tree_name=tree.name,
tree_type=tree.tree_type,
category_emoji=None,
category_name=tree.category_rel.name if tree.category_rel else None,
pinned_at=pin.pinned_at,
display_order=pin.display_order,
))
return PinnedFlowsListResponse(items=items, count=len(items))
@router.get("/search", response_model=list[TreeListResponse])
async def search_trees(
db: Annotated[AsyncSession, Depends(get_db)],
@@ -1034,46 +1122,6 @@ async def check_tree_can_publish(
)
# --- Pinned Flows Endpoints ---
MAX_PINNED_FLOWS = 15
@router.get("/pinned", response_model=PinnedFlowsListResponse)
async def list_pinned_flows(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""List user's pinned flows, ordered by display_order."""
result = await db.execute(
select(UserPinnedTree, Tree)
.join(Tree, UserPinnedTree.tree_id == Tree.id)
.options(selectinload(Tree.category_rel))
.where(
UserPinnedTree.user_id == current_user.id,
Tree.is_active == True,
Tree.deleted_at.is_(None)
)
.order_by(UserPinnedTree.display_order, UserPinnedTree.pinned_at)
)
rows = result.all()
items = []
for pin, tree in rows:
items.append(PinnedFlowResponse(
id=pin.id,
tree_id=tree.id,
tree_name=tree.name,
tree_type=tree.tree_type,
category_emoji=None,
category_name=tree.category_rel.name if tree.category_rel else None,
pinned_at=pin.pinned_at,
display_order=pin.display_order,
))
return PinnedFlowsListResponse(items=items, count=len(items))
@router.post("/{tree_id}/pin", response_model=PinnedFlowResponse)
async def pin_flow(
tree_id: UUID,
@@ -1166,51 +1214,3 @@ async def unpin_flow(
await db.delete(pin)
await db.commit()
return {"success": True}
@router.patch("/pinned/reorder", response_model=PinnedFlowsListResponse)
async def reorder_pinned_flows(
reorder_data: PinnedFlowReorderRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""Update display_order for all pinned flows."""
for item in reorder_data.order:
await db.execute(
update(UserPinnedTree)
.where(
UserPinnedTree.user_id == current_user.id,
UserPinnedTree.tree_id == item.tree_id
)
.values(display_order=item.display_order)
)
await db.commit()
# Return updated list
result = await db.execute(
select(UserPinnedTree, Tree)
.join(Tree, UserPinnedTree.tree_id == Tree.id)
.options(selectinload(Tree.category_rel))
.where(
UserPinnedTree.user_id == current_user.id,
Tree.is_active == True,
Tree.deleted_at.is_(None)
)
.order_by(UserPinnedTree.display_order, UserPinnedTree.pinned_at)
)
rows = result.all()
items = []
for pin, tree in rows:
items.append(PinnedFlowResponse(
id=pin.id,
tree_id=tree.id,
tree_name=tree.name,
tree_type=tree.tree_type,
category_emoji=None,
category_name=tree.category_rel.name if tree.category_rel else None,
pinned_at=pin.pinned_at,
display_order=pin.display_order,
))
return PinnedFlowsListResponse(items=items, count=len(items))

View File

@@ -5,6 +5,7 @@ from app.api.endpoints import ratings, analytics
from app.api.endpoints import target_lists
from app.api.endpoints import maintenance_schedules
from app.api.endpoints import feedback
from app.api.endpoints import ai_builder
api_router = APIRouter()
@@ -34,3 +35,4 @@ api_router.include_router(analytics.router)
api_router.include_router(target_lists.router)
api_router.include_router(maintenance_schedules.router)
api_router.include_router(feedback.router)
api_router.include_router(ai_builder.router)

View File

@@ -0,0 +1,87 @@
"""DB-backed CRUD for AI wizard conversation state.
Conversations have a 24-hour TTL. Every access validates ownership and expiry.
"""
import uuid
from datetime import datetime, timezone, timedelta
from typing import Any, Optional
from uuid import UUID
from fastapi import HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.models.ai_conversation import AIConversation
async def create_conversation(
user_id: UUID,
account_id: UUID,
wizard_state: dict[str, Any],
db: AsyncSession,
) -> AIConversation:
"""Create a new AI wizard conversation."""
conversation = AIConversation(
user_id=user_id,
account_id=account_id,
status="foundation",
wizard_state=wizard_state,
messages=[],
expires_at=datetime.now(timezone.utc)
+ timedelta(hours=settings.AI_CONVERSATION_TTL_HOURS),
)
db.add(conversation)
await db.flush()
return conversation
async def get_conversation(
conversation_id: UUID,
user_id: UUID,
db: AsyncSession,
) -> AIConversation:
"""Get a conversation, validating ownership and expiry.
Raises HTTPException 410 if expired, 404 if not found or wrong owner.
"""
result = await db.execute(
select(AIConversation).where(AIConversation.id == conversation_id)
)
conversation = result.scalar_one_or_none()
if not conversation or conversation.user_id != user_id:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Conversation not found",
)
if conversation.expires_at < datetime.now(timezone.utc):
conversation.status = "expired"
await db.flush()
raise HTTPException(
status_code=status.HTTP_410_GONE,
detail="Conversation expired. Please start a new AI build.",
)
return conversation
async def update_conversation(
conversation_id: UUID,
user_id: UUID,
updates: dict[str, Any],
db: AsyncSession,
) -> AIConversation:
"""Update a conversation's fields.
Validates ownership and expiry before updating.
"""
conversation = await get_conversation(conversation_id, user_id, db)
for key, value in updates.items():
if hasattr(conversation, key):
setattr(conversation, key, value)
await db.flush()
return conversation

View File

@@ -0,0 +1,186 @@
"""AI generation quota management.
Enforces monthly and daily limits on AI flow builder usage.
Monthly quota consumed only on successful tree assembly (counts_toward_quota=True).
Daily limit is an anti-abuse guard consumed on conversation start.
"""
import calendar
from datetime import datetime, timezone, timedelta
from typing import Optional
from uuid import UUID
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.ai_usage import AIUsage
from app.models.plan_limits import PlanLimits
from app.models.account_limit_override import AccountLimitOverride
from app.core.subscriptions import get_account_subscription, get_plan_limits
async def get_user_plan(account_id: Optional[UUID], db: AsyncSession) -> str:
"""Get the plan tier for an account."""
if not account_id:
return "free"
sub = await get_account_subscription(account_id, db)
if sub is None:
return "free"
return sub.plan if sub.plan else "free"
async def _get_effective_limits(
account_id: UUID, plan: str, db: AsyncSession
) -> tuple[Optional[int], Optional[int]]:
"""Get effective AI limits (monthly, daily), applying account overrides.
Returns (monthly_limit, daily_limit). None means unlimited.
"""
limits = await get_plan_limits(plan, db)
monthly = limits.max_ai_builds_per_month if limits else None
daily = limits.max_ai_builds_per_24h if limits else None
# Check for account-level overrides
result = await db.execute(
select(AccountLimitOverride).where(
AccountLimitOverride.account_id == account_id
)
)
override = result.scalar_one_or_none()
if override:
if override.override_max_ai_builds_per_month is not None:
monthly = override.override_max_ai_builds_per_month
if override.override_max_ai_builds_per_24h is not None:
daily = override.override_max_ai_builds_per_24h
return monthly, daily
def _get_billing_anchor_month_start(anchor: Optional[datetime]) -> datetime:
"""Calculate the start of the current billing month from the anchor date.
If the anchor is day 15, the billing month runs from the 15th of each month.
Falls back to calendar month if anchor is None.
"""
now = datetime.now(timezone.utc)
if not anchor:
return now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
anchor_day = min(anchor.day, 28) # Clamp to avoid month overflow
this_month_anchor = now.replace(
day=anchor_day, hour=0, minute=0, second=0, microsecond=0
)
if now >= this_month_anchor:
return this_month_anchor
else:
# We're before the anchor day, so billing month started last month
if now.month == 1:
return this_month_anchor.replace(year=now.year - 1, month=12)
else:
return this_month_anchor.replace(month=now.month - 1)
async def check_ai_quota(
user_id: UUID,
account_id: UUID,
db: AsyncSession,
billing_anchor: Optional[datetime] = None,
) -> tuple[bool, dict]:
"""Check if user can make an AI generation.
Returns (allowed, quota_status_dict).
Monthly counts only rows with counts_toward_quota=True.
Daily counts only rows with generation_type in ('scaffold', 'branch_detail').
"""
plan = await get_user_plan(account_id, db)
monthly_limit, daily_limit = await _get_effective_limits(account_id, plan, db)
now = datetime.now(timezone.utc)
month_start = _get_billing_anchor_month_start(billing_anchor)
day_start = now - timedelta(hours=24)
# Monthly: count successful quota-consuming records
monthly_count = await db.scalar(
select(func.count(AIUsage.id)).where(
AIUsage.user_id == user_id,
AIUsage.counts_toward_quota == True, # noqa: E712
AIUsage.created_at >= month_start,
)
) or 0
# Daily: count all AI API calls (scaffold + branch_detail) in last 24h
daily_count = await db.scalar(
select(func.count(AIUsage.id)).where(
AIUsage.user_id == user_id,
AIUsage.succeeded == True, # noqa: E712
AIUsage.generation_type.in_(["scaffold", "branch_detail"]),
AIUsage.created_at >= day_start,
)
) or 0
allowed = True
deny_reason = None
if monthly_limit is not None and monthly_count >= monthly_limit:
allowed = False
deny_reason = "monthly"
if daily_limit is not None and daily_count >= daily_limit:
allowed = False
deny_reason = "daily"
# Calculate reset timestamps
next_month = month_start.month % 12 + 1
next_year = month_start.year + (1 if month_start.month == 12 else 0)
max_day = calendar.monthrange(next_year, next_month)[1]
monthly_reset_at = month_start.replace(
month=next_month,
year=next_year,
day=min(month_start.day, max_day),
)
daily_reset_at = day_start + timedelta(hours=24)
return allowed, {
"plan": plan,
"monthly_used": monthly_count,
"monthly_limit": monthly_limit,
"monthly_reset_at": monthly_reset_at.isoformat(),
"daily_used": daily_count,
"daily_limit": daily_limit,
"daily_reset_at": daily_reset_at.isoformat(),
"allowed": allowed,
"deny_reason": deny_reason,
}
async def record_ai_usage(
user_id: UUID,
account_id: UUID,
conversation_id: Optional[UUID],
generation_type: str,
tier: str,
input_tokens: int,
output_tokens: int,
estimated_cost: float,
succeeded: bool,
counts_toward_quota: bool,
error_code: Optional[str],
extra_data: Optional[dict],
db: AsyncSession,
) -> AIUsage:
"""Record an AI usage entry."""
usage = AIUsage(
user_id=user_id,
account_id=account_id,
conversation_id=conversation_id,
generation_type=generation_type,
tier_at_time=tier,
input_tokens=input_tokens,
output_tokens=output_tokens,
estimated_cost_usd=estimated_cost,
succeeded=succeeded,
counts_toward_quota=counts_toward_quota,
error_code=error_code,
extra_data=extra_data or {},
)
db.add(usage)
await db.flush()
return usage

View File

@@ -0,0 +1,322 @@
"""AI-powered tree generation service using Anthropic Claude API.
Implements the 4-stage wizard flow:
Stage 2 (scaffold): AI suggests 4-7 top-level branches
Stage 3 (branch_detail): AI generates detailed nodes per branch
Stage 4 (assemble): Pure assembly logic — zero AI calls
System prompts are static constants to enable Anthropic prompt caching.
"""
import json
import logging
import re
import uuid
from typing import Any
import anthropic
from app.core.config import settings
from app.core.ai_tree_validator import validate_generated_tree, count_tree_stats
logger = logging.getLogger(__name__)
# ── Cost estimation (Haiku 4.5 pricing) ──
COST_PER_INPUT_TOKEN = 1.0 / 1_000_000 # $1.00 per 1M input tokens
COST_PER_OUTPUT_TOKEN = 5.0 / 1_000_000 # $5.00 per 1M output tokens
# ── System Prompts ──
SCAFFOLD_SYSTEM_PROMPT = """You are ResolutionFlow AI, assisting MSP engineers to build troubleshooting and procedural flows for IT service management.
Context: Your audience is technical MSP staff experienced with Windows Server, Active Directory, networking, and common MSP tooling (ConnectWise, Datto, SonicWall, etc.).
Task: Given a flow type, category, name, description, and environment tags, suggest 4-7 top-level branches for the flow.
For TROUBLESHOOTING flows:
- Branches should describe the symptom the user observes — written as what the user sees or reports
- The branch name becomes a selectable option on the first screen, so it must be self-identifying from a user's perspective
- Good: "Drive letter missing after login", "Mapped drive shows as disconnected (red X)", "Access denied when opening files"
- Bad: "Authentication Failures", "GPO Issues", "Connectivity Problems" — too vague for users to self-identify
- Order from most common to least common
For PROCEDURE flows:
- Branches should be phase-based stages (e.g., "Prerequisites", "Configuration", "Verification", "Documentation")
- Each branch represents a major step in the process
- Order in logical execution sequence
Rules:
- Suggest 4-7 branches
- Be specific to the technology/service described — avoid generic internal category names
- Branch names should be concise (3-7 words) and describe the observable symptom or phase
- Each branch needs a brief description (1 sentence) explaining what scenarios it covers
- Return ONLY valid JSON, no markdown, no explanation
Output format:
{"branches": [{"name": "Branch Name", "description": "Brief description of what this covers"}]}"""
BRANCH_DETAIL_SYSTEM_PROMPT = """You are ResolutionFlow AI generating step-by-step detail for one branch of a troubleshooting or procedural flow for MSP engineers.
Context: Your audience is technical MSP staff experienced with Windows Server, Active Directory, networking, and common MSP tooling.
You must return ONLY valid JSON — no markdown, no code fences, no explanation.
Required node schema:
Decision nodes (branching diagnostic questions — choose the right path):
{"id": "unique-slug", "type": "decision", "question": "The diagnostic question", "help_text": "Optional context or command hint", "options": [{"id": "opt-id", "label": "Specific observable answer", "next_node_id": "child-node-id"}], "children": [<all child nodes listed here>]}
Action nodes (a single investigation or remediation step — MUST have next_node_id pointing to the next node):
{"id": "unique-slug", "type": "action", "title": "Short title", "description": "Detailed instructions", "commands": ["PowerShell or CMD commands"], "expected_outcome": "What success looks like", "next_node_id": "id-of-next-sibling-node"}
Solution nodes (leaf nodes — the final resolution, no children):
{"id": "unique-slug", "type": "solution", "title": "Resolution title", "description": "Full resolution description", "resolution_steps": ["Step 1", "Step 2"]}
CRITICAL NAVIGATION RULES:
- Decision node: each option's next_node_id MUST exactly match the "id" of a direct child in that decision node's "children" array
- Action node: next_node_id MUST exactly match the "id" of a sibling node (another child of the same parent decision node)
- Every action node MUST have a next_node_id — action nodes with no next step are broken dead ends
- Solution nodes have no children and no next_node_id — they are the terminus
- Every path through the tree MUST end at a solution node
Additional rules:
1. Generate 4-10 nodes total for this branch
2. Start with a decision node if troubleshooting, action node if procedure
3. Decision nodes must have at least 2 options with specific, observable answer choices
4. Include realistic MSP commands (PowerShell preferred for Windows)
5. Use unique node IDs prefixed with the branch context (e.g., "gpo-check-link")
6. Build the tree bottom-up in your head: create solution/leaf nodes first, then build parent nodes referencing their IDs
Few-shot example showing correct action node next_node_id usage:
{"id": "dns-root", "type": "decision", "question": "Can the client resolve any DNS names?", "help_text": "Run: nslookup google.com", "options": [{"id": "dns-opt-none", "label": "No — nslookup times out or returns 'server failed'", "next_node_id": "dns-check-service"}, {"id": "dns-opt-partial", "label": "Some names resolve but others fail", "next_node_id": "dns-check-specific"}], "children": [{"id": "dns-check-service", "type": "action", "title": "Check DNS Client Service", "description": "Verify the DNS Client service is running on the affected machine", "commands": ["Get-Service -Name Dnscache | Select-Object Status,StartType"], "expected_outcome": "Status should be Running", "next_node_id": "dns-service-solution"}, {"id": "dns-service-solution", "type": "solution", "title": "DNS Service Was Stopped", "description": "The DNS Client service was stopped, preventing all name resolution", "resolution_steps": ["Run: Start-Service Dnscache", "Set startup type: Set-Service Dnscache -StartupType Automatic", "Flush cache: ipconfig /flushdns", "Test: nslookup google.com"]}, {"id": "dns-check-specific", "type": "solution", "title": "Selective DNS Failure — Stale or Missing Records", "description": "Some records resolve correctly, indicating DNS is functional but specific records are stale or missing", "resolution_steps": ["Check DNS server for missing A/CNAME records", "Clear DNS cache on the DNS server: Clear-DnsServerCache", "Flush client cache: ipconfig /flushdns", "Verify with: nslookup <failing-hostname>"]}]}"""
CORRECTIVE_PROMPT_TEMPLATE = """Your previous JSON was invalid for ResolutionFlow's tree schema.
Validation errors:
{error_list}
IMPORTANT: If any error mentions a next_node_id referencing a non-existent child, you must ensure every option's next_node_id exactly matches the "id" field of one of the node's direct children. The child node must exist in the "children" array of the same parent node.
Return a corrected full JSON object only. No markdown, no prose, no code fences.
Fix ALL listed errors while maintaining the same troubleshooting/procedural logic."""
def _strip_markdown_fences(text: str) -> str:
"""Strip markdown code fences if the model wrapped its JSON response."""
text = text.strip()
match = re.match(r"^```(?:json)?\s*([\s\S]*?)```$", text)
if match:
return match.group(1).strip()
return text
def _get_client() -> anthropic.AsyncAnthropic:
"""Get configured async Anthropic client."""
if not settings.ANTHROPIC_API_KEY:
raise RuntimeError("ANTHROPIC_API_KEY not configured")
return anthropic.AsyncAnthropic(
api_key=settings.ANTHROPIC_API_KEY,
timeout=settings.AI_REQUEST_TIMEOUT_SECONDS,
)
def _estimate_cost(input_tokens: int, output_tokens: int) -> float:
"""Estimate USD cost from token counts."""
return (input_tokens * COST_PER_INPUT_TOKEN) + (
output_tokens * COST_PER_OUTPUT_TOKEN
)
async def scaffold_branches(
wizard_state: dict[str, Any],
) -> tuple[list[dict[str, str]], int, int, float]:
"""Stage 2: AI suggests top-level branches.
Returns (branches, input_tokens, output_tokens, estimated_cost).
Raises ValueError on invalid response.
"""
client = _get_client()
flow_type = wizard_state.get("flow_type", "troubleshooting")
name = wizard_state.get("name", "")
description = wizard_state.get("description", "")
tags = wizard_state.get("environment_tags", [])
user_message = (
f"Flow type: {flow_type}\n"
f"Name: {name}\n"
f"Description: {description}\n"
)
if tags:
user_message += f"Environment: {', '.join(tags)}\n"
response = await client.messages.create(
model=settings.AI_MODEL,
max_tokens=1024,
system=SCAFFOLD_SYSTEM_PROMPT,
messages=[{"role": "user", "content": user_message}],
)
raw_text = _strip_markdown_fences(response.content[0].text)
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
cost = _estimate_cost(input_tokens, output_tokens)
try:
data = json.loads(raw_text)
except json.JSONDecodeError as e:
raise ValueError(f"AI returned invalid JSON: {e}")
branches = data.get("branches", [])
if not isinstance(branches, list) or len(branches) < 2:
raise ValueError("AI returned fewer than 2 branches")
return branches, input_tokens, output_tokens, cost
async def generate_branch_detail(
wizard_state: dict[str, Any],
branch_name: str,
existing_branches: list[str],
) -> tuple[dict[str, Any], int, int, float]:
"""Stage 3: AI generates detailed nodes for one branch.
Returns (branch_tree, input_tokens, output_tokens, estimated_cost).
On validation failure, retries once with corrective prompt.
Raises ValueError if both attempts fail.
"""
client = _get_client()
flow_type = wizard_state.get("flow_type", "troubleshooting")
name = wizard_state.get("name", "")
description = wizard_state.get("description", "")
user_message = (
f"Flow: {name} ({flow_type})\n"
f"Description: {description}\n"
f"Branch to detail: {branch_name}\n"
)
if existing_branches:
other = [b for b in existing_branches if b != branch_name]
if other:
user_message += f"Other branches (avoid overlap): {', '.join(other)}\n"
messages = [{"role": "user", "content": user_message}]
total_input = 0
total_output = 0
for attempt in range(3):
response = await client.messages.create(
model=settings.AI_MODEL,
max_tokens=8192,
system=BRANCH_DETAIL_SYSTEM_PROMPT,
messages=messages,
)
total_input += response.usage.input_tokens
total_output += response.usage.output_tokens
logger.debug(
"branch_detail attempt=%d stop_reason=%s content_blocks=%d output_tokens=%d",
attempt,
response.stop_reason,
len(response.content),
response.usage.output_tokens,
)
raw_text = _strip_markdown_fences(response.content[0].text) if response.content else ""
if not raw_text:
logger.warning("branch_detail attempt=%d returned empty text, stop_reason=%s", attempt, response.stop_reason)
try:
branch_tree = json.loads(raw_text)
except json.JSONDecodeError as e:
if attempt < 2:
messages.append({"role": "assistant", "content": raw_text})
messages.append({
"role": "user",
"content": CORRECTIVE_PROMPT_TEMPLATE.format(
error_list=f"JSON parse error: {e}"
),
})
continue
raise ValueError(f"AI returned invalid JSON after retry: {e}")
errors = validate_generated_tree(branch_tree)
if not errors:
cost = _estimate_cost(total_input, total_output)
return branch_tree, total_input, total_output, cost
if attempt < 2:
messages.append({"role": "assistant", "content": raw_text})
messages.append({
"role": "user",
"content": CORRECTIVE_PROMPT_TEMPLATE.format(
error_list="\n".join(f"- {e}" for e in errors)
),
})
continue
raise ValueError(
f"AI tree validation failed after retry: {'; '.join(errors)}"
)
# Should not reach here
raise ValueError("Branch detail generation failed")
def assemble_tree(
wizard_state: dict[str, Any],
branches: list[dict[str, Any]],
) -> tuple[dict[str, Any], str, str, dict[str, int]]:
"""Stage 4: Assemble branches into a complete tree.
Zero AI calls — pure assembly logic.
Returns (tree_structure, suggested_name, suggested_description, summary_stats).
"""
flow_type = wizard_state.get("flow_type", "troubleshooting")
name = wizard_state.get("name", "Untitled Flow")
description = wizard_state.get("description", "")
# Build root decision node pointing to each branch
options = []
children = []
for i, branch in enumerate(branches):
branch_name = branch.get("name", f"Branch {i + 1}")
branch_tree = branch.get("steps")
if not branch_tree or not isinstance(branch_tree, dict):
# Skip branches without detail
continue
branch_id = branch_tree.get("id", f"branch_{i}")
options.append({
"id": f"opt_{i + 1}",
"label": branch_name,
"next_node_id": branch_id,
})
children.append(branch_tree)
if len(options) < 2:
raise ValueError("Need at least 2 branches with detail to assemble a tree")
# Determine root question based on flow type
if flow_type == "troubleshooting":
root_question = f"What is the user experiencing? Select the symptom that best matches their report."
root_help = "Choose the option that most closely describes the user's reported problem."
else:
root_question = f"Which phase of {name} are you working on?"
root_help = None
tree_structure = {
"id": "root",
"type": "decision",
"question": root_question,
**({"help_text": root_help} if root_help else {}),
"options": options,
"children": children,
}
stats = count_tree_stats(tree_structure)
return tree_structure, name, description, stats

View File

@@ -0,0 +1,217 @@
"""Validation for AI-generated tree structures.
Ensures generated trees conform to ResolutionFlow's node schema
before they are saved to the database.
"""
from typing import Any
VALID_NODE_TYPES = {"decision", "action", "solution"}
# Required fields per node type
REQUIRED_FIELDS = {
"decision": {"id", "type", "question", "options", "children"},
"action": {"id", "type", "title", "description"},
"solution": {"id", "type", "title", "description"},
}
class TreeValidationError(Exception):
"""Raised when a generated tree fails validation."""
def __init__(self, errors: list[str]):
self.errors = errors
super().__init__(f"Tree validation failed: {'; '.join(errors)}")
def validate_generated_tree(tree: dict[str, Any]) -> list[str]:
"""Validate an AI-generated tree structure.
Returns a list of error strings. Empty list means valid.
"""
errors: list[str] = []
if not isinstance(tree, dict):
return ["Tree must be a JSON object"]
# Root must be a decision node
if tree.get("type") != "decision":
errors.append("Root node must be type 'decision'")
# Collect all node IDs and validate structure
all_ids: set[str] = set()
all_referenced_ids: set[str] = set()
node_count = 0
solution_count = 0
def _validate_node(node: dict[str, Any], path: str) -> None:
nonlocal node_count, solution_count
if not isinstance(node, dict):
errors.append(f"Node at {path} is not an object")
return
node_count += 1
node_type = node.get("type")
node_id = node.get("id")
# Check node ID
if not node_id:
errors.append(f"Node at {path} missing 'id'")
elif node_id in all_ids:
errors.append(f"Duplicate node ID: '{node_id}'")
else:
all_ids.add(node_id)
# Check node type
if node_type not in VALID_NODE_TYPES:
errors.append(
f"Node '{node_id or path}' has invalid type '{node_type}'. "
f"Must be one of: {', '.join(sorted(VALID_NODE_TYPES))}"
)
return
# Check required fields
required = REQUIRED_FIELDS[node_type]
missing = required - set(node.keys())
if missing:
errors.append(
f"Node '{node_id}' (type={node_type}) missing fields: {', '.join(sorted(missing))}"
)
# Type-specific validation
if node_type == "decision":
options = node.get("options", [])
if not isinstance(options, list) or len(options) < 2:
errors.append(
f"Decision node '{node_id}' must have at least 2 options"
)
else:
children = node.get("children", [])
child_ids = {c.get("id") for c in children if isinstance(c, dict)}
option_ids: set[str] = set()
for opt in options:
if not isinstance(opt, dict):
errors.append(f"Option in node '{node_id}' is not an object")
continue
opt_id = opt.get("id")
if opt_id and opt_id in option_ids:
errors.append(
f"Duplicate option ID '{opt_id}' in node '{node_id}'"
)
if opt_id:
option_ids.add(opt_id)
next_id = opt.get("next_node_id")
if next_id:
all_referenced_ids.add(next_id)
if child_ids and next_id not in child_ids:
errors.append(
f"Option '{opt.get('label', '?')}' in node '{node_id}' "
f"references non-existent child '{next_id}'"
)
elif node_type == "action":
next_id = node.get("next_node_id")
if not next_id:
errors.append(
f"Action node '{node_id}' is missing 'next_node_id'. "
"Every action node must point to the next node (a sibling in the parent's children)."
)
else:
all_referenced_ids.add(next_id)
elif node_type == "solution":
solution_count += 1
# Recurse into children
for i, child in enumerate(node.get("children", [])):
_validate_node(child, f"{path}.children[{i}]")
_validate_node(tree, "root")
# Global checks
if node_count < 5:
errors.append(
f"Tree has only {node_count} nodes. Minimum 5 required for a useful tree."
)
if node_count > 50:
errors.append(
f"Tree has {node_count} nodes. Maximum 50 allowed."
)
if solution_count < 2:
errors.append(
f"Tree has only {solution_count} solution nodes. "
"Need at least 2 to cover different resolution paths."
)
# Check that all leaf (non-solution) nodes have children or are solutions
_check_branch_termination(tree, errors)
return errors
def _check_branch_termination(node: dict[str, Any], errors: list[str]) -> None:
"""Verify every branch eventually reaches a solution node.
Action nodes continue via next_node_id (validated separately).
Only decision nodes with no children are dead ends.
"""
if not isinstance(node, dict):
return
node_type = node.get("type")
node_id = node.get("id", "?")
children = node.get("children", [])
if node_type == "solution":
return # Solution is a valid terminus
if node_type == "action":
# Action nodes continue via next_node_id (a sibling), not children.
# next_node_id presence is validated in _validate_node.
# Recurse into children if present (non-standard but tolerate it).
for child in children:
_check_branch_termination(child, errors)
return
# Decision node: must have children
if not children:
errors.append(
f"Decision node '{node_id}' is a dead end — "
"it has no children"
)
return
for child in children:
_check_branch_termination(child, errors)
def count_tree_stats(tree: dict[str, Any]) -> dict[str, int]:
"""Count node types and calculate depth of a tree."""
stats = {
"node_count": 0,
"decision_count": 0,
"action_count": 0,
"solution_count": 0,
"depth": 0,
}
def _count(node: dict[str, Any], depth: int) -> None:
if not isinstance(node, dict):
return
stats["node_count"] += 1
node_type = node.get("type", "")
if node_type == "decision":
stats["decision_count"] += 1
elif node_type == "action":
stats["action_count"] += 1
elif node_type == "solution":
stats["solution_count"] += 1
stats["depth"] = max(stats["depth"], depth)
for child in node.get("children", []):
_count(child, depth + 1)
_count(tree, 1)
return stats

View File

@@ -72,6 +72,18 @@ class Settings(BaseSettings):
"""Check if Stripe is configured."""
return self.STRIPE_SECRET_KEY is not None and self.STRIPE_WEBHOOK_SECRET is not None
# AI Flow Builder
ANTHROPIC_API_KEY: Optional[str] = None
AI_MODEL: str = "claude-haiku-4-5-20251001"
AI_CONVERSATION_TTL_HOURS: int = 24
AI_MAX_CALLS_PER_FLOW: int = 10
AI_REQUEST_TIMEOUT_SECONDS: int = 45
@property
def ai_enabled(self) -> bool:
"""Check if AI Flow Builder is configured."""
return self.ANTHROPIC_API_KEY is not None
# Deployment auto-seed test data on PR environments
SEED_ON_DEPLOY: bool = False

View File

@@ -1,4 +1,4 @@
"""APScheduler integration for maintenance flow auto-session creation."""
"""APScheduler integration for maintenance flow auto-session creation and AI cleanup."""
import logging
import uuid
from datetime import datetime, timezone
@@ -7,8 +7,9 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.base import SchedulerNotRunningError
from apscheduler.jobstores.base import JobLookupError
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
import pytz
from sqlalchemy import select
from sqlalchemy import select, delete
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger(__name__)
@@ -114,6 +115,27 @@ async def _fire_maintenance_schedule(schedule_id: str) -> None:
await db.rollback()
async def _cleanup_expired_ai_conversations() -> None:
"""Delete expired AI wizard conversations."""
import app.models # noqa: F401
from app.core.database import async_session_maker
from app.models.ai_conversation import AIConversation
async with async_session_maker() as db:
try:
result = await db.execute(
delete(AIConversation).where(
AIConversation.expires_at < datetime.now(timezone.utc)
)
)
if result.rowcount > 0:
logger.info(f"Cleaned up {result.rowcount} expired AI conversation(s)")
await db.commit()
except Exception:
logger.exception("Error cleaning up expired AI conversations")
await db.rollback()
async def load_all_schedules(db: AsyncSession) -> None:
"""Load all active schedules into APScheduler on startup."""
# Import all models to ensure SQLAlchemy mapper relationships resolve

View File

@@ -28,7 +28,7 @@ def convert_session_to_tree(
return {
"id": str(uuid.uuid4()),
"type": "solution",
"solution": "Session had no recorded path",
"title": "Session had no recorded path",
"children": []
}
@@ -63,7 +63,7 @@ def convert_session_to_tree(
new_node = {
"id": node_id,
"type": "action",
"action": f"Step from original tree (node {node_id})",
"title": f"Step from original tree (node {node_id})",
"children": []
}
@@ -130,15 +130,15 @@ def _create_node_from_original(
if decision and decision.get("answer"):
new_node["question"] += f"\n\nAnswer: {decision['answer']}"
elif node_type == "action":
new_node["action"] = original_node.get("action", "")
new_node["title"] = original_node.get("title", original_node.get("action", ""))
if decision and decision.get("action_performed"):
new_node["action"] = decision["action_performed"]
new_node["title"] = decision["action_performed"]
if decision and decision.get("command_output"):
output = decision["command_output"].strip()
if output:
new_node["action"] += f"\n\nCommand Output:\n{output}"
new_node["title"] += f"\n\nCommand Output:\n{output}"
elif node_type == "solution":
new_node["solution"] = original_node.get("solution", "")
new_node["title"] = original_node.get("title", original_node.get("solution", ""))
return new_node
@@ -169,18 +169,18 @@ def _create_node_from_custom_step(
if step_type == "decision":
new_node["question"] = content
elif step_type == "action":
new_node["action"] = content
new_node["title"] = content
elif step_type == "solution":
new_node["solution"] = content
new_node["title"] = content
# Add notes if present
if custom_step.get("notes"):
if step_type == "decision":
new_node["question"] += f"\n\nNotes: {custom_step['notes']}"
elif step_type == "action":
new_node["action"] += f"\n\nNotes: {custom_step['notes']}"
new_node["title"] += f"\n\nNotes: {custom_step['notes']}"
elif step_type == "solution":
new_node["solution"] += f"\n\nNotes: {custom_step['notes']}"
new_node["title"] += f"\n\nNotes: {custom_step['notes']}"
return new_node

View File

@@ -83,17 +83,17 @@ def _validate_node(node: dict[str, Any], path: str, errors: list[dict[str, str]]
})
elif node_type == "action":
if "action" not in node or not node["action"]:
if "title" not in node or not node["title"]:
errors.append({
"field": f"{path}.action",
"message": "Action nodes must have a non-empty action"
"field": f"{path}.title",
"message": "Action nodes must have a non-empty title"
})
elif node_type == "solution":
if "solution" not in node or not node["solution"]:
if "title" not in node or not node["title"]:
errors.append({
"field": f"{path}.solution",
"message": "Solution nodes must have a non-empty solution"
"field": f"{path}.title",
"message": "Solution nodes must have a non-empty title"
})
elif node_type == "answer":

View File

@@ -13,7 +13,7 @@ from app.core.logging_config import setup_logging
from app.core.middleware import RequestLoggingMiddleware, ErrorLoggingMiddleware
from app.core.rate_limit import limiter
from app.api.router import api_router
from app.core.scheduler import scheduler, load_all_schedules
from app.core.scheduler import scheduler, load_all_schedules, _cleanup_expired_ai_conversations
# Initialize logging configuration
setup_logging()
@@ -103,10 +103,17 @@ async def lifespan(app: FastAPI):
# Note: In production, use Alembic migrations instead of init_db
# await init_db()
# Start maintenance schedule runner
# Start maintenance schedule runner + AI conversation cleanup
scheduler.start()
async with async_session_maker() as db:
await load_all_schedules(db)
scheduler.add_job(
_cleanup_expired_ai_conversations,
trigger="interval",
hours=1,
id="cleanup_ai_conversations",
replace_existing=True,
)
# Auto-seed trees in background on PR environments
seed_task = None

View File

@@ -26,6 +26,8 @@ from .user_pinned_tree import UserPinnedTree
from .target_list import TargetList
from .maintenance_schedule import MaintenanceSchedule
from .feedback import Feedback
from .ai_conversation import AIConversation
from .ai_usage import AIUsage
__all__ = [
"User",
@@ -63,4 +65,6 @@ __all__ = [
"TargetList",
"MaintenanceSchedule",
"Feedback",
"AIConversation",
"AIUsage",
]

View File

@@ -24,6 +24,8 @@ class AccountLimitOverride(Base):
override_max_trees: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
override_max_sessions_per_month: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
override_max_users: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
override_max_ai_builds_per_month: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
override_max_ai_builds_per_24h: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
note: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
created_by_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),

View File

@@ -0,0 +1,67 @@
"""AI Flow Builder conversation tracking.
Stores wizard session state across the 4-stage flow builder process.
Conversations expire after 24 hours and are cleaned up by the scheduler.
"""
import uuid
from datetime import datetime, timezone
from typing import Optional, Any
from sqlalchemy import String, DateTime, ForeignKey, Integer, Text
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
class AIConversation(Base):
__tablename__ = "ai_conversations"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
account_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("accounts.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
status: Mapped[str] = mapped_column(
String(20),
nullable=False,
default="foundation",
comment="foundation | scaffolding | detailing | reviewing | completed | expired",
)
# Conversation history across all wizard stages
messages: Mapped[list[dict[str, Any]]] = mapped_column(
JSONB, nullable=False, default=list
)
# Wizard state: Stage 1 metadata, Stage 2 branches, Stage 3 detail
wizard_state: Mapped[dict[str, Any]] = mapped_column(
JSONB, nullable=False, default=dict
)
# Assembled tree from Stage 4 (null until assembly)
generated_tree: Mapped[Optional[dict[str, Any]]] = mapped_column(
JSONB, nullable=True
)
# Tracks AI call count for per-flow limits
question_rounds: Mapped[int] = mapped_column(
Integer, nullable=False, default=0
)
expires_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)

View File

@@ -0,0 +1,69 @@
"""AI usage tracking for quota enforcement and cost visibility.
Every AI API call is recorded here. Only rows with counts_toward_quota=True
and succeeded=True are counted against the user's monthly quota.
"""
import uuid
from datetime import datetime, timezone
from typing import Optional, Any
from sqlalchemy import String, DateTime, ForeignKey, Integer, Boolean, Numeric
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
class AIUsage(Base):
__tablename__ = "ai_usage"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
account_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("accounts.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
conversation_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True),
ForeignKey("ai_conversations.id", ondelete="SET NULL"),
nullable=True,
)
generation_type: Mapped[str] = mapped_column(
String(20),
nullable=False,
comment="scaffold | branch_detail | branch_suggest",
)
tier_at_time: Mapped[str] = mapped_column(
String(20),
nullable=False,
comment="free | pro | team | enterprise",
)
input_tokens: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
output_tokens: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
estimated_cost_usd: Mapped[float] = mapped_column(
Numeric(10, 6), nullable=False, default=0
)
succeeded: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
counts_toward_quota: Mapped[bool] = mapped_column(
Boolean, nullable=False, default=False
)
error_code: Mapped[Optional[str]] = mapped_column(
String(100), nullable=True
)
extra_data: Mapped[dict[str, Any]] = mapped_column(
"metadata", JSONB, nullable=False, default=dict
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
index=True,
)

View File

@@ -14,3 +14,7 @@ class PlanLimits(Base):
custom_branding: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
priority_support: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
export_formats: Mapped[list] = mapped_column(JSONB, nullable=False, default=lambda: ["markdown", "text"])
# AI Flow Builder limits
max_ai_builds_per_month: Mapped[int | None] = mapped_column(Integer, nullable=True)
max_ai_builds_per_24h: Mapped[int | None] = mapped_column(Integer, nullable=True)

View File

@@ -67,6 +67,11 @@ class User(Base):
)
last_login: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
# AI billing cycle anchor (for quota reset calculation)
ai_billing_cycle_anchor_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True), nullable=True
)
# Soft delete
deleted_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True),

View File

@@ -5,6 +5,11 @@ from .session import SessionCreate, SessionUpdate, SessionResponse, SessionExpor
from .category import CategoryCreate, CategoryUpdate, CategoryResponse, CategoryListResponse
from .tag import TagCreate, TagResponse, TagListResponse, TagAssignment
from .folder import FolderCreate, FolderUpdate, FolderResponse, FolderListResponse, FolderReorderRequest, FolderTreeRequest
from .ai_builder import (
AIStartRequest, AIScaffoldRequest, AIBranchDetailRequest, AIAssembleRequest,
AIStartResponse, AIScaffoldResponse, AIBranchDetailResponse, AIAssembleResponse,
AIQuotaStatusResponse,
)
__all__ = [
# User
@@ -21,4 +26,8 @@ __all__ = [
"TagCreate", "TagResponse", "TagListResponse", "TagAssignment",
# Folder
"FolderCreate", "FolderUpdate", "FolderResponse", "FolderListResponse", "FolderReorderRequest", "FolderTreeRequest",
# AI Builder
"AIStartRequest", "AIScaffoldRequest", "AIBranchDetailRequest", "AIAssembleRequest",
"AIStartResponse", "AIScaffoldResponse", "AIBranchDetailResponse", "AIAssembleResponse",
"AIQuotaStatusResponse",
]

View File

@@ -0,0 +1,126 @@
"""Pydantic schemas for the AI Flow Builder wizard."""
from typing import Any, Literal, Optional
from uuid import UUID
from pydantic import BaseModel, Field, field_validator
# ── Requests ──
class AIStartRequest(BaseModel):
"""Stage 1: Foundation — engineer provides flow metadata."""
flow_type: Literal["troubleshooting", "procedural"] = Field(
..., description="Type of flow to generate"
)
category_id: Optional[UUID] = None
name: str = Field(..., min_length=1, max_length=255)
description: str = Field("", max_length=2000)
environment_tags: list[str] = Field(default_factory=list, max_length=20)
@field_validator("environment_tags")
@classmethod
def validate_tags(cls, v: list[str]) -> list[str]:
for tag in v:
if len(tag) > 100:
raise ValueError("Each environment tag must be 100 characters or fewer")
if not tag.strip():
raise ValueError("Environment tags must not be empty")
return v
class AIScaffoldRequest(BaseModel):
"""Stage 2: Request AI-generated branch suggestions."""
conversation_id: UUID
class AIBranchDetailRequest(BaseModel):
"""Stage 3: Request AI-generated detail for one branch."""
conversation_id: UUID
branch_name: str = Field(..., min_length=1, max_length=255)
class AIBranchUpdate(BaseModel):
"""A branch with optional user edits for assembly."""
name: str
description: str = ""
steps: Optional[dict[str, Any]] = None
class AIAssembleRequest(BaseModel):
"""Stage 4: Assemble selected branches into a complete tree."""
conversation_id: UUID
selected_branches: list[AIBranchUpdate] = Field(..., min_length=2)
# ── Responses ──
class AIStartResponse(BaseModel):
"""Response after creating a conversation."""
conversation_id: UUID
status: str
class AIBranchSuggestion(BaseModel):
"""A single branch suggestion from the AI."""
name: str
description: str
class AIScaffoldResponse(BaseModel):
"""Response with AI-suggested branches."""
conversation_id: UUID
branches: list[AIBranchSuggestion]
status: str
class AIBranchDetailResponse(BaseModel):
"""Response with AI-generated detail for one branch."""
conversation_id: UUID
branch_name: str
steps: dict[str, Any]
status: str
class AITreeSummary(BaseModel):
"""Summary statistics for an assembled tree."""
node_count: int
decision_count: int
action_count: int
solution_count: int
depth: int
class AIAssembleResponse(BaseModel):
"""Response with the fully assembled tree."""
tree_structure: dict[str, Any]
suggested_name: str
suggested_description: str
summary: AITreeSummary
status: str
class AIQuotaStatusResponse(BaseModel):
"""Current user's AI quota status."""
plan: str
monthly_used: int
monthly_limit: Optional[int]
monthly_reset_at: str
daily_used: int
daily_limit: Optional[int]
daily_reset_at: str
allowed: bool
ai_enabled: bool

View File

@@ -4,7 +4,7 @@
# Testing
pytest==7.4.3
pytest-asyncio==0.23.0
httpx==0.26.0
httpx>=0.27.0
pytest-cov==4.1.0
# Code quality

View File

@@ -31,6 +31,9 @@ resend==2.21.0
# HTTP client (seed scripts, internal API calls)
httpx>=0.27.0
# AI Flow Builder
anthropic>=0.40.0
# Utilities
python-dotenv==1.0.1
croniter>=2.0.0

View File

@@ -0,0 +1,358 @@
"""Integration tests for AI Flow Builder endpoints.
All Anthropic API calls are mocked — zero real API spend.
"""
import json
from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from app.core.config import settings
# ── Sample AI responses ──
SCAFFOLD_RESPONSE_JSON = json.dumps({
"branches": [
{"name": "Service Not Running", "description": "The target service is stopped or crashed."},
{"name": "Authentication Failures", "description": "Users cannot authenticate against the service."},
{"name": "Network Connectivity", "description": "Network-level issues preventing access."},
{"name": "Configuration Errors", "description": "Misconfiguration of the service or its dependencies."},
]
})
BRANCH_DETAIL_JSON = json.dumps({
"id": "svc-root",
"type": "decision",
"question": "Is the service running?",
"options": [
{"id": "opt-yes", "label": "Yes", "next_node_id": "svc-check-logs"},
{"id": "opt-no", "label": "No", "next_node_id": "svc-restart"},
],
"children": [
{
"id": "svc-check-logs",
"type": "action",
"title": "Check Event Logs",
"description": "Check Windows Event Viewer for errors.",
"commands": ["Get-EventLog -LogName Application -Newest 20"],
"next_node_id": "svc-logs-resolved",
},
{
"id": "svc-logs-resolved",
"type": "solution",
"title": "Issue Found in Logs",
"description": "Error identified and resolved.",
"resolution_steps": ["Fix the error", "Restart service"],
},
{
"id": "svc-restart",
"type": "action",
"title": "Restart Service",
"description": "Attempt to restart the service.",
"commands": ["Restart-Service -Name 'TestService'"],
"next_node_id": "svc-restart-ok",
},
{
"id": "svc-restart-ok",
"type": "solution",
"title": "Service Restored",
"description": "Service is running after restart.",
"resolution_steps": ["Verify connectivity", "Document in ticket"],
},
],
})
def _mock_anthropic_response(text: str, input_tokens: int = 100, output_tokens: int = 200):
"""Create a mock Anthropic API response."""
response = MagicMock()
response.content = [MagicMock(text=text)]
response.usage = MagicMock(input_tokens=input_tokens, output_tokens=output_tokens)
return response
@pytest.fixture
def enable_ai():
"""Temporarily enable AI by setting a fake API key."""
original = settings.ANTHROPIC_API_KEY
settings.ANTHROPIC_API_KEY = "test-key-fake"
yield
settings.ANTHROPIC_API_KEY = original
@pytest.fixture
def disable_ai():
"""Ensure AI is disabled."""
original = settings.ANTHROPIC_API_KEY
settings.ANTHROPIC_API_KEY = None
yield
settings.ANTHROPIC_API_KEY = original
# ── Quota endpoint ──
@pytest.mark.asyncio
async def test_quota_returns_disabled_when_no_key(client, auth_headers, disable_ai):
"""GET /ai/quota returns ai_enabled=false when no API key."""
response = await client.get("/api/v1/ai/quota", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["ai_enabled"] is False
assert data["allowed"] is False
@pytest.mark.asyncio
async def test_quota_returns_enabled_with_key(client, auth_headers, enable_ai):
"""GET /ai/quota returns ai_enabled=true with API key configured."""
response = await client.get("/api/v1/ai/quota", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["ai_enabled"] is True
assert data["allowed"] is True
# ── Start endpoint ──
@pytest.mark.asyncio
async def test_start_requires_auth(client, enable_ai):
"""POST /ai/start requires authentication."""
response = await client.post("/api/v1/ai/start", json={
"flow_type": "troubleshooting",
"name": "Test Flow",
"description": "Test",
})
assert response.status_code == 401
@pytest.mark.asyncio
async def test_start_returns_503_when_disabled(client, auth_headers, disable_ai):
"""POST /ai/start returns 503 when AI is not configured."""
response = await client.post(
"/api/v1/ai/start",
json={
"flow_type": "troubleshooting",
"name": "Test Flow",
"description": "Test description",
},
headers=auth_headers,
)
assert response.status_code == 503
@pytest.mark.asyncio
async def test_start_creates_conversation(client, auth_headers, enable_ai):
"""POST /ai/start creates a conversation and returns conversation_id."""
response = await client.post(
"/api/v1/ai/start",
json={
"flow_type": "troubleshooting",
"name": "DNS Issues",
"description": "Troubleshooting DNS resolution failures",
"environment_tags": ["Windows Server", "Active Directory"],
},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert "conversation_id" in data
assert data["status"] == "foundation"
@pytest.mark.asyncio
async def test_start_validates_input(client, auth_headers, enable_ai):
"""POST /ai/start rejects invalid input."""
response = await client.post(
"/api/v1/ai/start",
json={
"flow_type": "troubleshooting",
"name": "", # Empty name
"description": "Test",
},
headers=auth_headers,
)
assert response.status_code == 422
# ── Scaffold endpoint ──
@pytest.mark.asyncio
async def test_scaffold_success(client, auth_headers, enable_ai):
"""POST /ai/scaffold returns AI-generated branches."""
# Create conversation first
start_resp = await client.post(
"/api/v1/ai/start",
json={
"flow_type": "troubleshooting",
"name": "DNS Issues",
"description": "DNS resolution failures",
},
headers=auth_headers,
)
conversation_id = start_resp.json()["conversation_id"]
# Mock Anthropic
mock_response = _mock_anthropic_response(SCAFFOLD_RESPONSE_JSON)
with patch("app.core.ai_tree_generator_service._get_client") as mock_client:
mock_client.return_value.messages.create = AsyncMock(return_value=mock_response)
response = await client.post(
"/api/v1/ai/scaffold",
json={"conversation_id": conversation_id},
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "scaffolding"
assert len(data["branches"]) == 4
assert data["branches"][0]["name"] == "Service Not Running"
@pytest.mark.asyncio
async def test_scaffold_invalid_conversation(client, auth_headers, enable_ai):
"""POST /ai/scaffold returns 404 for nonexistent conversation."""
response = await client.post(
"/api/v1/ai/scaffold",
json={"conversation_id": "00000000-0000-0000-0000-000000000000"},
headers=auth_headers,
)
assert response.status_code == 404
# ── Branch detail endpoint ──
@pytest.mark.asyncio
async def test_branch_detail_success(client, auth_headers, enable_ai):
"""POST /ai/branch-detail returns AI-generated branch nodes."""
# Create and scaffold first
start_resp = await client.post(
"/api/v1/ai/start",
json={
"flow_type": "troubleshooting",
"name": "Service Issues",
"description": "Service troubleshooting",
},
headers=auth_headers,
)
conversation_id = start_resp.json()["conversation_id"]
scaffold_mock = _mock_anthropic_response(SCAFFOLD_RESPONSE_JSON)
with patch("app.core.ai_tree_generator_service._get_client") as mock_client:
mock_client.return_value.messages.create = AsyncMock(return_value=scaffold_mock)
await client.post(
"/api/v1/ai/scaffold",
json={"conversation_id": conversation_id},
headers=auth_headers,
)
# Now generate branch detail
detail_mock = _mock_anthropic_response(BRANCH_DETAIL_JSON)
with patch("app.core.ai_tree_generator_service._get_client") as mock_client:
mock_client.return_value.messages.create = AsyncMock(return_value=detail_mock)
response = await client.post(
"/api/v1/ai/branch-detail",
json={
"conversation_id": conversation_id,
"branch_name": "Service Not Running",
},
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert data["branch_name"] == "Service Not Running"
assert data["steps"]["id"] == "svc-root"
assert data["steps"]["type"] == "decision"
# ── Assemble endpoint ──
@pytest.mark.asyncio
async def test_assemble_success(client, auth_headers, enable_ai):
"""POST /ai/assemble returns assembled tree from branches with detail."""
# Create conversation
start_resp = await client.post(
"/api/v1/ai/start",
json={
"flow_type": "troubleshooting",
"name": "Service Issues",
"description": "Service troubleshooting",
},
headers=auth_headers,
)
conversation_id = start_resp.json()["conversation_id"]
# Scaffold
scaffold_mock = _mock_anthropic_response(SCAFFOLD_RESPONSE_JSON)
with patch("app.core.ai_tree_generator_service._get_client") as mock_client:
mock_client.return_value.messages.create = AsyncMock(return_value=scaffold_mock)
await client.post(
"/api/v1/ai/scaffold",
json={"conversation_id": conversation_id},
headers=auth_headers,
)
# Assemble with branch detail included
branch_tree = json.loads(BRANCH_DETAIL_JSON)
response = await client.post(
"/api/v1/ai/assemble",
json={
"conversation_id": conversation_id,
"selected_branches": [
{
"name": "Service Not Running",
"description": "The target service is stopped.",
"steps": branch_tree,
},
{
"name": "Authentication Failures",
"description": "Users cannot authenticate.",
"steps": branch_tree,
},
],
},
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "completed"
assert data["suggested_name"] == "Service Issues"
assert "tree_structure" in data
assert data["tree_structure"]["type"] == "decision"
assert data["summary"]["node_count"] > 0
assert data["summary"]["solution_count"] >= 2
@pytest.mark.asyncio
async def test_assemble_requires_min_2_branches(client, auth_headers, enable_ai):
"""POST /ai/assemble rejects fewer than 2 branches."""
start_resp = await client.post(
"/api/v1/ai/start",
json={
"flow_type": "troubleshooting",
"name": "Test",
"description": "Test",
},
headers=auth_headers,
)
conversation_id = start_resp.json()["conversation_id"]
response = await client.post(
"/api/v1/ai/assemble",
json={
"conversation_id": conversation_id,
"selected_branches": [
{"name": "Only Branch", "description": "Just one"},
],
},
headers=auth_headers,
)
assert response.status_code == 422

View File

@@ -0,0 +1,192 @@
"""Tests for AI-generated tree structure validation."""
import pytest
from app.core.ai_tree_validator import validate_generated_tree, count_tree_stats
def _make_valid_tree():
"""Helper: minimal valid tree for testing.
Action nodes use next_node_id to point to a sibling (not children).
The solution following an action is a sibling under the parent decision.
"""
return {
"id": "root",
"type": "decision",
"question": "Is the service running?",
"options": [
{"id": "opt-yes", "label": "Yes", "next_node_id": "check-logs"},
{"id": "opt-no", "label": "No", "next_node_id": "restart-service"},
],
"children": [
{
"id": "check-logs",
"type": "decision",
"question": "Are there errors in the logs?",
"options": [
{"id": "opt-errors", "label": "Yes", "next_node_id": "fix-errors"},
{"id": "opt-clean", "label": "No", "next_node_id": "escalate"},
],
"children": [
{
"id": "fix-errors",
"type": "solution",
"title": "Fix Errors",
"description": "Apply the fix for the errors found.",
},
{
"id": "escalate",
"type": "solution",
"title": "Escalate",
"description": "No errors found; escalate to Tier 2.",
},
],
},
{
"id": "restart-service",
"type": "action",
"title": "Restart the Service",
"description": "Restart the service and verify.",
"commands": ["Restart-Service -Name 'TestService'"],
"next_node_id": "service-resolved",
},
{
"id": "service-resolved",
"type": "solution",
"title": "Service Restored",
"description": "Service is running after restart.",
},
],
}
class TestValidTree:
def test_valid_tree_passes(self):
errors = validate_generated_tree(_make_valid_tree())
assert errors == []
def test_not_a_dict(self):
errors = validate_generated_tree("not a dict")
assert any("must be a JSON object" in e for e in errors)
def test_root_not_decision(self):
tree = _make_valid_tree()
tree["type"] = "action"
tree["title"] = "Fake"
errors = validate_generated_tree(tree)
assert any("Root node must be type 'decision'" in e for e in errors)
class TestNodeValidation:
def test_missing_id(self):
tree = _make_valid_tree()
del tree["children"][0]["id"]
errors = validate_generated_tree(tree)
assert any("missing 'id'" in e for e in errors)
def test_duplicate_ids(self):
tree = _make_valid_tree()
tree["children"][1]["id"] = "check-logs" # same as sibling
errors = validate_generated_tree(tree)
assert any("Duplicate node ID" in e for e in errors)
def test_invalid_node_type(self):
tree = _make_valid_tree()
tree["children"][0]["type"] = "unknown"
errors = validate_generated_tree(tree)
assert any("invalid type" in e for e in errors)
def test_decision_missing_options(self):
tree = _make_valid_tree()
del tree["children"][0]["options"]
errors = validate_generated_tree(tree)
assert any("missing fields" in e for e in errors)
def test_decision_less_than_2_options(self):
tree = _make_valid_tree()
tree["children"][0]["options"] = [
{"id": "opt-1", "label": "Only", "next_node_id": "fix-errors"}
]
errors = validate_generated_tree(tree)
assert any("at least 2 options" in e for e in errors)
def test_action_missing_next_node_id(self):
tree = _make_valid_tree()
del tree["children"][1]["next_node_id"]
errors = validate_generated_tree(tree)
assert any("missing 'next_node_id'" in e for e in errors)
class TestReferenceIntegrity:
def test_option_references_nonexistent_child(self):
tree = _make_valid_tree()
tree["options"][0]["next_node_id"] = "nonexistent"
errors = validate_generated_tree(tree)
assert any("non-existent child" in e for e in errors)
def test_duplicate_option_ids(self):
tree = _make_valid_tree()
tree["options"][0]["id"] = "same"
tree["options"][1]["id"] = "same"
errors = validate_generated_tree(tree)
assert any("Duplicate option ID" in e for e in errors)
class TestGlobalChecks:
def test_too_few_nodes(self):
tree = {
"id": "root",
"type": "decision",
"question": "Test?",
"options": [
{"id": "o1", "label": "A", "next_node_id": "s1"},
{"id": "o2", "label": "B", "next_node_id": "s2"},
],
"children": [
{"id": "s1", "type": "solution", "title": "S1", "description": "D1"},
{"id": "s2", "type": "solution", "title": "S2", "description": "D2"},
],
}
errors = validate_generated_tree(tree)
assert any("Minimum 5 required" in e for e in errors)
def test_too_few_solutions(self):
tree = _make_valid_tree()
# Remove all solutions except one — replace children of check-logs
tree["children"][0]["children"] = [
{
"id": "only-solution",
"type": "solution",
"title": "Only",
"description": "Only solution",
}
]
tree["children"][0]["options"] = [
{"id": "o1", "label": "A", "next_node_id": "only-solution"},
{"id": "o2", "label": "B", "next_node_id": "only-solution"},
]
# Remove the solution that restart-service points to
tree["children"].pop(2) # remove service-resolved
errors = validate_generated_tree(tree)
assert any("solution" in e.lower() for e in errors)
class TestDeadEndDetection:
def test_dead_end_decision_node(self):
"""A decision node with no children is a dead end."""
tree = _make_valid_tree()
# Remove children from check-logs decision node — becomes dead end
tree["children"][0]["children"] = []
errors = validate_generated_tree(tree)
assert any("dead end" in e for e in errors)
class TestCountTreeStats:
def test_stats_correct(self):
tree = _make_valid_tree()
stats = count_tree_stats(tree)
assert stats["node_count"] == 6
assert stats["decision_count"] == 2
assert stats["action_count"] == 1
assert stats["solution_count"] == 3
assert stats["depth"] >= 3

View File

@@ -20,13 +20,13 @@ class TestTreeValidation:
{
"id": "yes",
"type": "solution",
"solution": "Server is healthy",
"title": "Server is healthy",
"children": []
},
{
"id": "no",
"type": "action",
"action": "Restart the server",
"title": "Restart the server",
"children": []
}
]
@@ -70,15 +70,15 @@ class TestTreeValidation:
"type": "decision",
"question": "Test?",
"children": [
{"id": "child1", "type": "solution", "solution": "Fix"}
{"id": "child1", "type": "solution", "title": "Fix"}
]
}
is_valid, errors = validate_tree_structure(tree_structure)
assert not is_valid
assert any("at least 2" in error["message"] for error in errors)
def test_action_node_missing_action(self):
"""Test validation when action node has no action."""
def test_action_node_missing_title(self):
"""Test validation when action node has no title."""
tree_structure = {
"id": "root",
"type": "action",
@@ -86,10 +86,10 @@ class TestTreeValidation:
}
is_valid, errors = validate_tree_structure(tree_structure)
assert not is_valid
assert any("action" in error["field"] for error in errors)
assert any("title" in error["field"] for error in errors)
def test_solution_node_missing_solution(self):
"""Test validation when solution node has no solution."""
def test_solution_node_missing_title(self):
"""Test validation when solution node has no title."""
tree_structure = {
"id": "root",
"type": "solution",
@@ -97,7 +97,7 @@ class TestTreeValidation:
}
is_valid, errors = validate_tree_structure(tree_structure)
assert not is_valid
assert any("solution" in error["field"] for error in errors)
assert any("title" in error["field"] for error in errors)
def test_unknown_node_type(self):
"""Test validation with unknown node type."""
@@ -112,14 +112,14 @@ class TestTreeValidation:
def test_can_publish_with_empty_name(self):
"""Test can_publish with empty name."""
tree_structure = {"id": "root", "type": "solution", "solution": "Fix"}
tree_structure = {"id": "root", "type": "solution", "title": "Fix"}
can_publish, errors = can_publish_tree(tree_structure, "", None)
assert not can_publish
assert any("name" in error["field"] for error in errors)
def test_can_publish_valid_tree(self):
"""Test can_publish with valid tree and name."""
tree_structure = {"id": "root", "type": "solution", "solution": "Fix"}
tree_structure = {"id": "root", "type": "solution", "title": "Fix"}
can_publish, errors = can_publish_tree(tree_structure, "Valid Tree", "Description")
assert can_publish
assert len(errors) == 0
@@ -157,8 +157,8 @@ class TestDraftTreesAPI:
"type": "decision",
"question": "Is it working?",
"children": [
{"id": "yes", "type": "solution", "solution": "Great!"},
{"id": "no", "type": "action", "action": "Fix it"}
{"id": "yes", "type": "solution", "title": "Great!"},
{"id": "no", "type": "action", "title": "Fix it"}
]
},
"status": "published"
@@ -193,8 +193,8 @@ class TestDraftTreesAPI:
name="Draft to Published",
description="Test tree",
tree_structure={"id": "root", "type": "decision", "question": "Test?", "children": [
{"id": "yes", "type": "solution", "solution": "Yes"},
{"id": "no", "type": "solution", "solution": "No"}
{"id": "yes", "type": "solution", "title": "Yes"},
{"id": "no", "type": "solution", "title": "No"}
]},
author_id=UUID(test_user["user_data"]["id"]),
account_id=UUID(test_user["user_data"]["account_id"]),
@@ -252,8 +252,8 @@ class TestDraftTreesAPI:
"type": "decision",
"question": "Is it working?",
"children": [
{"id": "yes", "type": "solution", "solution": "Great!"},
{"id": "no", "type": "action", "action": "Fix it"}
{"id": "yes", "type": "solution", "title": "Great!"},
{"id": "no", "type": "action", "title": "Fix it"}
]
},
author_id=UUID(test_user["user_data"]["id"]),
@@ -315,7 +315,7 @@ class TestDraftTreesAPI:
tree = Tree(
name="Test Tree",
description="Test",
tree_structure={"id": "root", "type": "solution", "solution": "Fix"},
tree_structure={"id": "root", "type": "solution", "title": "Fix"},
author_id=UUID(test_user["user_data"]["id"]),
account_id=UUID(test_user["user_data"]["account_id"]),
status='published'
@@ -337,7 +337,7 @@ class TestDraftTreesAPI:
tree = Tree(
name="Legacy Tree",
description="Created before status field",
tree_structure={"id": "root", "type": "solution", "solution": "Fix"},
tree_structure={"id": "root", "type": "solution", "title": "Fix"},
author_id=None,
account_id=None
)

View File

@@ -228,8 +228,8 @@ class TestCanPublishTreeDispatch:
"type": "decision",
"question": "Test?",
"children": [
{"id": "y", "type": "solution", "solution": "Yes"},
{"id": "n", "type": "solution", "solution": "No"},
{"id": "y", "type": "solution", "title": "Yes"},
{"id": "n", "type": "solution", "title": "No"},
]
}
can, errors = can_publish_tree(structure, "My Tree", tree_type="troubleshooting")

View File

@@ -20,7 +20,7 @@ class TestSessionToTreeConversion:
"""Test converting a session with no path."""
tree_structure = convert_session_to_tree([], {}, [], [])
assert tree_structure["type"] == "solution"
assert "no recorded path" in tree_structure["solution"].lower()
assert "no recorded path" in tree_structure["title"].lower()
def test_convert_simple_linear_path(self):
"""Test converting a simple linear path."""
@@ -29,8 +29,8 @@ class TestSessionToTreeConversion:
"type": "decision",
"question": "Is it working?",
"children": [
{"id": "yes", "type": "solution", "solution": "Great!"},
{"id": "no", "type": "action", "action": "Fix it"}
{"id": "yes", "type": "solution", "title": "Great!"},
{"id": "no", "type": "action", "title": "Fix it"}
]
}
path_taken = ["root", "no"]
@@ -51,7 +51,7 @@ class TestSessionToTreeConversion:
tree_snapshot = {
"id": "root",
"type": "solution",
"solution": "Done"
"title": "Done"
}
custom_step_id = "custom-123"
path_taken = ["root", custom_step_id]
@@ -70,7 +70,7 @@ class TestSessionToTreeConversion:
assert len(result["children"]) == 1
custom_node = result["children"][0]
assert custom_node["type"] == "action"
assert "Custom troubleshooting step" in custom_node["action"]
assert "Custom troubleshooting step" in custom_node["title"]
def test_find_node_in_tree(self):
"""Test finding a node in nested tree structure."""
@@ -142,7 +142,7 @@ class TestSaveSessionAsTreeAPI:
tree = Tree(
name="Test Tree",
description="Test",
tree_structure={"id": "root", "type": "solution", "solution": "Fix"},
tree_structure={"id": "root", "type": "solution", "title": "Fix"},
author_id=UUID(test_user["user_data"]["id"]),
account_id=UUID(test_user["user_data"]["account_id"]),
status='published'
@@ -187,7 +187,7 @@ class TestSaveSessionAsTreeAPI:
tree = Tree(
name="Original Tree",
tree_structure={"id": "root", "type": "solution", "solution": "Fix"},
tree_structure={"id": "root", "type": "solution", "title": "Fix"},
author_id=UUID(test_user["user_data"]["id"]),
account_id=UUID(test_user["user_data"]["account_id"]),
status='published'
@@ -227,7 +227,7 @@ class TestSaveSessionAsTreeAPI:
# Create a simple tree with just a solution (will convert to valid linear tree)
tree = Tree(
name="Test Tree",
tree_structure={"id": "root", "type": "solution", "solution": "Fixed"},
tree_structure={"id": "root", "type": "solution", "title": "Fixed"},
author_id=UUID(test_user["user_data"]["id"]),
account_id=UUID(test_user["user_data"]["account_id"]),
status='published'
@@ -267,7 +267,7 @@ class TestSaveSessionAsTreeAPI:
tree = Tree(
name="Original Tree",
tree_structure={"id": "root", "type": "solution", "solution": "Fix"},
tree_structure={"id": "root", "type": "solution", "title": "Fix"},
author_id=UUID(test_user["user_data"]["id"]),
account_id=UUID(test_user["user_data"]["account_id"]),
status='published'
@@ -326,7 +326,7 @@ class TestSaveSessionAsTreeAPI:
# Create a tree
tree = Tree(
name="Test Tree",
tree_structure={"id": "root", "type": "solution", "solution": "Fix"},
tree_structure={"id": "root", "type": "solution", "title": "Fix"},
author_id=UUID(test_user["user_data"]["id"]),
account_id=UUID(test_user["user_data"]["account_id"]),
status='published'

View File

@@ -15,7 +15,7 @@ class TestValidateTreeStructure:
def test_valid_solution_tree(self):
valid, errors = validate_tree_structure({
"id": "root", "type": "solution", "solution": "Done"
"id": "root", "type": "solution", "title": "Done"
})
assert valid
assert errors == []
@@ -26,8 +26,8 @@ class TestValidateTreeStructure:
"type": "decision",
"question": "Is it on?",
"children": [
{"id": "yes", "type": "solution", "solution": "Great"},
{"id": "no", "type": "action", "action": "Turn it on"},
{"id": "yes", "type": "solution", "title": "Great"},
{"id": "no", "type": "action", "title": "Turn it on"},
],
})
assert valid
@@ -39,7 +39,7 @@ class TestValidateTreeStructure:
assert any("empty" in e["message"].lower() for e in errors)
def test_missing_id_on_root(self):
valid, errors = validate_tree_structure({"type": "solution", "solution": "X"})
valid, errors = validate_tree_structure({"type": "solution", "title": "X"})
assert not valid
assert any("id" in e["field"] for e in errors)
@@ -67,7 +67,7 @@ class TestValidateTreeStructure:
"type": "decision",
"question": "Q?",
"children": [
{"id": "only", "type": "solution", "solution": "S"},
{"id": "only", "type": "solution", "title": "S"},
],
})
assert not valid
@@ -80,29 +80,29 @@ class TestValidateTreeStructure:
})
assert valid
def test_action_missing_action_field(self):
def test_action_missing_title_field(self):
valid, errors = validate_tree_structure({
"id": "root", "type": "action"
})
assert not valid
assert any("action" in e["message"].lower() for e in errors)
assert any("title" in e["message"].lower() for e in errors)
def test_action_with_empty_action(self):
def test_action_with_empty_title(self):
valid, errors = validate_tree_structure({
"id": "root", "type": "action", "action": ""
"id": "root", "type": "action", "title": ""
})
assert not valid
def test_solution_missing_solution_field(self):
def test_solution_missing_title_field(self):
valid, errors = validate_tree_structure({
"id": "root", "type": "solution"
})
assert not valid
assert any("solution" in e["message"].lower() for e in errors)
assert any("title" in e["message"].lower() for e in errors)
def test_solution_with_empty_solution(self):
def test_solution_with_empty_title(self):
valid, errors = validate_tree_structure({
"id": "root", "type": "solution", "solution": ""
"id": "root", "type": "solution", "title": ""
})
assert not valid
@@ -119,8 +119,8 @@ class TestValidateTreeStructure:
"type": "decision",
"question": "Q?",
"children": [
{"type": "solution", "solution": "S1"},
{"id": "c2", "type": "solution", "solution": "S2"},
{"type": "solution", "title": "S1"},
{"id": "c2", "type": "solution", "title": "S2"},
],
})
assert not valid
@@ -133,7 +133,7 @@ class TestValidateTreeStructure:
"question": "Q?",
"children": [
{"id": "c1"},
{"id": "c2", "type": "solution", "solution": "S2"},
{"id": "c2", "type": "solution", "title": "S2"},
],
})
assert not valid
@@ -151,11 +151,11 @@ class TestValidateTreeStructure:
"type": "decision",
"question": "Level 2?",
"children": [
{"id": "l3a", "type": "solution", "solution": "Deep"},
{"id": "l3b", "type": "solution"}, # Missing solution
{"id": "l3a", "type": "solution", "title": "Deep"},
{"id": "l3b", "type": "solution"}, # Missing title
],
},
{"id": "l2b", "type": "solution", "solution": "Shallow"},
{"id": "l2b", "type": "solution", "title": "Shallow"},
],
})
assert not valid
@@ -167,8 +167,8 @@ class TestValidateTreeStructure:
"type": "decision",
"question": "Q?",
"children": [
{"id": "c1", "type": "solution"}, # missing solution
{"id": "c2", "type": "action"}, # missing action
{"id": "c1", "type": "solution"}, # missing title
{"id": "c2", "type": "action"}, # missing title
],
})
assert not valid
@@ -179,7 +179,7 @@ class TestCanPublishTree:
def test_valid_tree_can_publish(self):
can, errors = can_publish_tree(
{"id": "root", "type": "solution", "solution": "Done"},
{"id": "root", "type": "solution", "title": "Done"},
"My Tree"
)
assert can
@@ -187,7 +187,7 @@ class TestCanPublishTree:
def test_empty_name_cannot_publish(self):
can, errors = can_publish_tree(
{"id": "root", "type": "solution", "solution": "Done"},
{"id": "root", "type": "solution", "title": "Done"},
""
)
assert not can
@@ -195,14 +195,14 @@ class TestCanPublishTree:
def test_whitespace_name_cannot_publish(self):
can, errors = can_publish_tree(
{"id": "root", "type": "solution", "solution": "Done"},
{"id": "root", "type": "solution", "title": "Done"},
" "
)
assert not can
def test_none_name_cannot_publish(self):
can, errors = can_publish_tree(
{"id": "root", "type": "solution", "solution": "Done"},
{"id": "root", "type": "solution", "title": "Done"},
None
)
assert not can