feat: add Script Builder service and API endpoints

- Script Builder service with language-specific system prompts (PowerShell, Bash, Python)
- AI-powered script generation with code block extraction and filename detection
- Context window management (last 20 messages) and session message limits
- REST API: CRUD sessions, send messages, save to Script Library
- Rate limiting on message endpoint (10/min), max 5 concurrent sessions per user
- Registered script_build action in AI model tier routing (standard tier)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Michael Chihlas
2026-03-21 16:58:26 -04:00
parent 35c0c67da3
commit 28f8200b36
4 changed files with 521 additions and 0 deletions

View File

@@ -0,0 +1,151 @@
"""Script Builder API endpoints."""
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.rate_limit import limiter
from app.api.deps import get_current_active_user
from app.models.user import User
from app.schemas.script_builder import (
ScriptBuilderCreateRequest,
ScriptBuilderMessageRequest,
ScriptBuilderMessageResponse,
ScriptBuilderSessionDetail,
ScriptBuilderSessionSummary,
SaveToLibraryRequest,
)
from app.schemas.script_template import ScriptTemplateDetail
from app.services import script_builder_service
router = APIRouter(prefix="/scripts/builder", tags=["script-builder"])
MAX_SESSIONS_PER_USER = 5
@router.post("/sessions", response_model=ScriptBuilderSessionDetail, status_code=201)
async def create_session(
data: ScriptBuilderCreateRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> ScriptBuilderSessionDetail:
"""Start a new Script Builder session."""
# Enforce max concurrent sessions
count = await script_builder_service.count_user_sessions(db, current_user.id)
if count >= MAX_SESSIONS_PER_USER:
raise HTTPException(
status_code=400,
detail=f"Maximum of {MAX_SESSIONS_PER_USER} builder sessions allowed. Delete an old session first.",
)
session = await script_builder_service.create_session(
db=db,
user_id=current_user.id,
team_id=current_user.team_id,
language=data.language,
)
await db.commit()
return ScriptBuilderSessionDetail.model_validate(session)
@router.get("/sessions", response_model=list[ScriptBuilderSessionSummary])
async def list_sessions(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
limit: int = 20,
offset: int = 0,
) -> list[ScriptBuilderSessionSummary]:
"""List user's recent builder sessions (lightweight, no messages)."""
sessions = await script_builder_service.list_sessions(
db=db, user_id=current_user.id, limit=limit, offset=offset
)
return [ScriptBuilderSessionSummary.model_validate(s) for s in sessions]
@router.get("/sessions/{session_id}", response_model=ScriptBuilderSessionDetail)
async def get_session(
session_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> ScriptBuilderSessionDetail:
"""Get full session detail with message history."""
session = await script_builder_service.get_session(db, session_id, current_user.id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return ScriptBuilderSessionDetail.model_validate(session)
@router.post(
"/sessions/{session_id}/messages",
response_model=ScriptBuilderMessageResponse,
)
@limiter.limit("10/minute")
async def send_message(
request: Request,
session_id: UUID,
data: ScriptBuilderMessageRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> ScriptBuilderMessageResponse:
"""Send a message and get AI-generated script response."""
session = await script_builder_service.get_session(db, session_id, current_user.id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
try:
response = await script_builder_service.send_message(db, session, data.content)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
await db.commit()
return response
@router.delete("/sessions/{session_id}", status_code=204)
async def delete_session(
session_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> None:
"""Delete a builder session."""
deleted = await script_builder_service.delete_session(db, session_id, current_user.id)
if not deleted:
raise HTTPException(status_code=404, detail="Session not found")
await db.commit()
@router.post(
"/sessions/{session_id}/save",
response_model=ScriptTemplateDetail,
status_code=201,
)
async def save_to_library(
session_id: UUID,
data: SaveToLibraryRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> ScriptTemplateDetail:
"""Save the latest generated script to the Script Library."""
session = await script_builder_service.get_session(db, session_id, current_user.id)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
try:
template = await script_builder_service.save_to_library(
db=db,
session=session,
name=data.name,
description=data.description,
category_id=data.category_id,
share_with_team=data.share_with_team,
user_id=current_user.id,
team_id=current_user.team_id,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
await db.commit()
await db.refresh(template)
return ScriptTemplateDetail.model_validate(template)

View File

@@ -28,6 +28,7 @@ from app.api.endpoints import notifications
from app.api.endpoints import public_templates
from app.api.endpoints import admin_gallery
from app.api.endpoints import uploads
from app.api.endpoints import script_builder
api_router = APIRouter()
@@ -81,3 +82,4 @@ api_router.include_router(notifications.router)
api_router.include_router(public_templates.router)
api_router.include_router(admin_gallery.router)
api_router.include_router(uploads.router)
api_router.include_router(script_builder.router)

View File

@@ -104,6 +104,7 @@ class Settings(BaseSettings):
"open_chat": "standard",
"variable_inference": "fast",
"kb_convert": "standard",
"script_build": "standard",
}
def get_model_for_action(self, action_type: str) -> str:

View File

@@ -0,0 +1,367 @@
"""AI Script Builder service — generates scripts from natural language descriptions."""
import logging
import re
from datetime import datetime, timezone
from typing import Optional
from uuid import UUID
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.ai_provider import get_ai_provider
from app.core.config import settings
from app.models.script_builder_session import ScriptBuilderSession
from app.schemas.script_builder import (
ScriptBuilderMessageResponse,
ScriptBuilderSessionDetail,
ScriptBuilderSessionSummary,
)
logger = logging.getLogger(__name__)
MAX_MESSAGES_PER_SESSION = 30
LANGUAGE_PROMPTS = {
"powershell": """\
You are an expert PowerShell script writer for MSP (Managed Service Provider) environments.
## Script Standards
- Use Advanced Functions with CmdletBinding and param() blocks
- Include comment-based help (.SYNOPSIS, .DESCRIPTION, .PARAMETER, .EXAMPLE)
- Use try/catch/finally for error handling
- Use Write-Verbose for diagnostic output, Write-Error for failures
- Support pipeline input where appropriate
- Use approved PowerShell verbs (Get-, Set-, New-, Remove-, etc.)
- Import required modules at the top (e.g., Import-Module ActiveDirectory)
- Use [Parameter(Mandatory=$true)] for required params
- Default to UTF-8 output for exports
## Security
- Never hardcode credentials — use Get-Credential or SecureString params
- Use -WhatIf and -Confirm support via SupportsShouldProcess
- Validate input with ValidateSet, ValidatePattern, ValidateRange
""",
"bash": """\
You are an expert Bash script writer for Linux/macOS system administration.
## Script Standards
- Start with #!/bin/bash
- Use set -euo pipefail for safety
- Parse arguments with getopts or positional parameters
- Include a usage() function for --help
- Use functions for logical grouping
- Quote all variable expansions ("$var" not $var)
- Use [[ ]] for conditionals (not [ ])
- Add comments explaining non-obvious logic
- Use lowercase_with_underscores for variable names
- Exit with meaningful exit codes (0=success, 1=general error, 2=usage error)
## Security
- Never store passwords in scripts — use environment variables or prompts
- Validate all user inputs
- Use mktemp for temporary files
""",
"python": """\
You are an expert Python script writer for IT automation and system administration.
## Script Standards
- Use Python 3.10+ syntax
- Add type hints to all function signatures
- Use argparse for CLI argument parsing
- Include if __name__ == "__main__": guard
- Use logging module (not print) for diagnostic output
- Use docstrings for functions and modules
- Use pathlib.Path instead of os.path
- Handle exceptions with specific exception types
- Use f-strings for string formatting
- Follow PEP 8 naming conventions
## Security
- Never hardcode secrets — use environment variables or config files
- Validate and sanitize all user inputs
- Use subprocess.run() with shell=False (never shell=True with user input)
""",
}
SYSTEM_PROMPT_TEMPLATE = """\
{language_prompt}
## Response Format
Respond conversationally. When generating a script:
1. Briefly explain what the script does and any assumptions
2. Include the complete script in a single fenced code block with the language tag
3. Suggest a filename (e.g., `Get-LinkedGPOs.ps1`)
When the user asks for modifications, generate the COMPLETE updated script (not a diff).
## Context
The user is an MSP engineer using ResolutionFlow. They need scripts for managing client infrastructure.
Keep scripts practical, production-ready, and well-documented.\
"""
def _extract_script_from_response(content: str, language: str) -> tuple[str | None, str | None]:
"""Extract code block and filename from AI response.
Returns (script, filename) tuple.
"""
# Map language to code fence tags
lang_tags = {
"powershell": ["powershell", "ps1", "pwsh"],
"bash": ["bash", "sh", "shell"],
"python": ["python", "py", "python3"],
}
tags = lang_tags.get(language, [language])
# Try each language-specific tag, then fall back to generic fence
script = None
for tag in tags:
pattern = rf"```{tag}\s*\n(.*?)```"
match = re.search(pattern, content, re.DOTALL | re.IGNORECASE)
if match:
script = match.group(1).strip()
break
else:
# Try generic code fence
pattern = r"```\s*\n(.*?)```"
match = re.search(pattern, content, re.DOTALL)
script = match.group(1).strip() if match else None
# Extract filename suggestion
filename = None
ext_map = {"powershell": ".ps1", "bash": ".sh", "python": ".py"}
ext = ext_map.get(language, ".txt")
filename_pattern = rf"`([A-Za-z0-9_\-]+{re.escape(ext)})`"
fname_match = re.search(filename_pattern, content)
if fname_match:
filename = fname_match.group(1)
return script, filename
async def create_session(
db: AsyncSession,
user_id: UUID,
team_id: UUID | None,
language: str,
initial_prompt: str | None = None,
) -> ScriptBuilderSession:
"""Create a new Script Builder session."""
session = ScriptBuilderSession(
user_id=user_id,
team_id=team_id,
language=language,
messages=[],
message_count=0,
)
db.add(session)
await db.flush()
# If initial prompt provided (e.g., from FlowPilot), send first message
if initial_prompt:
await send_message(db, session, initial_prompt)
return session
async def send_message(
db: AsyncSession,
session: ScriptBuilderSession,
user_content: str,
) -> ScriptBuilderMessageResponse:
"""Send a user message and get AI response with generated script."""
if session.message_count >= MAX_MESSAGES_PER_SESSION:
raise ValueError(f"Session has reached the maximum of {MAX_MESSAGES_PER_SESSION} messages.")
now = datetime.now(timezone.utc)
# Append user message
user_msg = {
"role": "user",
"content": user_content,
"timestamp": now.isoformat(),
}
messages = list(session.messages or [])
messages.append(user_msg)
# Build system prompt
language_prompt = LANGUAGE_PROMPTS.get(session.language, LANGUAGE_PROMPTS["powershell"])
system_prompt = SYSTEM_PROMPT_TEMPLATE.format(language_prompt=language_prompt)
# Build conversation for AI (just role + content)
ai_messages = [{"role": m["role"], "content": m["content"]} for m in messages]
# Context window management: keep last 20 messages (10 exchanges)
if len(ai_messages) > 20:
ai_messages = ai_messages[-20:]
# Call AI
model = settings.get_model_for_action("script_build")
provider = get_ai_provider(model=model)
ai_text, input_tokens, output_tokens = await provider.generate_text(
system_prompt=system_prompt,
messages=ai_messages,
max_tokens=8192,
)
# Extract script from response
script, filename = _extract_script_from_response(ai_text, session.language)
line_count = len(script.splitlines()) if script else None
# Build assistant message
assistant_msg = {
"role": "assistant",
"content": ai_text,
"timestamp": datetime.now(timezone.utc).isoformat(),
"input_tokens": input_tokens,
"output_tokens": output_tokens,
}
if script:
assistant_msg["script"] = script
assistant_msg["script_filename"] = filename
assistant_msg["line_count"] = line_count
messages.append(assistant_msg)
# Update session
session.messages = messages
session.message_count = len([m for m in messages if m["role"] == "user"])
if script:
session.latest_script = script
session.latest_script_filename = filename
if not session.title and len(messages) >= 2:
# Auto-generate title from first user message (truncate)
first_user = user_content[:100]
session.title = first_user if len(user_content) <= 100 else first_user + "..."
session.updated_at = datetime.now(timezone.utc)
await db.flush()
return ScriptBuilderMessageResponse(
role="assistant",
content=ai_text,
script=script,
script_filename=filename,
line_count=line_count,
timestamp=datetime.now(timezone.utc),
)
async def get_session(
db: AsyncSession,
session_id: UUID,
user_id: UUID,
) -> ScriptBuilderSession | None:
"""Get a session by ID, ensuring the user owns it."""
result = await db.execute(
select(ScriptBuilderSession).where(
ScriptBuilderSession.id == session_id,
ScriptBuilderSession.user_id == user_id,
)
)
return result.scalar_one_or_none()
async def list_sessions(
db: AsyncSession,
user_id: UUID,
limit: int = 20,
offset: int = 0,
) -> list[ScriptBuilderSession]:
"""List user's builder sessions ordered by updated_at desc."""
result = await db.execute(
select(ScriptBuilderSession)
.where(ScriptBuilderSession.user_id == user_id)
.order_by(ScriptBuilderSession.updated_at.desc())
.limit(limit)
.offset(offset)
)
return list(result.scalars().all())
async def delete_session(
db: AsyncSession,
session_id: UUID,
user_id: UUID,
) -> bool:
"""Delete a builder session. Returns True if deleted."""
session = await get_session(db, session_id, user_id)
if not session:
return False
await db.delete(session)
await db.flush()
return True
async def count_user_sessions(db: AsyncSession, user_id: UUID) -> int:
"""Count active builder sessions for a user."""
result = await db.execute(
select(func.count(ScriptBuilderSession.id)).where(
ScriptBuilderSession.user_id == user_id
)
)
return result.scalar_one()
async def save_to_library(
db: AsyncSession,
session: ScriptBuilderSession,
name: str,
description: str | None,
category_id: UUID | None,
share_with_team: bool,
user_id: UUID,
team_id: UUID | None,
) -> "ScriptTemplate":
"""Save the latest generated script to the Script Library as a ScriptTemplate."""
import uuid as uuid_mod
from app.models.script_template import ScriptTemplate, ScriptCategory
if not session.latest_script:
raise ValueError("No script has been generated in this session yet")
# Resolve category: use provided, or find "AI Generated" default
resolved_category_id = category_id
if not resolved_category_id:
result = await db.execute(
select(ScriptCategory.id).where(ScriptCategory.slug == "ai-generated")
)
default_cat = result.scalar_one_or_none()
if not default_cat:
raise ValueError("Default 'AI Generated' category not found. Run migrations.")
resolved_category_id = default_cat
# Generate unique slug
base_slug = name.lower().replace(" ", "-").replace("_", "-")[:80]
base_slug = re.sub(r"[^a-z0-9\-]", "", base_slug)
slug = base_slug
# Check uniqueness
existing = await db.execute(
select(ScriptTemplate.id).where(ScriptTemplate.slug == slug)
)
if existing.scalar_one_or_none():
slug = f"{base_slug}-{uuid_mod.uuid4().hex[:6]}"
template = ScriptTemplate(
id=uuid_mod.uuid4(),
category_id=resolved_category_id,
created_by=user_id,
team_id=team_id if share_with_team else None,
name=name,
slug=slug,
description=description,
script_body=session.latest_script,
language=session.language,
parameters_schema={"parameters": []},
default_values={},
validation_rules={},
tags=[session.language, "ai-generated"],
complexity="intermediate",
is_verified=False,
is_active=True,
version=1,
usage_count=0,
)
db.add(template)
await db.flush()
return template