348 lines
14 KiB
Python
348 lines
14 KiB
Python
"""Integration tests for Script Generator API endpoints."""
|
|
import json
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
import pytest
|
|
import sqlalchemy as sa
|
|
|
|
from app.models.script_template import ScriptGeneration
|
|
from app.models.user import User
|
|
|
|
# ── Fixtures ──────────────────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
async def seed_script_data(test_db, test_user):
|
|
"""Seed script categories and templates into the test database."""
|
|
now = datetime.now(timezone.utc)
|
|
cat_id = uuid.UUID("00000000-0000-0000-0000-000000000001")
|
|
user_result = await test_db.execute(sa.select(User).where(User.email == test_user["email"]))
|
|
user = user_result.scalar_one()
|
|
|
|
# Insert category
|
|
await test_db.execute(
|
|
sa.text("""
|
|
INSERT INTO script_categories (id, name, slug, description, icon, sort_order, is_active, created_at, updated_at)
|
|
VALUES (:id, :name, :slug, :description, :icon, :sort_order, true, :now, :now)
|
|
"""),
|
|
{
|
|
"id": cat_id,
|
|
"name": "Active Directory",
|
|
"slug": "active-directory",
|
|
"description": "User account and group management scripts",
|
|
"icon": "shield-check",
|
|
"sort_order": 1,
|
|
"now": now,
|
|
},
|
|
)
|
|
|
|
# Minimal template data for testing
|
|
templates = [
|
|
{
|
|
"id": uuid.UUID("00000000-0000-0000-0001-000000000001"),
|
|
"slug": "create-ad-user",
|
|
"name": "Create AD User Account",
|
|
"description": "Creates a new Active Directory user account.",
|
|
"script_body": "$SamAccountName = '{{ sam_account_name }}'",
|
|
"parameters_schema": json.dumps({
|
|
"parameters": [
|
|
{"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1},
|
|
]
|
|
}),
|
|
"complexity": "intermediate",
|
|
"estimated_runtime": "< 5 seconds",
|
|
"requires_elevation": True,
|
|
"tags": json.dumps(["active-directory", "user-management"]),
|
|
},
|
|
{
|
|
"id": uuid.UUID("00000000-0000-0000-0001-000000000002"),
|
|
"slug": "disable-ad-user",
|
|
"name": "Disable AD User Account",
|
|
"description": "Disables an Active Directory user account.",
|
|
"script_body": "$SamAccountName = '{{ sam_account_name }}'",
|
|
"parameters_schema": json.dumps({
|
|
"parameters": [
|
|
{"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1},
|
|
]
|
|
}),
|
|
"complexity": "beginner",
|
|
"estimated_runtime": "< 5 seconds",
|
|
"requires_elevation": True,
|
|
"tags": json.dumps(["active-directory", "offboarding"]),
|
|
},
|
|
{
|
|
"id": uuid.UUID("00000000-0000-0000-0001-000000000003"),
|
|
"slug": "reset-ad-password",
|
|
"name": "Reset AD Password",
|
|
"description": "Resets an Active Directory user password.",
|
|
"script_body": "$SamAccountName = '{{ sam_account_name }}'\n$NewPassword = {{ new_password | as_secure_string }}\n$ForceChange = {{ force_change_at_logon | as_bool }}\n$UnlockAccount = {{ unlock_account | as_bool }}",
|
|
"parameters_schema": json.dumps({
|
|
"parameters": [
|
|
{"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1},
|
|
{"key": "new_password", "label": "New Password", "type": "password", "required": True, "order": 2, "sensitive": True},
|
|
{"key": "force_change_at_logon", "label": "Force Change at Next Logon", "type": "boolean", "required": True, "order": 3},
|
|
{"key": "unlock_account", "label": "Unlock Account if Locked", "type": "boolean", "required": True, "order": 4},
|
|
]
|
|
}),
|
|
"complexity": "beginner",
|
|
"estimated_runtime": "< 5 seconds",
|
|
"requires_elevation": True,
|
|
"tags": json.dumps(["active-directory", "password"]),
|
|
},
|
|
{
|
|
"id": uuid.UUID("00000000-0000-0000-0001-000000000004"),
|
|
"slug": "unlock-ad-account",
|
|
"name": "Unlock AD Account",
|
|
"description": "Unlocks a locked-out Active Directory user account.",
|
|
"script_body": "$SamAccountName = '{{ sam_account_name }}'\n$ShowLockoutInfo = {{ show_lockout_info | as_bool }}",
|
|
"parameters_schema": json.dumps({
|
|
"parameters": [
|
|
{"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1},
|
|
{"key": "show_lockout_info", "label": "Show Lockout Source Info", "type": "boolean", "required": False, "order": 2},
|
|
]
|
|
}),
|
|
"complexity": "beginner",
|
|
"estimated_runtime": "< 5 seconds",
|
|
"requires_elevation": True,
|
|
"tags": json.dumps(["active-directory", "lockout"]),
|
|
},
|
|
{
|
|
"id": uuid.UUID("00000000-0000-0000-0001-000000000005"),
|
|
"slug": "delete-ad-user",
|
|
"name": "Delete AD User Account",
|
|
"description": "Permanently deletes an Active Directory user account.",
|
|
"script_body": "$SamAccountName = '{{ sam_account_name }}'\n$ConfirmDeletion = {{ confirm_deletion | as_bool }}",
|
|
"parameters_schema": json.dumps({
|
|
"parameters": [
|
|
{"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1},
|
|
{"key": "confirm_deletion", "label": "Confirm Deletion", "type": "boolean", "required": True, "order": 2},
|
|
]
|
|
}),
|
|
"complexity": "advanced",
|
|
"estimated_runtime": "< 10 seconds",
|
|
"requires_elevation": True,
|
|
"tags": json.dumps(["active-directory", "destructive"]),
|
|
},
|
|
{
|
|
"id": uuid.UUID("00000000-0000-0000-0001-000000000006"),
|
|
"slug": "bulk-user-import",
|
|
"name": "Bulk User Import from CSV",
|
|
"description": "Imports multiple Active Directory user accounts from a CSV file.",
|
|
"script_body": "$CSVPath = '{{ csv_path }}'\n$OUPath = '{{ ou_path }}'",
|
|
"parameters_schema": json.dumps({
|
|
"parameters": [
|
|
{"key": "csv_path", "label": "CSV File Path", "type": "text", "required": True, "order": 1},
|
|
{"key": "ou_path", "label": "Target OU", "type": "text", "required": True, "order": 2},
|
|
]
|
|
}),
|
|
"complexity": "advanced",
|
|
"estimated_runtime": "1-2 minutes",
|
|
"requires_elevation": True,
|
|
"tags": json.dumps(["active-directory", "bulk"]),
|
|
},
|
|
]
|
|
|
|
for tmpl in templates:
|
|
await test_db.execute(
|
|
sa.text("""
|
|
INSERT INTO script_templates (
|
|
id, category_id, account_id, name, slug, description,
|
|
script_body, parameters_schema, default_values, validation_rules,
|
|
tags, complexity, estimated_runtime, requires_elevation,
|
|
requires_modules, version, is_verified, is_active, usage_count,
|
|
created_at, updated_at
|
|
) VALUES (
|
|
:id, :category_id, :account_id, :name, :slug, :description,
|
|
:script_body, CAST(:parameters_schema AS jsonb), '{}'::jsonb, '{}'::jsonb,
|
|
CAST(:tags AS jsonb), :complexity, :estimated_runtime, :requires_elevation,
|
|
'[]'::jsonb, 1, true, true, 0,
|
|
:now, :now
|
|
)
|
|
"""),
|
|
{**tmpl, "category_id": cat_id, "account_id": user.account_id, "now": now},
|
|
)
|
|
|
|
await test_db.commit()
|
|
return cat_id
|
|
|
|
|
|
# ── Categories ────────────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_categories_requires_auth(client):
|
|
response = await client.get("/api/v1/scripts/categories")
|
|
assert response.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_categories_returns_seeded_data(client, auth_headers, seed_script_data):
|
|
response = await client.get("/api/v1/scripts/categories", headers=auth_headers)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
assert any(c["slug"] == "active-directory" for c in data)
|
|
|
|
|
|
# ── Templates ─────────────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_templates_requires_auth(client):
|
|
response = await client.get("/api/v1/scripts/templates")
|
|
assert response.status_code == 401
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_templates_returns_seeded_data(client, auth_headers, seed_script_data):
|
|
response = await client.get("/api/v1/scripts/templates", headers=auth_headers)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data) == 6
|
|
slugs = [t["slug"] for t in data]
|
|
assert "create-ad-user" in slugs
|
|
assert "reset-ad-password" in slugs
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_templates_filter_by_category(client, auth_headers, seed_script_data):
|
|
response = await client.get(
|
|
"/api/v1/scripts/templates?category_slug=active-directory",
|
|
headers=auth_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert len(data) == 6
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_templates_search(client, auth_headers, seed_script_data):
|
|
response = await client.get(
|
|
"/api/v1/scripts/templates?search=password",
|
|
headers=auth_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert any("password" in t["name"].lower() or "password" in t["slug"] for t in data)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_template_detail(client, auth_headers, seed_script_data):
|
|
list_resp = await client.get("/api/v1/scripts/templates", headers=auth_headers)
|
|
templates = list_resp.json()
|
|
template_id = templates[0]["id"]
|
|
|
|
response = await client.get(f"/api/v1/scripts/templates/{template_id}", headers=auth_headers)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "script_body" in data
|
|
assert "parameters_schema" in data
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_template_detail_not_found(client, auth_headers):
|
|
response = await client.get(
|
|
"/api/v1/scripts/templates/00000000-0000-0000-0000-000000000099",
|
|
headers=auth_headers,
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
|
|
# ── Generate ──────────────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_generate_script_success(client, auth_headers, seed_script_data, test_db, test_user):
|
|
list_resp = await client.get(
|
|
"/api/v1/scripts/templates?search=unlock",
|
|
headers=auth_headers,
|
|
)
|
|
unlock_template = list_resp.json()[0]
|
|
|
|
response = await client.post(
|
|
"/api/v1/scripts/generate",
|
|
json={
|
|
"template_id": unlock_template["id"],
|
|
"parameters": {"sam_account_name": "jsmith", "show_lockout_info": False},
|
|
},
|
|
headers=auth_headers,
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "script" in data
|
|
assert "jsmith" in data["script"]
|
|
assert "id" in data
|
|
generation_result = await test_db.execute(
|
|
sa.select(ScriptGeneration).where(ScriptGeneration.id == uuid.UUID(data["id"]))
|
|
)
|
|
generation = generation_result.scalar_one()
|
|
user_result = await test_db.execute(sa.select(User).where(User.email == test_user["email"]))
|
|
user = user_result.scalar_one()
|
|
assert generation.account_id == user.account_id
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_generate_script_missing_required_param(client, auth_headers, seed_script_data):
|
|
list_resp = await client.get(
|
|
"/api/v1/scripts/templates?search=unlock",
|
|
headers=auth_headers,
|
|
)
|
|
unlock_template = list_resp.json()[0]
|
|
|
|
response = await client.post(
|
|
"/api/v1/scripts/generate",
|
|
json={
|
|
"template_id": unlock_template["id"],
|
|
"parameters": {},
|
|
},
|
|
headers=auth_headers,
|
|
)
|
|
assert response.status_code == 422
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_generate_script_password_redacted_in_record(client, auth_headers, seed_script_data):
|
|
list_resp = await client.get(
|
|
"/api/v1/scripts/templates?search=reset-ad-password",
|
|
headers=auth_headers,
|
|
)
|
|
reset_template = list_resp.json()[0]
|
|
|
|
await client.post(
|
|
"/api/v1/scripts/generate",
|
|
json={
|
|
"template_id": reset_template["id"],
|
|
"parameters": {
|
|
"sam_account_name": "jsmith",
|
|
"new_password": "SuperSecret123!",
|
|
"force_change_at_logon": True,
|
|
"unlock_account": True,
|
|
},
|
|
},
|
|
headers=auth_headers,
|
|
)
|
|
|
|
history_resp = await client.get("/api/v1/scripts/generations", headers=auth_headers)
|
|
assert history_resp.status_code == 200
|
|
generations = history_resp.json()
|
|
assert len(generations) > 0
|
|
latest = generations[0]
|
|
assert latest["parameters_used"].get("new_password") == "[REDACTED]"
|
|
|
|
|
|
# ── Team template CRUD ────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_team_template_requires_team_admin(client, auth_headers, seed_script_data):
|
|
list_resp = await client.get("/api/v1/scripts/categories", headers=auth_headers)
|
|
cat_id = list_resp.json()[0]["id"]
|
|
|
|
response = await client.post(
|
|
"/api/v1/scripts/templates",
|
|
json={
|
|
"category_id": cat_id,
|
|
"name": "My Custom Script",
|
|
"script_body": "Write-Host 'hello'",
|
|
"parameters_schema": {},
|
|
},
|
|
headers=auth_headers, # engineers can create templates
|
|
)
|
|
assert response.status_code == 201
|