From 28f8200b360a3d59577b98ee853ed82dec394940 Mon Sep 17 00:00:00 2001 From: Michael Chihlas Date: Sat, 21 Mar 2026 16:58:26 -0400 Subject: [PATCH] feat: add Script Builder service and API endpoints - Script Builder service with language-specific system prompts (PowerShell, Bash, Python) - AI-powered script generation with code block extraction and filename detection - Context window management (last 20 messages) and session message limits - REST API: CRUD sessions, send messages, save to Script Library - Rate limiting on message endpoint (10/min), max 5 concurrent sessions per user - Registered script_build action in AI model tier routing (standard tier) Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/api/endpoints/script_builder.py | 151 +++++++ backend/app/api/router.py | 2 + backend/app/core/config.py | 1 + .../app/services/script_builder_service.py | 367 ++++++++++++++++++ 4 files changed, 521 insertions(+) create mode 100644 backend/app/api/endpoints/script_builder.py create mode 100644 backend/app/services/script_builder_service.py diff --git a/backend/app/api/endpoints/script_builder.py b/backend/app/api/endpoints/script_builder.py new file mode 100644 index 00000000..a477d7a9 --- /dev/null +++ b/backend/app/api/endpoints/script_builder.py @@ -0,0 +1,151 @@ +"""Script Builder API endpoints.""" +from typing import Annotated +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, Request +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.database import get_db +from app.core.rate_limit import limiter +from app.api.deps import get_current_active_user +from app.models.user import User +from app.schemas.script_builder import ( + ScriptBuilderCreateRequest, + ScriptBuilderMessageRequest, + ScriptBuilderMessageResponse, + ScriptBuilderSessionDetail, + ScriptBuilderSessionSummary, + SaveToLibraryRequest, +) +from app.schemas.script_template import ScriptTemplateDetail +from app.services import script_builder_service + +router = APIRouter(prefix="/scripts/builder", tags=["script-builder"]) + +MAX_SESSIONS_PER_USER = 5 + + +@router.post("/sessions", response_model=ScriptBuilderSessionDetail, status_code=201) +async def create_session( + data: ScriptBuilderCreateRequest, + db: Annotated[AsyncSession, Depends(get_db)], + current_user: Annotated[User, Depends(get_current_active_user)], +) -> ScriptBuilderSessionDetail: + """Start a new Script Builder session.""" + # Enforce max concurrent sessions + count = await script_builder_service.count_user_sessions(db, current_user.id) + if count >= MAX_SESSIONS_PER_USER: + raise HTTPException( + status_code=400, + detail=f"Maximum of {MAX_SESSIONS_PER_USER} builder sessions allowed. Delete an old session first.", + ) + + session = await script_builder_service.create_session( + db=db, + user_id=current_user.id, + team_id=current_user.team_id, + language=data.language, + ) + await db.commit() + return ScriptBuilderSessionDetail.model_validate(session) + + +@router.get("/sessions", response_model=list[ScriptBuilderSessionSummary]) +async def list_sessions( + db: Annotated[AsyncSession, Depends(get_db)], + current_user: Annotated[User, Depends(get_current_active_user)], + limit: int = 20, + offset: int = 0, +) -> list[ScriptBuilderSessionSummary]: + """List user's recent builder sessions (lightweight, no messages).""" + sessions = await script_builder_service.list_sessions( + db=db, user_id=current_user.id, limit=limit, offset=offset + ) + return [ScriptBuilderSessionSummary.model_validate(s) for s in sessions] + + +@router.get("/sessions/{session_id}", response_model=ScriptBuilderSessionDetail) +async def get_session( + session_id: UUID, + db: Annotated[AsyncSession, Depends(get_db)], + current_user: Annotated[User, Depends(get_current_active_user)], +) -> ScriptBuilderSessionDetail: + """Get full session detail with message history.""" + session = await script_builder_service.get_session(db, session_id, current_user.id) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + return ScriptBuilderSessionDetail.model_validate(session) + + +@router.post( + "/sessions/{session_id}/messages", + response_model=ScriptBuilderMessageResponse, +) +@limiter.limit("10/minute") +async def send_message( + request: Request, + session_id: UUID, + data: ScriptBuilderMessageRequest, + db: Annotated[AsyncSession, Depends(get_db)], + current_user: Annotated[User, Depends(get_current_active_user)], +) -> ScriptBuilderMessageResponse: + """Send a message and get AI-generated script response.""" + session = await script_builder_service.get_session(db, session_id, current_user.id) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + + try: + response = await script_builder_service.send_message(db, session, data.content) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + await db.commit() + return response + + +@router.delete("/sessions/{session_id}", status_code=204) +async def delete_session( + session_id: UUID, + db: Annotated[AsyncSession, Depends(get_db)], + current_user: Annotated[User, Depends(get_current_active_user)], +) -> None: + """Delete a builder session.""" + deleted = await script_builder_service.delete_session(db, session_id, current_user.id) + if not deleted: + raise HTTPException(status_code=404, detail="Session not found") + await db.commit() + + +@router.post( + "/sessions/{session_id}/save", + response_model=ScriptTemplateDetail, + status_code=201, +) +async def save_to_library( + session_id: UUID, + data: SaveToLibraryRequest, + db: Annotated[AsyncSession, Depends(get_db)], + current_user: Annotated[User, Depends(get_current_active_user)], +) -> ScriptTemplateDetail: + """Save the latest generated script to the Script Library.""" + session = await script_builder_service.get_session(db, session_id, current_user.id) + if not session: + raise HTTPException(status_code=404, detail="Session not found") + + try: + template = await script_builder_service.save_to_library( + db=db, + session=session, + name=data.name, + description=data.description, + category_id=data.category_id, + share_with_team=data.share_with_team, + user_id=current_user.id, + team_id=current_user.team_id, + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + await db.commit() + await db.refresh(template) + return ScriptTemplateDetail.model_validate(template) diff --git a/backend/app/api/router.py b/backend/app/api/router.py index 884111fc..01465810 100644 --- a/backend/app/api/router.py +++ b/backend/app/api/router.py @@ -28,6 +28,7 @@ from app.api.endpoints import notifications from app.api.endpoints import public_templates from app.api.endpoints import admin_gallery from app.api.endpoints import uploads +from app.api.endpoints import script_builder api_router = APIRouter() @@ -81,3 +82,4 @@ api_router.include_router(notifications.router) api_router.include_router(public_templates.router) api_router.include_router(admin_gallery.router) api_router.include_router(uploads.router) +api_router.include_router(script_builder.router) diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 955543ef..7fdf7fb6 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -104,6 +104,7 @@ class Settings(BaseSettings): "open_chat": "standard", "variable_inference": "fast", "kb_convert": "standard", + "script_build": "standard", } def get_model_for_action(self, action_type: str) -> str: diff --git a/backend/app/services/script_builder_service.py b/backend/app/services/script_builder_service.py new file mode 100644 index 00000000..f07c2653 --- /dev/null +++ b/backend/app/services/script_builder_service.py @@ -0,0 +1,367 @@ +"""AI Script Builder service — generates scripts from natural language descriptions.""" +import logging +import re +from datetime import datetime, timezone +from typing import Optional +from uuid import UUID + +from sqlalchemy import select, func +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.ai_provider import get_ai_provider +from app.core.config import settings +from app.models.script_builder_session import ScriptBuilderSession +from app.schemas.script_builder import ( + ScriptBuilderMessageResponse, + ScriptBuilderSessionDetail, + ScriptBuilderSessionSummary, +) + +logger = logging.getLogger(__name__) + +MAX_MESSAGES_PER_SESSION = 30 + +LANGUAGE_PROMPTS = { + "powershell": """\ +You are an expert PowerShell script writer for MSP (Managed Service Provider) environments. + +## Script Standards +- Use Advanced Functions with CmdletBinding and param() blocks +- Include comment-based help (.SYNOPSIS, .DESCRIPTION, .PARAMETER, .EXAMPLE) +- Use try/catch/finally for error handling +- Use Write-Verbose for diagnostic output, Write-Error for failures +- Support pipeline input where appropriate +- Use approved PowerShell verbs (Get-, Set-, New-, Remove-, etc.) +- Import required modules at the top (e.g., Import-Module ActiveDirectory) +- Use [Parameter(Mandatory=$true)] for required params +- Default to UTF-8 output for exports + +## Security +- Never hardcode credentials — use Get-Credential or SecureString params +- Use -WhatIf and -Confirm support via SupportsShouldProcess +- Validate input with ValidateSet, ValidatePattern, ValidateRange +""", + "bash": """\ +You are an expert Bash script writer for Linux/macOS system administration. + +## Script Standards +- Start with #!/bin/bash +- Use set -euo pipefail for safety +- Parse arguments with getopts or positional parameters +- Include a usage() function for --help +- Use functions for logical grouping +- Quote all variable expansions ("$var" not $var) +- Use [[ ]] for conditionals (not [ ]) +- Add comments explaining non-obvious logic +- Use lowercase_with_underscores for variable names +- Exit with meaningful exit codes (0=success, 1=general error, 2=usage error) + +## Security +- Never store passwords in scripts — use environment variables or prompts +- Validate all user inputs +- Use mktemp for temporary files +""", + "python": """\ +You are an expert Python script writer for IT automation and system administration. + +## Script Standards +- Use Python 3.10+ syntax +- Add type hints to all function signatures +- Use argparse for CLI argument parsing +- Include if __name__ == "__main__": guard +- Use logging module (not print) for diagnostic output +- Use docstrings for functions and modules +- Use pathlib.Path instead of os.path +- Handle exceptions with specific exception types +- Use f-strings for string formatting +- Follow PEP 8 naming conventions + +## Security +- Never hardcode secrets — use environment variables or config files +- Validate and sanitize all user inputs +- Use subprocess.run() with shell=False (never shell=True with user input) +""", +} + +SYSTEM_PROMPT_TEMPLATE = """\ +{language_prompt} + +## Response Format +Respond conversationally. When generating a script: +1. Briefly explain what the script does and any assumptions +2. Include the complete script in a single fenced code block with the language tag +3. Suggest a filename (e.g., `Get-LinkedGPOs.ps1`) + +When the user asks for modifications, generate the COMPLETE updated script (not a diff). + +## Context +The user is an MSP engineer using ResolutionFlow. They need scripts for managing client infrastructure. +Keep scripts practical, production-ready, and well-documented.\ +""" + + +def _extract_script_from_response(content: str, language: str) -> tuple[str | None, str | None]: + """Extract code block and filename from AI response. + + Returns (script, filename) tuple. + """ + # Map language to code fence tags + lang_tags = { + "powershell": ["powershell", "ps1", "pwsh"], + "bash": ["bash", "sh", "shell"], + "python": ["python", "py", "python3"], + } + tags = lang_tags.get(language, [language]) + + # Try each language-specific tag, then fall back to generic fence + script = None + for tag in tags: + pattern = rf"```{tag}\s*\n(.*?)```" + match = re.search(pattern, content, re.DOTALL | re.IGNORECASE) + if match: + script = match.group(1).strip() + break + else: + # Try generic code fence + pattern = r"```\s*\n(.*?)```" + match = re.search(pattern, content, re.DOTALL) + script = match.group(1).strip() if match else None + + # Extract filename suggestion + filename = None + ext_map = {"powershell": ".ps1", "bash": ".sh", "python": ".py"} + ext = ext_map.get(language, ".txt") + filename_pattern = rf"`([A-Za-z0-9_\-]+{re.escape(ext)})`" + fname_match = re.search(filename_pattern, content) + if fname_match: + filename = fname_match.group(1) + + return script, filename + + +async def create_session( + db: AsyncSession, + user_id: UUID, + team_id: UUID | None, + language: str, + initial_prompt: str | None = None, +) -> ScriptBuilderSession: + """Create a new Script Builder session.""" + session = ScriptBuilderSession( + user_id=user_id, + team_id=team_id, + language=language, + messages=[], + message_count=0, + ) + db.add(session) + await db.flush() + + # If initial prompt provided (e.g., from FlowPilot), send first message + if initial_prompt: + await send_message(db, session, initial_prompt) + + return session + + +async def send_message( + db: AsyncSession, + session: ScriptBuilderSession, + user_content: str, +) -> ScriptBuilderMessageResponse: + """Send a user message and get AI response with generated script.""" + if session.message_count >= MAX_MESSAGES_PER_SESSION: + raise ValueError(f"Session has reached the maximum of {MAX_MESSAGES_PER_SESSION} messages.") + + now = datetime.now(timezone.utc) + + # Append user message + user_msg = { + "role": "user", + "content": user_content, + "timestamp": now.isoformat(), + } + messages = list(session.messages or []) + messages.append(user_msg) + + # Build system prompt + language_prompt = LANGUAGE_PROMPTS.get(session.language, LANGUAGE_PROMPTS["powershell"]) + system_prompt = SYSTEM_PROMPT_TEMPLATE.format(language_prompt=language_prompt) + + # Build conversation for AI (just role + content) + ai_messages = [{"role": m["role"], "content": m["content"]} for m in messages] + + # Context window management: keep last 20 messages (10 exchanges) + if len(ai_messages) > 20: + ai_messages = ai_messages[-20:] + + # Call AI + model = settings.get_model_for_action("script_build") + provider = get_ai_provider(model=model) + ai_text, input_tokens, output_tokens = await provider.generate_text( + system_prompt=system_prompt, + messages=ai_messages, + max_tokens=8192, + ) + + # Extract script from response + script, filename = _extract_script_from_response(ai_text, session.language) + line_count = len(script.splitlines()) if script else None + + # Build assistant message + assistant_msg = { + "role": "assistant", + "content": ai_text, + "timestamp": datetime.now(timezone.utc).isoformat(), + "input_tokens": input_tokens, + "output_tokens": output_tokens, + } + if script: + assistant_msg["script"] = script + assistant_msg["script_filename"] = filename + assistant_msg["line_count"] = line_count + + messages.append(assistant_msg) + + # Update session + session.messages = messages + session.message_count = len([m for m in messages if m["role"] == "user"]) + if script: + session.latest_script = script + session.latest_script_filename = filename + if not session.title and len(messages) >= 2: + # Auto-generate title from first user message (truncate) + first_user = user_content[:100] + session.title = first_user if len(user_content) <= 100 else first_user + "..." + session.updated_at = datetime.now(timezone.utc) + + await db.flush() + + return ScriptBuilderMessageResponse( + role="assistant", + content=ai_text, + script=script, + script_filename=filename, + line_count=line_count, + timestamp=datetime.now(timezone.utc), + ) + + +async def get_session( + db: AsyncSession, + session_id: UUID, + user_id: UUID, +) -> ScriptBuilderSession | None: + """Get a session by ID, ensuring the user owns it.""" + result = await db.execute( + select(ScriptBuilderSession).where( + ScriptBuilderSession.id == session_id, + ScriptBuilderSession.user_id == user_id, + ) + ) + return result.scalar_one_or_none() + + +async def list_sessions( + db: AsyncSession, + user_id: UUID, + limit: int = 20, + offset: int = 0, +) -> list[ScriptBuilderSession]: + """List user's builder sessions ordered by updated_at desc.""" + result = await db.execute( + select(ScriptBuilderSession) + .where(ScriptBuilderSession.user_id == user_id) + .order_by(ScriptBuilderSession.updated_at.desc()) + .limit(limit) + .offset(offset) + ) + return list(result.scalars().all()) + + +async def delete_session( + db: AsyncSession, + session_id: UUID, + user_id: UUID, +) -> bool: + """Delete a builder session. Returns True if deleted.""" + session = await get_session(db, session_id, user_id) + if not session: + return False + await db.delete(session) + await db.flush() + return True + + +async def count_user_sessions(db: AsyncSession, user_id: UUID) -> int: + """Count active builder sessions for a user.""" + result = await db.execute( + select(func.count(ScriptBuilderSession.id)).where( + ScriptBuilderSession.user_id == user_id + ) + ) + return result.scalar_one() + + +async def save_to_library( + db: AsyncSession, + session: ScriptBuilderSession, + name: str, + description: str | None, + category_id: UUID | None, + share_with_team: bool, + user_id: UUID, + team_id: UUID | None, +) -> "ScriptTemplate": + """Save the latest generated script to the Script Library as a ScriptTemplate.""" + import uuid as uuid_mod + from app.models.script_template import ScriptTemplate, ScriptCategory + + if not session.latest_script: + raise ValueError("No script has been generated in this session yet") + + # Resolve category: use provided, or find "AI Generated" default + resolved_category_id = category_id + if not resolved_category_id: + result = await db.execute( + select(ScriptCategory.id).where(ScriptCategory.slug == "ai-generated") + ) + default_cat = result.scalar_one_or_none() + if not default_cat: + raise ValueError("Default 'AI Generated' category not found. Run migrations.") + resolved_category_id = default_cat + + # Generate unique slug + base_slug = name.lower().replace(" ", "-").replace("_", "-")[:80] + base_slug = re.sub(r"[^a-z0-9\-]", "", base_slug) + slug = base_slug + # Check uniqueness + existing = await db.execute( + select(ScriptTemplate.id).where(ScriptTemplate.slug == slug) + ) + if existing.scalar_one_or_none(): + slug = f"{base_slug}-{uuid_mod.uuid4().hex[:6]}" + + template = ScriptTemplate( + id=uuid_mod.uuid4(), + category_id=resolved_category_id, + created_by=user_id, + team_id=team_id if share_with_team else None, + name=name, + slug=slug, + description=description, + script_body=session.latest_script, + language=session.language, + parameters_schema={"parameters": []}, + default_values={}, + validation_rules={}, + tags=[session.language, "ai-generated"], + complexity="intermediate", + is_verified=False, + is_active=True, + version=1, + usage_count=0, + ) + db.add(template) + await db.flush() + return template