Files
resolutionflow/docs/superpowers/plans/2026-05-06-self-serve-signup-phase-1-backend.md
Michael Chihlas ab0d40c1e2 docs(plan): self-serve signup & onboarding implementation plans
Adds two phase plans alongside the spec at
docs/superpowers/specs/2026-05-05-self-serve-signup-onboarding-design.md:

- Phase 1 (backend foundation, 26 tasks across 8 sub-phases A-H):
  schema migrations, subscription model + new guards, BillingService,
  Stripe webhook handler extension, OAuth callbacks, email verification
  auto-send + email-match enforcement, account-invite extensions,
  GET /billing/state, pilot user backfill. Step-by-step granularity
  with full code blocks per writing-plans skill.

- Phase 2 (frontend + cutover, 21 tasks across 7 sub-phases I-O):
  Phase-1-deferred endpoints, useBillingStore + hooks + gating
  components, register redesign + OAuth buttons + accept-invite,
  welcome wizard, dashboard redesign, pricing page + contact-sales,
  beta-signup deprecation, cutover. Higher-altitude — defines
  contracts, acceptance criteria, integration tests; leaves
  component-detail decisions to implementer.

Each phase ends in a mergeable PR. Cutover is gated behind
SELF_SERVE_ENABLED + VITE_SELF_SERVE_ENABLED. Execution deferred to
a future session.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-06 19:14:30 -04:00

110 KiB
Raw Blame History

Self-Serve Signup & Onboarding — Phase 1: Backend Foundation

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Build the backend foundation for public self-serve signup — schema additions, billing service, Stripe webhook event handling, OAuth provider callbacks, email-verification auto-send, account-invite extensions, /billing/state endpoint, and pilot user backfill — all dark behind SELF_SERVE_ENABLED until Phase 2 ships the frontend.

Architecture: Reuses existing subscriptions / plan_limits / feature_flags / account_invites / email_verification_tokens infrastructure. New schema is small: oauth_identities, plan_billing (sibling to plan_limits), sales_leads, stripe_events. Replaces deps.py:109 trial auto-downgrade with two new non-mutating dependencies (require_active_subscription, require_verified_email_after_grace). Adds BillingService module wrapping Stripe; webhook handler extends the existing stub at app/api/endpoints/webhooks.py.

Tech Stack: Python 3.11 + FastAPI, SQLAlchemy 2.0 async, Alembic, Pydantic v2, Stripe Python SDK, respx for HTTP mocking in tests, python-jose for JWT, bcrypt, existing EmailService.

Spec reference: docs/superpowers/specs/2026-05-05-self-serve-signup-onboarding-design.md (commit bbb01ef).


File Structure

New backend files

backend/app/models/oauth_identity.py
backend/app/models/plan_billing.py
backend/app/models/sales_lead.py
backend/app/models/stripe_event.py
backend/app/schemas/billing.py
backend/app/schemas/oauth.py
backend/app/services/billing.py
backend/app/services/oauth_providers.py
backend/app/api/endpoints/billing.py
backend/app/api/endpoints/oauth.py
backend/alembic/versions/<hash>_add_oauth_identities.py
backend/alembic/versions/<hash>_users_password_hash_nullable.py
backend/alembic/versions/<hash>_users_add_role_at_signup_and_onboarding_step.py
backend/alembic/versions/<hash>_accounts_add_wizard_columns.py
backend/alembic/versions/<hash>_account_invites_add_revoked_and_email_sent.py
backend/alembic/versions/<hash>_add_plan_billing.py
backend/alembic/versions/<hash>_add_sales_leads.py
backend/alembic/versions/<hash>_add_stripe_events.py
backend/alembic/versions/<hash>_subscriptions_pilot_complimentary_backfill.py
backend/tests/test_billing_service.py
backend/tests/test_subscription_guards.py
backend/tests/test_oauth_callbacks.py
backend/tests/test_account_invite_extensions.py
backend/tests/test_email_verification_autosend.py
backend/tests/test_stripe_webhook_handler.py
backend/tests/test_billing_state_endpoint.py

Modified backend files

backend/app/models/subscription.py      — add 'complimentary' status, fix is_paid, add has_pro_entitlement
backend/app/models/user.py              — password_hash nullable, role_at_signup, onboarding_step_completed
backend/app/models/account.py           — team_size_bucket, primary_psa
backend/app/models/account_invite.py    — revoked_at, email_sent_at, property updates
backend/app/api/deps.py                 — REMOVE auto-downgrade block, add new deps
backend/app/api/endpoints/auth.py       — auto-send verify email on register; email-match enforcement; null password_hash handling
backend/app/api/endpoints/accounts.py   — wire EmailService.send_account_invite_email; bulk endpoint; soft-revoke
backend/app/api/endpoints/webhooks.py   — extend stub with concrete event handlers
backend/app/api/router.py               — register new oauth + billing routers
backend/app/schemas/user.py             — preserve UserCreate.account_invite_code; no plan/billing additions
backend/app/core/config.py              — SELF_SERVE_ENABLED flag, OAUTH_REDIRECT_BASE, GOOGLE_CLIENT_ID/SECRET, MS_CLIENT_ID/SECRET

Phase boundaries

Phase Tasks Outcome
A 18 Schema is in place; tests pass
B 912 Subscription model + new guards; old auto-downgrade gone
C 1316 BillingService + webhook handler complete
D 1719 Google + Microsoft OAuth working end-to-end
E 20 Auto-send verification + email-match enforcement
F 2123 Account-invite send + bulk + revoke routes
G 24 /billing/state endpoint live
H 25 Pilot user backfill applied

Each phase ends with a clean test run + commit; no phase leaves the backend in a broken state.


Phase A — Schema migrations

Task 1: Add oauth_identities table

Files:

  • Create: backend/app/models/oauth_identity.py

  • Create: backend/alembic/versions/<hash>_add_oauth_identities.py

  • Modify: backend/app/models/__init__.py (register import)

  • Test: backend/tests/test_oauth_identity_model.py

  • Step 1: Create the model file

# backend/app/models/oauth_identity.py
import uuid
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from sqlalchemy import String, DateTime, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from app.core.database import Base

if TYPE_CHECKING:
    from app.models.user import User


class OAuthIdentity(Base):
    __tablename__ = "oauth_identities"
    __table_args__ = (
        UniqueConstraint("provider", "provider_subject", name="uq_oauth_identities_provider_subject"),
        Index("ix_oauth_identities_user_id", "user_id"),
    )

    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False
    )
    provider: Mapped[str] = mapped_column(String(20), nullable=False)
    provider_subject: Mapped[str] = mapped_column(String(255), nullable=False)
    provider_email_at_link: Mapped[str] = mapped_column(String(255), nullable=False)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc),
    )

    user: Mapped["User"] = relationship("User", backref="oauth_identities")
  • Step 2: Register in models init
# backend/app/models/__init__.py — add line alongside other imports:
from app.models.oauth_identity import OAuthIdentity  # noqa: F401
  • Step 3: Create the migration
docker exec -w /app resolutionflow_backend alembic revision -m "add oauth_identities"

Edit the generated file:

