Files
resolutionflow/backend/tests/test_scripts.py

348 lines
14 KiB
Python

"""Integration tests for Script Generator API endpoints."""
import json
import uuid
from datetime import datetime, timezone
import pytest
import sqlalchemy as sa
from app.models.script_template import ScriptGeneration
from app.models.user import User
# ── Fixtures ──────────────────────────────────────────────────────────────
@pytest.fixture
async def seed_script_data(test_db, test_user):
"""Seed script categories and templates into the test database."""
now = datetime.now(timezone.utc)
cat_id = uuid.UUID("00000000-0000-0000-0000-000000000001")
user_result = await test_db.execute(sa.select(User).where(User.email == test_user["email"]))
user = user_result.scalar_one()
# Insert category
await test_db.execute(
sa.text("""
INSERT INTO script_categories (id, name, slug, description, icon, sort_order, is_active, created_at, updated_at)
VALUES (:id, :name, :slug, :description, :icon, :sort_order, true, :now, :now)
"""),
{
"id": cat_id,
"name": "Active Directory",
"slug": "active-directory",
"description": "User account and group management scripts",
"icon": "shield-check",
"sort_order": 1,
"now": now,
},
)
# Minimal template data for testing
templates = [
{
"id": uuid.UUID("00000000-0000-0000-0001-000000000001"),
"slug": "create-ad-user",
"name": "Create AD User Account",
"description": "Creates a new Active Directory user account.",
"script_body": "$SamAccountName = '{{ sam_account_name }}'",
"parameters_schema": json.dumps({
"parameters": [
{"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1},
]
}),
"complexity": "intermediate",
"estimated_runtime": "< 5 seconds",
"requires_elevation": True,
"tags": json.dumps(["active-directory", "user-management"]),
},
{
"id": uuid.UUID("00000000-0000-0000-0001-000000000002"),
"slug": "disable-ad-user",
"name": "Disable AD User Account",
"description": "Disables an Active Directory user account.",
"script_body": "$SamAccountName = '{{ sam_account_name }}'",
"parameters_schema": json.dumps({
"parameters": [
{"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1},
]
}),
"complexity": "beginner",
"estimated_runtime": "< 5 seconds",
"requires_elevation": True,
"tags": json.dumps(["active-directory", "offboarding"]),
},
{
"id": uuid.UUID("00000000-0000-0000-0001-000000000003"),
"slug": "reset-ad-password",
"name": "Reset AD Password",
"description": "Resets an Active Directory user password.",
"script_body": "$SamAccountName = '{{ sam_account_name }}'\n$NewPassword = {{ new_password | as_secure_string }}\n$ForceChange = {{ force_change_at_logon | as_bool }}\n$UnlockAccount = {{ unlock_account | as_bool }}",
"parameters_schema": json.dumps({
"parameters": [
{"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1},
{"key": "new_password", "label": "New Password", "type": "password", "required": True, "order": 2, "sensitive": True},
{"key": "force_change_at_logon", "label": "Force Change at Next Logon", "type": "boolean", "required": True, "order": 3},
{"key": "unlock_account", "label": "Unlock Account if Locked", "type": "boolean", "required": True, "order": 4},
]
}),
"complexity": "beginner",
"estimated_runtime": "< 5 seconds",
"requires_elevation": True,
"tags": json.dumps(["active-directory", "password"]),
},
{
"id": uuid.UUID("00000000-0000-0000-0001-000000000004"),
"slug": "unlock-ad-account",
"name": "Unlock AD Account",
"description": "Unlocks a locked-out Active Directory user account.",
"script_body": "$SamAccountName = '{{ sam_account_name }}'\n$ShowLockoutInfo = {{ show_lockout_info | as_bool }}",
"parameters_schema": json.dumps({
"parameters": [
{"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1},
{"key": "show_lockout_info", "label": "Show Lockout Source Info", "type": "boolean", "required": False, "order": 2},
]
}),
"complexity": "beginner",
"estimated_runtime": "< 5 seconds",
"requires_elevation": True,
"tags": json.dumps(["active-directory", "lockout"]),
},
{
"id": uuid.UUID("00000000-0000-0000-0001-000000000005"),
"slug": "delete-ad-user",
"name": "Delete AD User Account",
"description": "Permanently deletes an Active Directory user account.",
"script_body": "$SamAccountName = '{{ sam_account_name }}'\n$ConfirmDeletion = {{ confirm_deletion | as_bool }}",
"parameters_schema": json.dumps({
"parameters": [
{"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1},
{"key": "confirm_deletion", "label": "Confirm Deletion", "type": "boolean", "required": True, "order": 2},
]
}),
"complexity": "advanced",
"estimated_runtime": "< 10 seconds",
"requires_elevation": True,
"tags": json.dumps(["active-directory", "destructive"]),
},
{
"id": uuid.UUID("00000000-0000-0000-0001-000000000006"),
"slug": "bulk-user-import",
"name": "Bulk User Import from CSV",
"description": "Imports multiple Active Directory user accounts from a CSV file.",
"script_body": "$CSVPath = '{{ csv_path }}'\n$OUPath = '{{ ou_path }}'",
"parameters_schema": json.dumps({
"parameters": [
{"key": "csv_path", "label": "CSV File Path", "type": "text", "required": True, "order": 1},
{"key": "ou_path", "label": "Target OU", "type": "text", "required": True, "order": 2},
]
}),
"complexity": "advanced",
"estimated_runtime": "1-2 minutes",
"requires_elevation": True,
"tags": json.dumps(["active-directory", "bulk"]),
},
]
for tmpl in templates:
await test_db.execute(
sa.text("""
INSERT INTO script_templates (
id, category_id, account_id, name, slug, description,
script_body, parameters_schema, default_values, validation_rules,
tags, complexity, estimated_runtime, requires_elevation,
requires_modules, version, is_verified, is_active, usage_count,
created_at, updated_at
) VALUES (
:id, :category_id, :account_id, :name, :slug, :description,
:script_body, CAST(:parameters_schema AS jsonb), '{}'::jsonb, '{}'::jsonb,
CAST(:tags AS jsonb), :complexity, :estimated_runtime, :requires_elevation,
'[]'::jsonb, 1, true, true, 0,
:now, :now
)
"""),
{**tmpl, "category_id": cat_id, "account_id": user.account_id, "now": now},
)
await test_db.commit()
return cat_id
# ── Categories ────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_list_categories_requires_auth(client):
response = await client.get("/api/v1/scripts/categories")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_list_categories_returns_seeded_data(client, auth_headers, seed_script_data):
response = await client.get("/api/v1/scripts/categories", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert any(c["slug"] == "active-directory" for c in data)
# ── Templates ─────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_list_templates_requires_auth(client):
response = await client.get("/api/v1/scripts/templates")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_list_templates_returns_seeded_data(client, auth_headers, seed_script_data):
response = await client.get("/api/v1/scripts/templates", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert len(data) == 6
slugs = [t["slug"] for t in data]
assert "create-ad-user" in slugs
assert "reset-ad-password" in slugs
@pytest.mark.asyncio
async def test_list_templates_filter_by_category(client, auth_headers, seed_script_data):
response = await client.get(
"/api/v1/scripts/templates?category_slug=active-directory",
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert len(data) == 6
@pytest.mark.asyncio
async def test_list_templates_search(client, auth_headers, seed_script_data):
response = await client.get(
"/api/v1/scripts/templates?search=password",
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert any("password" in t["name"].lower() or "password" in t["slug"] for t in data)
@pytest.mark.asyncio
async def test_get_template_detail(client, auth_headers, seed_script_data):
list_resp = await client.get("/api/v1/scripts/templates", headers=auth_headers)
templates = list_resp.json()
template_id = templates[0]["id"]
response = await client.get(f"/api/v1/scripts/templates/{template_id}", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "script_body" in data
assert "parameters_schema" in data
@pytest.mark.asyncio
async def test_get_template_detail_not_found(client, auth_headers):
response = await client.get(
"/api/v1/scripts/templates/00000000-0000-0000-0000-000000000099",
headers=auth_headers,
)
assert response.status_code == 404
# ── Generate ──────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_generate_script_success(client, auth_headers, seed_script_data, test_db, test_user):
list_resp = await client.get(
"/api/v1/scripts/templates?search=unlock",
headers=auth_headers,
)
unlock_template = list_resp.json()[0]
response = await client.post(
"/api/v1/scripts/generate",
json={
"template_id": unlock_template["id"],
"parameters": {"sam_account_name": "jsmith", "show_lockout_info": False},
},
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert "script" in data
assert "jsmith" in data["script"]
assert "id" in data
generation_result = await test_db.execute(
sa.select(ScriptGeneration).where(ScriptGeneration.id == uuid.UUID(data["id"]))
)
generation = generation_result.scalar_one()
user_result = await test_db.execute(sa.select(User).where(User.email == test_user["email"]))
user = user_result.scalar_one()
assert generation.account_id == user.account_id
@pytest.mark.asyncio
async def test_generate_script_missing_required_param(client, auth_headers, seed_script_data):
list_resp = await client.get(
"/api/v1/scripts/templates?search=unlock",
headers=auth_headers,
)
unlock_template = list_resp.json()[0]
response = await client.post(
"/api/v1/scripts/generate",
json={
"template_id": unlock_template["id"],
"parameters": {},
},
headers=auth_headers,
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_generate_script_password_redacted_in_record(client, auth_headers, seed_script_data):
list_resp = await client.get(
"/api/v1/scripts/templates?search=reset-ad-password",
headers=auth_headers,
)
reset_template = list_resp.json()[0]
await client.post(
"/api/v1/scripts/generate",
json={
"template_id": reset_template["id"],
"parameters": {
"sam_account_name": "jsmith",
"new_password": "SuperSecret123!",
"force_change_at_logon": True,
"unlock_account": True,
},
},
headers=auth_headers,
)
history_resp = await client.get("/api/v1/scripts/generations", headers=auth_headers)
assert history_resp.status_code == 200
generations = history_resp.json()
assert len(generations) > 0
latest = generations[0]
assert latest["parameters_used"].get("new_password") == "[REDACTED]"
# ── Team template CRUD ────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_create_team_template_requires_team_admin(client, auth_headers, seed_script_data):
list_resp = await client.get("/api/v1/scripts/categories", headers=auth_headers)
cat_id = list_resp.json()[0]["id"]
response = await client.post(
"/api/v1/scripts/templates",
json={
"category_id": cat_id,
"name": "My Custom Script",
"script_body": "Write-Host 'hello'",
"parameters_schema": {},
},
headers=auth_headers, # engineers can create templates
)
assert response.status_code == 201