diff --git a/backend/app/api/endpoints/sales_leads.py b/backend/app/api/endpoints/sales_leads.py new file mode 100644 index 00000000..5f786319 --- /dev/null +++ b/backend/app/api/endpoints/sales_leads.py @@ -0,0 +1,114 @@ +"""Public Talk-to-Sales endpoint — no auth required. + +POST /api/v1/sales-leads + - Inserts a sales_leads row. + - Fires (best-effort) a notification email to settings.SALES_LEAD_RECIPIENT_EMAIL. + - Emits a server-side PostHog event (best-effort). + - Rate-limited per IP (5/hour). +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Annotated + +from fastapi import APIRouter, Depends, Request +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.admin_database import get_admin_db +from app.core.config import settings +from app.core.email import EmailService +from app.core.rate_limit import limiter +from app.models.sales_lead import SalesLead +from app.schemas.sales_lead import SalesLeadCreate, SalesLeadCreateResponse + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/sales-leads", tags=["sales"]) + + +async def _send_notification_email(lead: SalesLead) -> None: + """Fire-and-forget wrapper. EmailService methods never raise, but we + still wrap in a try/except to defend against future regressions.""" + try: + await EmailService.send_sales_lead_notification( + to_email=settings.SALES_LEAD_RECIPIENT_EMAIL, + lead=lead, + ) + except Exception: + logger.warning( + "Sales lead notification email failed for lead %s", + lead.id, + exc_info=True, + ) + + +def _capture_posthog_event(lead: SalesLead) -> None: + """Emit `talk_to_sales_form_submitted` server-side. Best-effort. + + Backend PostHog SDK isn't initialized in the project today; this function + is the single instrumentation point so wiring it up later is a one-line + change. The call is wrapped so any future failure can never fail the + request. + """ + try: + # Lazy import — keeps the dependency optional. When the backend + # PostHog client is wired in (likely as `app.core.analytics.posthog`), + # swap the import path here and the event will fire automatically. + try: + from app.core.analytics import posthog # type: ignore[attr-defined] + except ImportError: + logger.debug( + "PostHog server-side capture skipped — client not configured" + ) + return + + distinct_id = lead.posthog_distinct_id or f"sales_lead:{lead.id}" + posthog.capture( + distinct_id=distinct_id, + event="talk_to_sales_form_submitted", + properties={ + "source": lead.source, + "company": lead.company, + "team_size": lead.team_size, + }, + ) + except Exception: + logger.warning( + "PostHog capture failed for sales lead %s", + lead.id, + exc_info=True, + ) + + +@router.post("", response_model=SalesLeadCreateResponse, status_code=201) +@limiter.limit("5/hour") +async def create_sales_lead( + request: Request, + data: SalesLeadCreate, + db: Annotated[AsyncSession, Depends(get_admin_db)], +) -> SalesLeadCreateResponse: + """Public Talk-to-Sales submission. + + Creates a sales_leads row, fires (best-effort) a notification email and a + server-side PostHog event. Rate-limited per IP at 5/hour. + """ + lead = SalesLead( + email=str(data.email).lower(), + name=data.name, + company=data.company, + team_size=data.team_size, + message=data.message, + source=data.source, + posthog_distinct_id=data.posthog_distinct_id, + ) + db.add(lead) + await db.commit() + await db.refresh(lead) + + # Fire-and-forget: email + analytics. Failures must not fail the request. + asyncio.create_task(_send_notification_email(lead)) + _capture_posthog_event(lead) + + return SalesLeadCreateResponse(id=lead.id, status="received") diff --git a/backend/app/api/router.py b/backend/app/api/router.py index 01ce9a8a..5ef8122c 100644 --- a/backend/app/api/router.py +++ b/backend/app/api/router.py @@ -26,6 +26,7 @@ from app.api.endpoints import ( billing, beta_feedback, beta_signup, + sales_leads, branding, categories, copilot, @@ -88,6 +89,7 @@ api_router.include_router(billing.router) # Reachable when subscription lock api_router.include_router(shared.router) # Public share links (no auth) api_router.include_router(shares.public_router) # Public session share links (optional auth) api_router.include_router(beta_signup.router) +api_router.include_router(sales_leads.router) # Talk-to-Sales (no auth, rate-limited) api_router.include_router(webhooks.router) # Stripe webhook receiver api_router.include_router(public_templates.router) # Public gallery (no auth, rate-limited) api_router.include_router(survey.router) # Public survey flow (no auth, rate-limited) diff --git a/backend/app/core/config.py b/backend/app/core/config.py index f2c28593..815c95db 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -84,6 +84,7 @@ class Settings(BaseSettings): RESEND_API_KEY: Optional[str] = None FROM_EMAIL: str = "ResolutionFlow " FEEDBACK_EMAIL: Optional[str] = None + SALES_LEAD_RECIPIENT_EMAIL: str = "sales@resolutionflow.com" @property def email_enabled(self) -> bool: diff --git a/backend/app/core/email.py b/backend/app/core/email.py index 313d5db0..0bb62b94 100644 --- a/backend/app/core/email.py +++ b/backend/app/core/email.py @@ -1,6 +1,11 @@ import logging +from typing import TYPE_CHECKING + from app.core.config import settings +if TYPE_CHECKING: + from app.models.sales_lead import SalesLead + logger = logging.getLogger(__name__) @@ -484,6 +489,99 @@ class EmailService: logger.exception("Failed to send beta signup notification for %s", signup_email) return False + @staticmethod + async def send_sales_lead_notification( + to_email: str, + lead: "SalesLead", + ) -> bool: + """Notify the sales recipient about a new Talk-to-Sales submission. + + Fire-and-forget. Returns False (and logs) on any failure; never raises. + """ + if not settings.email_enabled: + logger.warning( + "Sales lead email not sent — RESEND_API_KEY not configured (lead %s)", + lead.id, + ) + return False + + try: + import resend + import html as html_mod + from datetime import datetime, timezone + + resend.api_key = settings.RESEND_API_KEY + + date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + safe_email = html_mod.escape(lead.email) + safe_name = html_mod.escape(lead.name) + safe_company = html_mod.escape(lead.company) + safe_team_size = html_mod.escape(lead.team_size or "—") + safe_source = html_mod.escape(lead.source) + safe_message = html_mod.escape(lead.message or "(no message)") + subject = f"[ResolutionFlow Sales] New lead — {safe_company} ({safe_email})" + + email_html = f""" + + + + +
+ + + + + + +
+