# backend/alembic/versions/<hash>_add_oauth_identities.py
"""add oauth_identities

Revision ID: <hash>
Revises: <previous_head>
Create Date: 2026-05-06 ...

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID


revision = "<hash>"
down_revision = "<previous_head>"   # set to whichever existing head this branches from
branch_labels = None
depends_on = None


def upgrade() -> None:
    op.create_table(
        "oauth_identities",
        sa.Column("id", UUID(as_uuid=True), primary_key=True),
        sa.Column("user_id", UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="CASCADE"), nullable=False),
        sa.Column("provider", sa.String(20), nullable=False),
        sa.Column("provider_subject", sa.String(255), nullable=False),
        sa.Column("provider_email_at_link", sa.String(255), nullable=False),
        sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
        sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
        sa.UniqueConstraint("provider", "provider_subject", name="uq_oauth_identities_provider_subject"),
    )
    op.create_index("ix_oauth_identities_user_id", "oauth_identities", ["user_id"])


def downgrade() -> None:
    op.drop_index("ix_oauth_identities_user_id", table_name="oauth_identities")
    op.drop_table("oauth_identities")
  • Step 4: Apply migration

Run: docker exec resolutionflow_backend alembic upgrade heads Expected: applies cleanly; output includes the new revision id.

  • Step 5: Write a model smoke test
# backend/tests/test_oauth_identity_model.py
import pytest
from sqlalchemy import select
from app.models.oauth_identity import OAuthIdentity


@pytest.mark.asyncio
async def test_oauth_identity_unique_provider_subject(db_session, test_user):
    """Two rows with same provider+subject should violate uniqueness."""
    row1 = OAuthIdentity(
        user_id=test_user.id,
        provider="google",
        provider_subject="abc-123",
        provider_email_at_link="alex@acmemsp.com",
    )
    db_session.add(row1)
    await db_session.commit()

    row2 = OAuthIdentity(
        user_id=test_user.id,
        provider="google",
        provider_subject="abc-123",
        provider_email_at_link="alex@acmemsp.com",
    )
    db_session.add(row2)
    with pytest.raises(Exception):  # IntegrityError
        await db_session.commit()
    await db_session.rollback()

    rows = (await db_session.execute(select(OAuthIdentity).where(OAuthIdentity.user_id == test_user.id))).scalars().all()
    assert len(rows) == 1
  • Step 6: Run the test

Run: docker exec resolutionflow_backend pytest backend/tests/test_oauth_identity_model.py -v --override-ini="addopts=" Expected: PASS.

  • Step 7: Commit
git add backend/app/models/oauth_identity.py backend/app/models/__init__.py backend/alembic/versions/*_add_oauth_identities.py backend/tests/test_oauth_identity_model.py
git commit -m "feat(auth): add oauth_identities table for Google/Microsoft sign-in"

Task 2: Make users.password_hash nullable

Files:

  • Modify: backend/app/models/user.py:36

  • Create: backend/alembic/versions/<hash>_users_password_hash_nullable.py

  • Step 1: Update the model

Change backend/app/models/user.py line 36 from:

password_hash: Mapped[str] = mapped_column(String(255), nullable=False)

to:

password_hash: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
  • Step 2: Create the migration
docker exec -w /app resolutionflow_backend alembic revision -m "users password_hash nullable"

Edit:

def upgrade() -> None:
    op.alter_column("users", "password_hash", nullable=True)


def downgrade() -> None:
    # NOTE: downgrade is non-trivial if any OAuth-only users exist.
    # This downgrade fails fast in that case rather than corrupting data.
    conn = op.get_bind()
    null_count = conn.execute(sa.text("SELECT COUNT(*) FROM users WHERE password_hash IS NULL")).scalar()
    if null_count and null_count > 0:
        raise RuntimeError(
            f"Cannot downgrade: {null_count} OAuth-only users have NULL password_hash. "
            "Set passwords or delete those rows before downgrading."
        )
    op.alter_column("users", "password_hash", nullable=False)
  • Step 3: Apply

Run: docker exec resolutionflow_backend alembic upgrade heads Expected: applies cleanly.

  • Step 4: Verify with a smoke test
# backend/tests/test_user_password_nullable.py
import pytest
from app.models.user import User


@pytest.mark.asyncio
async def test_user_can_be_created_without_password_hash(db_session, test_account):
    user = User(
        email="oauth-only@example.com",
        name="OAuth Only",
        password_hash=None,
        account_id=test_account.id,
        account_role="engineer",
    )
    db_session.add(user)
    await db_session.commit()
    await db_session.refresh(user)
    assert user.password_hash is None
  • Step 5: Run the test

Run: docker exec resolutionflow_backend pytest backend/tests/test_user_password_nullable.py -v --override-ini="addopts=" Expected: PASS.

  • Step 6: Commit
git add backend/app/models/user.py backend/alembic/versions/*_users_password_hash_nullable.py backend/tests/test_user_password_nullable.py
git commit -m "feat(auth): make users.password_hash nullable for OAuth-only accounts"

Task 3: Add users.role_at_signup + users.onboarding_step_completed

Files:

  • Modify: backend/app/models/user.py

  • Create: backend/alembic/versions/<hash>_users_add_role_at_signup_and_onboarding_step.py

  • Step 1: Add columns to model

After the onboarding_dismissed column in backend/app/models/user.py (around line 78), add:

    role_at_signup: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
    onboarding_step_completed: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)

Make sure Integer is imported at top:

from sqlalchemy import String, DateTime, ForeignKey, Boolean, CheckConstraint, Text, Integer
  • Step 2: Create migration
docker exec -w /app resolutionflow_backend alembic revision -m "users add role_at_signup and onboarding_step_completed"
def upgrade() -> None:
    op.add_column("users", sa.Column("role_at_signup", sa.String(50), nullable=True))
    op.add_column("users", sa.Column("onboarding_step_completed", sa.Integer(), nullable=True))


def downgrade() -> None:
    op.drop_column("users", "onboarding_step_completed")
    op.drop_column("users", "role_at_signup")
  • Step 3: Apply

Run: docker exec resolutionflow_backend alembic upgrade heads Expected: applies cleanly.

  • Step 4: Commit
git add backend/app/models/user.py backend/alembic/versions/*_users_add_role_at_signup*.py
git commit -m "feat(onboarding): add users.role_at_signup and onboarding_step_completed"

Task 4: Add accounts.team_size_bucket + accounts.primary_psa

Files:

  • Modify: backend/app/models/account.py

  • Create: backend/alembic/versions/<hash>_accounts_add_wizard_columns.py

  • Step 1: Add columns to model

In backend/app/models/account.py, after branding_company_name (around line 50), add:

    team_size_bucket: Mapped[Optional[str]] = mapped_column(String(20), nullable=True)
    primary_psa: Mapped[Optional[str]] = mapped_column(String(20), nullable=True)
  • Step 2: Create migration
docker exec -w /app resolutionflow_backend alembic revision -m "accounts add wizard columns"
def upgrade() -> None:
    op.add_column("accounts", sa.Column("team_size_bucket", sa.String(20), nullable=True))
    op.add_column("accounts", sa.Column("primary_psa", sa.String(20), nullable=True))


def downgrade() -> None:
    op.drop_column("accounts", "primary_psa")
    op.drop_column("accounts", "team_size_bucket")
  • Step 3: Apply + commit
docker exec resolutionflow_backend alembic upgrade heads
git add backend/app/models/account.py backend/alembic/versions/*_accounts_add_wizard_columns.py
git commit -m "feat(onboarding): add accounts.team_size_bucket and primary_psa for wizard"

Task 5: Add account_invites.revoked_at + email_sent_at

Files:

  • Modify: backend/app/models/account_invite.py

  • Create: backend/alembic/versions/<hash>_account_invites_add_revoked_and_email_sent.py

  • Test: backend/tests/test_account_invite_model.py

  • Step 1: Add columns and update properties

In backend/app/models/account_invite.py:

    revoked_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)
    email_sent_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), nullable=True)

Update properties:

    @property
    def is_used(self) -> bool:
        return self.accepted_by_id is not None

    @property
    def is_revoked(self) -> bool:
        return self.revoked_at is not None

    @property
    def is_expired(self) -> bool:
        if self.expires_at is None:
            return False
        return datetime.now(timezone.utc) > self.expires_at

    @property
    def is_valid(self) -> bool:
        return not self.is_used and not self.is_expired and not self.is_revoked
  • Step 2: Migration
docker exec -w /app resolutionflow_backend alembic revision -m "account_invites add revoked_at and email_sent_at"
def upgrade() -> None:
    op.add_column("account_invites", sa.Column("revoked_at", sa.DateTime(timezone=True), nullable=True))
    op.add_column("account_invites", sa.Column("email_sent_at", sa.DateTime(timezone=True), nullable=True))
    op.create_index("ix_account_invites_revoked_at", "account_invites", ["revoked_at"])


def downgrade() -> None:
    op.drop_index("ix_account_invites_revoked_at", table_name="account_invites")
    op.drop_column("account_invites", "email_sent_at")
    op.drop_column("account_invites", "revoked_at")
  • Step 3: Property test
# backend/tests/test_account_invite_model.py
import pytest
from datetime import datetime, timezone, timedelta
from app.models.account_invite import AccountInvite


def make_invite(**kwargs):
    return AccountInvite(
        account_id=kwargs.get("account_id", "00000000-0000-0000-0000-000000000001"),
        invited_by_id=kwargs.get("invited_by_id", "00000000-0000-0000-0000-000000000002"),
        email=kwargs.get("email", "x@y.com"),
        code=kwargs.get("code", "ABCD1234"),
        role=kwargs.get("role", "engineer"),
        accepted_by_id=kwargs.get("accepted_by_id"),
        expires_at=kwargs.get("expires_at"),
        revoked_at=kwargs.get("revoked_at"),
    )


def test_invite_revoked_is_invalid():
    invite = make_invite(revoked_at=datetime.now(timezone.utc))
    assert invite.is_revoked is True
    assert invite.is_valid is False


def test_invite_unrevoked_unexpired_unused_is_valid():
    invite = make_invite(expires_at=datetime.now(timezone.utc) + timedelta(days=7))
    assert invite.is_valid is True
  • Step 4: Run test + apply migration
docker exec resolutionflow_backend alembic upgrade heads
docker exec resolutionflow_backend pytest backend/tests/test_account_invite_model.py -v --override-ini="addopts="

Expected: tests PASS.

  • Step 5: Commit
git add backend/app/models/account_invite.py backend/alembic/versions/*_account_invites_add_*.py backend/tests/test_account_invite_model.py
git commit -m "feat(invites): add revoked_at + email_sent_at to account_invites"

Task 6: Add plan_billing table

Files:

  • Create: backend/app/models/plan_billing.py

  • Create: backend/alembic/versions/<hash>_add_plan_billing.py

  • Modify: backend/app/models/__init__.py

  • Step 1: Model

# backend/app/models/plan_billing.py
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import String, Integer, Boolean, DateTime, ForeignKey, Text
from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import Base


class PlanBilling(Base):
    __tablename__ = "plan_billing"

    plan: Mapped[str] = mapped_column(
        String(50), ForeignKey("plan_limits.plan"), primary_key=True
    )
    display_name: Mapped[str] = mapped_column(String(255), nullable=False)
    description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    monthly_price_cents: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
    annual_price_cents: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
    stripe_product_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
    stripe_monthly_price_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
    stripe_annual_price_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
    is_public: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
    is_archived: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
    sort_order: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc),
    )

Register in __init__.py:

from app.models.plan_billing import PlanBilling  # noqa: F401
  • Step 2: Migration
docker exec -w /app resolutionflow_backend alembic revision -m "add plan_billing"
def upgrade() -> None:
    op.create_table(
        "plan_billing",
        sa.Column("plan", sa.String(50), sa.ForeignKey("plan_limits.plan"), primary_key=True),
        sa.Column("display_name", sa.String(255), nullable=False),
        sa.Column("description", sa.Text(), nullable=True),
        sa.Column("monthly_price_cents", sa.Integer(), nullable=True),
        sa.Column("annual_price_cents", sa.Integer(), nullable=True),
        sa.Column("stripe_product_id", sa.String(255), nullable=True),
        sa.Column("stripe_monthly_price_id", sa.String(255), nullable=True),
        sa.Column("stripe_annual_price_id", sa.String(255), nullable=True),
        sa.Column("is_public", sa.Boolean(), nullable=False, server_default=sa.text("true")),
        sa.Column("is_archived", sa.Boolean(), nullable=False, server_default=sa.text("false")),
        sa.Column("sort_order", sa.Integer(), nullable=False, server_default=sa.text("0")),
        sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
        sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
    )


def downgrade() -> None:
    op.drop_table("plan_billing")

Note: this migration creates the table empty. Stripe IDs and per-environment seeding happens later (out-of-band, via /admin/plan-limits PUT once that endpoint accepts plan_billing fields, or a one-off scripts.sync_stripe_plan_ids admin command). The migration stays environment-agnostic.

  • Step 3: Apply + commit
docker exec resolutionflow_backend alembic upgrade heads
git add backend/app/models/plan_billing.py backend/app/models/__init__.py backend/alembic/versions/*_add_plan_billing.py
git commit -m "feat(billing): add plan_billing sibling table for Stripe + catalog metadata"

Task 7: Add sales_leads + stripe_events tables

Files:

  • Create: backend/app/models/sales_lead.py

  • Create: backend/app/models/stripe_event.py

  • Create: backend/alembic/versions/<hash>_add_sales_leads.py

  • Create: backend/alembic/versions/<hash>_add_stripe_events.py

  • Modify: backend/app/models/__init__.py

  • Step 1: SalesLead model

# backend/app/models/sales_lead.py
import uuid
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import String, DateTime, Text, Index
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import UUID
from app.core.database import Base


class SalesLead(Base):
    __tablename__ = "sales_leads"
    __table_args__ = (Index("ix_sales_leads_email", "email"),)

    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    email: Mapped[str] = mapped_column(String(255), nullable=False)
    name: Mapped[str] = mapped_column(String(255), nullable=False)
    company: Mapped[str] = mapped_column(String(255), nullable=False)
    team_size: Mapped[Optional[str]] = mapped_column(String(20), nullable=True)
    message: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    source: Mapped[str] = mapped_column(String(50), nullable=False)
    posthog_distinct_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True)
    status: Mapped[str] = mapped_column(String(20), nullable=False, default="new")
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc),
    )
  • Step 2: StripeEvent model
# backend/app/models/stripe_event.py
from datetime import datetime, timezone
from sqlalchemy import String, DateTime, Index
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import JSONB
from app.core.database import Base


class StripeEvent(Base):
    __tablename__ = "stripe_events"
    __table_args__ = (Index("ix_stripe_events_event_type", "event_type"),)

    id: Mapped[str] = mapped_column(String(255), primary_key=True)  # Stripe event id
    event_type: Mapped[str] = mapped_column(String(100), nullable=False)
    processed_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
    )
    payload_excerpt: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)

Register both in __init__.py.

  • Step 3: Two migrations
docker exec -w /app resolutionflow_backend alembic revision -m "add sales_leads"
docker exec -w /app resolutionflow_backend alembic revision -m "add stripe_events"

Each migration creates its respective table — fields match the models above.

  • Step 4: Apply + commit
docker exec resolutionflow_backend alembic upgrade heads
git add backend/app/models/sales_lead.py backend/app/models/stripe_event.py backend/app/models/__init__.py backend/alembic/versions/*_add_sales_leads.py backend/alembic/versions/*_add_stripe_events.py
git commit -m "feat(billing): add sales_leads and stripe_events tables"

Task 8: Verify migrations apply on a fresh DB

Files: None (verification step).

  • Step 1: Drop test DB and recreate
docker exec resolutionflow_postgres psql -U postgres -c "DROP DATABASE IF EXISTS resolutionflow_test;"
docker exec resolutionflow_postgres psql -U postgres -c "CREATE DATABASE resolutionflow_test;"
  • Step 2: Apply all migrations to fresh DB
docker exec -e DATABASE_URL=postgresql+asyncpg://postgres:postgres@postgres:5432/resolutionflow_test resolutionflow_backend alembic upgrade heads

Expected: all migrations apply cleanly through to the new heads. No errors.

  • Step 3: Run full backend test suite
docker exec resolutionflow_backend pytest --override-ini="addopts="

Expected: all existing tests still pass. New model tests added in Tasks 1-7 also pass.

  • Step 4: No commit (verification only)

If the suite is green, Phase A is complete.


Phase B — Subscription model + new dependencies

Task 9: Add 'complimentary' status; fix is_paid; add has_pro_entitlement

Files:

  • Modify: backend/app/models/subscription.py

  • Test: backend/tests/test_subscription_properties.py

  • Step 1: Update model properties

Replace the property block at the bottom of backend/app/models/subscription.py:

    @property
    def is_active(self) -> bool:
        return self.status in ("active", "trialing", "complimentary")

    @property
    def is_paid(self) -> bool:
        # Excludes complimentary so MRR/paid-customer metrics aren't inflated.
        return self.plan in ("pro", "team") and self.status not in ("complimentary",)

    @property
    def has_pro_entitlement(self) -> bool:
        """True if the account can access Pro features right now."""
        if self.plan in ("pro", "team"):
            if self.status in ("active", "complimentary"):
                return True
        if self.status == "trialing" and self.current_period_end is not None:
            from datetime import datetime, timezone
            return self.current_period_end > datetime.now(timezone.utc)
        return False
  • Step 2: Property tests
# backend/tests/test_subscription_properties.py
from datetime import datetime, timezone, timedelta
from app.models.subscription import Subscription


def make_sub(**kwargs):
    sub = Subscription()
    sub.plan = kwargs.get("plan", "free")
    sub.status = kwargs.get("status", "active")
    sub.current_period_end = kwargs.get("current_period_end")
    return sub


def test_complimentary_is_active_but_not_paid():
    sub = make_sub(plan="pro", status="complimentary")
    assert sub.is_active is True
    assert sub.is_paid is False
    assert sub.has_pro_entitlement is True


def test_paid_pro_active():
    sub = make_sub(plan="pro", status="active")
    assert sub.is_paid is True
    assert sub.has_pro_entitlement is True


def test_trial_unexpired_has_entitlement():
    sub = make_sub(plan="pro", status="trialing", current_period_end=datetime.now(timezone.utc) + timedelta(days=5))
    assert sub.is_active is True
    assert sub.is_paid is False
    assert sub.has_pro_entitlement is True


def test_trial_expired_no_entitlement():
    sub = make_sub(plan="pro", status="trialing", current_period_end=datetime.now(timezone.utc) - timedelta(hours=1))
    assert sub.has_pro_entitlement is False


def test_canceled_no_entitlement():
    sub = make_sub(plan="pro", status="canceled")
    assert sub.is_active is False
    assert sub.has_pro_entitlement is False
  • Step 3: Run tests
docker exec resolutionflow_backend pytest backend/tests/test_subscription_properties.py -v --override-ini="addopts="

Expected: all pass.

  • Step 4: Commit
git add backend/app/models/subscription.py backend/tests/test_subscription_properties.py
git commit -m "feat(billing): add complimentary status, fix is_paid, add has_pro_entitlement"

Task 10: Remove the trial auto-downgrade in deps.py

Files:

  • Modify: backend/app/api/deps.py:81-129

  • Test: backend/tests/test_get_current_active_user_no_mutation.py

  • Step 1: Remove the auto-downgrade block

In backend/app/api/deps.py, find lines 109-127 (the Lightweight trial expiry check block) and delete the entire if-block, leaving only the early return paths and Sentry user context:

async def get_current_active_user(
    request: Request,
    current_user: Annotated[User, Depends(get_current_user)],
    db: Annotated[AsyncSession, Depends(get_admin_db)],
) -> User:
    """Ensure user is active (not disabled). Enforces must_change_password —
    blocks all routes except allowlist.

    Trial expiry enforcement now happens via require_active_subscription in
    individual routers, NOT here. This dep no longer mutates Subscription
    state.
    """
    if not current_user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Account has been deactivated"
        )

    if current_user.must_change_password:
        if request.url.path not in _PASSWORD_CHANGE_ALLOWLIST:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="password_change_required"
            )

    sentry_sdk.set_user({"id": str(current_user.id), "email": current_user.email})

    return current_user

Remove the now-unused imports from app.models.subscription import Subscription and from datetime import datetime, timezone if they were inside the function.

  • Step 2: Test that trialing+expired is NOT mutated
# backend/tests/test_get_current_active_user_no_mutation.py
import pytest
from datetime import datetime, timezone, timedelta
from sqlalchemy import select
from app.models.subscription import Subscription


@pytest.mark.asyncio
async def test_expired_trial_is_not_mutated_by_get_current_active_user(
    db_session, authed_client, test_account, test_user
):
    """The previous deps.py:109 logic mutated trialing→active+free on expiry.
    That's gone. An expired-trial Subscription should retain status='trialing'
    and current_period_end after any authenticated request."""
    sub = Subscription(
        account_id=test_account.id,
        plan="pro",
        status="trialing",
        current_period_end=datetime.now(timezone.utc) - timedelta(hours=1),
    )
    db_session.add(sub)
    await db_session.commit()

    # Call any authenticated endpoint that goes through get_current_active_user.
    response = await authed_client.get("/api/v1/auth/me")
    assert response.status_code == 200

    await db_session.refresh(sub)
    assert sub.status == "trialing"
    assert sub.plan == "pro"
    assert sub.current_period_end is not None
  • Step 3: Run test
docker exec resolutionflow_backend pytest backend/tests/test_get_current_active_user_no_mutation.py -v --override-ini="addopts="

Expected: PASS.

  • Step 4: Commit
git add backend/app/api/deps.py backend/tests/test_get_current_active_user_no_mutation.py
git commit -m "refactor(deps): remove trial auto-downgrade; expiry now non-mutating per spec"

Task 11: Add require_active_subscription dep

Files:

  • Modify: backend/app/api/deps.py

  • Test: backend/tests/test_subscription_guards.py

  • Step 1: Add the dep

Append to backend/app/api/deps.py:

from datetime import datetime, timezone

_SUBSCRIPTION_GUARD_ALLOWLIST = {
    "/api/v1/auth/me",
    "/api/v1/auth/logout",
    "/api/v1/auth/password/change",
    "/api/v1/auth/email/send-verification",
    "/api/v1/auth/email/verify",
    "/api/v1/billing/state",
    "/api/v1/billing/checkout-session",
    "/api/v1/billing/portal-session",
    "/api/v1/users/me",
    "/api/v1/users/me/onboarding-step",
}


async def require_active_subscription(
    request: Request,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_admin_db)],
):
    """Returns the Subscription row when the account has access; raises 402
    when locked. Mounted on routers requiring Pro entitlement.

    'Locked' = (trialing AND current_period_end < now()) OR
              (canceled OR incomplete OR no subscription).
    Active states: active, complimentary, trialing-with-time-remaining, past_due.
    """
    if request.url.path in _SUBSCRIPTION_GUARD_ALLOWLIST:
        return None

    from app.models.subscription import Subscription
    result = await db.execute(
        select(Subscription).where(Subscription.account_id == current_user.account_id)
    )
    sub = result.scalar_one_or_none()

    if sub is None:
        raise HTTPException(
            status_code=402,
            detail={"error": "no_subscription", "upgrade_url": "/account/billing/select-plan"},
        )

    now = datetime.now(timezone.utc)
    is_live = (
        sub.status in ("active", "complimentary", "past_due")
        or (
            sub.status == "trialing"
            and sub.current_period_end is not None
            and sub.current_period_end > now
        )
    )
    if not is_live:
        raise HTTPException(
            status_code=402,
            detail={
                "error": "subscription_inactive",
                "status": sub.status,
                "plan": sub.plan,
                "current_period_end": sub.current_period_end.isoformat() if sub.current_period_end else None,
                "upgrade_url": "/account/billing/select-plan",
            },
        )

    return sub
  • Step 2: Tests
# backend/tests/test_subscription_guards.py
import pytest
from datetime import datetime, timezone, timedelta
from sqlalchemy import select
from app.models.subscription import Subscription


@pytest.mark.asyncio
async def test_active_subscription_passes(authed_client, db_session, test_account):
    db_session.add(Subscription(account_id=test_account.id, plan="pro", status="active"))
    await db_session.commit()
    response = await authed_client.get("/api/v1/trees/")  # any protected route
    assert response.status_code != 402


@pytest.mark.asyncio
async def test_complimentary_subscription_passes(authed_client, db_session, test_account):
    db_session.add(Subscription(account_id=test_account.id, plan="pro", status="complimentary"))
    await db_session.commit()
    response = await authed_client.get("/api/v1/trees/")
    assert response.status_code != 402


@pytest.mark.asyncio
async def test_trialing_unexpired_passes(authed_client, db_session, test_account):
    db_session.add(Subscription(
        account_id=test_account.id,
        plan="pro",
        status="trialing",
        current_period_end=datetime.now(timezone.utc) + timedelta(days=5),
    ))
    await db_session.commit()
    response = await authed_client.get("/api/v1/trees/")
    assert response.status_code != 402


@pytest.mark.asyncio
async def test_trialing_expired_returns_402(authed_client, db_session, test_account):
    db_session.add(Subscription(
        account_id=test_account.id,
        plan="pro",
        status="trialing",
        current_period_end=datetime.now(timezone.utc) - timedelta(hours=1),
    ))
    await db_session.commit()
    response = await authed_client.get("/api/v1/trees/")
    assert response.status_code == 402
    body = response.json()
    assert body["detail"]["error"] == "subscription_inactive"


@pytest.mark.asyncio
async def test_canceled_returns_402(authed_client, db_session, test_account):
    db_session.add(Subscription(account_id=test_account.id, plan="pro", status="canceled"))
    await db_session.commit()
    response = await authed_client.get("/api/v1/trees/")
    assert response.status_code == 402


@pytest.mark.asyncio
async def test_billing_state_endpoint_bypasses_guard(authed_client, db_session, test_account):
    """Allowlisted route works even when subscription is canceled."""
    db_session.add(Subscription(account_id=test_account.id, plan="pro", status="canceled"))
    await db_session.commit()
    # /billing/state will be added in Task 24; this test currently asserts the
    # allowlist by hitting /auth/me (also allowlisted).
    response = await authed_client.get("/api/v1/auth/me")
    assert response.status_code == 200
  • Step 3: Mount the guard on existing protected routers

This requires editing backend/app/api/router.py to add the dep to the routers that gate Pro functionality. Apply with care — the implementation plan within this task is targeted: add dependencies=[Depends(require_active_subscription)] to:

  • trees_router (in router.py)
  • sessions_router
  • flowpilot_router (FlowPilot endpoints)
  • scripts_router
  • psa_integrations_router
  • analytics_router
  • assistant_chat_router
  • step_library_router
  • template_trees_router

Do NOT add to: auth_router, users_router, accounts_router (selectively; some endpoints may need it), admin_router family, webhooks_router, public_router. The allowlist in the guard handles per-path bypass for routes within mounted routers.

# backend/app/api/router.py — example pattern
from app.api.deps import require_active_subscription

api_router.include_router(
    trees.router,
    dependencies=[Depends(require_active_subscription)],
    tags=["trees"],
)
  • Step 4: Run tests
docker exec resolutionflow_backend pytest backend/tests/test_subscription_guards.py -v --override-ini="addopts="

Expected: all pass. Existing test suite should also still pass — verify with:

docker exec resolutionflow_backend pytest --override-ini="addopts="

If existing tests break because they don't seed a Subscription, update fixtures to provide an active Subscription by default for test_account.

  • Step 5: Commit
git add backend/app/api/deps.py backend/app/api/router.py backend/tests/test_subscription_guards.py backend/tests/conftest.py
git commit -m "feat(deps): add require_active_subscription guard with allowlist"

Task 12: Add require_verified_email_after_grace dep

Files:

  • Modify: backend/app/api/deps.py

  • Modify: backend/app/api/router.py

  • Test: backend/tests/test_email_verification_guard.py

  • Step 1: Add the dep

Append to backend/app/api/deps.py:

from datetime import timedelta

_EMAIL_VERIFICATION_ALLOWLIST = {
    "/api/v1/auth/me",
    "/api/v1/auth/logout",
    "/api/v1/auth/email/send-verification",
    "/api/v1/auth/email/verify",
    "/api/v1/auth/password/change",
    "/api/v1/users/me",
    "/api/v1/billing/state",
    "/api/v1/billing/checkout-session",
    "/api/v1/billing/portal-session",
}

VERIFICATION_GRACE_DAYS = 7


async def require_verified_email_after_grace(
    request: Request,
    current_user: Annotated[User, Depends(get_current_active_user)],
):
    """Enforces 'this user has verified email OR is still in 7-day grace.'
    OAuth signups bypass cleanly because /auth/{google,microsoft}/callback
    sets users.email_verified_at = now() (provider-attested)."""
    if request.url.path in _EMAIL_VERIFICATION_ALLOWLIST:
        return

    if current_user.email_verified_at is not None:
        return

    grace_ends = current_user.created_at + timedelta(days=VERIFICATION_GRACE_DAYS)
    if datetime.now(timezone.utc) < grace_ends:
        return

    raise HTTPException(
        status_code=403,
        detail={
            "error": "email_not_verified",
            "grace_ended_at": grace_ends.isoformat(),
            "resend_url": "/api/v1/auth/email/send-verification",
        },
    )
  • Step 2: Tests
# backend/tests/test_email_verification_guard.py
import pytest
from datetime import datetime, timezone, timedelta


@pytest.mark.asyncio
async def test_verified_user_passes(authed_client, db_session, test_user):
    test_user.email_verified_at = datetime.now(timezone.utc)
    await db_session.commit()
    response = await authed_client.get("/api/v1/trees/")
    assert response.status_code != 403


@pytest.mark.asyncio
async def test_unverified_in_grace_passes(authed_client, db_session, test_user):
    test_user.email_verified_at = None
    test_user.created_at = datetime.now(timezone.utc) - timedelta(days=2)
    await db_session.commit()
    response = await authed_client.get("/api/v1/trees/")
    assert response.status_code != 403


@pytest.mark.asyncio
async def test_unverified_past_grace_blocks(authed_client, db_session, test_user):
    test_user.email_verified_at = None
    test_user.created_at = datetime.now(timezone.utc) - timedelta(days=10)
    await db_session.commit()
    response = await authed_client.get("/api/v1/trees/")
    assert response.status_code == 403
    body = response.json()
    assert body["detail"]["error"] == "email_not_verified"


@pytest.mark.asyncio
async def test_unverified_past_grace_allowlisted_still_passes(authed_client, db_session, test_user):
    test_user.email_verified_at = None
    test_user.created_at = datetime.now(timezone.utc) - timedelta(days=10)
    await db_session.commit()
    response = await authed_client.get("/api/v1/auth/me")
    assert response.status_code == 200


@pytest.mark.asyncio
async def test_combined_guards_unverified_expired_trial(authed_client, db_session, test_user, test_account):
    """A user who is BOTH past grace AND on an expired trial should get blocked
    by one of the two guards. Either error is acceptable; we just verify a
    refusal."""
    from app.models.subscription import Subscription
    test_user.email_verified_at = None
    test_user.created_at = datetime.now(timezone.utc) - timedelta(days=10)
    db_session.add(Subscription(
        account_id=test_account.id, plan="pro", status="trialing",
        current_period_end=datetime.now(timezone.utc) - timedelta(hours=1),
    ))
    await db_session.commit()
    response = await authed_client.get("/api/v1/trees/")
    assert response.status_code in (402, 403)
  • Step 3: Mount on the same routers

In backend/app/api/router.py, add Depends(require_verified_email_after_grace) alongside Depends(require_active_subscription) on the routers listed in Task 11 Step 3.

  • Step 4: Run tests
docker exec resolutionflow_backend pytest backend/tests/test_email_verification_guard.py -v --override-ini="addopts="

Expected: all pass.

  • Step 5: Commit
git add backend/app/api/deps.py backend/app/api/router.py backend/tests/test_email_verification_guard.py
git commit -m "feat(deps): add require_verified_email_after_grace guard"

Phase C — BillingService + Stripe webhook handler

Task 13: BillingService skeleton + start_trial; integrate into /auth/register

Files:

  • Create: backend/app/services/billing.py

  • Modify: backend/app/api/endpoints/auth.py (register handler)

  • Test: backend/tests/test_billing_service.py

  • Step 1: BillingService skeleton

# backend/app/services/billing.py
"""Single billing service module. Stripe is the only impl — no provider
abstraction. Account row is canonical local state; Stripe is canonical
remote state; the webhook handler bridges the two."""
from datetime import datetime, timezone, timedelta
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.models.account import Account
from app.models.subscription import Subscription


TRIAL_DAYS = 14


class BillingService:
    @staticmethod
    async def start_trial(db: AsyncSession, account_id) -> Subscription:
        """Idempotent. Creates a trialing Subscription on Pro for the account if
        one doesn't exist; otherwise returns the existing row."""
        result = await db.execute(
            select(Subscription).where(Subscription.account_id == account_id)
        )
        existing = result.scalar_one_or_none()
        if existing is not None:
            return existing

        sub = Subscription(
            account_id=account_id,
            plan="pro",
            status="trialing",
            current_period_start=datetime.now(timezone.utc),
            current_period_end=datetime.now(timezone.utc) + timedelta(days=TRIAL_DAYS),
        )
        db.add(sub)
        await db.commit()
        await db.refresh(sub)
        return sub
  • Step 2: Wire into /auth/register

