feat: Script Generator Phase 1+2 — backend, engine, API, frontend, template editor, parameter detector

Complete Script Generator feature including:

Backend:
- ScriptCategory, ScriptTemplate, ScriptGeneration models
- ScriptTemplateEngine with substitution, filters, sanitization
- CRUD + share API endpoints with permission checks
- Integration tests for permissions and sharing
- Migration 057 with AD User Management seed templates

Frontend — Script Library:
- Browse templates with category tabs and search
- Configure pane with parameter form and script generation
- Script preview with live substitution and copy/download
- scriptGeneratorStore Zustand store

Frontend — Template Editor:
- Full CRUD form with metadata, script body (Monaco Editor), parameters
- ParameterSchemaBuilder with visual builder + JSON toggle
- ScriptManagePage with routing and nav link

Frontend — Parameter Detector:
- Client-side PowerShell parameter detection engine
- Detects script-level param() blocks and variable assignments
- Type inference from PS type annotations and value patterns
- ParameterDetectorStepper one-by-one review UI with accept/skip

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit was merged in pull request #105.
This commit is contained in:
chihlasm
2026-03-14 20:18:59 -04:00
committed by GitHub
parent 83b13fcd26
commit d4dbf44781
50 changed files with 11916 additions and 11 deletions

View File

