Files
resolutionflow/backend/app/core/security.py
Michael Chihlas 8d6accaf60 feat: add account management, email verification, AI fixes, and user guides
- Profile settings, account transfer, delete/leave account flows
- Email verification with JWT tokens and Resend integration
- AI assistant/copilot fixes: markdown rendering, shared RAG helpers,
  token tracking, input refocus, model_validate usage
- User guides hub + detail pages with 13 topic guides
- Sidebar and top bar navigation for guides

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 19:18:06 -05:00

117 lines
3.9 KiB
Python

import hashlib
import secrets
import string
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from .config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password."""
return pwd_context.hash(password, rounds=settings.BCRYPT_ROUNDS)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire, "type": "access"})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def create_refresh_token(data: dict) -> str:
"""Create a JWT refresh token with a unique jti for revocation tracking."""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
jti = str(uuid.uuid4())
to_encode.update({"exp": expire, "type": "refresh", "jti": jti})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def hash_token(jti: str) -> str:
"""Hash a token JTI for secure storage."""
return hashlib.sha256(jti.encode()).hexdigest()
def decode_token(token: str) -> Optional[dict]:
"""Decode and validate a JWT token."""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
return None
def create_password_reset_token(user_id: str) -> str:
"""Create a JWT password reset token (30-minute expiry, unique JTI)."""
jti = str(uuid.uuid4())
expire = datetime.now(timezone.utc) + timedelta(minutes=30)
to_encode = {
"sub": user_id,
"type": "password_reset",
"jti": jti,
"exp": expire,
}
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def create_email_verification_token(user_id: str) -> str:
"""Create a JWT email verification token (24-hour expiry, unique JTI)."""
jti = str(uuid.uuid4())
expire = datetime.now(timezone.utc) + timedelta(hours=24)
to_encode = {
"sub": user_id,
"type": "email_verification",
"jti": jti,
"exp": expire,
}
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def generate_temp_password(length: int = 16) -> str:
"""Generate a temporary password with guaranteed complexity.
Includes at least 1 uppercase, 1 lowercase, 1 digit, and 1 symbol.
Excludes ambiguous characters: 0, O, I, l, 1, |
"""
upper = "ABCDEFGHJKLMNPQRSTUVWXYZ" # no O, I
lower = "abcdefghjkmnopqrstuvwxyz" # no l
digits = "23456789" # no 0, 1
symbols = "!@#$%^&*-_+=?"
# Guarantee at least one of each category
required = [
secrets.choice(upper),
secrets.choice(lower),
secrets.choice(digits),
secrets.choice(symbols),
]
# Fill the rest from the combined pool
pool = upper + lower + digits + symbols
remaining = [secrets.choice(pool) for _ in range(length - len(required))]
# Combine and shuffle
all_chars = required + remaining
# Fisher-Yates shuffle using secrets for uniform randomness
for i in range(len(all_chars) - 1, 0, -1):
j = secrets.randbelow(i + 1)
all_chars[i], all_chars[j] = all_chars[j], all_chars[i]
return "".join(all_chars)