In backend/app/api/endpoints/auth.py (the registration handler — search for the existing @router.post("/register") block), call BillingService.start_trial(db, account.id) after the Account is created. Do NOT call it for invitee signups (when account_invite_code was provided and matched an existing account) — those users join an existing account that already has a subscription.

# inside the register handler, after Account creation:
from app.services.billing import BillingService

# ... existing user/account creation ...

if not joining_existing_account:
    # New shop — start a Pro trial
    await BillingService.start_trial(db, account.id)
  • Step 3: Test
# backend/tests/test_billing_service.py
import pytest
from datetime import datetime, timezone
from sqlalchemy import select
from app.models.subscription import Subscription
from app.services.billing import BillingService


@pytest.mark.asyncio
async def test_start_trial_creates_trialing_pro_subscription(db_session, test_account):
    sub = await BillingService.start_trial(db_session, test_account.id)
    assert sub.plan == "pro"
    assert sub.status == "trialing"
    assert sub.current_period_end is not None
    assert sub.current_period_end > datetime.now(timezone.utc)


@pytest.mark.asyncio
async def test_start_trial_is_idempotent(db_session, test_account):
    sub1 = await BillingService.start_trial(db_session, test_account.id)
    sub2 = await BillingService.start_trial(db_session, test_account.id)
    assert sub1.id == sub2.id

    rows = (await db_session.execute(
        select(Subscription).where(Subscription.account_id == test_account.id)
    )).scalars().all()
    assert len(rows) == 1


