"""Fernet-based credential encryption for PSA connections. Uses the application SECRET_KEY to derive a Fernet encryption key via HKDF. Credentials are stored as a single encrypted JSON blob. """ from __future__ import annotations import json import base64 from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.hkdf import HKDF from app.core.config import settings def _get_fernet() -> Fernet: """Derive a Fernet key from the application SECRET_KEY.""" hkdf = HKDF( algorithm=hashes.SHA256(), length=32, salt=b"resolutionflow-psa-credentials", info=b"psa-credential-encryption", ) key = hkdf.derive(settings.SECRET_KEY.encode()) fernet_key = base64.urlsafe_b64encode(key) return Fernet(fernet_key) def encrypt_credentials(credentials: dict) -> str: """Encrypt a credentials dict to a Fernet token string.""" f = _get_fernet() plaintext = json.dumps(credentials).encode() return f.encrypt(plaintext).decode() def decrypt_credentials(encrypted: str) -> dict: """Decrypt a Fernet token string back to a credentials dict.""" f = _get_fernet() plaintext = f.decrypt(encrypted.encode()) return json.loads(plaintext) def mask_credential(value: str | None, visible_suffix: int = 4) -> str: """Return a masked version of a credential for display. e.g., 'abcdefghij' -> '......ghij' """ if not value: return "\u2022\u2022\u2022\u2022\u2022\u2022" if len(value) <= visible_suffix: return "\u2022\u2022\u2022\u2022\u2022\u2022" + value return "\u2022\u2022\u2022\u2022\u2022\u2022" + value[-visible_suffix:]