feat(billing): extend Stripe webhook stub with concrete event handlers

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-06 14:55:23 -04:00
parent 18180bc57f
commit 9b709488d9
2 changed files with 163 additions and 32 deletions

View File

@@ -1,10 +1,10 @@
import logging import logging
from fastapi import APIRouter, Request, HTTPException, status, Depends from fastapi import APIRouter, Request, HTTPException, Depends
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db from app.core.admin_database import get_admin_db
from app.core.config import settings from app.core.config import settings
from app.core.stripe_handlers import WEBHOOK_HANDLERS from app.services.billing import BillingService
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -14,49 +14,36 @@ router = APIRouter(prefix="/webhooks", tags=["webhooks"])
@router.post("/stripe") @router.post("/stripe")
async def stripe_webhook( async def stripe_webhook(
request: Request, request: Request,
db: AsyncSession = Depends(get_db), db: AsyncSession = Depends(get_admin_db),
): ):
"""Handle Stripe webhook events. """Stripe webhook handler. Public endpoint; signature verification is the
only gate. Idempotency via stripe_events table.
Returns 200 for all events to prevent Stripe retries. Returns 200 even when Stripe is not configured — keeps the receiver
Actual processing happens only when Stripe is configured. permissive for local dev.
""" """
if not settings.stripe_enabled: if not settings.stripe_enabled or not settings.STRIPE_WEBHOOK_SECRET:
return {"status": "ok", "message": "Stripe not configured, event ignored"} return {"status": "ok", "message": "Stripe not configured, event ignored"}
payload = await request.body() payload = await request.body()
sig_header = request.headers.get("stripe-signature") sig_header = request.headers.get("stripe-signature")
if not sig_header: if not sig_header:
raise HTTPException( raise HTTPException(status_code=400, detail="Missing stripe-signature header")
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing stripe-signature header"
)
# Verify webhook signature
try: try:
import stripe import stripe
stripe.api_key = settings.STRIPE_SECRET_KEY stripe.api_key = settings.STRIPE_SECRET_KEY
event = stripe.Webhook.construct_event( event = stripe.Webhook.construct_event(
payload, sig_header, settings.STRIPE_WEBHOOK_SECRET payload, sig_header, settings.STRIPE_WEBHOOK_SECRET
) )
except ImportError:
logger.warning("stripe package not installed, cannot verify webhook")
return {"status": "ok", "message": "stripe package not installed"}
except Exception as e: except Exception as e:
logger.error("Stripe webhook signature verification failed: %s", e) logger.warning("stripe webhook bad signature: %s", e)
raise HTTPException( raise HTTPException(status_code=400, detail="Invalid signature")
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid signature"
)
event_type = event.get("type", "") applied = await BillingService.apply_subscription_event(
handler = WEBHOOK_HANDLERS.get(event_type) db,
event_id=event["id"],
if handler: event_type=event["type"],
try: payload={"data": event["data"]},
await handler(event, db) )
except Exception: return {"status": "ok", "applied": applied}
logger.exception("Error handling Stripe event %s", event_type)
return {"status": "ok"}

View File