@pytest.mark.asyncio
async def test_register_creates_trial_subscription(client, db_session):
    response = await client.post("/api/v1/auth/register", json={
        "email": "newshop@example.com",
        "password": "Verystrong1Pwd",
        "name": "New Shop",
    })
    assert response.status_code in (200, 201)

    from app.models.user import User
    user = (await db_session.execute(select(User).where(User.email == "newshop@example.com"))).scalar_one()
    sub = (await db_session.execute(select(Subscription).where(Subscription.account_id == user.account_id))).scalar_one()
    assert sub.plan == "pro"
    assert sub.status == "trialing"
  • Step 4: Run tests
docker exec resolutionflow_backend pytest backend/tests/test_billing_service.py -v --override-ini="addopts="

Expected: PASS.

  • Step 5: Commit
git add backend/app/services/billing.py backend/app/api/endpoints/auth.py backend/tests/test_billing_service.py
git commit -m "feat(billing): add BillingService.start_trial; wire into /auth/register"

Task 14: BillingService.create_checkout_session + endpoint

Files:

  • Modify: backend/app/services/billing.py

  • Create: backend/app/api/endpoints/billing.py

  • Create: backend/app/schemas/billing.py

  • Modify: backend/app/api/router.py

  • Modify: backend/app/core/config.py (Stripe settings)

  • Test: backend/tests/test_billing_checkout.py

  • Step 1: Schemas

# backend/app/schemas/billing.py
from typing import Optional, Literal
from datetime import datetime
from pydantic import BaseModel


class CheckoutSessionCreate(BaseModel):
    plan: Literal["pro", "starter", "team", "enterprise"]
    seats: int
    billing_interval: Literal["monthly", "annual"] = "monthly"


class CheckoutSessionResponse(BaseModel):
    url: str
  • Step 2: Add config keys

In backend/app/core/config.py, add to Settings:

    STRIPE_SECRET_KEY: Optional[str] = None
    STRIPE_PUBLISHABLE_KEY: Optional[str] = None
    STRIPE_WEBHOOK_SECRET: Optional[str] = None
    SELF_SERVE_ENABLED: bool = False

    @property
    def stripe_enabled(self) -> bool:
        return bool(self.STRIPE_SECRET_KEY)
  • Step 3: Service method

Add to backend/app/services/billing.py:

import stripe
from app.core.config import settings
from app.models.plan_billing import PlanBilling


class BillingService:
    # ... existing start_trial ...

    @staticmethod
    async def create_checkout_session(
        db: AsyncSession,
        account: Account,
        plan: str,
        seats: int,
        billing_interval: str,
        success_url: str,
        cancel_url: str,
    ) -> str:
        if not settings.stripe_enabled:
            raise RuntimeError("Stripe not configured")
        stripe.api_key = settings.STRIPE_SECRET_KEY

        # Look up Stripe price id for the requested plan + interval
        plan_billing = (await db.execute(
            select(PlanBilling).where(PlanBilling.plan == plan)
        )).scalar_one_or_none()
        if plan_billing is None:
            raise ValueError(f"Unknown plan: {plan}")
        price_id = (
            plan_billing.stripe_monthly_price_id if billing_interval == "monthly"
            else plan_billing.stripe_annual_price_id
        )
        if price_id is None:
            raise RuntimeError(f"Plan '{plan}' has no Stripe price for {billing_interval}")

        # Ensure Stripe Customer exists on the Account row
        if account.stripe_customer_id is None:
            customer = stripe.Customer.create(
                email=None,  # email comes from session.customer_email below
                metadata={"account_id": str(account.id)},
            )
            account.stripe_customer_id = customer.id
            await db.commit()

        # Read the live subscription's current_period_end to pass as trial_end if still trialing
        sub = (await db.execute(
            select(Subscription).where(Subscription.account_id == account.id)
        )).scalar_one_or_none()
        subscription_data = {}
        if sub and sub.status == "trialing" and sub.current_period_end and sub.current_period_end > datetime.now(timezone.utc):
            subscription_data["trial_end"] = int(sub.current_period_end.timestamp())

        session = stripe.checkout.Session.create(
            customer=account.stripe_customer_id,
            line_items=[{"price": price_id, "quantity": seats}],
            mode="subscription",
            subscription_data=subscription_data or None,
            success_url=success_url,
            cancel_url=cancel_url,
            allow_promotion_codes=False,  # promo codes deferred to v2 per spec
        )
        return session.url
  • Step 4: Endpoint
# backend/app/api/endpoints/billing.py
from typing import Annotated
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.deps import get_current_active_user, require_admin_db
from app.core.database import get_admin_db
from app.core.config import settings
from app.models.user import User
from app.schemas.billing import CheckoutSessionCreate, CheckoutSessionResponse
from app.services.billing import BillingService

router = APIRouter(prefix="/billing", tags=["billing"])


@router.post("/checkout-session", response_model=CheckoutSessionResponse)
async def create_checkout_session(
    payload: CheckoutSessionCreate,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_admin_db)],
) -> CheckoutSessionResponse:
    url = await BillingService.create_checkout_session(
        db=db,
        account=current_user.account,
        plan=payload.plan,
        seats=payload.seats,
        billing_interval=payload.billing_interval,
        success_url=f"{settings.FRONTEND_URL}/account/billing?success=1",
        cancel_url=f"{settings.FRONTEND_URL}/account/billing/select-plan",
    )
    return CheckoutSessionResponse(url=url)

Register in backend/app/api/router.py:

from app.api.endpoints import billing
api_router.include_router(billing.router)
  • Step 5: Test with respx-mocked Stripe
# backend/tests/test_billing_checkout.py
import pytest
import respx
from httpx import Response
from sqlalchemy import select
from app.models.plan_billing import PlanBilling


@pytest.mark.asyncio
@respx.mock
async def test_checkout_session_creates_stripe_session(authed_client, db_session, test_account):
    db_session.add(PlanBilling(
        plan="pro",
        display_name="Pro",
        stripe_product_id="prod_test",
        stripe_monthly_price_id="price_test_monthly",
    ))
    await db_session.commit()

    respx.post("https://api.stripe.com/v1/customers").mock(
        return_value=Response(200, json={"id": "cus_test_123"})
    )
    respx.post("https://api.stripe.com/v1/checkout/sessions").mock(
        return_value=Response(200, json={"id": "cs_test", "url": "https://checkout.stripe.com/test"})
    )

    response = await authed_client.post("/api/v1/billing/checkout-session", json={
        "plan": "pro",
        "seats": 3,
        "billing_interval": "monthly",
    })
    assert response.status_code == 200
    body = response.json()
    assert body["url"] == "https://checkout.stripe.com/test"

    await db_session.refresh(test_account)
    assert test_account.stripe_customer_id == "cus_test_123"
  • Step 6: Run + commit
docker exec resolutionflow_backend pytest backend/tests/test_billing_checkout.py -v --override-ini="addopts="
git add backend/app/services/billing.py backend/app/api/endpoints/billing.py backend/app/schemas/billing.py backend/app/api/router.py backend/app/core/config.py backend/tests/test_billing_checkout.py
git commit -m "feat(billing): add /billing/checkout-session via BillingService"

Task 15: BillingService.apply_subscription_event + idempotency

Files:

  • Modify: backend/app/services/billing.py

  • Test: backend/tests/test_billing_service.py

  • Step 1: Add the event-application method

Append to backend/app/services/billing.py:

from datetime import datetime, timezone
from app.models.stripe_event import StripeEvent
from sqlalchemy.exc import IntegrityError


class BillingService:
    # ... existing methods ...

    @staticmethod
    async def apply_subscription_event(
        db: AsyncSession, event_id: str, event_type: str, payload: dict
    ) -> bool:
        """Idempotent. Returns True if the event was applied; False if it had
        already been processed (idempotent ack). The webhook handler returns 200
        either way."""
        # Idempotency check — atomic insert
        try:
            db.add(StripeEvent(
                id=event_id,
                event_type=event_type,
                payload_excerpt=_excerpt(payload),
            ))
            await db.commit()
        except IntegrityError:
            await db.rollback()
            return False

        # Dispatch
        if event_type == "checkout.session.completed":
            await _handle_checkout_completed(db, payload)
        elif event_type == "customer.subscription.updated":
            await _handle_subscription_updated(db, payload)
        elif event_type == "customer.subscription.deleted":
            await _handle_subscription_deleted(db, payload)
        elif event_type == "invoice.payment_failed":
            await _handle_payment_failed(db, payload)
        elif event_type == "invoice.payment_succeeded":
            await _handle_payment_succeeded(db, payload)
        # other events: just record + ack
        return True


