"""Integration tests for Script Generator API endpoints.""" import json import uuid from datetime import datetime, timezone import pytest import sqlalchemy as sa from app.models.script_template import ScriptGeneration from app.models.user import User # ── Fixtures ────────────────────────────────────────────────────────────── @pytest.fixture async def seed_script_data(test_db, test_user): """Seed script categories and templates into the test database.""" now = datetime.now(timezone.utc) cat_id = uuid.UUID("00000000-0000-0000-0000-000000000001") user_result = await test_db.execute(sa.select(User).where(User.email == test_user["email"])) user = user_result.scalar_one() # Insert category await test_db.execute( sa.text(""" INSERT INTO script_categories (id, name, slug, description, icon, sort_order, is_active, created_at, updated_at) VALUES (:id, :name, :slug, :description, :icon, :sort_order, true, :now, :now) """), { "id": cat_id, "name": "Active Directory", "slug": "active-directory", "description": "User account and group management scripts", "icon": "shield-check", "sort_order": 1, "now": now, }, ) # Minimal template data for testing templates = [ { "id": uuid.UUID("00000000-0000-0000-0001-000000000001"), "slug": "create-ad-user", "name": "Create AD User Account", "description": "Creates a new Active Directory user account.", "script_body": "$SamAccountName = '{{ sam_account_name }}'", "parameters_schema": json.dumps({ "parameters": [ {"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1}, ] }), "complexity": "intermediate", "estimated_runtime": "< 5 seconds", "requires_elevation": True, "tags": json.dumps(["active-directory", "user-management"]), }, { "id": uuid.UUID("00000000-0000-0000-0001-000000000002"), "slug": "disable-ad-user", "name": "Disable AD User Account", "description": "Disables an Active Directory user account.", "script_body": "$SamAccountName = '{{ sam_account_name }}'", "parameters_schema": json.dumps({ "parameters": [ {"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1}, ] }), "complexity": "beginner", "estimated_runtime": "< 5 seconds", "requires_elevation": True, "tags": json.dumps(["active-directory", "offboarding"]), }, { "id": uuid.UUID("00000000-0000-0000-0001-000000000003"), "slug": "reset-ad-password", "name": "Reset AD Password", "description": "Resets an Active Directory user password.", "script_body": "$SamAccountName = '{{ sam_account_name }}'\n$NewPassword = {{ new_password | as_secure_string }}\n$ForceChange = {{ force_change_at_logon | as_bool }}\n$UnlockAccount = {{ unlock_account | as_bool }}", "parameters_schema": json.dumps({ "parameters": [ {"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1}, {"key": "new_password", "label": "New Password", "type": "password", "required": True, "order": 2, "sensitive": True}, {"key": "force_change_at_logon", "label": "Force Change at Next Logon", "type": "boolean", "required": True, "order": 3}, {"key": "unlock_account", "label": "Unlock Account if Locked", "type": "boolean", "required": True, "order": 4}, ] }), "complexity": "beginner", "estimated_runtime": "< 5 seconds", "requires_elevation": True, "tags": json.dumps(["active-directory", "password"]), }, { "id": uuid.UUID("00000000-0000-0000-0001-000000000004"), "slug": "unlock-ad-account", "name": "Unlock AD Account", "description": "Unlocks a locked-out Active Directory user account.", "script_body": "$SamAccountName = '{{ sam_account_name }}'\n$ShowLockoutInfo = {{ show_lockout_info | as_bool }}", "parameters_schema": json.dumps({ "parameters": [ {"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1}, {"key": "show_lockout_info", "label": "Show Lockout Source Info", "type": "boolean", "required": False, "order": 2}, ] }), "complexity": "beginner", "estimated_runtime": "< 5 seconds", "requires_elevation": True, "tags": json.dumps(["active-directory", "lockout"]), }, { "id": uuid.UUID("00000000-0000-0000-0001-000000000005"), "slug": "delete-ad-user", "name": "Delete AD User Account", "description": "Permanently deletes an Active Directory user account.", "script_body": "$SamAccountName = '{{ sam_account_name }}'\n$ConfirmDeletion = {{ confirm_deletion | as_bool }}", "parameters_schema": json.dumps({ "parameters": [ {"key": "sam_account_name", "label": "SAM Account Name", "type": "text", "required": True, "order": 1}, {"key": "confirm_deletion", "label": "Confirm Deletion", "type": "boolean", "required": True, "order": 2}, ] }), "complexity": "advanced", "estimated_runtime": "< 10 seconds", "requires_elevation": True, "tags": json.dumps(["active-directory", "destructive"]), }, { "id": uuid.UUID("00000000-0000-0000-0001-000000000006"), "slug": "bulk-user-import", "name": "Bulk User Import from CSV", "description": "Imports multiple Active Directory user accounts from a CSV file.", "script_body": "$CSVPath = '{{ csv_path }}'\n$OUPath = '{{ ou_path }}'", "parameters_schema": json.dumps({ "parameters": [ {"key": "csv_path", "label": "CSV File Path", "type": "text", "required": True, "order": 1}, {"key": "ou_path", "label": "Target OU", "type": "text", "required": True, "order": 2}, ] }), "complexity": "advanced", "estimated_runtime": "1-2 minutes", "requires_elevation": True, "tags": json.dumps(["active-directory", "bulk"]), }, ] for tmpl in templates: await test_db.execute( sa.text(""" INSERT INTO script_templates ( id, category_id, account_id, name, slug, description, script_body, parameters_schema, default_values, validation_rules, tags, complexity, estimated_runtime, requires_elevation, requires_modules, version, is_verified, is_active, usage_count, created_at, updated_at ) VALUES ( :id, :category_id, :account_id, :name, :slug, :description, :script_body, CAST(:parameters_schema AS jsonb), '{}'::jsonb, '{}'::jsonb, CAST(:tags AS jsonb), :complexity, :estimated_runtime, :requires_elevation, '[]'::jsonb, 1, true, true, 0, :now, :now ) """), {**tmpl, "category_id": cat_id, "account_id": user.account_id, "now": now}, ) await test_db.commit() return cat_id # ── Categories ──────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_list_categories_requires_auth(client): response = await client.get("/api/v1/scripts/categories") assert response.status_code == 401 @pytest.mark.asyncio async def test_list_categories_returns_seeded_data(client, auth_headers, seed_script_data): response = await client.get("/api/v1/scripts/categories", headers=auth_headers) assert response.status_code == 200 data = response.json() assert isinstance(data, list) assert any(c["slug"] == "active-directory" for c in data) # ── Templates ───────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_list_templates_requires_auth(client): response = await client.get("/api/v1/scripts/templates") assert response.status_code == 401 @pytest.mark.asyncio async def test_list_templates_returns_seeded_data(client, auth_headers, seed_script_data): response = await client.get("/api/v1/scripts/templates", headers=auth_headers) assert response.status_code == 200 data = response.json() assert len(data) == 6 slugs = [t["slug"] for t in data] assert "create-ad-user" in slugs assert "reset-ad-password" in slugs @pytest.mark.asyncio async def test_list_templates_filter_by_category(client, auth_headers, seed_script_data): response = await client.get( "/api/v1/scripts/templates?category_slug=active-directory", headers=auth_headers, ) assert response.status_code == 200 data = response.json() assert len(data) == 6 @pytest.mark.asyncio async def test_list_templates_search(client, auth_headers, seed_script_data): response = await client.get( "/api/v1/scripts/templates?search=password", headers=auth_headers, ) assert response.status_code == 200 data = response.json() assert any("password" in t["name"].lower() or "password" in t["slug"] for t in data) @pytest.mark.asyncio async def test_get_template_detail(client, auth_headers, seed_script_data): list_resp = await client.get("/api/v1/scripts/templates", headers=auth_headers) templates = list_resp.json() template_id = templates[0]["id"] response = await client.get(f"/api/v1/scripts/templates/{template_id}", headers=auth_headers) assert response.status_code == 200 data = response.json() assert "script_body" in data assert "parameters_schema" in data @pytest.mark.asyncio async def test_get_template_detail_not_found(client, auth_headers): response = await client.get( "/api/v1/scripts/templates/00000000-0000-0000-0000-000000000099", headers=auth_headers, ) assert response.status_code == 404 # ── Generate ────────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_generate_script_success(client, auth_headers, seed_script_data, test_db, test_user): list_resp = await client.get( "/api/v1/scripts/templates?search=unlock", headers=auth_headers, ) unlock_template = list_resp.json()[0] response = await client.post( "/api/v1/scripts/generate", json={ "template_id": unlock_template["id"], "parameters": {"sam_account_name": "jsmith", "show_lockout_info": False}, }, headers=auth_headers, ) assert response.status_code == 200 data = response.json() assert "script" in data assert "jsmith" in data["script"] assert "id" in data generation_result = await test_db.execute( sa.select(ScriptGeneration).where(ScriptGeneration.id == uuid.UUID(data["id"])) ) generation = generation_result.scalar_one() user_result = await test_db.execute(sa.select(User).where(User.email == test_user["email"])) user = user_result.scalar_one() assert generation.account_id == user.account_id @pytest.mark.asyncio async def test_generate_script_missing_required_param(client, auth_headers, seed_script_data): list_resp = await client.get( "/api/v1/scripts/templates?search=unlock", headers=auth_headers, ) unlock_template = list_resp.json()[0] response = await client.post( "/api/v1/scripts/generate", json={ "template_id": unlock_template["id"], "parameters": {}, }, headers=auth_headers, ) assert response.status_code == 422 @pytest.mark.asyncio async def test_generate_script_password_redacted_in_record(client, auth_headers, seed_script_data): list_resp = await client.get( "/api/v1/scripts/templates?search=reset-ad-password", headers=auth_headers, ) reset_template = list_resp.json()[0] await client.post( "/api/v1/scripts/generate", json={ "template_id": reset_template["id"], "parameters": { "sam_account_name": "jsmith", "new_password": "SuperSecret123!", "force_change_at_logon": True, "unlock_account": True, }, }, headers=auth_headers, ) history_resp = await client.get("/api/v1/scripts/generations", headers=auth_headers) assert history_resp.status_code == 200 generations = history_resp.json() assert len(generations) > 0 latest = generations[0] assert latest["parameters_used"].get("new_password") == "[REDACTED]" # ── Team template CRUD ──────────────────────────────────────────────────── @pytest.mark.asyncio async def test_create_team_template_requires_team_admin(client, auth_headers, seed_script_data): list_resp = await client.get("/api/v1/scripts/categories", headers=auth_headers) cat_id = list_resp.json()[0]["id"] response = await client.post( "/api/v1/scripts/templates", json={ "category_id": cat_id, "name": "My Custom Script", "script_body": "Write-Host 'hello'", "parameters_schema": {}, }, headers=auth_headers, # engineers can create templates ) assert response.status_code == 201