Files
resolutionflow/backend/app/core/security.py
chihlasm ad59446332 feat: user management — admin create, password reset, archive/delete, quick invite
Phase 1: must_change_password enforcement + change password endpoint/page
Phase 2: Admin user creation (M365-style) with temp password
Phase 3: Password reset (self-service forgot + admin-triggered)
Phase 4: User archive (soft delete) + hard delete with precheck
Phase 5: Quick invite from admin Users page

Also fixes:
- Auto-create subscription for accounts missing one
- Hard delete precheck ignores sole-member personal accounts
- Seed script patches tree nodes for validation compliance

Migrations: 031 (must_change_password), 032 (password_reset_tokens), 033 (user soft delete)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 01:42:51 -05:00

104 lines
3.4 KiB
Python

import hashlib
import secrets
import string
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from .config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password."""
return pwd_context.hash(password, rounds=settings.BCRYPT_ROUNDS)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire, "type": "access"})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def create_refresh_token(data: dict) -> str:
"""Create a JWT refresh token with a unique jti for revocation tracking."""
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
jti = str(uuid.uuid4())
to_encode.update({"exp": expire, "type": "refresh", "jti": jti})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def hash_token(jti: str) -> str:
"""Hash a token JTI for secure storage."""
return hashlib.sha256(jti.encode()).hexdigest()
def decode_token(token: str) -> Optional[dict]:
"""Decode and validate a JWT token."""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
return None
def create_password_reset_token(user_id: str) -> str:
"""Create a JWT password reset token (30-minute expiry, unique JTI)."""
jti = str(uuid.uuid4())
expire = datetime.now(timezone.utc) + timedelta(minutes=30)
to_encode = {
"sub": user_id,
"type": "password_reset",
"jti": jti,
"exp": expire,
}
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def generate_temp_password(length: int = 16) -> str:
"""Generate a temporary password with guaranteed complexity.
Includes at least 1 uppercase, 1 lowercase, 1 digit, and 1 symbol.
Excludes ambiguous characters: 0, O, I, l, 1, |
"""
upper = "ABCDEFGHJKLMNPQRSTUVWXYZ" # no O, I
lower = "abcdefghjkmnopqrstuvwxyz" # no l
digits = "23456789" # no 0, 1
symbols = "!@#$%^&*-_+=?"
# Guarantee at least one of each category
required = [
secrets.choice(upper),
secrets.choice(lower),
secrets.choice(digits),
secrets.choice(symbols),
]
# Fill the rest from the combined pool
pool = upper + lower + digits + symbols
remaining = [secrets.choice(pool) for _ in range(length - len(required))]
# Combine and shuffle
all_chars = required + remaining
# Fisher-Yates shuffle using secrets for uniform randomness
for i in range(len(all_chars) - 1, 0, -1):
j = secrets.randbelow(i + 1)
all_chars[i], all_chars[j] = all_chars[j], all_chars[i]
return "".join(all_chars)