def _excerpt(payload: dict) -> dict:
    obj = payload.get("data", {}).get("object", {})
    return {
        "object_id": obj.get("id"),
        "customer": obj.get("customer"),
        "subscription": obj.get("subscription"),
        "status": obj.get("status"),
    }


async def _handle_checkout_completed(db: AsyncSession, payload: dict):
    obj = payload["data"]["object"]
    customer_id = obj["customer"]
    subscription_id = obj["subscription"]

    account = (await db.execute(
        select(Account).where(Account.stripe_customer_id == customer_id)
    )).scalar_one_or_none()
    if account is None:
        return  # webhook for unknown customer — log and ack

    sub = (await db.execute(
        select(Subscription).where(Subscription.account_id == account.id)
    )).scalar_one_or_none()
    if sub is None:
        return

    # Fetch the live Stripe subscription for canonical period + price details
    stripe.api_key = settings.STRIPE_SECRET_KEY
    stripe_sub = stripe.Subscription.retrieve(subscription_id)
    sub.stripe_subscription_id = subscription_id
    sub.stripe_price_id = stripe_sub["items"]["data"][0]["price"]["id"]
    sub.status = "active"
    sub.current_period_start = datetime.fromtimestamp(stripe_sub["current_period_start"], tz=timezone.utc)
    sub.current_period_end = datetime.fromtimestamp(stripe_sub["current_period_end"], tz=timezone.utc)
    sub.seat_limit = stripe_sub["items"]["data"][0]["quantity"]
    # Map price_id → plan via plan_billing
    pb = (await db.execute(
        select(PlanBilling).where(
            (PlanBilling.stripe_monthly_price_id == sub.stripe_price_id) |
            (PlanBilling.stripe_annual_price_id == sub.stripe_price_id)
        )
    )).scalar_one_or_none()
    if pb is not None:
        sub.plan = pb.plan
    await db.commit()


async def _handle_subscription_updated(db: AsyncSession, payload: dict):
    obj = payload["data"]["object"]
    sub = (await db.execute(
        select(Subscription).where(Subscription.stripe_subscription_id == obj["id"])
    )).scalar_one_or_none()
    if sub is None:
        return
    sub.status = obj["status"]
    sub.current_period_start = datetime.fromtimestamp(obj["current_period_start"], tz=timezone.utc)
    sub.current_period_end = datetime.fromtimestamp(obj["current_period_end"], tz=timezone.utc)
    sub.cancel_at_period_end = obj.get("cancel_at_period_end", False)
    sub.seat_limit = obj["items"]["data"][0]["quantity"]
    await db.commit()


async def _handle_subscription_deleted(db: AsyncSession, payload: dict):
    obj = payload["data"]["object"]
    sub = (await db.execute(
        select(Subscription).where(Subscription.stripe_subscription_id == obj["id"])
    )).scalar_one_or_none()
    if sub is None:
        return
    sub.status = "canceled"
    await db.commit()


async def _handle_payment_failed(db: AsyncSession, payload: dict):
    obj = payload["data"]["object"]
    subscription_id = obj.get("subscription")
    if not subscription_id:
        return
    sub = (await db.execute(
        select(Subscription).where(Subscription.stripe_subscription_id == subscription_id)
    )).scalar_one_or_none()
    if sub is None:
        return
    sub.status = "past_due"
    await db.commit()


async def _handle_payment_succeeded(db: AsyncSession, payload: dict):
    obj = payload["data"]["object"]
    subscription_id = obj.get("subscription")
    if not subscription_id:
        return
    sub = (await db.execute(
        select(Subscription).where(Subscription.stripe_subscription_id == subscription_id)
    )).scalar_one_or_none()
    if sub is None:
        return
    if sub.status == "past_due":
        sub.status = "active"
        await db.commit()
  • Step 2: Idempotency test
# Add to backend/tests/test_billing_service.py

@pytest.mark.asyncio
async def test_apply_subscription_event_is_idempotent(db_session, test_account):
    payload = {
        "data": {"object": {
            "id": "evt_test_1",
            "customer": "cus_xxx",
            "subscription": "sub_xxx",
            "status": "active",
        }}
    }

    applied_first = await BillingService.apply_subscription_event(
        db_session, "evt_test_1", "customer.subscription.updated", payload
    )
    applied_second = await BillingService.apply_subscription_event(
        db_session, "evt_test_1", "customer.subscription.updated", payload
    )
    assert applied_first is True
    assert applied_second is False  # already-processed → ack without re-applying
  • Step 3: Run + commit
docker exec resolutionflow_backend pytest backend/tests/test_billing_service.py -v --override-ini="addopts="
git add backend/app/services/billing.py backend/tests/test_billing_service.py
git commit -m "feat(billing): apply_subscription_event with stripe_events idempotency"

Task 16: Extend the Stripe webhook handler stub

Files:

  • Modify: backend/app/api/endpoints/webhooks.py

  • Test: backend/tests/test_stripe_webhook_handler.py

  • Step 1: Wire BillingService into webhook handler

Replace the stub body in backend/app/api/endpoints/webhooks.py:

import logging
from fastapi import APIRouter, Request, HTTPException, status, Depends
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.admin_database import get_admin_db
from app.core.config import settings
from app.services.billing import BillingService

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/webhooks", tags=["webhooks"])


@router.post("/stripe")
async def stripe_webhook(
    request: Request,
    db: AsyncSession = Depends(get_admin_db),
):
    """Stripe webhook handler. Public endpoint; signature verification is the
    only gate. Idempotency via stripe_events table."""
    if not settings.stripe_enabled:
        return {"status": "ok", "message": "Stripe not configured, event ignored"}

    payload = await request.body()
    sig_header = request.headers.get("stripe-signature")
    if not sig_header:
        raise HTTPException(status_code=400, detail="Missing stripe-signature header")

    try:
        import stripe
        stripe.api_key = settings.STRIPE_SECRET_KEY
        event = stripe.Webhook.construct_event(
            payload, sig_header, settings.STRIPE_WEBHOOK_SECRET
        )
    except (ValueError, stripe.error.SignatureVerificationError) as e:
        logger.warning("stripe webhook bad signature: %s", e)
        raise HTTPException(status_code=400, detail="Invalid signature")

    applied = await BillingService.apply_subscription_event(
        db,
        event_id=event["id"],
        event_type=event["type"],
        payload={"data": event["data"]},
    )
    return {"status": "ok", "applied": applied}
  • Step 2: Webhook integration tests
# backend/tests/test_stripe_webhook_handler.py
import pytest
import json
from sqlalchemy import select
from unittest.mock import patch
from app.models.subscription import Subscription
from app.models.account import Account


def _make_event(event_id, event_type, obj):
    return {
        "id": event_id,
        "type": event_type,
        "data": {"object": obj},
    }


@pytest.mark.asyncio
async def test_checkout_completed_activates_subscription(client, db_session, test_account):
    test_account.stripe_customer_id = "cus_xxx"
    sub = Subscription(account_id=test_account.id, plan="pro", status="trialing")
    db_session.add(sub)
    await db_session.commit()

    event = _make_event("evt_co_1", "checkout.session.completed", {
        "id": "cs_xxx",
        "customer": "cus_xxx",
        "subscription": "sub_xxx",
    })

    with patch("stripe.Subscription.retrieve", return_value={
        "id": "sub_xxx",
        "status": "active",
        "current_period_start": 1714521600,
        "current_period_end": 1717113600,
        "items": {"data": [{
            "price": {"id": "price_test_monthly"},
            "quantity": 5,
        }]},
        "cancel_at_period_end": False,
    }), patch("stripe.Webhook.construct_event", return_value=event):
        response = await client.post(
            "/api/v1/webhooks/stripe",
            content=json.dumps(event),
            headers={"stripe-signature": "fake-sig"},
        )
    assert response.status_code == 200

    await db_session.refresh(sub)
    assert sub.status == "active"
    assert sub.stripe_subscription_id == "sub_xxx"


@pytest.mark.asyncio
async def test_subscription_deleted_cancels_account(client, db_session, test_account):
    sub = Subscription(
        account_id=test_account.id, plan="pro", status="active",
        stripe_subscription_id="sub_xxx",
    )
    db_session.add(sub)
    await db_session.commit()

    event = _make_event("evt_del_1", "customer.subscription.deleted", {
        "id": "sub_xxx",
        "current_period_start": 1714521600,
        "current_period_end": 1717113600,
        "items": {"data": [{"quantity": 1}]},
    })

    with patch("stripe.Webhook.construct_event", return_value=event):
        response = await client.post(
            "/api/v1/webhooks/stripe",
            content=json.dumps(event),
            headers={"stripe-signature": "fake-sig"},
        )
    assert response.status_code == 200

    await db_session.refresh(sub)
    assert sub.status == "canceled"


@pytest.mark.asyncio
async def test_webhook_signature_failure_returns_400(client):
    with patch("stripe.Webhook.construct_event", side_effect=ValueError("bad sig")):
        response = await client.post(
            "/api/v1/webhooks/stripe",
            content=b"{}",
            headers={"stripe-signature": "fake-sig"},
        )
    assert response.status_code == 400


@pytest.mark.asyncio
async def test_webhook_idempotency(client, db_session, test_account):
    test_account.stripe_customer_id = "cus_xxx"
    db_session.add(Subscription(account_id=test_account.id, plan="pro", status="trialing"))
    await db_session.commit()

    event = _make_event("evt_dup_1", "customer.subscription.updated", {
        "id": "sub_yyy",
        "status": "active",
        "current_period_start": 1714521600,
        "current_period_end": 1717113600,
        "items": {"data": [{"quantity": 1}]},
        "cancel_at_period_end": False,
    })
    with patch("stripe.Webhook.construct_event", return_value=event):
        r1 = await client.post("/api/v1/webhooks/stripe", content=json.dumps(event), headers={"stripe-signature": "x"})
        r2 = await client.post("/api/v1/webhooks/stripe", content=json.dumps(event), headers={"stripe-signature": "x"})
    assert r1.status_code == 200
    assert r2.status_code == 200
    assert r1.json()["applied"] is True
    assert r2.json()["applied"] is False
  • Step 3: Run + commit
docker exec resolutionflow_backend pytest backend/tests/test_stripe_webhook_handler.py -v --override-ini="addopts="
git add backend/app/api/endpoints/webhooks.py backend/tests/test_stripe_webhook_handler.py
git commit -m "feat(billing): extend Stripe webhook stub with concrete event handlers"

Phase D — OAuth additions

Task 17: OAuth provider helpers + Google callback endpoint

Files:

  • Create: backend/app/services/oauth_providers.py

  • Create: backend/app/api/endpoints/oauth.py

  • Create: backend/app/schemas/oauth.py

  • Modify: backend/app/core/config.py (Google client id + secret)

  • Modify: backend/app/api/router.py

  • Test: backend/tests/test_oauth_callbacks.py

  • Step 1: Provider helpers

# backend/app/services/oauth_providers.py
"""OAuth provider helpers. Each provider exposes:
- exchange_code(code) -> {provider_subject, email, name}
"""
from dataclasses import dataclass
import httpx
from app.core.config import settings


@dataclass
class OAuthProfile:
    provider_subject: str
    email: str
    name: str


async def google_exchange_code(code: str, redirect_uri: str) -> OAuthProfile:
    async with httpx.AsyncClient(timeout=10) as cli:
        token_response = await cli.post(
            "https://oauth2.googleapis.com/token",
            data={
                "code": code,
                "client_id": settings.GOOGLE_CLIENT_ID,
                "client_secret": settings.GOOGLE_CLIENT_SECRET,
                "redirect_uri": redirect_uri,
                "grant_type": "authorization_code",
            },
        )
        token_response.raise_for_status()
        access_token = token_response.json()["access_token"]

        userinfo = await cli.get(
            "https://openidconnect.googleapis.com/v1/userinfo",
            headers={"Authorization": f"Bearer {access_token}"},
        )
        userinfo.raise_for_status()
        data = userinfo.json()
        return OAuthProfile(
            provider_subject=data["sub"],
            email=data["email"],
            name=data.get("name") or data["email"].split("@")[0],
        )


async def microsoft_exchange_code(code: str, redirect_uri: str) -> OAuthProfile:
    async with httpx.AsyncClient(timeout=10) as cli:
        token_response = await cli.post(
            "https://login.microsoftonline.com/common/oauth2/v2.0/token",
            data={
                "code": code,
                "client_id": settings.MS_CLIENT_ID,
                "client_secret": settings.MS_CLIENT_SECRET,
                "redirect_uri": redirect_uri,
                "grant_type": "authorization_code",
                "scope": "openid email profile",
            },
        )
        token_response.raise_for_status()
        access_token = token_response.json()["access_token"]

        userinfo = await cli.get(
            "https://graph.microsoft.com/v1.0/me",
            headers={"Authorization": f"Bearer {access_token}"},
        )
        userinfo.raise_for_status()
        data = userinfo.json()
        return OAuthProfile(
            provider_subject=data["id"],
            email=data.get("mail") or data["userPrincipalName"],
            name=data.get("displayName") or data["userPrincipalName"].split("@")[0],
        )
  • Step 2: Config keys