ResolutionFlow

+

New Sales Lead

+
+

+ Source: {safe_source} +

+
+ + +
+

Name

+

{safe_name}

+

Email

+

{safe_email}

+

Company

+

{safe_company}

+

Team Size

+

{safe_team_size}

+
+
+

Message

+

{safe_message}

+
+

+ Submitted at {date_str} · Lead ID: {lead.id} +

+
+
+""" + + resend.Emails.send({ + "from": settings.FROM_EMAIL, + "to": [to_email], + "reply_to": lead.email, + "subject": subject, + "html": email_html, + }) + logger.info("Sales lead notification sent for %s (lead %s)", lead.email, lead.id) + return True + + except Exception: + logger.exception( + "Failed to send sales lead notification for %s (lead %s)", + lead.email, + lead.id, + ) + return False + @staticmethod async def send_notification_email( to_email: str, diff --git a/backend/app/schemas/sales_lead.py b/backend/app/schemas/sales_lead.py new file mode 100644 index 00000000..9247e91e --- /dev/null +++ b/backend/app/schemas/sales_lead.py @@ -0,0 +1,27 @@ +"""Pydantic schemas for Talk-to-Sales submissions.""" + +from typing import Literal, Optional +from uuid import UUID + +from pydantic import BaseModel, ConfigDict, EmailStr, Field + +SalesLeadSource = Literal["pricing_page", "register_footer", "landing_page"] + + +class SalesLeadCreate(BaseModel): + """Public Talk-to-Sales form submission.""" + + model_config = ConfigDict(str_strip_whitespace=True) + + email: EmailStr + name: str = Field(..., min_length=1, max_length=255) + company: str = Field(..., min_length=1, max_length=255) + team_size: Optional[str] = Field(default=None, max_length=20) + message: Optional[str] = Field(default=None, max_length=5000) + source: SalesLeadSource + posthog_distinct_id: Optional[str] = Field(default=None, max_length=255) + + +class SalesLeadCreateResponse(BaseModel): + id: UUID + status: Literal["received"] = "received" diff --git a/backend/tests/test_sales_leads.py b/backend/tests/test_sales_leads.py new file mode 100644 index 00000000..c3620ab8 --- /dev/null +++ b/backend/tests/test_sales_leads.py @@ -0,0 +1,134 @@ +"""Integration tests for the public Talk-to-Sales endpoint. + +POST /api/v1/sales-leads — no auth, rate-limited 5/hour per IP. +""" + +from unittest.mock import AsyncMock, patch + +import pytest +import sqlalchemy as sa + + +@pytest.mark.asyncio +async def test_sales_lead_creates_row_and_sends_notification_email(client, test_db): + """Happy path: row inserted, notification email fired, 201 returned.""" + + payload = { + "email": "buyer@acme.example", + "name": "Pat Buyer", + "company": "Acme MSP", + "team_size": "11-50", + "message": "We're evaluating ResolutionFlow for our NOC team.", + "source": "pricing_page", + "posthog_distinct_id": "ph_distinct_123", + } + + with patch( + "app.api.endpoints.sales_leads.EmailService.send_sales_lead_notification", + new=AsyncMock(return_value=True), + ) as mock_email: + response = await client.post("/api/v1/sales-leads", json=payload) + + assert response.status_code == 201, response.text + body = response.json() + assert body["status"] == "received" + assert "id" in body + + # Notification email was attempted (asyncio.create_task — give it a tick). + import asyncio + await asyncio.sleep(0) + await asyncio.sleep(0) + assert mock_email.await_count == 1 + kwargs = mock_email.await_args.kwargs + assert kwargs["to_email"] # default placeholder until cutover + assert kwargs["lead"].email == "buyer@acme.example" + assert kwargs["lead"].source == "pricing_page" + + # Row was inserted with normalized email + all fields preserved. + result = await test_db.execute( + sa.text("SELECT email, name, company, team_size, message, source, posthog_distinct_id, status FROM sales_leads") + ) + rows = result.all() + assert len(rows) == 1 + row = rows[0] + assert row.email == "buyer@acme.example" + assert row.name == "Pat Buyer" + assert row.company == "Acme MSP" + assert row.team_size == "11-50" + assert row.message == "We're evaluating ResolutionFlow for our NOC team." + assert row.source == "pricing_page" + assert row.posthog_distinct_id == "ph_distinct_123" + assert row.status == "new" + + +@pytest.mark.asyncio +async def test_sales_lead_email_failure_does_not_fail_request(client, test_db): + """If the email send raises, the API still returns 201 and the row persists.""" + + payload = { + "email": "buyer2@acme.example", + "name": "Sam Lead", + "company": "Acme MSP", + "source": "register_footer", + } + + with patch( + "app.api.endpoints.sales_leads.EmailService.send_sales_lead_notification", + new=AsyncMock(side_effect=RuntimeError("resend exploded")), + ): + response = await client.post("/api/v1/sales-leads", json=payload) + + assert response.status_code == 201, response.text + + # Row must still be persisted even though email failed. + import asyncio + await asyncio.sleep(0) + result = await test_db.execute( + sa.text("SELECT count(*) FROM sales_leads WHERE email = 'buyer2@acme.example'") + ) + assert result.scalar() == 1 + + +@pytest.mark.asyncio +async def test_sales_lead_rate_limited_after_5_per_hour(client): + """The 6th submission within an hour from the same IP returns 429. + + The default `limiter` is disabled in tests (DEBUG=true). We re-enable it + for this test, then reset its state on teardown so other tests aren't + affected. + """ + from app.core.rate_limit import limiter + + was_enabled = limiter.enabled + limiter.enabled = True + try: + limiter.reset() + + with patch( + "app.api.endpoints.sales_leads.EmailService.send_sales_lead_notification", + new=AsyncMock(return_value=True), + ): + for i in range(5): + payload = { + "email": f"lead{i}@acme.example", + "name": f"Lead {i}", + "company": "Acme MSP", + "source": "landing_page", + } + resp = await client.post("/api/v1/sales-leads", json=payload) + assert resp.status_code == 201, f"submission {i}: {resp.text}" + + # 6th should be rate-limited. + resp = await client.post( + "/api/v1/sales-leads", + json={ + "email": "lead6@acme.example", + "name": "Lead 6", + "company": "Acme MSP", + "source": "landing_page", + }, + ) + assert resp.status_code == 429, resp.text + finally: + limiter.reset() + limiter.enabled = was_enabled