Files
resolutionflow/backend/app/api/endpoints/sales_leads.py
Michael Chihlas 694279f89e feat(sales): add POST /sales-leads public endpoint
Phase 2 Task 29 — public Talk-to-Sales submission endpoint.

- New POST /api/v1/sales-leads (public, no auth, rate-limited 5/hour per IP).
- Inserts a sales_leads row, fires best-effort notification email and
  PostHog server-side capture; failures are logged but never fail the
  request.
- New EmailService.send_sales_lead_notification static method.
- New SALES_LEAD_RECIPIENT_EMAIL setting (defaults to sales@resolutionflow.com).
- Schemas: SalesLeadCreate / SalesLeadCreateResponse with literal source enum.
- Tests: happy path (row + email), email-failure resilience, and rate-limit
  enforcement (re-enables the slowapi limiter for the rate-limit assertion
  since DEBUG=true disables it by default in tests).

PostHog server-side instrumentation point is wired in but no-ops gracefully
until app.core.analytics.posthog exists — turning it on is a one-line
change when the backend SDK is configured.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-06 20:12:03 -04:00

115 lines
3.7 KiB
Python

"""Public Talk-to-Sales endpoint — no auth required.
POST /api/v1/sales-leads
- Inserts a sales_leads row.
- Fires (best-effort) a notification email to settings.SALES_LEAD_RECIPIENT_EMAIL.
- Emits a server-side PostHog event (best-effort).
- Rate-limited per IP (5/hour).
"""
from __future__ import annotations
import asyncio
import logging
from typing import Annotated
from fastapi import APIRouter, Depends, Request
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.admin_database import get_admin_db
from app.core.config import settings
from app.core.email import EmailService
from app.core.rate_limit import limiter
from app.models.sales_lead import SalesLead
from app.schemas.sales_lead import SalesLeadCreate, SalesLeadCreateResponse
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/sales-leads", tags=["sales"])
async def _send_notification_email(lead: SalesLead) -> None:
"""Fire-and-forget wrapper. EmailService methods never raise, but we
still wrap in a try/except to defend against future regressions."""
try:
await EmailService.send_sales_lead_notification(
to_email=settings.SALES_LEAD_RECIPIENT_EMAIL,
lead=lead,
)
except Exception:
logger.warning(
"Sales lead notification email failed for lead %s",
lead.id,
exc_info=True,
)
def _capture_posthog_event(lead: SalesLead) -> None:
"""Emit `talk_to_sales_form_submitted` server-side. Best-effort.
Backend PostHog SDK isn't initialized in the project today; this function
is the single instrumentation point so wiring it up later is a one-line
change. The call is wrapped so any future failure can never fail the
request.
"""
try:
# Lazy import — keeps the dependency optional. When the backend
# PostHog client is wired in (likely as `app.core.analytics.posthog`),
# swap the import path here and the event will fire automatically.
try:
from app.core.analytics import posthog # type: ignore[attr-defined]
except ImportError:
logger.debug(
"PostHog server-side capture skipped — client not configured"
)
return
distinct_id = lead.posthog_distinct_id or f"sales_lead:{lead.id}"
posthog.capture(
distinct_id=distinct_id,
event="talk_to_sales_form_submitted",
properties={
"source": lead.source,
"company": lead.company,
"team_size": lead.team_size,
},
)
except Exception:
logger.warning(
"PostHog capture failed for sales lead %s",
lead.id,
exc_info=True,
)
@router.post("", response_model=SalesLeadCreateResponse, status_code=201)
@limiter.limit("5/hour")
async def create_sales_lead(
request: Request,
data: SalesLeadCreate,
db: Annotated[AsyncSession, Depends(get_admin_db)],
) -> SalesLeadCreateResponse:
"""Public Talk-to-Sales submission.
Creates a sales_leads row, fires (best-effort) a notification email and a
server-side PostHog event. Rate-limited per IP at 5/hour.
"""
lead = SalesLead(
email=str(data.email).lower(),
name=data.name,
company=data.company,
team_size=data.team_size,
message=data.message,
source=data.source,
posthog_distinct_id=data.posthog_distinct_id,
)
db.add(lead)
await db.commit()
await db.refresh(lead)
# Fire-and-forget: email + analytics. Failures must not fail the request.
asyncio.create_task(_send_notification_email(lead))
_capture_posthog_event(lead)
return SalesLeadCreateResponse(id=lead.id, status="received")