In backend/app/core/config.py:

    GOOGLE_CLIENT_ID: Optional[str] = None
    GOOGLE_CLIENT_SECRET: Optional[str] = None
    MS_CLIENT_ID: Optional[str] = None
    MS_CLIENT_SECRET: Optional[str] = None
    OAUTH_REDIRECT_BASE: str = "http://localhost:5173"
  • Step 3: Google callback endpoint + shared sign-in helper
# backend/app/schemas/oauth.py
from pydantic import BaseModel


class OAuthCallbackPayload(BaseModel):
    code: str
    state: str | None = None


class OAuthCallbackResponse(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"
    is_new_user: bool
# backend/app/api/endpoints/oauth.py
from datetime import datetime, timezone
from typing import Annotated
from uuid import uuid4

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.admin_database import get_admin_db
from app.core.config import settings
from app.core.security import create_access_token, create_refresh_token
from app.models.account import Account
from app.models.oauth_identity import OAuthIdentity
from app.models.user import User
from app.schemas.oauth import OAuthCallbackPayload, OAuthCallbackResponse
from app.services.billing import BillingService
from app.services.oauth_providers import (
    google_exchange_code, microsoft_exchange_code, OAuthProfile,
)

router = APIRouter(prefix="/auth", tags=["auth-oauth"])


async def _sign_in_or_register(
    db: AsyncSession, provider: str, profile: OAuthProfile
) -> tuple[User, bool]:
    """Returns (user, is_new_user). Idempotent."""
    # Existing identity?
    identity = (await db.execute(
        select(OAuthIdentity).where(
            OAuthIdentity.provider == provider,
            OAuthIdentity.provider_subject == profile.provider_subject,
        )
    )).scalar_one_or_none()

    if identity:
        user = (await db.execute(select(User).where(User.id == identity.user_id))).scalar_one()
        return user, False

    # No identity yet — link to existing email if present, else create new account
    user = (await db.execute(select(User).where(User.email == profile.email))).scalar_one_or_none()
    is_new_user = user is None
    if is_new_user:
        account = Account(name=profile.name)
        db.add(account)
        await db.flush()
        user = User(
            email=profile.email,
            name=profile.name,
            password_hash=None,  # OAuth-only
            account_id=account.id,
            account_role="owner",
            email_verified_at=datetime.now(timezone.utc),
        )
        db.add(user)
        await db.flush()
        await BillingService.start_trial(db, account.id)

    db.add(OAuthIdentity(
        user_id=user.id,
        provider=provider,
        provider_subject=profile.provider_subject,
        provider_email_at_link=profile.email,
    ))
    await db.commit()
    await db.refresh(user)
    return user, is_new_user


@router.post("/google/callback", response_model=OAuthCallbackResponse)
async def google_callback(
    payload: OAuthCallbackPayload,
    db: Annotated[AsyncSession, Depends(get_admin_db)],
) -> OAuthCallbackResponse:
    if not settings.GOOGLE_CLIENT_ID:
        raise HTTPException(status_code=503, detail="Google sign-in not configured")
    redirect_uri = f"{settings.OAUTH_REDIRECT_BASE}/auth/google/callback"
    profile = await google_exchange_code(payload.code, redirect_uri)
    user, is_new = await _sign_in_or_register(db, "google", profile)
    return OAuthCallbackResponse(
        access_token=create_access_token({"sub": str(user.id)}),
        refresh_token=create_refresh_token({"sub": str(user.id), "jti": str(uuid4())}),
        is_new_user=is_new,
    )
  • Step 4: Register router
# backend/app/api/router.py
from app.api.endpoints import oauth as oauth_endpoints
api_router.include_router(oauth_endpoints.router)
  • Step 5: Tests
# backend/tests/test_oauth_callbacks.py
import pytest
from unittest.mock import patch
from sqlalchemy import select
from app.models.user import User
from app.models.oauth_identity import OAuthIdentity
from app.models.subscription import Subscription
from app.services.oauth_providers import OAuthProfile


@pytest.mark.asyncio
async def test_google_callback_creates_user_account_subscription(client, db_session):
    profile = OAuthProfile(
        provider_subject="google_subject_123",
        email="newuser@example.com",
        name="New User",
    )
    with patch("app.api.endpoints.oauth.google_exchange_code", return_value=profile):
        response = await client.post("/api/v1/auth/google/callback", json={"code": "auth_code_xyz"})
    assert response.status_code == 200
    body = response.json()
    assert body["is_new_user"] is True
    assert body["access_token"]

    user = (await db_session.execute(select(User).where(User.email == "newuser@example.com"))).scalar_one()
    assert user.password_hash is None
    assert user.email_verified_at is not None  # provider-attested

    identity = (await db_session.execute(
        select(OAuthIdentity).where(OAuthIdentity.user_id == user.id)
    )).scalar_one()
    assert identity.provider == "google"
    assert identity.provider_subject == "google_subject_123"

    sub = (await db_session.execute(
        select(Subscription).where(Subscription.account_id == user.account_id)
    )).scalar_one()
    assert sub.status == "trialing"
    assert sub.plan == "pro"


@pytest.mark.asyncio
async def test_google_callback_existing_user_is_idempotent(client, db_session, test_user):
    profile = OAuthProfile(
        provider_subject="google_subject_456",
        email=test_user.email,
        name=test_user.name,
    )
    with patch("app.api.endpoints.oauth.google_exchange_code", return_value=profile):
        r1 = await client.post("/api/v1/auth/google/callback", json={"code": "x"})
        r2 = await client.post("/api/v1/auth/google/callback", json={"code": "x"})
    assert r1.status_code == 200
    assert r2.status_code == 200
    assert r1.json()["is_new_user"] is False  # linked to existing user
    assert r2.json()["is_new_user"] is False

    identities = (await db_session.execute(
        select(OAuthIdentity).where(OAuthIdentity.user_id == test_user.id)
    )).scalars().all()
    assert len(identities) == 1  # idempotent — only one row
  • Step 6: Run + commit
docker exec resolutionflow_backend pytest backend/tests/test_oauth_callbacks.py -v --override-ini="addopts="
git add backend/app/services/oauth_providers.py backend/app/api/endpoints/oauth.py backend/app/schemas/oauth.py backend/app/api/router.py backend/app/core/config.py backend/tests/test_oauth_callbacks.py
git commit -m "feat(auth): add Google OAuth callback with oauth_identities linking"

Task 18: Microsoft OAuth callback

Files:

  • Modify: backend/app/api/endpoints/oauth.py

  • Test: backend/tests/test_oauth_callbacks.py

  • Step 1: Add Microsoft endpoint (mirrors Google)

# backend/app/api/endpoints/oauth.py — append:
@router.post("/microsoft/callback", response_model=OAuthCallbackResponse)
async def microsoft_callback(
    payload: OAuthCallbackPayload,
    db: Annotated[AsyncSession, Depends(get_admin_db)],
) -> OAuthCallbackResponse:
    if not settings.MS_CLIENT_ID:
        raise HTTPException(status_code=503, detail="Microsoft sign-in not configured")
    redirect_uri = f"{settings.OAUTH_REDIRECT_BASE}/auth/microsoft/callback"
    profile = await microsoft_exchange_code(payload.code, redirect_uri)
    user, is_new = await _sign_in_or_register(db, "microsoft", profile)
    return OAuthCallbackResponse(
        access_token=create_access_token({"sub": str(user.id)}),
        refresh_token=create_refresh_token({"sub": str(user.id), "jti": str(uuid4())}),
        is_new_user=is_new,
    )
  • Step 2: Test
# backend/tests/test_oauth_callbacks.py — append:
@pytest.mark.asyncio
async def test_microsoft_callback_creates_user(client, db_session):
    profile = OAuthProfile(
        provider_subject="ms_subject_789",
        email="msuser@example.com",
        name="MS User",
    )
    with patch("app.api.endpoints.oauth.microsoft_exchange_code", return_value=profile):
        response = await client.post("/api/v1/auth/microsoft/callback", json={"code": "auth_code"})
    assert response.status_code == 200
    user = (await db_session.execute(select(User).where(User.email == "msuser@example.com"))).scalar_one()
    identity = (await db_session.execute(
        select(OAuthIdentity).where(OAuthIdentity.user_id == user.id)
    )).scalar_one()
    assert identity.provider == "microsoft"
  • Step 3: Run + commit
docker exec resolutionflow_backend pytest backend/tests/test_oauth_callbacks.py -v --override-ini="addopts="
git add backend/app/api/endpoints/oauth.py backend/tests/test_oauth_callbacks.py
git commit -m "feat(auth): add Microsoft OAuth callback"

Task 19: Handle null password_hash in login + password change/reset

Files:

  • Modify: backend/app/api/endpoints/auth.py

  • Test: backend/tests/test_oauth_only_user_paths.py

  • Step 1: Guard password endpoints

In each of these handlers in backend/app/api/endpoints/auth.py, before the existing verify_password(...) call, add a check:

if user.password_hash is None:
    raise HTTPException(
        status_code=400,
        detail={
            "error": "use_oauth_provider",
            "providers": [i.provider for i in user.oauth_identities],
        },
    )

Apply to:

  • POST /auth/login (form-data flow + JSON flow)

  • POST /auth/password/change

  • POST /auth/password/reset initiation — when looking up user by email, if user.password_hash is None, return 200 with a generic message (do NOT send a reset email; we don't want to leak that an account exists).

  • Step 2: Test

# backend/tests/test_oauth_only_user_paths.py
import pytest
from sqlalchemy import select
from app.models.user import User
from app.models.oauth_identity import OAuthIdentity


@pytest.mark.asyncio
async def test_login_rejects_oauth_only_user_with_helpful_error(client, db_session, test_account):
    user = User(
        email="oauth-only@example.com",
        name="OAuth Only",
        password_hash=None,
        account_id=test_account.id,
        account_role="owner",
    )
    db_session.add(user)
    await db_session.flush()
    db_session.add(OAuthIdentity(
        user_id=user.id, provider="google",
        provider_subject="g_xyz", provider_email_at_link=user.email,
    ))
    await db_session.commit()

    response = await client.post("/api/v1/auth/login", data={
        "username": user.email, "password": "wontwork",
    })
    assert response.status_code == 400
    body = response.json()
    assert body["detail"]["error"] == "use_oauth_provider"
    assert "google" in body["detail"]["providers"]


@pytest.mark.asyncio
async def test_password_reset_silent_for_oauth_only_user(client, db_session, test_account):
    user = User(
        email="oauth-only2@example.com",
        name="OAuth Only 2",
        password_hash=None,
        account_id=test_account.id,
        account_role="owner",
    )
    db_session.add(user)
    await db_session.commit()

    response = await client.post("/api/v1/auth/password/reset/initiate", json={
        "email": user.email,
    })
    # Generic 200 — does not leak account state
    assert response.status_code == 200
  • Step 3: Run + commit
docker exec resolutionflow_backend pytest backend/tests/test_oauth_only_user_paths.py -v --override-ini="addopts="
git add backend/app/api/endpoints/auth.py backend/tests/test_oauth_only_user_paths.py
git commit -m "feat(auth): guard login/password paths against OAuth-only users"

Phase E — Email verification auto-send + email-match enforcement

Task 20: Auto-send verification on register + enforce account_invite_code email match

Files:

  • Modify: backend/app/api/endpoints/auth.py (register handler)

  • Test: backend/tests/test_email_verification_autosend.py

  • Step 1: Modify the register handler

In the existing @router.post("/register") handler in backend/app/api/endpoints/auth.py:

  1. After the User is created, immediately call EmailService.send_email_verification_email with a fresh token from create_email_verification_token. Do NOT require user click — fires automatically. Existing logic guards email_verification_enabled settings flag.

  2. Email-match enforcement: if user_data.account_invite_code is provided, look up the matching account_invites row by code. If found, require user_data.email == invite.email (case-insensitive). Mismatch → 400 with {"error": "invite_email_mismatch"}. Apply this check BEFORE any User is created.

# backend/app/api/endpoints/auth.py — within register handler

if user_data.account_invite_code:
    invite = (await db.execute(
        select(AccountInvite).where(AccountInvite.code == user_data.account_invite_code)
    )).scalar_one_or_none()
    if not invite or not invite.is_valid:
        raise HTTPException(status_code=400, detail={"error": "invite_invalid_or_expired"})
    if invite.email.lower() != user_data.email.lower():
        raise HTTPException(status_code=400, detail={"error": "invite_email_mismatch"})

# ... existing user/account creation ...

# After commit, fire verification email (skip if already verified — e.g., invitee accepting flow)
if user.email_verified_at is None:
    raw_token = create_email_verification_token(str(user.id))
    if settings.email_verification_enabled:
        try:
            verification_url = f"{settings.FRONTEND_URL}/verify-email?token={raw_token}"
            await EmailService.send_email_verification_email(
                to_email=user.email, verification_url=verification_url, user_name=user.name,
            )
        except Exception as e:
            logger.warning("verification email send failed for %s: %s", user.email, e)
  • Step 2: Test
# backend/tests/test_email_verification_autosend.py
import pytest
from unittest.mock import AsyncMock, patch


@pytest.mark.asyncio
async def test_register_auto_sends_verification_email(client):
    with patch("app.core.email.EmailService.send_email_verification_email", new_callable=AsyncMock) as mock_send:
        response = await client.post("/api/v1/auth/register", json={
            "email": "newshop@example.com",
            "password": "Verystrong1Pwd",
            "name": "New Shop",
        })
        assert response.status_code in (200, 201)
        mock_send.assert_called_once()
        kwargs = mock_send.call_args.kwargs
        assert kwargs["to_email"] == "newshop@example.com"


@pytest.mark.asyncio
async def test_register_with_account_invite_code_email_mismatch_rejected(client, db_session, test_account, test_user):
    from app.models.account_invite import AccountInvite
    from datetime import datetime, timezone, timedelta
    invite = AccountInvite(
        account_id=test_account.id,
        invited_by_id=test_user.id,
        email="invited@example.com",
        code="INVITECODE99",
        role="engineer",
        expires_at=datetime.now(timezone.utc) + timedelta(days=7),
    )
    db_session.add(invite)
    await db_session.commit()

    response = await client.post("/api/v1/auth/register", json={
        "email": "wrong-email@example.com",
        "password": "Verystrong1Pwd",
        "name": "Wrong Email",
        "account_invite_code": "INVITECODE99",
    })
    assert response.status_code == 400
    assert response.json()["detail"]["error"] == "invite_email_mismatch"
  • Step 3: Run + commit
docker exec resolutionflow_backend pytest backend/tests/test_email_verification_autosend.py -v --override-ini="addopts="
git add backend/app/api/endpoints/auth.py backend/tests/test_email_verification_autosend.py
git commit -m "feat(auth): auto-send verification email on register; enforce invite email match"

Phase F — Account invite extensions

Task 21: Wire EmailService into POST /accounts/me/invites

Files:

  • Modify: backend/app/api/endpoints/accounts.py:257

  • Test: backend/tests/test_account_invite_extensions.py

  • Step 1: Update create handler to send email + stamp email_sent_at

In backend/app/api/endpoints/accounts.py find the existing create_invite handler and modify:

from app.core.email import EmailService
from datetime import datetime, timezone
import logging

logger = logging.getLogger(__name__)

@router.post("/me/invites", response_model=AccountInviteResponse, status_code=status.HTTP_201_CREATED)
async def create_invite(
    invite_data: AccountInviteCreate,
    current_user: Annotated[User, Depends(require_account_owner)],
    db: Annotated[AsyncSession, Depends(get_db)],
):
    """Create an invite to join this account (owner only). Sends email."""
    invite = AccountInvite(
        account_id=current_user.account_id,
        invited_by_id=current_user.id,
        email=invite_data.email,
        code=secrets.token_urlsafe(24)[:32],
        role=invite_data.role,
        expires_at=datetime.now(timezone.utc) + timedelta(days=7),
    )
    db.add(invite)
    await db.flush()

    # Send invitation email (non-blocking on failure)
    try:
        accept_url = f"{settings.FRONTEND_URL}/accept-invite?code={invite.code}"
        await EmailService.send_account_invite_email(
            to_email=invite.email,
            inviter_name=current_user.name,
            account_name=current_user.account.name,
            accept_url=accept_url,
        )
        invite.email_sent_at = datetime.now(timezone.utc)
    except Exception as e:
        logger.warning("invite email send failed for %s: %s", invite.email, e)

    await db.commit()
    await db.refresh(invite)
    return invite
  • Step 2: Test (regression catch)
# backend/tests/test_account_invite_extensions.py
import pytest
from unittest.mock import AsyncMock, patch
from sqlalchemy import select
from app.models.account_invite import AccountInvite


@pytest.mark.asyncio
async def test_create_invite_sends_email_and_stamps_email_sent_at(authed_owner_client, db_session):
    """Regression: today's create_invite does NOT send email. After this task, it MUST."""
    with patch("app.core.email.EmailService.send_account_invite_email", new_callable=AsyncMock) as mock_send:
        response = await authed_owner_client.post("/api/v1/accounts/me/invites", json={
            "email": "teammate@example.com",
            "role": "engineer",
        })
    assert response.status_code == 201
    mock_send.assert_called_once()

    invite = (await db_session.execute(
        select(AccountInvite).where(AccountInvite.email == "teammate@example.com")
    )).scalar_one()
    assert invite.email_sent_at is not None
  • Step 3: Run + commit
docker exec resolutionflow_backend pytest backend/tests/test_account_invite_extensions.py::test_create_invite_sends_email_and_stamps_email_sent_at -v --override-ini="addopts="
git add backend/app/api/endpoints/accounts.py backend/tests/test_account_invite_extensions.py
git commit -m "feat(invites): wire EmailService.send_account_invite_email into create handler"

Task 22: Add POST /accounts/me/invites/bulk endpoint

Files:

  • Modify: backend/app/api/endpoints/accounts.py

  • Modify: backend/app/schemas/account.py (or wherever invite schemas live)

  • Test: backend/tests/test_account_invite_extensions.py

  • Step 1: Schema

In the appropriate schema file (backend/app/schemas/account.py likely):

class AccountInviteBulkCreate(BaseModel):
    invites: list[AccountInviteCreate]


class AccountInviteBulkResponse(BaseModel):
    created: list[AccountInviteResponse]
    failed: list[dict]  # {email, error}
  • Step 2: Endpoint
# backend/app/api/endpoints/accounts.py — append:
@router.post("/me/invites/bulk", response_model=AccountInviteBulkResponse, status_code=status.HTTP_201_CREATED)
async def create_invites_bulk(
    payload: AccountInviteBulkCreate,
    current_user: Annotated[User, Depends(require_account_owner)],
    db: Annotated[AsyncSession, Depends(get_db)],
):
    created, failed = [], []
    for invite_data in payload.invites:
        try:
            invite = AccountInvite(
                account_id=current_user.account_id,
                invited_by_id=current_user.id,
                email=invite_data.email,
                code=secrets.token_urlsafe(24)[:32],
                role=invite_data.role,
                expires_at=datetime.now(timezone.utc) + timedelta(days=7),
            )
            db.add(invite)
            await db.flush()
            try:
                accept_url = f"{settings.FRONTEND_URL}/accept-invite?code={invite.code}"
                await EmailService.send_account_invite_email(
                    to_email=invite.email,
                    inviter_name=current_user.name,
                    account_name=current_user.account.name,
                    accept_url=accept_url,
                )
                invite.email_sent_at = datetime.now(timezone.utc)
            except Exception as e:
                logger.warning("bulk invite email send failed for %s: %s", invite.email, e)
            created.append(invite)
        except Exception as e:
            failed.append({"email": invite_data.email, "error": str(e)})
            await db.rollback()
    await db.commit()
    return AccountInviteBulkResponse(created=created, failed=failed)
  • Step 3: Test
# backend/tests/test_account_invite_extensions.py — append:
@pytest.mark.asyncio
async def test_bulk_invite_creates_n_rows_and_sends_n_emails(authed_owner_client, db_session):
    with patch("app.core.email.EmailService.send_account_invite_email", new_callable=AsyncMock) as mock_send:
        response = await authed_owner_client.post("/api/v1/accounts/me/invites/bulk", json={
            "invites": [
                {"email": "a@example.com", "role": "engineer"},
                {"email": "b@example.com", "role": "engineer"},
                {"email": "c@example.com", "role": "viewer"},
            ],
        })
    assert response.status_code == 201
    body = response.json()
    assert len(body["created"]) == 3
    assert len(body["failed"]) == 0
    assert mock_send.call_count == 3
  • Step 4: Run + commit
docker exec resolutionflow_backend pytest backend/tests/test_account_invite_extensions.py::test_bulk_invite_creates_n_rows_and_sends_n_emails -v --override-ini="addopts="
git add backend/app/api/endpoints/accounts.py backend/app/schemas/account.py backend/tests/test_account_invite_extensions.py
git commit -m "feat(invites): add POST /accounts/me/invites/bulk endpoint for wizard step 3"

Task 23: Add DELETE /accounts/me/invites/{id} (soft-revoke)

Files:

  • Modify: backend/app/api/endpoints/accounts.py

  • Test: backend/tests/test_account_invite_extensions.py

  • Step 1: Endpoint

# backend/app/api/endpoints/accounts.py — append:
@router.delete("/me/invites/{invite_id}", status_code=status.HTTP_204_NO_CONTENT)
async def revoke_invite(
    invite_id: UUID,
    current_user: Annotated[User, Depends(require_account_owner)],
    db: Annotated[AsyncSession, Depends(get_db)],
):
    """Soft-revoke an invitation (sets revoked_at)."""
    invite = (await db.execute(
        select(AccountInvite).where(
            AccountInvite.id == invite_id,
            AccountInvite.account_id == current_user.account_id,
        )
    )).scalar_one_or_none()
    if not invite:
        raise HTTPException(status_code=404, detail="Invite not found")
    if invite.is_revoked:
        return None  # idempotent
    if invite.is_used:
        raise HTTPException(status_code=400, detail="Cannot revoke an accepted invite")
    invite.revoked_at = datetime.now(timezone.utc)
    await db.commit()
    return None
  • Step 2: Test
@pytest.mark.asyncio
async def test_revoke_invite_sets_revoked_at(authed_owner_client, db_session, test_account, test_user):
    invite = AccountInvite(
        account_id=test_account.id,
        invited_by_id=test_user.id,
        email="revoked@example.com",
        code="REVOKEME01",
        role="engineer",
        expires_at=datetime.now(timezone.utc) + timedelta(days=7),
    )
    db_session.add(invite)
    await db_session.commit()

    response = await authed_owner_client.delete(f"/api/v1/accounts/me/invites/{invite.id}")
    assert response.status_code == 204

    await db_session.refresh(invite)
    assert invite.revoked_at is not None
    assert invite.is_valid is False


@pytest.mark.asyncio
async def test_revoke_invite_idempotent(authed_owner_client, db_session, test_account, test_user):
    invite = AccountInvite(
        account_id=test_account.id,
        invited_by_id=test_user.id,
        email="revoked2@example.com",
        code="REVOKEME02",
        role="engineer",
        revoked_at=datetime.now(timezone.utc),
        expires_at=datetime.now(timezone.utc) + timedelta(days=7),
    )
    db_session.add(invite)
    await db_session.commit()

    response = await authed_owner_client.delete(f"/api/v1/accounts/me/invites/{invite.id}")
    assert response.status_code == 204
  • Step 3: Run + commit
docker exec resolutionflow_backend pytest backend/tests/test_account_invite_extensions.py -v --override-ini="addopts="
git add backend/app/api/endpoints/accounts.py backend/tests/test_account_invite_extensions.py
git commit -m "feat(invites): add DELETE /accounts/me/invites/{id} soft-revoke route"

Phase G — Billing surface

Task 24: GET /billing/state endpoint

Files:

  • Modify: backend/app/services/billing.py (get_billing_state method)

  • Modify: backend/app/api/endpoints/billing.py (new GET endpoint)

  • Modify: backend/app/schemas/billing.py

  • Test: backend/tests/test_billing_state_endpoint.py

  • Step 1: Schemas

# backend/app/schemas/billing.py — append:
from typing import Optional, Dict
from datetime import datetime
from pydantic import BaseModel


class SubscriptionState(BaseModel):
    status: str
    plan: str
    current_period_start: Optional[datetime]
    current_period_end: Optional[datetime]
    cancel_at_period_end: bool
    seat_limit: Optional[int]
    has_pro_entitlement: bool
    is_paid: bool


class PlanBillingState(BaseModel):
    display_name: str
    description: Optional[str]
    monthly_price_cents: Optional[int]
    annual_price_cents: Optional[int]


class BillingStateResponse(BaseModel):
    subscription: SubscriptionState
    plan_billing: Optional[PlanBillingState]
    plan_limits: Dict[str, object]  # full PlanLimits row as dict
    enabled_features: Dict[str, bool]  # resolved flag map
  • Step 2: Service method
# backend/app/services/billing.py — append to BillingService:
from app.models.plan_limits import PlanLimits
from app.models.feature_flag import FeatureFlag, PlanFeatureDefault, AccountFeatureOverride
from app.models.plan_billing import PlanBilling
from sqlalchemy.orm import selectinload


class BillingService:
    # ... existing methods ...

    @staticmethod
    async def get_billing_state(db: AsyncSession, account: Account):
        sub = (await db.execute(
            select(Subscription).where(Subscription.account_id == account.id)
        )).scalar_one_or_none()
        if sub is None:
            raise HTTPException(status_code=404, detail="No subscription for account")

        pl = (await db.execute(
            select(PlanLimits).where(PlanLimits.plan == sub.plan)
        )).scalar_one_or_none()
        pb = (await db.execute(
            select(PlanBilling).where(PlanBilling.plan == sub.plan)
        )).scalar_one_or_none()

        # Resolved feature flags: plan_feature_defaults overridden by account_feature_overrides
        defaults = (await db.execute(
            select(PlanFeatureDefault, FeatureFlag)
            .join(FeatureFlag, PlanFeatureDefault.flag_id == FeatureFlag.id)
            .where(PlanFeatureDefault.plan == sub.plan)
        )).all()
        resolved = {flag.flag_key: pfd.enabled for pfd, flag in defaults}
        overrides = (await db.execute(
            select(AccountFeatureOverride, FeatureFlag)
            .join(FeatureFlag, AccountFeatureOverride.flag_id == FeatureFlag.id)
            .where(AccountFeatureOverride.account_id == account.id)
        )).all()
        for ovr, flag in overrides:
            resolved[flag.flag_key] = ovr.enabled

        return {
            "subscription": {
                "status": sub.status,
                "plan": sub.plan,
                "current_period_start": sub.current_period_start,
                "current_period_end": sub.current_period_end,
                "cancel_at_period_end": sub.cancel_at_period_end,
                "seat_limit": sub.seat_limit,
                "has_pro_entitlement": sub.has_pro_entitlement,
                "is_paid": sub.is_paid,
            },
            "plan_billing": pb,
            "plan_limits": _plan_limits_to_dict(pl) if pl else {},
            "enabled_features": resolved,
        }


def _plan_limits_to_dict(pl: PlanLimits) -> dict:
    return {c.name: getattr(pl, c.name) for c in pl.__table__.columns}
  • Step 3: Endpoint
# backend/app/api/endpoints/billing.py — append:
@router.get("/state", response_model=BillingStateResponse)
async def get_billing_state(
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_admin_db)],
) -> BillingStateResponse:
    state = await BillingService.get_billing_state(db, current_user.account)
    return BillingStateResponse(**state)
  • Step 4: Tests
