Phase 1: must_change_password enforcement + change password endpoint/page Phase 2: Admin user creation (M365-style) with temp password Phase 3: Password reset (self-service forgot + admin-triggered) Phase 4: User archive (soft delete) + hard delete with precheck Phase 5: Quick invite from admin Users page Also fixes: - Auto-create subscription for accounts missing one - Hard delete precheck ignores sole-member personal accounts - Seed script patches tree nodes for validation compliance Migrations: 031 (must_change_password), 032 (password_reset_tokens), 033 (user soft delete) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
104 lines
3.4 KiB
Python
104 lines
3.4 KiB
Python
import hashlib
|
|
import secrets
|
|
import string
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
from jose import JWTError, jwt
|
|
from passlib.context import CryptContext
|
|
from .config import settings
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
"""Verify a password against its hash."""
|
|
return pwd_context.verify(plain_password, hashed_password)
|
|
|
|
|
|
def get_password_hash(password: str) -> str:
|
|
"""Hash a password."""
|
|
return pwd_context.hash(password, rounds=settings.BCRYPT_ROUNDS)
|
|
|
|
|
|
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
|
|
"""Create a JWT access token."""
|
|
to_encode = data.copy()
|
|
if expires_delta:
|
|
expire = datetime.now(timezone.utc) + expires_delta
|
|
else:
|
|
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
to_encode.update({"exp": expire, "type": "access"})
|
|
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
|
|
return encoded_jwt
|
|
|
|
|
|
def create_refresh_token(data: dict) -> str:
|
|
"""Create a JWT refresh token with a unique jti for revocation tracking."""
|
|
to_encode = data.copy()
|
|
expire = datetime.now(timezone.utc) + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
|
|
jti = str(uuid.uuid4())
|
|
to_encode.update({"exp": expire, "type": "refresh", "jti": jti})
|
|
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
|
|
return encoded_jwt
|
|
|
|
|
|
def hash_token(jti: str) -> str:
|
|
"""Hash a token JTI for secure storage."""
|
|
return hashlib.sha256(jti.encode()).hexdigest()
|
|
|
|
|
|
def decode_token(token: str) -> Optional[dict]:
|
|
"""Decode and validate a JWT token."""
|
|
try:
|
|
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
|
|
return payload
|
|
except JWTError:
|
|
return None
|
|
|
|
|
|
def create_password_reset_token(user_id: str) -> str:
|
|
"""Create a JWT password reset token (30-minute expiry, unique JTI)."""
|
|
jti = str(uuid.uuid4())
|
|
expire = datetime.now(timezone.utc) + timedelta(minutes=30)
|
|
to_encode = {
|
|
"sub": user_id,
|
|
"type": "password_reset",
|
|
"jti": jti,
|
|
"exp": expire,
|
|
}
|
|
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
|
|
|
|
|
|
def generate_temp_password(length: int = 16) -> str:
|
|
"""Generate a temporary password with guaranteed complexity.
|
|
|
|
Includes at least 1 uppercase, 1 lowercase, 1 digit, and 1 symbol.
|
|
Excludes ambiguous characters: 0, O, I, l, 1, |
|
|
"""
|
|
upper = "ABCDEFGHJKLMNPQRSTUVWXYZ" # no O, I
|
|
lower = "abcdefghjkmnopqrstuvwxyz" # no l
|
|
digits = "23456789" # no 0, 1
|
|
symbols = "!@#$%^&*-_+=?"
|
|
|
|
# Guarantee at least one of each category
|
|
required = [
|
|
secrets.choice(upper),
|
|
secrets.choice(lower),
|
|
secrets.choice(digits),
|
|
secrets.choice(symbols),
|
|
]
|
|
|
|
# Fill the rest from the combined pool
|
|
pool = upper + lower + digits + symbols
|
|
remaining = [secrets.choice(pool) for _ in range(length - len(required))]
|
|
|
|
# Combine and shuffle
|
|
all_chars = required + remaining
|
|
# Fisher-Yates shuffle using secrets for uniform randomness
|
|
for i in range(len(all_chars) - 1, 0, -1):
|
|
j = secrets.randbelow(i + 1)
|
|
all_chars[i], all_chars[j] = all_chars[j], all_chars[i]
|
|
|
|
return "".join(all_chars)
|