Files
resolutionflow/backend/scripts/sync_stripe_plan_ids.py
Michael Chihlas 3f04911070
All checks were successful
CI / frontend (push) Successful in 6m40s
Mirror to GitHub / mirror (push) Successful in 7s
CI / e2e (push) Successful in 10m7s
CI / backend (push) Successful in 10m34s
feat(billing): plan taxonomy reconciliation + Stripe sync + internal-tester allowlist (#164)
Co-authored-by: Michael Chihlas <michael@resolutionflow.com>
Co-committed-by: Michael Chihlas <michael@resolutionflow.com>
2026-05-11 05:07:07 +00:00

200 lines
6.8 KiB
Python

#!/usr/bin/env python3
"""Sync plan_billing rows from Stripe products and prices.
Reads the active Stripe environment (test or live, determined by
STRIPE_SECRET_KEY in env), looks up the canonical ResolutionFlow products
by exact name match, picks the active monthly recurring price for tiers
that have one, and upserts plan_billing rows.
Idempotent. Safe to re-run after price changes, after live cutover, or
after rotating Stripe keys.
Tier mapping (name in Stripe -> plan slug in plan_limits):
ResolutionFlow Starter -> starter (monthly price required)
ResolutionFlow Pro -> pro (monthly price required)
ResolutionFlow Enterprise -> enterprise (no price, sales-led)
Annual prices are intentionally not supported in this iteration. The
plan_billing schema allows annual fields (stripe_annual_price_id,
annual_price_cents); this script leaves them NULL.
Usage:
docker exec -w /app resolutionflow_backend python -m scripts.sync_stripe_plan_ids
docker exec -w /app resolutionflow_backend python -m scripts.sync_stripe_plan_ids --dry-run
"""
import argparse
import asyncio
import logging
import sys
from typing import Optional
import stripe
from app.core.config import settings
from app.core.database import async_session_maker
from sqlalchemy import text
logger = logging.getLogger("sync_stripe_plan_ids")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
PLAN_NAME_TO_SLUG = {
"ResolutionFlow Starter": "starter",
"ResolutionFlow Pro": "pro",
"ResolutionFlow Enterprise": "enterprise",
}
PLANS_REQUIRING_PRICE = {"starter", "pro"}
PLAN_DEFAULTS = {
"starter": {"sort_order": 10, "is_public": True},
"pro": {"sort_order": 20, "is_public": True},
"enterprise": {"sort_order": 30, "is_public": True},
}
def find_product_by_name(target: str) -> Optional[stripe.Product]:
"""Page through active products and return the first exact name match."""
for product in stripe.Product.list(active=True, limit=100).auto_paging_iter():
if product.name == target:
return product
return None
def find_active_monthly_price(product_id: str) -> Optional[stripe.Price]:
"""Return the active recurring monthly price for a product, or None."""
candidates = [
p
for p in stripe.Price.list(product=product_id, active=True, limit=100).auto_paging_iter()
if p.type == "recurring"
and p.recurring is not None
and p.recurring.get("interval") == "month"
and p.recurring.get("interval_count", 1) == 1
]
if not candidates:
return None
if len(candidates) > 1:
logger.warning(
"Product %s has %d active monthly recurring prices; picking %s. "
"Archive the others to silence this warning.",
product_id, len(candidates), candidates[0].id,
)
return candidates[0]
async def upsert_plan_billing(
plan: str,
display_name: str,
description: Optional[str],
monthly_price_cents: Optional[int],
stripe_product_id: Optional[str],
stripe_monthly_price_id: Optional[str],
sort_order: int,
is_public: bool,
dry_run: bool,
) -> None:
"""Upsert one plan_billing row. Annual fields stay NULL."""
if dry_run:
logger.info(
"[dry-run] would upsert plan=%s display=%s monthly_cents=%s "
"product=%s monthly_price=%s",
plan, display_name, monthly_price_cents,
stripe_product_id, stripe_monthly_price_id,
)
return
sql = text("""
INSERT INTO plan_billing (
plan, display_name, description,
monthly_price_cents, annual_price_cents,
stripe_product_id, stripe_monthly_price_id, stripe_annual_price_id,
is_public, is_archived, sort_order
) VALUES (
:plan, :display_name, :description,
:monthly_price_cents, NULL,
:stripe_product_id, :stripe_monthly_price_id, NULL,
:is_public, FALSE, :sort_order
)
ON CONFLICT (plan) DO UPDATE SET
display_name = EXCLUDED.display_name,
description = EXCLUDED.description,
monthly_price_cents = EXCLUDED.monthly_price_cents,
stripe_product_id = EXCLUDED.stripe_product_id,
stripe_monthly_price_id = EXCLUDED.stripe_monthly_price_id,
is_public = EXCLUDED.is_public,
sort_order = EXCLUDED.sort_order,
updated_at = NOW()
""")
async with async_session_maker() as session:
await session.execute(sql, {
"plan": plan,
"display_name": display_name,
"description": description,
"monthly_price_cents": monthly_price_cents,
"stripe_product_id": stripe_product_id,
"stripe_monthly_price_id": stripe_monthly_price_id,
"is_public": is_public,
"sort_order": sort_order,
})
await session.commit()
logger.info("upserted plan_billing for plan=%s", plan)
async def main(dry_run: bool) -> int:
if not settings.STRIPE_SECRET_KEY:
logger.error("STRIPE_SECRET_KEY is not set. Refusing to run.")
return 2
stripe.api_key = settings.STRIPE_SECRET_KEY
mode = "live" if settings.STRIPE_SECRET_KEY.startswith("sk_live_") else "test"
logger.info("connected to Stripe in %s mode", mode)
errors: list[str] = []
for product_name, plan in PLAN_NAME_TO_SLUG.items():
defaults = PLAN_DEFAULTS[plan]
product = find_product_by_name(product_name)
if product is None:
errors.append(f"Stripe product not found: {product_name!r}")
continue
price = None
if plan in PLANS_REQUIRING_PRICE:
price = find_active_monthly_price(product.id)
if price is None:
errors.append(
f"No active monthly recurring price for {product_name!r} "
f"(product {product.id})"
)
continue
await upsert_plan_billing(
plan=plan,
display_name=product.name,
description=product.description,
monthly_price_cents=price.unit_amount if price else None,
stripe_product_id=product.id,
stripe_monthly_price_id=price.id if price else None,
sort_order=defaults["sort_order"],
is_public=defaults["is_public"],
dry_run=dry_run,
)
if errors:
for e in errors:
logger.error(e)
return 1
logger.info("done")
return 0
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--dry-run", action="store_true", help="Log actions without writing.")
args = parser.parse_args()
sys.exit(asyncio.run(main(dry_run=args.dry_run)))