# backend/tests/test_billing_state_endpoint.py
import pytest
from sqlalchemy import select
from app.models.subscription import Subscription
from app.models.feature_flag import FeatureFlag, PlanFeatureDefault


@pytest.mark.asyncio
async def test_billing_state_returns_subscription_plan_features(authed_client, db_session, test_account):
    db_session.add(Subscription(account_id=test_account.id, plan="pro", status="active"))
    flag = FeatureFlag(flag_key="psa_integration", display_name="PSA Integration")
    db_session.add(flag)
    await db_session.flush()
    db_session.add(PlanFeatureDefault(plan="pro", flag_id=flag.id, enabled=True))
    await db_session.commit()

    response = await authed_client.get("/api/v1/billing/state")
    assert response.status_code == 200
    body = response.json()
    assert body["subscription"]["status"] == "active"
    assert body["subscription"]["plan"] == "pro"
    assert body["subscription"]["has_pro_entitlement"] is True
    assert body["enabled_features"]["psa_integration"] is True


@pytest.mark.asyncio
async def test_billing_state_account_override_beats_plan_default(authed_client, db_session, test_account):
    from app.models.feature_flag import AccountFeatureOverride
    db_session.add(Subscription(account_id=test_account.id, plan="pro", status="active"))
    flag = FeatureFlag(flag_key="escalation_mode", display_name="Escalation Mode")
    db_session.add(flag)
    await db_session.flush()
    db_session.add(PlanFeatureDefault(plan="pro", flag_id=flag.id, enabled=False))
    db_session.add(AccountFeatureOverride(account_id=test_account.id, flag_id=flag.id, enabled=True))
    await db_session.commit()

    response = await authed_client.get("/api/v1/billing/state")
    assert response.json()["enabled_features"]["escalation_mode"] is True
  • Step 5: Run + commit
