feat(deps): add require_verified_email_after_grace guard

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-06 14:44:50 -04:00
parent 9ec208f6e7
commit 519c7eb5ce
3 changed files with 141 additions and 2 deletions

View File

@@ -289,3 +289,47 @@ async def require_active_subscription(
)
return sub
_EMAIL_VERIFICATION_ALLOWLIST = {
"/api/v1/auth/me",
"/api/v1/auth/logout",
"/api/v1/auth/email/send-verification",
"/api/v1/auth/email/verify",
"/api/v1/auth/password/change",
"/api/v1/users/me",
"/api/v1/billing/state",
"/api/v1/billing/checkout-session",
"/api/v1/billing/portal-session",
}
VERIFICATION_GRACE_DAYS = 7
async def require_verified_email_after_grace(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
):
"""Enforces 'this user has verified email OR is still in 7-day grace.'
OAuth signups bypass cleanly because /auth/{google,microsoft}/callback
sets users.email_verified_at = now() (provider-attested)."""
from datetime import datetime, timezone, timedelta
if request.url.path in _EMAIL_VERIFICATION_ALLOWLIST:
return
if current_user.email_verified_at is not None:
return
grace_ends = current_user.created_at + timedelta(days=VERIFICATION_GRACE_DAYS)
if datetime.now(timezone.utc) < grace_ends:
return
raise HTTPException(
status_code=403,
detail={
"error": "email_not_verified",
"grace_ended_at": grace_ends.isoformat(),
"resend_url": "/api/v1/auth/email/send-verification",
},
)

View File

@@ -1,6 +1,10 @@
from fastapi import APIRouter, Depends
from app.api.deps import require_tenant_context, require_active_subscription
from app.api.deps import (
require_tenant_context,
require_active_subscription,
require_verified_email_after_grace,
)
from app.api.endpoints import (
admin,
admin_audit,
@@ -112,7 +116,11 @@ api_router.include_router(admin_gallery.router)
# bypass the gate for billing/account admin/auth flows.
# ---------------------------------------------------------------------------
_tenant_deps = [Depends(require_tenant_context)]
_pro_deps = [Depends(require_tenant_context), Depends(require_active_subscription)]
_pro_deps = [
Depends(require_tenant_context),
Depends(require_active_subscription),
Depends(require_verified_email_after_grace),
]
api_router.include_router(trees.router, dependencies=_pro_deps)
api_router.include_router(sidebar.router, dependencies=_tenant_deps)

View File

@@ -0,0 +1,87 @@
import uuid
import pytest
from datetime import datetime, timezone, timedelta
from sqlalchemy import select
from app.models.user import User
async def _set_user_email_state(test_db, user_id, *, verified_at=None, created_at=None):
user = (await test_db.execute(select(User).where(User.id == user_id))).scalar_one()
user.email_verified_at = verified_at
if created_at is not None:
user.created_at = created_at
await test_db.commit()
@pytest.mark.asyncio
async def test_verified_user_passes(client, test_db, test_user, auth_headers):
user_id = uuid.UUID(test_user["user_data"]["id"])
await _set_user_email_state(test_db, user_id, verified_at=datetime.now(timezone.utc))
response = await client.get("/api/v1/trees", headers=auth_headers)
assert response.status_code != 403
@pytest.mark.asyncio
async def test_unverified_in_grace_passes(client, test_db, test_user, auth_headers):
user_id = uuid.UUID(test_user["user_data"]["id"])
await _set_user_email_state(
test_db, user_id,
verified_at=None,
created_at=datetime.now(timezone.utc) - timedelta(days=2),
)
response = await client.get("/api/v1/trees", headers=auth_headers)
assert response.status_code != 403
@pytest.mark.asyncio
async def test_unverified_past_grace_blocks(client, test_db, test_user, auth_headers):
user_id = uuid.UUID(test_user["user_data"]["id"])
await _set_user_email_state(
test_db, user_id,
verified_at=None,
created_at=datetime.now(timezone.utc) - timedelta(days=10),
)
response = await client.get("/api/v1/trees", headers=auth_headers)
assert response.status_code == 403
body = response.json()
assert body["detail"]["error"] == "email_not_verified"
@pytest.mark.asyncio
async def test_unverified_past_grace_allowlisted_still_passes(client, test_db, test_user, auth_headers):
user_id = uuid.UUID(test_user["user_data"]["id"])
await _set_user_email_state(
test_db, user_id,
verified_at=None,
created_at=datetime.now(timezone.utc) - timedelta(days=10),
)
response = await client.get("/api/v1/auth/me", headers=auth_headers)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_combined_guards_unverified_expired_trial(client, test_db, test_user, auth_headers):
"""A user who is BOTH past grace AND on an expired trial should get blocked
by one of the two guards. Either error is acceptable; we just verify a
refusal."""
from app.models.subscription import Subscription
from sqlalchemy import delete
user_id = uuid.UUID(test_user["user_data"]["id"])
account_id = uuid.UUID(test_user["user_data"]["account_id"])
await _set_user_email_state(
test_db, user_id,
verified_at=None,
created_at=datetime.now(timezone.utc) - timedelta(days=10),
)
# Replace the seeded active sub with an expired trial
await test_db.execute(delete(Subscription).where(Subscription.account_id == account_id))
test_db.add(Subscription(
account_id=account_id, plan="pro", status="trialing",
current_period_end=datetime.now(timezone.utc) - timedelta(hours=1),
))
await test_db.commit()
response = await client.get("/api/v1/trees", headers=auth_headers)
assert response.status_code in (402, 403)