@@ -0,0 +1,144 @@
import json
import uuid
import pytest
from sqlalchemy import delete, select
from unittest.mock import patch
from app.models.subscription import Subscription
def _make_event(event_id, event_type, obj):
return {
"id": event_id,
"type": event_type,
"data": {"object": obj},
}
@pytest.mark.asyncio
async def test_checkout_completed_activates_subscription(
client, test_db, test_user, auth_headers, monkeypatch
):
from app.core.config import settings
monkeypatch.setattr(settings, "STRIPE_SECRET_KEY", "sk_test_dummy")
monkeypatch.setattr(settings, "STRIPE_WEBHOOK_SECRET", "whsec_dummy")
account_id = uuid.UUID(test_user["user_data"]["account_id"])
# Replace seeded sub with trialing + stripe_customer_id linkage
from app.models.account import Account
account = (await test_db.execute(select(Account).where(Account.id == account_id))).scalar_one()
account.stripe_customer_id = "cus_xxx"
await test_db.execute(delete(Subscription).where(Subscription.account_id == account_id))
test_db.add(Subscription(account_id=account_id, plan="pro", status="trialing"))
await test_db.commit()
event = _make_event("evt_co_1", "checkout.session.completed", {
"id": "cs_xxx",
"customer": "cus_xxx",
"subscription": "sub_xxx",
})
with patch("stripe.Subscription.retrieve", return_value={
"id": "sub_xxx",
"status": "active",
"current_period_start": 1714521600,
"current_period_end": 1717113600,
"items": {"data": [{
"price": {"id": "price_test_monthly"},
"quantity": 5,
}]},
"cancel_at_period_end": False,
}), patch("stripe.Webhook.construct_event", return_value=event):
response = await client.post(
"/api/v1/webhooks/stripe",
content=json.dumps(event),
headers={"stripe-signature": "fake-sig"},
)
assert response.status_code == 200, response.json()
sub = (await test_db.execute(
select(Subscription).where(Subscription.account_id == account_id)
)).scalar_one()
assert sub.status == "active"
assert sub.stripe_subscription_id == "sub_xxx"
@pytest.mark.asyncio
async def test_subscription_deleted_cancels_account(
client, test_db, test_user, auth_headers, monkeypatch
):
from app.core.config import settings
monkeypatch.setattr(settings, "STRIPE_SECRET_KEY", "sk_test_dummy")
monkeypatch.setattr(settings, "STRIPE_WEBHOOK_SECRET", "whsec_dummy")
account_id = uuid.UUID(test_user["user_data"]["account_id"])
await test_db.execute(delete(Subscription).where(Subscription.account_id == account_id))
test_db.add(Subscription(
account_id=account_id, plan="pro", status="active",
stripe_subscription_id="sub_xxx",
))
await test_db.commit()
event = _make_event("evt_del_1", "customer.subscription.deleted", {
"id": "sub_xxx",
"current_period_start": 1714521600,
"current_period_end": 1717113600,
"items": {"data": [{"quantity": 1}]},
})
with patch("stripe.Webhook.construct_event", return_value=event):
response = await client.post(
"/api/v1/webhooks/stripe",
content=json.dumps(event),
headers={"stripe-signature": "fake-sig"},
)
assert response.status_code == 200
sub = (await test_db.execute(
select(Subscription).where(Subscription.account_id == account_id)
)).scalar_one()
assert sub.status == "canceled"
@pytest.mark.asyncio
async def test_webhook_signature_failure_returns_400(client, monkeypatch):
from app.core.config import settings
monkeypatch.setattr(settings, "STRIPE_SECRET_KEY", "sk_test_dummy")
monkeypatch.setattr(settings, "STRIPE_WEBHOOK_SECRET", "whsec_dummy")
with patch("stripe.Webhook.construct_event", side_effect=ValueError("bad sig")):
response = await client.post(
"/api/v1/webhooks/stripe",
content=b"{}",
headers={"stripe-signature": "fake-sig"},
)
assert response.status_code == 400
@pytest.mark.asyncio
async def test_webhook_idempotency(
client, test_db, test_user, auth_headers, monkeypatch
):
from app.core.config import settings
monkeypatch.setattr(settings, "STRIPE_SECRET_KEY", "sk_test_dummy")
monkeypatch.setattr(settings, "STRIPE_WEBHOOK_SECRET", "whsec_dummy")
account_id = uuid.UUID(test_user["user_data"]["account_id"])
await test_db.execute(delete(Subscription).where(Subscription.account_id == account_id))
test_db.add(Subscription(account_id=account_id, plan="pro", status="trialing"))
await test_db.commit()
event = _make_event("evt_dup_1", "customer.subscription.updated", {
"id": "sub_yyy",
"status": "active",
"current_period_start": 1714521600,
"current_period_end": 1717113600,
"items": {"data": [{"quantity": 1}]},
"cancel_at_period_end": False,
})
with patch("stripe.Webhook.construct_event", return_value=event):
r1 = await client.post("/api/v1/webhooks/stripe", content=json.dumps(event), headers={"stripe-signature": "x"})
r2 = await client.post("/api/v1/webhooks/stripe", content=json.dumps(event), headers={"stripe-signature": "x"})
assert r1.status_code == 200
assert r2.status_code == 200
assert r1.json()["applied"] is True
assert r2.json()["applied"] is False