feat(psa): add in-memory TTL cache for board statuses
Add PSACache class with get/set/invalidate/clear operations and TTL expiry. Board statuses are cached for 1 hour in the provider. Cache is cleared when a PSA connection is re-tested. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
38
backend/app/services/psa/cache.py
Normal file
38
backend/app/services/psa/cache.py
Normal file
@@ -0,0 +1,38 @@
|
||||
"""Simple in-memory TTL cache for PSA API responses."""
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
|
||||
class PSACache:
|
||||
"""Account-scoped in-memory cache with TTL expiry."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._store: dict[str, tuple[Any, float]] = {}
|
||||
|
||||
def get(self, key: str) -> Any | None:
|
||||
entry = self._store.get(key)
|
||||
if entry is None:
|
||||
return None
|
||||
value, expires_at = entry
|
||||
if time.time() > expires_at:
|
||||
del self._store[key]
|
||||
return None
|
||||
return value
|
||||
|
||||
def set(self, key: str, value: Any, ttl_seconds: int) -> None:
|
||||
self._store[key] = (value, time.time() + ttl_seconds)
|
||||
|
||||
def invalidate(self, prefix: str) -> None:
|
||||
"""Remove all entries matching a key prefix."""
|
||||
keys_to_remove = [k for k in self._store if k.startswith(prefix)]
|
||||
for k in keys_to_remove:
|
||||
del self._store[k]
|
||||
|
||||
def clear(self) -> None:
|
||||
self._store.clear()
|
||||
|
||||
|
||||
# Global singleton — acceptable at current scale (see design doc section 6)
|
||||
psa_cache = PSACache()
|
||||
Reference in New Issue
Block a user