feat(auth): auto-send verification email on register; enforce invite email match

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-06 15:05:15 -04:00
parent b0708ed650
commit 86893562b9
2 changed files with 135 additions and 0 deletions

View File

@@ -1,3 +1,4 @@
import logging
import secrets
import string
from datetime import datetime, timezone, timedelta
@@ -41,6 +42,8 @@ from app.core.email import EmailService
from app.api.deps import get_current_active_user, get_refresh_token_payload
from app.core.audit import log_audit
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/auth", tags=["authentication"])
@@ -124,6 +127,12 @@ async def register(
detail="Account invite code has expired"
)
if account_invite_record.email.lower() != user_data.email.lower():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"error": "invite_email_mismatch"},
)
# Validate platform invite code (skip if account invite was provided)
invite_code_record = None
if not account_invite_record:
@@ -244,6 +253,34 @@ async def register(
await db.commit()
await db.refresh(new_user)
# Auto-send verification email for newly-registered users.
# Skip silently if verification already done (shouldn't happen for fresh
# users, but defensive).
if new_user.email_verified_at is None:
verification_enabled = await SettingsManager.get(
"email_verification_enabled", db, default=True
)
if verification_enabled:
try:
raw_token = create_email_verification_token(str(new_user.id))
payload = decode_token(raw_token)
if payload and payload.get("jti"):
token_record = EmailVerificationToken(
token_hash=hash_token(payload["jti"]),
user_id=new_user.id,
expires_at=datetime.fromtimestamp(payload["exp"], tz=timezone.utc),
)
db.add(token_record)
await db.commit()
verification_url = f"{settings.FRONTEND_URL}/verify-email?token={raw_token}"
await EmailService.send_email_verification_email(
to_email=new_user.email,
verification_url=verification_url,
)
except Exception as e:
logger.warning("verification email send failed for %s: %s", new_user.email, e)
return new_user

View File

@@ -0,0 +1,98 @@
import pytest
from datetime import datetime, timezone, timedelta
from unittest.mock import AsyncMock, patch
from sqlalchemy import select
@pytest.mark.asyncio
async def test_register_auto_sends_verification_email(client, test_db):
"""Fresh registration triggers send_email_verification_email."""
with patch(
"app.core.email.EmailService.send_email_verification_email",
new_callable=AsyncMock,
) as mock_send:
response = await client.post("/api/v1/auth/register", json={
"email": "newshop@example.com",
"password": "Verystrong1Pwd",
"name": "New Shop",
})
assert response.status_code in (200, 201), response.json()
mock_send.assert_called_once()
kwargs = mock_send.call_args.kwargs
assert kwargs["to_email"] == "newshop@example.com"
assert "/verify-email?token=" in kwargs["verification_url"]
@pytest.mark.asyncio
async def test_register_with_account_invite_code_email_mismatch_rejected(
client, test_db, test_user
):
"""Invite code is for invited@example.com but user registers with a
different email -> 400 invite_email_mismatch."""
from app.models.account_invite import AccountInvite
import uuid
invited_by_id = uuid.UUID(test_user["user_data"]["id"])
account_id = uuid.UUID(test_user["user_data"]["account_id"])
invite = AccountInvite(
account_id=account_id,
invited_by_id=invited_by_id,
email="invited@example.com",
code="INVITECODE99",
role="engineer",
expires_at=datetime.now(timezone.utc) + timedelta(days=7),
)
test_db.add(invite)
await test_db.commit()
response = await client.post("/api/v1/auth/register", json={
"email": "wrong-email@example.com",
"password": "Verystrong1Pwd",
"name": "Wrong Email",
"account_invite_code": "INVITECODE99",
})
assert response.status_code == 400, response.json()
assert response.json()["detail"]["error"] == "invite_email_mismatch"
@pytest.mark.asyncio
async def test_register_with_account_invite_code_email_match_accepted(
client, test_db, test_user
):
"""Invite code is for invited@example.com - registering with that email
succeeds and joins the existing account."""
from app.models.account_invite import AccountInvite
from app.models.user import User
import uuid
invited_by_id = uuid.UUID(test_user["user_data"]["id"])
account_id = uuid.UUID(test_user["user_data"]["account_id"])
invite = AccountInvite(
account_id=account_id,
invited_by_id=invited_by_id,
email="invited@example.com",
code="INVITECODE100",
role="engineer",
expires_at=datetime.now(timezone.utc) + timedelta(days=7),
)
test_db.add(invite)
await test_db.commit()
with patch(
"app.core.email.EmailService.send_email_verification_email",
new_callable=AsyncMock,
):
response = await client.post("/api/v1/auth/register", json={
"email": "invited@example.com",
"password": "Verystrong1Pwd",
"name": "Invited",
"account_invite_code": "INVITECODE100",
})
assert response.status_code in (200, 201), response.json()
new_user = (await test_db.execute(
select(User).where(User.email == "invited@example.com")
)).scalar_one()
assert new_user.account_id == account_id # joined existing account