@@ -0,0 +1,413 @@
"""Script Generator API endpoints."""
from typing import Annotated, Optional
from uuid import UUID
import re
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, or_
from app.core.database import get_db
from app.api.deps import get_current_active_user
from app.core.permissions import can_manage_script_template, can_create_content
from app.models.user import User
from app.models.script_template import ScriptCategory, ScriptTemplate, ScriptGeneration
from app.schemas.script_template import (
ScriptCategoryResponse,
ScriptTemplateCreate,
ScriptTemplateUpdate,
ScriptTemplateListItem,
ScriptTemplateDetail,
ScriptGenerateRequest,
ScriptGenerateResponse,
ScriptGenerationRecord,
)
from app.services.script_template_engine import ScriptTemplateEngine, ScriptRenderError
router = APIRouter(prefix="/scripts", tags=["scripts"])
_engine = ScriptTemplateEngine()
# ── Categories ────────────────────────────────────────────────────────────
@router.get("/categories", response_model=list[ScriptCategoryResponse])
async def list_categories(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> list[ScriptCategoryResponse]:
result = await db.execute(
select(ScriptCategory)
.where(ScriptCategory.is_active == True) # noqa: E712
.order_by(ScriptCategory.sort_order)
)
categories = result.scalars().all()
count_result = await db.execute(
select(ScriptTemplate.category_id, func.count(ScriptTemplate.id))
.where(ScriptTemplate.is_active == True) # noqa: E712
.group_by(ScriptTemplate.category_id)
)
counts = dict(count_result.all())
return [
ScriptCategoryResponse(
id=cat.id,
name=cat.name,
slug=cat.slug,
description=cat.description,
icon=cat.icon,
sort_order=cat.sort_order,
template_count=counts.get(cat.id, 0),
)
for cat in categories
]
# ── Templates ─────────────────────────────────────────────────────────────
@router.get("/templates", response_model=list[ScriptTemplateListItem])
async def list_templates(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
category_slug: Optional[str] = Query(None),
search: Optional[str] = Query(None),
tags: Optional[str] = Query(None, description="Comma-separated tags"),
managed: Optional[bool] = Query(None, description="If true, return only templates this user can edit"),
) -> list[ScriptTemplateListItem]:
query = (
select(ScriptTemplate)
.join(ScriptCategory, ScriptTemplate.category_id == ScriptCategory.id)
.where(ScriptTemplate.is_active == True) # noqa: E712
.where(
or_(
ScriptTemplate.team_id == None, # noqa: E711
ScriptTemplate.team_id == current_user.team_id,
)
)
)
if category_slug:
query = query.where(ScriptCategory.slug == category_slug)
if search:
term = f"%{search.lower()}%"
query = query.where(
or_(
func.lower(ScriptTemplate.name).like(term),
func.lower(ScriptTemplate.description).like(term),
func.lower(ScriptTemplate.slug).like(term),
)
)
if managed:
if current_user.is_super_admin:
pass # super admin can edit all
elif current_user.account_role == "owner":
query = query.where(
or_(
ScriptTemplate.created_by == current_user.id,
ScriptTemplate.team_id != None, # noqa: E711
)
)
else:
# engineers see only their own
query = query.where(ScriptTemplate.created_by == current_user.id)
result = await db.execute(query.order_by(ScriptTemplate.name))
templates = result.scalars().all()
if tags:
tag_list = [t.strip().lower() for t in tags.split(",")]
templates = [
t
for t in templates
if any(tag in [tg.lower() for tg in (t.tags or [])] for tag in tag_list)
]
return [ScriptTemplateListItem.model_validate(t) for t in templates]
@router.get("/templates/{template_id}", response_model=ScriptTemplateDetail)
async def get_template(
template_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> ScriptTemplateDetail:
result = await db.execute(
select(ScriptTemplate).where(
ScriptTemplate.id == template_id,
ScriptTemplate.is_active == True, # noqa: E712
or_(
ScriptTemplate.team_id == None, # noqa: E711
ScriptTemplate.team_id == current_user.team_id,
),
)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Template not found"
)
return ScriptTemplateDetail.model_validate(template)
@router.post(
"/templates",
response_model=ScriptTemplateDetail,
status_code=status.HTTP_201_CREATED,
)
async def create_template(
data: ScriptTemplateCreate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> ScriptTemplateDetail:
if not can_create_content(current_user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Engineer access required to create templates",
)
cat_result = await db.execute(
select(ScriptCategory).where(
ScriptCategory.id == data.category_id,
ScriptCategory.is_active == True, # noqa: E712
)
)
if not cat_result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Category not found"
)
slug = re.sub(r"[^a-z0-9]+", "-", data.name.lower()).strip("-")
template = ScriptTemplate(
category_id=data.category_id,
team_id=current_user.team_id,
created_by=current_user.id,
name=data.name,
slug=slug,
description=data.description,
use_case=data.use_case,
script_body=data.script_body,
parameters_schema=data.parameters_schema,
default_values=data.default_values,
validation_rules=data.validation_rules,
tags=data.tags,
complexity=data.complexity,
estimated_runtime=data.estimated_runtime,
requires_elevation=data.requires_elevation,
requires_modules=data.requires_modules,
)
db.add(template)
await db.commit()
await db.refresh(template)
return ScriptTemplateDetail.model_validate(template)
@router.put("/templates/{template_id}", response_model=ScriptTemplateDetail)
async def update_template(
template_id: UUID,
data: ScriptTemplateUpdate,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> ScriptTemplateDetail:
result = await db.execute(
select(ScriptTemplate).where(
ScriptTemplate.id == template_id,
ScriptTemplate.is_active == True, # noqa: E712
)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Template not found",
)
if not can_manage_script_template(current_user, template.created_by, template.team_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to edit this template",
)
update_data = data.model_dump(exclude_unset=True)
if "script_body" in update_data or "parameters_schema" in update_data:
template.version += 1
for field, value in update_data.items():
setattr(template, field, value)
await db.commit()
await db.refresh(template)
return ScriptTemplateDetail.model_validate(template)
@router.delete("/templates/{template_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_template(
template_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> None:
result = await db.execute(
select(ScriptTemplate).where(
ScriptTemplate.id == template_id,
ScriptTemplate.is_active == True, # noqa: E712
)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Template not found",
)
if not can_manage_script_template(current_user, template.created_by, template.team_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to delete this template",
)
template.is_active = False
await db.commit()
@router.patch("/templates/{template_id}/share", response_model=ScriptTemplateDetail)
async def share_template(
template_id: UUID,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
shared: bool = Query(..., description="true to share with team, false to make personal"),
) -> ScriptTemplateDetail:
"""Toggle team sharing for a template. Owner/admin/super_admin only."""
if not (current_user.is_super_admin or current_user.account_role == "owner"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only account owners and admins can share templates",
)
result = await db.execute(
select(ScriptTemplate).where(
ScriptTemplate.id == template_id,
ScriptTemplate.is_active == True, # noqa: E712
)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Template not found",
)
if shared:
template.team_id = current_user.team_id
else:
template.team_id = None
await db.commit()
await db.refresh(template)
return ScriptTemplateDetail.model_validate(template)
# ── Generate ──────────────────────────────────────────────────────────────
@router.post("/generate", response_model=ScriptGenerateResponse)
async def generate_script(
data: ScriptGenerateRequest,
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
) -> ScriptGenerateResponse:
result = await db.execute(
select(ScriptTemplate).where(
ScriptTemplate.id == data.template_id,
ScriptTemplate.is_active == True, # noqa: E712
or_(
ScriptTemplate.team_id == None, # noqa: E711
ScriptTemplate.team_id == current_user.team_id,
),
)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Template not found"
)
try:
rendered_script = _engine.render(template.script_body, data.parameters)
except ScriptRenderError as e:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(e)
)
params_schema = template.parameters_schema or {}
sensitive_keys = {
p["key"]
for p in params_schema.get("parameters", [])
if p.get("sensitive", False)
}
redacted_params = _engine.redact_sensitive(data.parameters, sensitive_keys)
generation = ScriptGeneration(
template_id=template.id,
user_id=current_user.id,
team_id=current_user.team_id,
session_id=data.session_id,
parameters_used=redacted_params,
generated_script=rendered_script,
)
db.add(generation)
template.usage_count += 1
await db.commit()
await db.refresh(generation)
warnings: list[str] = []
if template.requires_elevation:
warnings.append("This script requires 'Run as Administrator'")
return ScriptGenerateResponse(
id=generation.id,
script=rendered_script,
warnings=warnings,
metadata={
"template_name": template.name,
"template_version": template.version,
"requires_elevation": template.requires_elevation,
"requires_modules": template.requires_modules,
"generated_at": generation.created_at.isoformat(),
"estimated_runtime": template.estimated_runtime,
},
)
# ── Generations history ───────────────────────────────────────────────────
@router.get("/generations", response_model=list[ScriptGenerationRecord])
async def list_generations(
db: Annotated[AsyncSession, Depends(get_db)],
current_user: Annotated[User, Depends(get_current_active_user)],
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
) -> list[ScriptGenerationRecord]:
result = await db.execute(
select(ScriptGeneration, ScriptTemplate.name)
.join(ScriptTemplate, ScriptGeneration.template_id == ScriptTemplate.id)
.where(ScriptGeneration.user_id == current_user.id)
.order_by(ScriptGeneration.created_at.desc())
.limit(limit)
.offset(offset)
)
rows = result.all()
return [
ScriptGenerationRecord(
id=gen.id,
template_id=gen.template_id,
template_name=name,
parameters_used=gen.parameters_used,
created_at=gen.created_at,
)
for gen, name in rows
]

View File

@@ -16,6 +16,7 @@ from app.api.endpoints import tree_transfer
from app.api.endpoints import ai_suggestions
from app.api.endpoints import kb_accelerator
from app.api.endpoints import beta_signup
from app.api.endpoints import scripts
api_router = APIRouter()
@@ -56,3 +57,4 @@ api_router.include_router(tree_transfer.router)
api_router.include_router(ai_suggestions.router)
api_router.include_router(kb_accelerator.router)
api_router.include_router(beta_signup.router)
api_router.include_router(scripts.router)

View File

@@ -169,3 +169,19 @@ def can_create_step_category(user: User, account_id: Optional[UUID]) -> bool:
if user.account_role == "owner" and account_id == user.account_id and user.account_id is not None:
return True
return False
def can_manage_script_template(user: User, template_created_by: Optional[UUID], template_account_id: Optional[UUID] = None) -> bool:
"""Can the user edit/delete this script template?
- Super admins can manage any template
- Account owners can manage any template in their account
- Engineers can manage templates they created
"""
if user.is_super_admin:
return True
if user.account_role == "owner" and template_account_id == user.account_id and user.account_id is not None:
return True
if template_created_by == user.id:
return True
return False

View File

@@ -35,6 +35,7 @@ from .assistant_chat import AssistantChat
from .survey_response import SurveyResponse
from .survey_invite import SurveyInvite
from .kb_import import KBImport, KBImportNode
from .script_template import ScriptCategory, ScriptTemplate, ScriptGeneration
__all__ = [
"User",
@@ -82,4 +83,7 @@ __all__ = [
"SurveyInvite",
"KBImport",
"KBImportNode",
"ScriptCategory",
"ScriptTemplate",
"ScriptGeneration",
]

View File

@@ -0,0 +1,107 @@
import uuid
from datetime import datetime, timezone
from typing import Optional, TYPE_CHECKING
from sqlalchemy import String, Text, DateTime, ForeignKey, Boolean, Integer, Enum as SAEnum
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
if TYPE_CHECKING:
from app.models.user import User
from app.models.team import Team
from app.models.session import Session
class ScriptCategory(Base):
__tablename__ = "script_categories"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(100), nullable=False)
slug: Mapped[str] = mapped_column(String(100), nullable=False, unique=True, index=True)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
icon: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
sort_order: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
templates: Mapped[list["ScriptTemplate"]] = relationship("ScriptTemplate", back_populates="category")
class ScriptTemplate(Base):
__tablename__ = "script_templates"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
category_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("script_categories.id", ondelete="RESTRICT"), nullable=False, index=True
)
team_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True), ForeignKey("teams.id", ondelete="CASCADE"), nullable=True, index=True
)
created_by: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
)
name: Mapped[str] = mapped_column(String(200), nullable=False)
slug: Mapped[str] = mapped_column(String(200), nullable=False, index=True)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
use_case: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
script_body: Mapped[str] = mapped_column(Text, nullable=False)
parameters_schema: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)
default_values: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)
validation_rules: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)
tags: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
complexity: Mapped[str] = mapped_column(
SAEnum("beginner", "intermediate", "advanced", name="script_complexity"), nullable=False, default="beginner"
)
estimated_runtime: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
requires_elevation: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
requires_modules: Mapped[list] = mapped_column(JSONB, nullable=False, default=list)
version: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
is_verified: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
usage_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
category: Mapped["ScriptCategory"] = relationship("ScriptCategory", back_populates="templates")
team: Mapped[Optional["Team"]] = relationship("Team")
creator: Mapped[Optional["User"]] = relationship("User", foreign_keys=[created_by])
generations: Mapped[list["ScriptGeneration"]] = relationship("ScriptGeneration", back_populates="template")
class ScriptGeneration(Base):
__tablename__ = "script_generations"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
template_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("script_templates.id", ondelete="RESTRICT"), nullable=False, index=True
)
user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
team_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True), ForeignKey("teams.id", ondelete="SET NULL"), nullable=True, index=True
)
session_id: Mapped[Optional[uuid.UUID]] = mapped_column(
UUID(as_uuid=True), ForeignKey("sessions.id", ondelete="SET NULL"), nullable=True, index=True
)
parameters_used: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)
generated_script: Mapped[str] = mapped_column(Text, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
template: Mapped["ScriptTemplate"] = relationship("ScriptTemplate", back_populates="generations")
user: Mapped["User"] = relationship("User")

View File

@@ -10,6 +10,11 @@ from .ai_builder import (
AIStartResponse, AIScaffoldResponse, AIBranchDetailResponse, AIAssembleResponse,
AIQuotaStatusResponse,
)
from .script_template import (
ScriptCategoryResponse,
ScriptTemplateCreate, ScriptTemplateUpdate, ScriptTemplateListItem, ScriptTemplateDetail,
ScriptGenerateRequest, ScriptGenerateResponse, ScriptGenerationRecord,
)
__all__ = [
# User
@@ -30,4 +35,8 @@ __all__ = [
"AIStartRequest", "AIScaffoldRequest", "AIBranchDetailRequest", "AIAssembleRequest",
"AIStartResponse", "AIScaffoldResponse", "AIBranchDetailResponse", "AIAssembleResponse",
"AIQuotaStatusResponse",
# Script Generator
"ScriptCategoryResponse",
"ScriptTemplateCreate", "ScriptTemplateUpdate", "ScriptTemplateListItem", "ScriptTemplateDetail",
"ScriptGenerateRequest", "ScriptGenerateResponse", "ScriptGenerationRecord",
]

View File

@@ -0,0 +1,138 @@
from datetime import datetime
from typing import Optional, Any
from uuid import UUID
from pydantic import BaseModel, Field
# ── Parameter schema types ──────────────────────────────────────────────────
class ScriptParameterValidation(BaseModel):
pattern: Optional[str] = None
min_length: Optional[int] = None
max_length: Optional[int] = None
min_value: Optional[float] = None
max_value: Optional[float] = None
class ScriptParameter(BaseModel):
key: str
label: str
type: str # text | password | select | boolean | multi_text | number | textarea
required: bool = True
placeholder: Optional[str] = None
group: Optional[str] = None
order: int = 0
help_text: Optional[str] = None
options: Optional[list[dict]] = None # for select type: [{value, label}]
default: Optional[Any] = None
validation: Optional[ScriptParameterValidation] = None
sensitive: bool = False # password fields → redacted in generation record
class ScriptParametersSchema(BaseModel):
parameters: list[ScriptParameter]
# ── Category ────────────────────────────────────────────────────────────────
class ScriptCategoryResponse(BaseModel):
id: UUID
name: str
slug: str
description: Optional[str] = None
icon: Optional[str] = None
sort_order: int
template_count: int = 0
class Config:
from_attributes = True
# ── Template ────────────────────────────────────────────────────────────────
class ScriptTemplateCreate(BaseModel):
category_id: UUID
name: str = Field(..., min_length=1, max_length=200)
description: Optional[str] = None
use_case: Optional[str] = None
script_body: str = Field(..., min_length=1)
parameters_schema: dict = Field(default_factory=dict)
default_values: dict = Field(default_factory=dict)
validation_rules: dict = Field(default_factory=dict)
tags: list[str] = Field(default_factory=list)
complexity: str = Field(default="beginner", pattern="^(beginner|intermediate|advanced)$")
estimated_runtime: Optional[str] = None
requires_elevation: bool = False
requires_modules: list[str] = Field(default_factory=list)
class ScriptTemplateUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=200)
description: Optional[str] = None
use_case: Optional[str] = None
script_body: Optional[str] = None
parameters_schema: Optional[dict] = None
default_values: Optional[dict] = None
validation_rules: Optional[dict] = None
tags: Optional[list[str]] = None
complexity: Optional[str] = Field(None, pattern="^(beginner|intermediate|advanced)$")
estimated_runtime: Optional[str] = None
requires_elevation: Optional[bool] = None
requires_modules: Optional[list[str]] = None
is_active: Optional[bool] = None
class ScriptTemplateListItem(BaseModel):
id: UUID
category_id: UUID
team_id: Optional[UUID] = None
created_by: Optional[UUID] = None
name: str
slug: str
description: Optional[str] = None
tags: list[str]
complexity: str
estimated_runtime: Optional[str] = None
requires_elevation: bool
requires_modules: list[str]
is_verified: bool
usage_count: int
class Config:
from_attributes = True
class ScriptTemplateDetail(ScriptTemplateListItem):
use_case: Optional[str] = None
script_body: str
parameters_schema: dict
default_values: dict
validation_rules: dict
version: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# ── Generation ───────────────────────────────────────────────────────────────
class ScriptGenerateRequest(BaseModel):
template_id: UUID
parameters: dict[str, Any]
session_id: Optional[UUID] = None
class ScriptGenerateResponse(BaseModel):
id: UUID
script: str
warnings: list[str] = Field(default_factory=list)
metadata: dict
class Config:
from_attributes = True
class ScriptGenerationRecord(BaseModel):
id: UUID
template_id: UUID
template_name: str
parameters_used: dict # passwords already redacted
created_at: datetime
class Config:
from_attributes = True

View File

@@ -0,0 +1,139 @@
"""
ScriptTemplateEngine — renders parameterized PowerShell templates.
Template syntax:
{{ param_name }} — simple substitution (string-escaped)
{{ param | filter }} — substitution with filter applied
{% if param %} ... {% endif %} — conditional block
Security: all string values are PowerShell-escaped before insertion.
Sensitive parameter values are never stored in generation records — use redact_sensitive().
"""
import re
from typing import Any
class ScriptRenderError(Exception):
"""Raised when a required parameter is missing or rendering fails."""
class ScriptTemplateEngine:
# ── Public API ────────────────────────────────────────────────────────
def render(self, body: str, parameters: dict[str, Any]) -> str:
"""Render a template body with the given parameters. Raises ScriptRenderError on missing required params."""
result = self._process_conditionals(body, parameters)
result = self._substitute_params(result, parameters)
return result
def redact_sensitive(self, parameters: dict[str, Any], sensitive_keys: set[str]) -> dict[str, Any]:
"""Return a copy of parameters with sensitive values replaced by [REDACTED]."""
return {
k: "[REDACTED]" if k in sensitive_keys else v
for k, v in parameters.items()
}
# ── Conditional processing ────────────────────────────────────────────
def _process_conditionals(self, body: str, parameters: dict[str, Any]) -> str:
"""Process {% if param %} ... {% endif %} blocks."""
pattern = re.compile(
r'\{%\s*if\s+(\w+)\s*%\}(.*?)\{%\s*endif\s*%\}',
re.DOTALL
)
def replace_block(match: re.Match) -> str:
key = match.group(1)
content = match.group(2)
value = parameters.get(key)
# Truthy: non-empty string, non-empty list, True, non-zero number
if value is None or value == "" or value == [] or value is False:
return ""
return content
return pattern.sub(replace_block, body)
# ── Parameter substitution ────────────────────────────────────────────
# Matches {{ param }} or {{ param | filter }}
_PLACEHOLDER = re.compile(r'\{\{\s*(\w+)(?:\s*\|\s*(\w+))?\s*\}\}')
def _substitute_params(self, body: str, parameters: dict[str, Any]) -> str:
missing: list[str] = []
def replace(match: re.Match) -> str:
key = match.group(1)
filter_name = match.group(2)
if key not in parameters:
missing.append(key)
return match.group(0) # leave as-is, report at end
value = parameters[key]
if filter_name:
return self._apply_filter(filter_name, value)
return self._escape_string(value)
result = self._PLACEHOLDER.sub(replace, body)
if missing:
raise ScriptRenderError(
f"Missing required template parameter(s): {', '.join(missing)}"
)
return result
# ── Filters ──────────────────────────────────────────────────────────
def _apply_filter(self, filter_name: str, value: Any) -> str:
filters = {
"as_secure_string": self._filter_as_secure_string,
"as_array": self._filter_as_array,
"as_bool": self._filter_as_bool,
"escape_single": self._filter_escape_single,
}
if filter_name not in filters:
raise ScriptRenderError(f"Unknown template filter: {filter_name}")
return filters[filter_name](value)
def _filter_as_secure_string(self, value: Any) -> str:
escaped = str(value).replace("'", "''")
return f"(ConvertTo-SecureString '{escaped}' -AsPlainText -Force)"
def _filter_as_array(self, value: Any) -> str:
if not isinstance(value, list):
value = [value]
items = ",".join(f"'{self._escape_single_quote(str(v))}'" for v in value)
return items
def _filter_as_bool(self, value: Any) -> str:
if isinstance(value, bool):
return "$true" if value else "$false"
return "$true" if str(value).lower() in ("true", "1", "yes") else "$false"
def _filter_escape_single(self, value: Any) -> str:
return self._escape_single_quote(str(value))
# ── Escaping helpers ──────────────────────────────────────────────────
def _escape_string(self, value: Any) -> str:
"""Escape a value for safe insertion into a PowerShell single-quoted context."""
if isinstance(value, bool):
return "$true" if value else "$false"
if isinstance(value, (int, float)):
return str(value)
if isinstance(value, list):
# Lists rendered without filter — join with space (caller should use | as_array for proper PS arrays)
return " ".join(str(v) for v in value)
s = str(value)
# Escape backtick (PS escape char) first, then dollar (variable interpolation)
s = s.replace("`", "``")
s = s.replace("$", "`$")
# Escape single quotes (doubling for PS single-quoted strings)
s = self._escape_single_quote(s)
return s
def _escape_single_quote(self, s: str) -> str:
return s.replace("'", "''")