Files
resolutionflow/backend/app/api/endpoints/webhooks.py
chihlasm e0089a9c5a feat: update all endpoints and schemas for account-based model
Replace team_id with account_id across all API endpoints (trees,
categories, tags, steps, step_categories, admin, auth). Add new
accounts and webhooks endpoints. Registration now atomically creates
Account + Subscription, with account_invite_code bypassing the
platform invite gate.

Schemas updated for account_id/account_role. 82 tests passing
including 18 new tests for accounts, subscriptions, and permissions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 02:39:01 -05:00

63 lines
1.9 KiB
Python

import logging
from fastapi import APIRouter, Request, HTTPException, status, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.config import settings
from app.core.stripe_handlers import WEBHOOK_HANDLERS
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/webhooks", tags=["webhooks"])
@router.post("/stripe")
async def stripe_webhook(
request: Request,
db: AsyncSession = Depends(get_db),
):
"""Handle Stripe webhook events.
Returns 200 for all events to prevent Stripe retries.
Actual processing happens only when Stripe is configured.
"""
if not settings.stripe_enabled:
return {"status": "ok", "message": "Stripe not configured, event ignored"}
payload = await request.body()
sig_header = request.headers.get("stripe-signature")
if not sig_header:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing stripe-signature header"
)
# Verify webhook signature
try:
import stripe
stripe.api_key = settings.STRIPE_SECRET_KEY
event = stripe.Webhook.construct_event(
payload, sig_header, settings.STRIPE_WEBHOOK_SECRET
)
except ImportError:
logger.warning("stripe package not installed, cannot verify webhook")
return {"status": "ok", "message": "stripe package not installed"}
except Exception as e:
logger.error("Stripe webhook signature verification failed: %s", e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid signature"
)
event_type = event.get("type", "")
handler = WEBHOOK_HANDLERS.get(event_type)
if handler:
try:
await handler(event, db)
except Exception:
logger.exception("Error handling Stripe event %s", event_type)
return {"status": "ok"}