Files
resolutionflow/backend/app/services/psa/encryption.py
chihlasm 46865882c6 feat: ConnectWise PSA integration (#106)
PSA abstraction layer with provider pattern, ConnectWise integration (connection management, ticket linking, note posting, status updates, member mapping), Integrations page UI, Fernet credential encryption, in-memory TTL cache, 6 DB migrations, ConnectWise API reference docs.
2026-03-15 01:45:35 -04:00

54 lines
1.7 KiB
Python

"""Fernet-based credential encryption for PSA connections.
Uses the application SECRET_KEY to derive a Fernet encryption key via HKDF.
Credentials are stored as a single encrypted JSON blob.
"""
from __future__ import annotations
import json
import base64
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from app.core.config import settings
def _get_fernet() -> Fernet:
"""Derive a Fernet key from the application SECRET_KEY."""
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=b"resolutionflow-psa-credentials",
info=b"psa-credential-encryption",
)
key = hkdf.derive(settings.SECRET_KEY.encode())
fernet_key = base64.urlsafe_b64encode(key)
return Fernet(fernet_key)
def encrypt_credentials(credentials: dict) -> str:
"""Encrypt a credentials dict to a Fernet token string."""
f = _get_fernet()
plaintext = json.dumps(credentials).encode()
return f.encrypt(plaintext).decode()
def decrypt_credentials(encrypted: str) -> dict:
"""Decrypt a Fernet token string back to a credentials dict."""
f = _get_fernet()
plaintext = f.decrypt(encrypted.encode())
return json.loads(plaintext)
def mask_credential(value: str | None, visible_suffix: int = 4) -> str:
"""Return a masked version of a credential for display.
e.g., 'abcdefghij' -> '......ghij'
"""
if not value:
return "\u2022\u2022\u2022\u2022\u2022\u2022"
if len(value) <= visible_suffix:
return "\u2022\u2022\u2022\u2022\u2022\u2022" + value
return "\u2022\u2022\u2022\u2022\u2022\u2022" + value[-visible_suffix:]