diff --git a/backend/scripts/create_site_admin.py b/backend/scripts/create_site_admin.py new file mode 100644 index 00000000..b7691043 --- /dev/null +++ b/backend/scripts/create_site_admin.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +""" +Create or promote a site-wide super-admin user on any environment. + +Designed for the prod bootstrap case where no admin exists yet and self-serve +signup is gated, so there is no way to obtain admin access through the UI. +Also safe to use as a recovery tool: if an admin email exists already, the +script just promotes them to `is_super_admin=True` instead of duplicating. + +Usage: + + # Bootstrap a fresh super-admin and email a password-reset link: + python -m scripts.create_site_admin --email michael@resolutionflow.com --send-reset + + # Same but emit the reset URL on stdout instead of sending email (useful + # if email infra is not configured yet or if you want to bypass the inbox): + python -m scripts.create_site_admin --email michael@resolutionflow.com --print-reset + + # Promote an existing user (no reset needed if they already have a password): + python -m scripts.create_site_admin --email michael@resolutionflow.com --promote-only + +The script is idempotent. Running it twice on the same email is safe. +""" + +from __future__ import annotations + +import argparse +import asyncio +import random +import string +import sys +import uuid +from datetime import datetime, timezone +from typing import Optional + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncConnection, create_async_engine + +from app.core.config import settings +from app.core.email import EmailService +from app.core.security import ( + create_password_reset_token, + decode_token, + hash_token, +) + + +def _display_code() -> str: + return "".join(random.choices(string.ascii_uppercase + string.digits, k=8)) + + +async def _find_user(conn: AsyncConnection, email: str): + result = await conn.execute( + text( + "SELECT id, account_id, is_super_admin, password_hash " + "FROM users WHERE email = :email" + ), + {"email": email}, + ) + return result.first() + + +async def _create_user_and_account( + conn: AsyncConnection, + email: str, + name: str, + account_name: str, + now: datetime, +) -> uuid.UUID: + """Create a new Account and a new super-admin User as its owner. + + Mirrors the shape used by seed_test_users.py for the super-admin row, + minus the shared dev password — this bootstrap user gets no password + until the reset flow runs. + """ + account_id = uuid.uuid4() + user_id = uuid.uuid4() + + await conn.execute( + text( + """ + INSERT INTO accounts (id, name, display_code, created_at, updated_at) + VALUES (:id, :name, :code, :now, :now) + """ + ), + {"id": account_id, "name": account_name, "code": _display_code(), "now": now}, + ) + await conn.execute( + text( + """ + INSERT INTO users ( + id, email, password_hash, name, role, is_super_admin, + is_team_admin, is_active, account_id, account_role, + created_at, email_verified_at + ) + VALUES ( + :id, :email, NULL, :name, 'engineer', true, + false, true, :account_id, 'owner', + :now, :now + ) + """ + ), + { + "id": user_id, + "email": email, + "name": name, + "account_id": account_id, + "now": now, + }, + ) + await conn.execute( + text("UPDATE accounts SET owner_id = :uid WHERE id = :aid"), + {"uid": user_id, "aid": account_id}, + ) + return user_id + + +async def _promote_existing(conn: AsyncConnection, user_id: uuid.UUID, now: datetime) -> None: + """Promote an existing user to super-admin and backfill verification.""" + await conn.execute( + text( + """ + UPDATE users + SET is_super_admin = true, + email_verified_at = COALESCE(email_verified_at, :now), + is_active = true + WHERE id = :uid + """ + ), + {"uid": user_id, "now": now}, + ) + + +async def _issue_reset_link( + conn: AsyncConnection, user_id: uuid.UUID, send_email: bool, email: str +) -> Optional[str]: + """Generate a password-reset token, persist its hash, and return the URL. + + Mirrors /auth/password/forgot. We commit the token row directly because + the script owns its own transaction (not the API request lifecycle). + """ + raw_token = create_password_reset_token(str(user_id)) + payload = decode_token(raw_token) + if not payload or not payload.get("jti"): + return None + + await conn.execute( + text( + """ + INSERT INTO password_reset_tokens + (id, token_hash, user_id, expires_at, created_at) + VALUES (:id, :token_hash, :user_id, :expires_at, :created_at) + """ + ), + { + "id": uuid.uuid4(), + "token_hash": hash_token(payload["jti"]), + "user_id": user_id, + "expires_at": datetime.fromtimestamp(payload["exp"], tz=timezone.utc), + "created_at": datetime.now(timezone.utc), + }, + ) + + reset_url = f"{settings.FRONTEND_URL}/reset-password?token={raw_token}" + + if send_email: + # Best-effort. If email infra is misconfigured, the caller still + # has --print-reset as a fallback. + try: + await EmailService.send_password_reset_email( + to_email=email, reset_url=reset_url + ) + except Exception as exc: # noqa: BLE001 + print(f" [WARN] Email send failed: {exc}") + print(f" [WARN] Use the printed URL below as a fallback.") + + return reset_url + + +async def main(args: argparse.Namespace) -> int: + email = args.email.strip().lower() + name = args.name or email.split("@", 1)[0].title() + account_name = args.account_name or "ResolutionFlow Admin" + + admin_url = getattr(settings, "ADMIN_DATABASE_URL", None) or settings.DATABASE_URL + engine = create_async_engine(admin_url, echo=False) + now = datetime.now(timezone.utc) + + try: + async with engine.begin() as conn: + existing = await _find_user(conn, email) + + if existing is None: + if args.promote_only: + print(f"[ERROR] --promote-only set but no user with email {email!r} exists.") + return 1 + user_id = await _create_user_and_account( + conn, email, name, account_name, now + ) + print(f" [OK] Created super-admin user {email} (id={user_id})") + else: + user_id = existing.id + if existing.is_super_admin: + print(f" [SKIP] {email} already exists and is super-admin (id={user_id})") + else: + await _promote_existing(conn, user_id, now) + print(f" [OK] Promoted {email} to super-admin (id={user_id})") + + # Skip reset issuance for --promote-only when the user already + # has a password (they can just log in with their existing creds). + should_issue_reset = not args.promote_only or ( + existing is not None and existing.password_hash is None + ) + + if should_issue_reset: + reset_url = await _issue_reset_link( + conn, user_id, send_email=args.send_reset, email=email + ) + if reset_url is None: + print(" [ERROR] Failed to mint password-reset token.") + return 2 + + print() + if args.send_reset: + print(f" [OK] Password-reset email sent to {email}") + print(f" [INFO] Reset link (also emailed) — copy if email is delayed:") + if args.print_reset or not args.send_reset: + print(f" {reset_url}") + else: + print(f" {reset_url}") + print() + print(" This link expires per PASSWORD_RESET_TOKEN_EXPIRE in settings.") + + finally: + await engine.dispose() + + return 0 + + +def _build_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser( + prog="create_site_admin", + description="Create or promote a site-wide super-admin user.", + ) + p.add_argument("--email", required=True, help="Email of the admin (will be normalized to lowercase).") + p.add_argument("--name", help="Display name. Defaults to the local part of the email, title-cased.") + p.add_argument( + "--account-name", + help="Account name to create alongside a brand-new user. Ignored if the user already exists. Defaults to 'ResolutionFlow Admin'.", + ) + mode = p.add_mutually_exclusive_group() + mode.add_argument( + "--send-reset", + action="store_true", + help="Send a password-reset email to the admin. The reset URL is also printed to stdout as a fallback.", + ) + mode.add_argument( + "--print-reset", + action="store_true", + help="Mint a reset token and print the URL to stdout WITHOUT sending email. Use when email infra is not configured.", + ) + mode.add_argument( + "--promote-only", + action="store_true", + help="Only promote an existing user to super-admin. Will NOT create a new user, and will NOT issue a reset link unless the existing user has no password.", + ) + return p + + +if __name__ == "__main__": + print("\n[*] ResolutionFlow — Site Admin Bootstrap") + print("=" * 60) + args = _build_parser().parse_args() + sys.exit(asyncio.run(main(args)))