Idempotent CLI script that creates or promotes a site-wide super_admin
on any environment. Solves the prod bootstrap case where no admin
exists yet — dev's seed_test_users.py only runs in dev, self-serve
signup is still gated, and even when enabled, signup creates owner
roles, not super_admins.
The script:
- Reads --email (required), normalizes to lowercase.
- If user does not exist: creates an Account + super_admin User as
the account owner, with email_verified_at stamped at creation and
password_hash=NULL (forces the reset flow on first login).
- If user exists: promotes is_super_admin=true and backfills
email_verified_at if null. Idempotent — re-running is safe.
- Mints a password-reset JWT, stores the token hash in
password_reset_tokens, and either emails the link
(--send-reset) or prints it to stdout (--print-reset). Email
send is best-effort with a fallback URL on stdout so a
misconfigured EmailService never blocks login.
- --promote-only flag: skips creation, only promotes an existing
user. Useful for promoting an already-self-served user without
triggering an unnecessary reset.
Uses ADMIN_DATABASE_URL when set (BYPASSRLS — required because users
is RLS-enabled and the script has no tenant context at bootstrap).
Smoke-tested in dev against all three paths: fresh create, re-run
idempotency on the same email, --promote-only on an existing user
with no password.
Intended invocation on prod, once Stripe/EIN unblocks:
railway run python -m scripts.create_site_admin \
--email michael@resolutionflow.com \
--send-reset
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
275 lines
9.4 KiB
Python
275 lines
9.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Create or promote a site-wide super-admin user on any environment.
|
|
|
|
Designed for the prod bootstrap case where no admin exists yet and self-serve
|
|
signup is gated, so there is no way to obtain admin access through the UI.
|
|
Also safe to use as a recovery tool: if an admin email exists already, the
|
|
script just promotes them to `is_super_admin=True` instead of duplicating.
|
|
|
|
Usage:
|
|
|
|
# Bootstrap a fresh super-admin and email a password-reset link:
|
|
python -m scripts.create_site_admin --email michael@resolutionflow.com --send-reset
|
|
|
|
# Same but emit the reset URL on stdout instead of sending email (useful
|
|
# if email infra is not configured yet or if you want to bypass the inbox):
|
|
python -m scripts.create_site_admin --email michael@resolutionflow.com --print-reset
|
|
|
|
# Promote an existing user (no reset needed if they already have a password):
|
|
python -m scripts.create_site_admin --email michael@resolutionflow.com --promote-only
|
|
|
|
The script is idempotent. Running it twice on the same email is safe.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import random
|
|
import string
|
|
import sys
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncConnection, create_async_engine
|
|
|
|
from app.core.config import settings
|
|
from app.core.email import EmailService
|
|
from app.core.security import (
|
|
create_password_reset_token,
|
|
decode_token,
|
|
hash_token,
|
|
)
|
|
|
|
|
|
def _display_code() -> str:
|
|
return "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
|
|
|
|
|
|
async def _find_user(conn: AsyncConnection, email: str):
|
|
result = await conn.execute(
|
|
text(
|
|
"SELECT id, account_id, is_super_admin, password_hash "
|
|
"FROM users WHERE email = :email"
|
|
),
|
|
{"email": email},
|
|
)
|
|
return result.first()
|
|
|
|
|
|
async def _create_user_and_account(
|
|
conn: AsyncConnection,
|
|
email: str,
|
|
name: str,
|
|
account_name: str,
|
|
now: datetime,
|
|
) -> uuid.UUID:
|
|
"""Create a new Account and a new super-admin User as its owner.
|
|
|
|
Mirrors the shape used by seed_test_users.py for the super-admin row,
|
|
minus the shared dev password — this bootstrap user gets no password
|
|
until the reset flow runs.
|
|
"""
|
|
account_id = uuid.uuid4()
|
|
user_id = uuid.uuid4()
|
|
|
|
await conn.execute(
|
|
text(
|
|
"""
|
|
INSERT INTO accounts (id, name, display_code, created_at, updated_at)
|
|
VALUES (:id, :name, :code, :now, :now)
|
|
"""
|
|
),
|
|
{"id": account_id, "name": account_name, "code": _display_code(), "now": now},
|
|
)
|
|
await conn.execute(
|
|
text(
|
|
"""
|
|
INSERT INTO users (
|
|
id, email, password_hash, name, role, is_super_admin,
|
|
is_team_admin, is_active, account_id, account_role,
|
|
created_at, email_verified_at
|
|
)
|
|
VALUES (
|
|
:id, :email, NULL, :name, 'engineer', true,
|
|
false, true, :account_id, 'owner',
|
|
:now, :now
|
|
)
|
|
"""
|
|
),
|
|
{
|
|
"id": user_id,
|
|
"email": email,
|
|
"name": name,
|
|
"account_id": account_id,
|
|
"now": now,
|
|
},
|
|
)
|
|
await conn.execute(
|
|
text("UPDATE accounts SET owner_id = :uid WHERE id = :aid"),
|
|
{"uid": user_id, "aid": account_id},
|
|
)
|
|
return user_id
|
|
|
|
|
|
async def _promote_existing(conn: AsyncConnection, user_id: uuid.UUID, now: datetime) -> None:
|
|
"""Promote an existing user to super-admin and backfill verification."""
|
|
await conn.execute(
|
|
text(
|
|
"""
|
|
UPDATE users
|
|
SET is_super_admin = true,
|
|
email_verified_at = COALESCE(email_verified_at, :now),
|
|
is_active = true
|
|
WHERE id = :uid
|
|
"""
|
|
),
|
|
{"uid": user_id, "now": now},
|
|
)
|
|
|
|
|
|
async def _issue_reset_link(
|
|
conn: AsyncConnection, user_id: uuid.UUID, send_email: bool, email: str
|
|
) -> Optional[str]:
|
|
"""Generate a password-reset token, persist its hash, and return the URL.
|
|
|
|
Mirrors /auth/password/forgot. We commit the token row directly because
|
|
the script owns its own transaction (not the API request lifecycle).
|
|
"""
|
|
raw_token = create_password_reset_token(str(user_id))
|
|
payload = decode_token(raw_token)
|
|
if not payload or not payload.get("jti"):
|
|
return None
|
|
|
|
await conn.execute(
|
|
text(
|
|
"""
|
|
INSERT INTO password_reset_tokens
|
|
(id, token_hash, user_id, expires_at, created_at)
|
|
VALUES (:id, :token_hash, :user_id, :expires_at, :created_at)
|
|
"""
|
|
),
|
|
{
|
|
"id": uuid.uuid4(),
|
|
"token_hash": hash_token(payload["jti"]),
|
|
"user_id": user_id,
|
|
"expires_at": datetime.fromtimestamp(payload["exp"], tz=timezone.utc),
|
|
"created_at": datetime.now(timezone.utc),
|
|
},
|
|
)
|
|
|
|
reset_url = f"{settings.FRONTEND_URL}/reset-password?token={raw_token}"
|
|
|
|
if send_email:
|
|
# Best-effort. If email infra is misconfigured, the caller still
|
|
# has --print-reset as a fallback.
|
|
try:
|
|
await EmailService.send_password_reset_email(
|
|
to_email=email, reset_url=reset_url
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
print(f" [WARN] Email send failed: {exc}")
|
|
print(f" [WARN] Use the printed URL below as a fallback.")
|
|
|
|
return reset_url
|
|
|
|
|
|
async def main(args: argparse.Namespace) -> int:
|
|
email = args.email.strip().lower()
|
|
name = args.name or email.split("@", 1)[0].title()
|
|
account_name = args.account_name or "ResolutionFlow Admin"
|
|
|
|
admin_url = getattr(settings, "ADMIN_DATABASE_URL", None) or settings.DATABASE_URL
|
|
engine = create_async_engine(admin_url, echo=False)
|
|
now = datetime.now(timezone.utc)
|
|
|
|
try:
|
|
async with engine.begin() as conn:
|
|
existing = await _find_user(conn, email)
|
|
|
|
if existing is None:
|
|
if args.promote_only:
|
|
print(f"[ERROR] --promote-only set but no user with email {email!r} exists.")
|
|
return 1
|
|
user_id = await _create_user_and_account(
|
|
conn, email, name, account_name, now
|
|
)
|
|
print(f" [OK] Created super-admin user {email} (id={user_id})")
|
|
else:
|
|
user_id = existing.id
|
|
if existing.is_super_admin:
|
|
print(f" [SKIP] {email} already exists and is super-admin (id={user_id})")
|
|
else:
|
|
await _promote_existing(conn, user_id, now)
|
|
print(f" [OK] Promoted {email} to super-admin (id={user_id})")
|
|
|
|
# Skip reset issuance for --promote-only when the user already
|
|
# has a password (they can just log in with their existing creds).
|
|
should_issue_reset = not args.promote_only or (
|
|
existing is not None and existing.password_hash is None
|
|
)
|
|
|
|
if should_issue_reset:
|
|
reset_url = await _issue_reset_link(
|
|
conn, user_id, send_email=args.send_reset, email=email
|
|
)
|
|
if reset_url is None:
|
|
print(" [ERROR] Failed to mint password-reset token.")
|
|
return 2
|
|
|
|
print()
|
|
if args.send_reset:
|
|
print(f" [OK] Password-reset email sent to {email}")
|
|
print(f" [INFO] Reset link (also emailed) — copy if email is delayed:")
|
|
if args.print_reset or not args.send_reset:
|
|
print(f" {reset_url}")
|
|
else:
|
|
print(f" {reset_url}")
|
|
print()
|
|
print(" This link expires per PASSWORD_RESET_TOKEN_EXPIRE in settings.")
|
|
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
return 0
|
|
|
|
|
|
def _build_parser() -> argparse.ArgumentParser:
|
|
p = argparse.ArgumentParser(
|
|
prog="create_site_admin",
|
|
description="Create or promote a site-wide super-admin user.",
|
|
)
|
|
p.add_argument("--email", required=True, help="Email of the admin (will be normalized to lowercase).")
|
|
p.add_argument("--name", help="Display name. Defaults to the local part of the email, title-cased.")
|
|
p.add_argument(
|
|
"--account-name",
|
|
help="Account name to create alongside a brand-new user. Ignored if the user already exists. Defaults to 'ResolutionFlow Admin'.",
|
|
)
|
|
mode = p.add_mutually_exclusive_group()
|
|
mode.add_argument(
|
|
"--send-reset",
|
|
action="store_true",
|
|
help="Send a password-reset email to the admin. The reset URL is also printed to stdout as a fallback.",
|
|
)
|
|
mode.add_argument(
|
|
"--print-reset",
|
|
action="store_true",
|
|
help="Mint a reset token and print the URL to stdout WITHOUT sending email. Use when email infra is not configured.",
|
|
)
|
|
mode.add_argument(
|
|
"--promote-only",
|
|
action="store_true",
|
|
help="Only promote an existing user to super-admin. Will NOT create a new user, and will NOT issue a reset link unless the existing user has no password.",
|
|
)
|
|
return p
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("\n[*] ResolutionFlow — Site Admin Bootstrap")
|
|
print("=" * 60)
|
|
args = _build_parser().parse_args()
|
|
sys.exit(asyncio.run(main(args)))
|