docker exec resolutionflow_backend pytest backend/tests/test_billing_state_endpoint.py -v --override-ini="addopts="
git add backend/app/services/billing.py backend/app/api/endpoints/billing.py backend/app/schemas/billing.py backend/tests/test_billing_state_endpoint.py
git commit -m "feat(billing): add GET /billing/state aggregating subscription + plan + features"

Phase H — Pilot user backfill

Task 25: Pilot complimentary backfill migration

Files:

  • Create: backend/alembic/versions/<hash>_subscriptions_pilot_complimentary_backfill.py

  • Test: backend/tests/test_pilot_complimentary_backfill.py

  • Step 1: Migration

docker exec -w /app resolutionflow_backend alembic revision -m "subscriptions pilot complimentary backfill"
def upgrade() -> None:
    """Set status='complimentary' and plan='pro' for all existing accounts that
    don't have an active or canceled subscription. Pilot users transition to
    permanent complimentary Pro per spec section 5.

    Forward-only — does not preserve original status values."""
    conn = op.get_bind()
    # Update existing rows:
    conn.execute(sa.text("""
        UPDATE subscriptions
        SET status = 'complimentary', plan = 'pro',
            current_period_end = NULL, current_period_start = NULL,
            updated_at = now()
        WHERE status NOT IN ('canceled', 'past_due')
    """))
    # Backfill: any account without a Subscription row gets one
    conn.execute(sa.text("""
        INSERT INTO subscriptions (id, account_id, plan, status, created_at, updated_at)
        SELECT gen_random_uuid(), a.id, 'pro', 'complimentary', now(), now()
        FROM accounts a
        WHERE NOT EXISTS (SELECT 1 FROM subscriptions s WHERE s.account_id = a.id)
    """))


def downgrade() -> None:
    raise RuntimeError(
        "Cannot downgrade: original subscription state is not preserved. "
        "Restore from backup if needed."
    )
  • Step 2: Test
# backend/tests/test_pilot_complimentary_backfill.py
import pytest
from sqlalchemy import select
from app.models.account import Account
from app.models.subscription import Subscription


@pytest.mark.asyncio
async def test_after_backfill_all_existing_accounts_are_complimentary(db_session):
    """Run the backfill SQL inline, then verify every account has a
    complimentary Pro Subscription. Skipped in normal pytest because the
    migration runs at deploy time — this test exists for cutover validation."""
    accounts = (await db_session.execute(select(Account))).scalars().all()
    if not accounts:
        pytest.skip("no accounts seeded")
    for account in accounts:
        sub = (await db_session.execute(
            select(Subscription).where(Subscription.account_id == account.id)
        )).scalar_one()
        assert sub.status in ("complimentary", "canceled", "past_due")  # canceled/past_due preserved
        if sub.status == "complimentary":
            assert sub.plan == "pro"
            assert sub.is_paid is False
            assert sub.has_pro_entitlement is True


@pytest.mark.asyncio
async def test_complimentary_subscription_passes_active_subscription_guard(authed_client, db_session, test_account):
    db_session.add(Subscription(account_id=test_account.id, plan="pro", status="complimentary"))
    await db_session.commit()
    response = await authed_client.get("/api/v1/trees/")
    assert response.status_code != 402
  • Step 3: Apply + run
docker exec resolutionflow_backend alembic upgrade heads
docker exec resolutionflow_backend pytest backend/tests/test_pilot_complimentary_backfill.py -v --override-ini="addopts="
  • Step 4: Commit
git add backend/alembic/versions/*_subscriptions_pilot_complimentary_backfill.py backend/tests/test_pilot_complimentary_backfill.py
git commit -m "feat(billing): pilot user backfill — set existing accounts to complimentary"

Final Phase 1 Validation

Task 26: Full test sweep + verify dark-launch posture

Files: None (verification only).

  • Step 1: Full test suite
docker exec resolutionflow_backend pytest --override-ini="addopts="

Expected: all tests pass. Numbers should include:

  • All existing pre-Phase-1 tests (regression)

  • ~25 new test files added across Phase 1

  • Step 2: Migrations heads

docker exec resolutionflow_backend alembic heads

Expected: single head. If multiple, inspect — Phase 1 added 9 migrations linearly, so the chain should converge.

  • Step 3: Verify endpoints not exposed publicly without flag

SELF_SERVE_ENABLED is a backend config flag. Check that:

  • /auth/register — still accepts the existing invite_code-based flow today; new flows (OAuth, email-match) coexist without breaking it.
  • /auth/google/callback, /auth/microsoft/callback — return 503 if GOOGLE_CLIENT_ID / MS_CLIENT_ID is unset (which they will be in dev until cutover).
  • /billing/checkout-session — the BillingService raises if Stripe not configured.
  • /billing/state — requires auth; works for any logged-in user.

This phase is dark by default because the new endpoints are gated by environment configuration, not by SELF_SERVE_ENABLED. Phase 2 introduces SELF_SERVE_ENABLED as a frontend gate.

  • Step 4: No commit (validation only)

If everything passes, Phase 1 is complete. Phase 2 (frontend + cutover) follows in a separate plan document.


Self-Review

(Author's note — fix issues inline, don't re-review.)

Spec coverage: Walk each spec section:

  • §2 Schema additions: oauth_identities ✓ (Task 1), users.password_hash nullable ✓ (Task 2), role_at_signup + onboarding_step_completed ✓ (Task 3), accounts.team_size_bucket + primary_psa ✓ (Task 4), account_invites.revoked_at + email_sent_at ✓ (Task 5), plan_billing ✓ (Task 6), sales_leads + stripe_events ✓ (Task 7).
  • §2 Subscription status enum extension to 'complimentary' ✓ (Task 9 — model property changes; underlying column is String(50) so no migration value-level change required).
  • §4 BillingService methods: start_trial ✓ (Task 13), create_checkout_session ✓ (Task 14), apply_subscription_event ✓ (Task 15), get_billing_state ✓ (Task 24). open_customer_portal — GAP: not in Phase 1; deferred to Phase 2 since it's only consumed by the frontend /account/billing page. Document this in Phase 2 plan.
  • §4 Replace deps.py:109 ✓ (Task 10).
  • §4 require_active_subscription ✓ (Task 11).
  • §4 require_verified_email_after_grace ✓ (Task 12).
  • §4 Stripe webhook handler extension ✓ (Task 16).
  • §3.2 OAuth callbacks: Google ✓ (Task 17), Microsoft ✓ (Task 18). Null password_hash handling ✓ (Task 19).
  • §3.2 Email-match enforcement at register ✓ (Task 20). Auto-send verification ✓ (Task 20).
  • §3.3 Account invite extensions: email send on create ✓ (Task 21), bulk endpoint ✓ (Task 22), soft-revoke ✓ (Task 23).
  • §3.4 GET /billing/state ✓ (Task 24).
  • §5 Pilot user backfill ✓ (Task 25).

Out of Phase 1 scope (correctly deferred to Phase 2):

  • BillingService.open_customer_portal and /billing/portal-session endpoint — frontend-driven.
  • PATCH /users/me/onboarding-step endpoint (welcome wizard persistence) — frontend-driven.
  • /sales-leads endpoint and SalesLead form — frontend-driven (table is in Phase 1 schema).
  • /admin/plan-limits extension to surface plan_billing — frontend-driven.
  • Beta-signup deprecation (307 redirect) — pure routing change, fits Phase 2.
  • Frontend pricing page, welcome wizard, dashboard redesign, useBillingStore.
  • SELF_SERVE_ENABLED flag wiring.
  • Stripe live-mode setup + cutover validation.

Placeholder scan: none in tasks. The migration down_revision = "<previous_head>" is genuine — that value is filled in at alembic revision time by the tool, not by a human guess.

Type consistency: verified — OAuthProfile.provider_subject, OAuthIdentity.provider_subject, etc. align across files. BillingService.start_trial returns Subscription consistently. apply_subscription_event signature matches webhook handler call site.


Execution Handoff

Plan complete and saved to docs/superpowers/plans/2026-05-06-self-serve-signup-phase-1-backend.md. Two execution options:

1. Subagent-Driven (recommended) — I dispatch a fresh subagent per task, review between tasks, fast iteration.

2. Inline Execution — Execute tasks in this session using executing-plans, batch execution with checkpoints.

Which approach?