feat(scripts): add ScriptTemplateEngine with substitution, filters, and sanitization
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
139
backend/app/services/script_template_engine.py
Normal file
139
backend/app/services/script_template_engine.py
Normal file
@@ -0,0 +1,139 @@
|
|||||||
|
"""
|
||||||
|
ScriptTemplateEngine — renders parameterized PowerShell templates.
|
||||||
|
|
||||||
|
Template syntax:
|
||||||
|
{{ param_name }} — simple substitution (string-escaped)
|
||||||
|
{{ param | filter }} — substitution with filter applied
|
||||||
|
{% if param %} ... {% endif %} — conditional block
|
||||||
|
|
||||||
|
Security: all string values are PowerShell-escaped before insertion.
|
||||||
|
Sensitive parameter values are never stored in generation records — use redact_sensitive().
|
||||||
|
"""
|
||||||
|
import re
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
class ScriptRenderError(Exception):
|
||||||
|
"""Raised when a required parameter is missing or rendering fails."""
|
||||||
|
|
||||||
|
|
||||||
|
class ScriptTemplateEngine:
|
||||||
|
|
||||||
|
# ── Public API ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def render(self, body: str, parameters: dict[str, Any]) -> str:
|
||||||
|
"""Render a template body with the given parameters. Raises ScriptRenderError on missing required params."""
|
||||||
|
result = self._process_conditionals(body, parameters)
|
||||||
|
result = self._substitute_params(result, parameters)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def redact_sensitive(self, parameters: dict[str, Any], sensitive_keys: set[str]) -> dict[str, Any]:
|
||||||
|
"""Return a copy of parameters with sensitive values replaced by [REDACTED]."""
|
||||||
|
return {
|
||||||
|
k: "[REDACTED]" if k in sensitive_keys else v
|
||||||
|
for k, v in parameters.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
# ── Conditional processing ────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _process_conditionals(self, body: str, parameters: dict[str, Any]) -> str:
|
||||||
|
"""Process {% if param %} ... {% endif %} blocks."""
|
||||||
|
pattern = re.compile(
|
||||||
|
r'\{%\s*if\s+(\w+)\s*%\}(.*?)\{%\s*endif\s*%\}',
|
||||||
|
re.DOTALL
|
||||||
|
)
|
||||||
|
|
||||||
|
def replace_block(match: re.Match) -> str:
|
||||||
|
key = match.group(1)
|
||||||
|
content = match.group(2)
|
||||||
|
value = parameters.get(key)
|
||||||
|
# Truthy: non-empty string, non-empty list, True, non-zero number
|
||||||
|
if value is None or value == "" or value == [] or value is False:
|
||||||
|
return ""
|
||||||
|
return content
|
||||||
|
|
||||||
|
return pattern.sub(replace_block, body)
|
||||||
|
|
||||||
|
# ── Parameter substitution ────────────────────────────────────────────
|
||||||
|
|
||||||
|
# Matches {{ param }} or {{ param | filter }}
|
||||||
|
_PLACEHOLDER = re.compile(r'\{\{\s*(\w+)(?:\s*\|\s*(\w+))?\s*\}\}')
|
||||||
|
|
||||||
|
def _substitute_params(self, body: str, parameters: dict[str, Any]) -> str:
|
||||||
|
missing: list[str] = []
|
||||||
|
|
||||||
|
def replace(match: re.Match) -> str:
|
||||||
|
key = match.group(1)
|
||||||
|
filter_name = match.group(2)
|
||||||
|
|
||||||
|
if key not in parameters:
|
||||||
|
missing.append(key)
|
||||||
|
return match.group(0) # leave as-is, report at end
|
||||||
|
|
||||||
|
value = parameters[key]
|
||||||
|
|
||||||
|
if filter_name:
|
||||||
|
return self._apply_filter(filter_name, value)
|
||||||
|
return self._escape_string(value)
|
||||||
|
|
||||||
|
result = self._PLACEHOLDER.sub(replace, body)
|
||||||
|
|
||||||
|
if missing:
|
||||||
|
raise ScriptRenderError(
|
||||||
|
f"Missing required template parameter(s): {', '.join(missing)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
# ── Filters ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _apply_filter(self, filter_name: str, value: Any) -> str:
|
||||||
|
filters = {
|
||||||
|
"as_secure_string": self._filter_as_secure_string,
|
||||||
|
"as_array": self._filter_as_array,
|
||||||
|
"as_bool": self._filter_as_bool,
|
||||||
|
"escape_single": self._filter_escape_single,
|
||||||
|
}
|
||||||
|
if filter_name not in filters:
|
||||||
|
raise ScriptRenderError(f"Unknown template filter: {filter_name}")
|
||||||
|
return filters[filter_name](value)
|
||||||
|
|
||||||
|
def _filter_as_secure_string(self, value: Any) -> str:
|
||||||
|
escaped = str(value).replace("'", "''")
|
||||||
|
return f"(ConvertTo-SecureString '{escaped}' -AsPlainText -Force)"
|
||||||
|
|
||||||
|
def _filter_as_array(self, value: Any) -> str:
|
||||||
|
if not isinstance(value, list):
|
||||||
|
value = [value]
|
||||||
|
items = ",".join(f"'{self._escape_single_quote(str(v))}'" for v in value)
|
||||||
|
return items
|
||||||
|
|
||||||
|
def _filter_as_bool(self, value: Any) -> str:
|
||||||
|
if isinstance(value, bool):
|
||||||
|
return "$true" if value else "$false"
|
||||||
|
return "$true" if str(value).lower() in ("true", "1", "yes") else "$false"
|
||||||
|
|
||||||
|
def _filter_escape_single(self, value: Any) -> str:
|
||||||
|
return self._escape_single_quote(str(value))
|
||||||
|
|
||||||
|
# ── Escaping helpers ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _escape_string(self, value: Any) -> str:
|
||||||
|
"""Escape a value for safe insertion into a PowerShell single-quoted context."""
|
||||||
|
if isinstance(value, bool):
|
||||||
|
return "$true" if value else "$false"
|
||||||
|
if isinstance(value, (int, float)):
|
||||||
|
return str(value)
|
||||||
|
if isinstance(value, list):
|
||||||
|
# Lists rendered without filter — join with space (caller should use | as_array for proper PS arrays)
|
||||||
|
return " ".join(str(v) for v in value)
|
||||||
|
s = str(value)
|
||||||
|
# Escape backtick (PS escape char) first, then dollar (variable interpolation)
|
||||||
|
s = s.replace("`", "``")
|
||||||
|
s = s.replace("$", "`$")
|
||||||
|
# Escape single quotes (doubling for PS single-quoted strings)
|
||||||
|
s = self._escape_single_quote(s)
|
||||||
|
return s
|
||||||
|
|
||||||
|
def _escape_single_quote(self, s: str) -> str:
|
||||||
|
return s.replace("'", "''")
|
||||||
114
backend/tests/test_script_template_engine.py
Normal file
114
backend/tests/test_script_template_engine.py
Normal file
@@ -0,0 +1,114 @@
|
|||||||
|
"""Tests for ScriptTemplateEngine — parameter substitution, sanitization, and filters."""
|
||||||
|
import pytest
|
||||||
|
from app.services.script_template_engine import ScriptTemplateEngine, ScriptRenderError
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def engine():
|
||||||
|
return ScriptTemplateEngine()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Basic substitution ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_simple_substitution(engine):
|
||||||
|
body = "New-ADUser -Name '{{ first_name }} {{ last_name }}'"
|
||||||
|
result = engine.render(body, {"first_name": "John", "last_name": "Smith"})
|
||||||
|
assert result == "New-ADUser -Name 'John Smith'"
|
||||||
|
|
||||||
|
|
||||||
|
def test_missing_required_param_raises(engine):
|
||||||
|
body = "New-ADUser -Name '{{ first_name }}'"
|
||||||
|
with pytest.raises(ScriptRenderError, match="first_name"):
|
||||||
|
engine.render(body, {})
|
||||||
|
|
||||||
|
|
||||||
|
def test_extra_params_ignored(engine):
|
||||||
|
body = "New-ADUser -Name '{{ first_name }}'"
|
||||||
|
result = engine.render(body, {"first_name": "John", "extra": "ignored"})
|
||||||
|
assert result == "New-ADUser -Name 'John'"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Security: single-quote injection ─────────────────────────────────────
|
||||||
|
|
||||||
|
def test_single_quote_in_value_is_escaped(engine):
|
||||||
|
body = "Set-ADUser -Name '{{ name }}'"
|
||||||
|
result = engine.render(body, {"name": "O'Brien"})
|
||||||
|
# Single quotes doubled for PowerShell safety
|
||||||
|
assert "O''Brien" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_backtick_in_value_is_escaped(engine):
|
||||||
|
body = "Write-Host '{{ msg }}'"
|
||||||
|
result = engine.render(body, {"msg": "hello`world"})
|
||||||
|
assert "`" not in result or "``" in result # backtick is escaped
|
||||||
|
|
||||||
|
|
||||||
|
def test_dollar_sign_in_value_is_escaped(engine):
|
||||||
|
body = "Write-Host '{{ msg }}'"
|
||||||
|
result = engine.render(body, {"msg": "price is $100"})
|
||||||
|
# Dollar sign escaped so it doesn't interpolate as a PowerShell variable
|
||||||
|
assert "`$100" in result or "'price is $100'" in result
|
||||||
|
|
||||||
|
|
||||||
|
# ── Filters ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_as_secure_string_filter(engine):
|
||||||
|
body = "$secPwd = {{ password | as_secure_string }}"
|
||||||
|
result = engine.render(body, {"password": "MyP@ss123"})
|
||||||
|
assert "ConvertTo-SecureString" in result
|
||||||
|
assert "MyP@ss123" in result
|
||||||
|
assert "-AsPlainText -Force" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_as_array_filter(engine):
|
||||||
|
body = "$groups = @({{ groups | as_array }})"
|
||||||
|
result = engine.render(body, {"groups": ["GroupA", "GroupB"]})
|
||||||
|
assert "'GroupA','GroupB'" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_as_array_filter_single_item(engine):
|
||||||
|
body = "$groups = @({{ groups | as_array }})"
|
||||||
|
result = engine.render(body, {"groups": ["OnlyGroup"]})
|
||||||
|
assert "'OnlyGroup'" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_as_bool_filter_true(engine):
|
||||||
|
body = "$force = {{ force_change | as_bool }}"
|
||||||
|
result = engine.render(body, {"force_change": True})
|
||||||
|
assert "$true" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_as_bool_filter_false(engine):
|
||||||
|
body = "$force = {{ force_change | as_bool }}"
|
||||||
|
result = engine.render(body, {"force_change": False})
|
||||||
|
assert "$false" in result
|
||||||
|
|
||||||
|
|
||||||
|
# ── Conditional blocks ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_if_block_included_when_truthy(engine):
|
||||||
|
body = "{% if groups %}\nAdd-Groups\n{% endif %}"
|
||||||
|
result = engine.render(body, {"groups": ["GroupA"]})
|
||||||
|
assert "Add-Groups" in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_if_block_excluded_when_falsy(engine):
|
||||||
|
body = "{% if groups %}\nAdd-Groups\n{% endif %}"
|
||||||
|
result = engine.render(body, {"groups": []})
|
||||||
|
assert "Add-Groups" not in result
|
||||||
|
|
||||||
|
|
||||||
|
def test_if_block_excluded_when_missing(engine):
|
||||||
|
body = "{% if groups %}\nAdd-Groups\n{% endif %}"
|
||||||
|
result = engine.render(body, {})
|
||||||
|
assert "Add-Groups" not in result
|
||||||
|
|
||||||
|
|
||||||
|
# ── Parameter redaction ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_sensitive_params_redacted_in_record(engine):
|
||||||
|
params = {"first_name": "John", "password": "Secret123"}
|
||||||
|
sensitive_keys = {"password"}
|
||||||
|
redacted = engine.redact_sensitive(params, sensitive_keys)
|
||||||
|
assert redacted["first_name"] == "John"
|
||||||
|
assert redacted["password"] == "[REDACTED]"
|
||||||
Reference in New Issue
Block a user