Files
resolutionflow/backend/tests/test_session_policy.py
Michael Chihlas c7cd711859 feat: AccountSecuritySettingsPage + active-users list + toast + login banner
Eighth commit in the session-expiration-policy series. Surfaces all
the owner controls and user-facing expiry UX that the prior commits
plumbed through, designed end-to-end via /plan-design-review (initial
4/10 -> final 9/10; 7 decisions locked in the plan).

Backend additions:
- accounts/me/security GET response gains active_users: list of
  {user_id, name, email, last_login_at} for users in this account
  with at least one un-revoked refresh token. Joined query on
  refresh_tokens + users, distinct, ordered by last_login desc.
  Drives the Active Sessions section.

Frontend additions:
- api/accountSecurity.ts: typed client for GET/PATCH/revoke-sessions.
- hooks/useAuthSessionExpiry.ts: reads idle/absolute expiry from the
  auth store, returns warning ('none'|'soon'|'now') + reason
  ('idle'|'absolute') so consumers can pick the right UX for the
  closer window. Re-evaluates every 30s.
- components/common/SessionExpiryToast.tsx: top-of-app notice that
  fires at T-5min. Idle case: warning-amber tone, [Stay signed in]
  button hits authApi.refresh() and updates the store on success.
  Absolute case: info-cyan tone, [Sign in now] link to /login (no
  recoverable action). Dismissable, doesn't re-fire after dismissal.
- components/account/RevokeSessionsModal.tsx: confirmation modal for
  the two bulk-revoke scopes. Title, body, and confirm-label vary by
  scope; danger-styled confirm button.
- pages/account/AccountSecuritySettingsPage.tsx: the main page.
  Header (Shield icon), intro, Policy card with Strict/Standard/Custom
  radios + always-visible-disabled Custom inputs (idle/absolute
  minutes) with inline validation, Save button + emerald success ping,
  info note about 'applies at next login'. Active sessions card with
  count-aware copy, list of {name, email, last-login-ago} rows
  (caller tagged '(you)'), two buttons — 'except me' hidden when
  count=1, 'sign me out and everyone else' uses danger-tinted styling.
- pages/AccountSettingsPage.tsx: 'Session security' row added to the
  owner-only settings list.
- router.tsx: /account/security route, owner-gated via ProtectedRoute.
- pages/LoginPage.tsx: cyan info-tone banner above form when
  ?reason=session_expired is in the URL.
- components/layout/AppLayout.tsx: mounts <SessionExpiryToast />.

Scope=all bulk-revoke UX (the most jarring moment): on success,
toast.success(N sessions), 1.5s delay, then clear localStorage +
useAuthStore.logout() + window.location='/login' (no banner — the
owner just did this).

Backend tests: existing 22/22 still green plus the GET test now
asserts active_users is present + non-empty after login. Frontend:
tsc clean, authStore test 2/2.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-13 17:07:14 -04:00

783 lines
29 KiB
Python

"""Tests for the session-expiration-policy series.
See docs/plans/2026-05-13-session-expiration-policy.md.
Test numbers below correspond to the cases listed in §6 of the plan.
This file grows across commits:
- Commit 2: error-detail taxonomy (#11 + wrong-type + bad-signature)
- Commit 3: claims embedded at login + response fields surfaced (#1, #14)
- Commit 4: absolute-cap enforcement + grandfather path (#8, #9, #12)
- Commit 5: GET/PATCH /accounts/me/security (#2, #3, #4, #5, #7, #16)
- Commit 6: POST /accounts/me/security/revoke-sessions (#17-#22)
"""
import uuid
from datetime import datetime, timedelta, timezone
import pytest
from httpx import AsyncClient
from jose import jwt
from app.core.config import settings
def _encode_refresh_token(
*,
sub: str,
exp: datetime,
token_type: str = "refresh",
secret: str | None = None,
) -> str:
"""Build a refresh JWT with arbitrary `exp` for testing.
Bypasses create_refresh_token so tests can produce already-expired
tokens, wrong-type tokens, or wrong-signature tokens.
"""
return jwt.encode(
{
"sub": sub,
"type": token_type,
"jti": str(uuid.uuid4()),
"exp": exp,
},
secret or settings.SECRET_KEY,
algorithm=settings.ALGORITHM,
)
class TestRefreshTokenErrorTaxonomy:
"""§6 test #11 — refresh-token error-detail taxonomy.
`/auth/refresh` distinguishes idle expiry from generic invalid-token
failures via `detail`, so the frontend can choose between the "session
ended for security" banner and a plain logout redirect.
"""
@pytest.mark.asyncio
async def test_idle_expired_refresh_returns_session_expired_idle(
self, client: AsyncClient, test_user: dict
):
token = _encode_refresh_token(
sub=test_user["user_data"]["id"],
exp=datetime.now(timezone.utc) - timedelta(seconds=1),
)
response = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 401
assert response.json()["detail"] == "session_expired_idle"
@pytest.mark.asyncio
async def test_wrong_type_token_returns_invalid_refresh_token(
self, client: AsyncClient, test_user: dict
):
token = _encode_refresh_token(
sub=test_user["user_data"]["id"],
exp=datetime.now(timezone.utc) + timedelta(minutes=5),
token_type="access",
)
response = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 401
assert response.json()["detail"] == "invalid_refresh_token"
@pytest.mark.asyncio
async def test_bad_signature_returns_invalid_refresh_token(
self, client: AsyncClient, test_user: dict
):
token = _encode_refresh_token(
sub=test_user["user_data"]["id"],
exp=datetime.now(timezone.utc) + timedelta(minutes=5),
secret="not-the-real-secret-key",
)
response = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 401
assert response.json()["detail"] == "invalid_refresh_token"
class TestSessionPolicyClaims:
"""§6 tests #1 and #14 — session-policy claims stamped at login.
Every token-issuing endpoint embeds auth_time/idle_max/abs_max in
the refresh JWT and surfaces idle_expires_at/absolute_expires_at on
the response.
"""
@pytest.mark.asyncio
async def test_login_json_embeds_session_claims_with_defaults(
self, client: AsyncClient, test_user: dict
):
before = datetime.now(timezone.utc)
response = await client.post(
"/api/v1/auth/login/json",
json={
"email": test_user["email"],
"password": test_user["password"],
},
)
assert response.status_code == 200, response.json()
body = response.json()
after = datetime.now(timezone.utc)
# Response surfaces both expiry windows as ISO strings.
assert body["idle_expires_at"] is not None
assert body["absolute_expires_at"] is not None
idle_at = datetime.fromisoformat(body["idle_expires_at"])
abs_at = datetime.fromisoformat(body["absolute_expires_at"])
# Strict default: 3 days idle, 14 days absolute.
assert timedelta(days=3) - timedelta(seconds=10) <= idle_at - before <= timedelta(days=3) + timedelta(seconds=10)
assert timedelta(days=14) - timedelta(seconds=10) <= abs_at - before <= timedelta(days=14) + timedelta(seconds=10)
# JWT carries the claims in seconds, plus auth_time as Unix seconds.
decoded = jwt.decode(
body["refresh_token"], settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
assert decoded["idle_max"] == 3 * 24 * 60 * 60 # 259200
assert decoded["abs_max"] == 14 * 24 * 60 * 60 # 1209600
assert int(before.timestamp()) <= decoded["auth_time"] <= int(after.timestamp())
@pytest.mark.asyncio
async def test_refresh_carries_claims_forward_unchanged(
self, client: AsyncClient, test_user: dict
):
# Login produces the original session.
login_resp = await client.post(
"/api/v1/auth/login/json",
json={"email": test_user["email"], "password": test_user["password"]},
)
original_refresh = login_resp.json()["refresh_token"]
original_payload = jwt.decode(
original_refresh, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
# Refresh rotates the token but must carry auth_time/idle_max/abs_max
# forward unchanged so the absolute window doesn't slide.
refresh_resp = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {original_refresh}"},
)
assert refresh_resp.status_code == 200, refresh_resp.json()
new_refresh = refresh_resp.json()["refresh_token"]
new_payload = jwt.decode(
new_refresh, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
assert new_payload["auth_time"] == original_payload["auth_time"]
assert new_payload["idle_max"] == original_payload["idle_max"]
assert new_payload["abs_max"] == original_payload["abs_max"]
# Idle deadline does slide because exp = now + idle_max.
assert new_payload["exp"] >= original_payload["exp"]
# JTI rotates.
assert new_payload["jti"] != original_payload["jti"]
def _backdate_auth_time(refresh_token: str, *, seconds_back: int) -> str:
"""Re-sign a refresh JWT with an earlier auth_time, preserving JTI.
The DB row in refresh_tokens is keyed on hash(jti), so preserving jti
lets the atomic revoke step still find the row. Used to simulate
"this session is past its absolute cap" without waiting two weeks.
"""
payload = jwt.decode(
refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
payload["auth_time"] = payload["auth_time"] - seconds_back
return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
class TestSessionPolicyEndpoint:
"""§6 tests #2, #3, #4, #5, #7, #16 — GET/PATCH /accounts/me/security."""
@pytest.mark.asyncio
async def test_get_returns_defaults_and_bounds(
self, client: AsyncClient, auth_headers: dict, test_user: dict
):
response = await client.get(
"/api/v1/accounts/me/security", headers=auth_headers
)
assert response.status_code == 200, response.json()
body = response.json()
# No override yet -> effective values are the system defaults.
assert body["idle_minutes"] is None
assert body["absolute_minutes"] is None
assert body["effective_idle_minutes"] == 4320 # 3d Strict default
assert body["effective_absolute_minutes"] == 20160 # 14d
assert body["idle_minutes_min"] == 15
assert body["idle_minutes_max"] == 43200
assert body["absolute_minutes_min"] == 60
assert body["absolute_minutes_max"] == 129600
# active_users reflects users with un-revoked refresh tokens.
# auth_headers logged the owner in once, so they should appear.
assert isinstance(body["active_users"], list)
assert len(body["active_users"]) >= 1
emails = [u["email"] for u in body["active_users"]]
assert test_user["email"] in emails
# Schema check on one row.
first = body["active_users"][0]
assert "user_id" in first
assert "name" in first
assert "email" in first
assert "last_login_at" in first
@pytest.mark.asyncio
async def test_patch_persists_override_and_returns_new_state(
self, client: AsyncClient, auth_headers: dict
):
response = await client.patch(
"/api/v1/accounts/me/security",
headers=auth_headers,
json={"idle_minutes": 60, "absolute_minutes": 240},
)
assert response.status_code == 200, response.json()
body = response.json()
assert body["idle_minutes"] == 60
assert body["absolute_minutes"] == 240
assert body["effective_idle_minutes"] == 60
assert body["effective_absolute_minutes"] == 240
# Next login picks up the new policy.
login_resp = await client.post(
"/api/v1/auth/login/json",
json={"email": "test@example.com", "password": "TestPassword123!"},
)
new_payload = jwt.decode(
login_resp.json()["refresh_token"],
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
assert new_payload["idle_max"] == 60 * 60 # 3600 seconds
assert new_payload["abs_max"] == 240 * 60 # 14400 seconds
@pytest.mark.asyncio
async def test_patch_rejects_idle_below_min(
self, client: AsyncClient, auth_headers: dict
):
response = await client.patch(
"/api/v1/accounts/me/security",
headers=auth_headers,
json={"idle_minutes": 5, "absolute_minutes": 60},
)
assert response.status_code == 422
assert "idle_minutes" in response.json()["detail"]
@pytest.mark.asyncio
async def test_patch_rejects_absolute_above_max(
self, client: AsyncClient, auth_headers: dict
):
response = await client.patch(
"/api/v1/accounts/me/security",
headers=auth_headers,
json={"absolute_minutes": 200000},
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_patch_rejects_idle_greater_than_absolute_both_set(
self, client: AsyncClient, auth_headers: dict
):
response = await client.patch(
"/api/v1/accounts/me/security",
headers=auth_headers,
json={"idle_minutes": 300, "absolute_minutes": 120},
)
assert response.status_code == 422
assert "exceed" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_patch_rejects_partial_override_when_effective_invalid(
self, client: AsyncClient, auth_headers: dict
):
"""§6 test #5 — partial override: idle=43200, absolute=NULL ->
effective idle (43200) > effective absolute (20160 default) -> 422.
"""
response = await client.patch(
"/api/v1/accounts/me/security",
headers=auth_headers,
json={"idle_minutes": 43200, "absolute_minutes": None},
)
assert response.status_code == 422
assert "exceed" in response.json()["detail"].lower()
@pytest.mark.asyncio
async def test_non_owner_cannot_patch(
self, client: AsyncClient, test_user: dict, test_db
):
"""§6 test #7 — engineer role is forbidden."""
from app.models.user import User
from sqlalchemy import select
# Add a second user in the same account with account_role=engineer.
result = await test_db.execute(
select(User).where(User.email == test_user["email"])
)
owner = result.scalar_one()
engineer = User(
email="engineer-policy@example.com",
password_hash=owner.password_hash, # reuse the bcrypt hash
name="Engineer",
role="engineer",
is_super_admin=False,
is_active=True,
account_id=owner.account_id,
account_role="engineer",
email_verified_at=datetime.now(timezone.utc),
)
test_db.add(engineer)
await test_db.commit()
login_resp = await client.post(
"/api/v1/auth/login/json",
json={
"email": "engineer-policy@example.com",
"password": test_user["password"],
},
)
assert login_resp.status_code == 200
engineer_headers = {
"Authorization": f"Bearer {login_resp.json()['access_token']}"
}
response = await client.patch(
"/api/v1/accounts/me/security",
headers=engineer_headers,
json={"idle_minutes": 60, "absolute_minutes": 240},
)
assert response.status_code == 403
@pytest.mark.asyncio
async def test_patch_writes_audit_row(
self, client: AsyncClient, auth_headers: dict, test_db
):
"""§6 test #16 — PATCH emits one account.session_policy_update
audit event with old/new + effective_old/new payload.
"""
from app.models.audit_log import AuditLog
from sqlalchemy import select
response = await client.patch(
"/api/v1/accounts/me/security",
headers=auth_headers,
json={"idle_minutes": 120, "absolute_minutes": 480},
)
assert response.status_code == 200
result = await test_db.execute(
select(AuditLog).where(AuditLog.action == "account.session_policy_update")
)
rows = result.scalars().all()
assert len(rows) == 1
entry = rows[0]
assert entry.resource_type == "account"
assert entry.details["new"] == {"idle_minutes": 120, "absolute_minutes": 480}
assert entry.details["effective_new"] == {
"idle_minutes": 120,
"absolute_minutes": 480,
}
assert entry.details["effective_old"]["idle_minutes"] == 4320 # default
assert entry.details["effective_old"]["absolute_minutes"] == 20160
async def _seed_extra_account_user(
test_db, *, email: str, account_id, password_hash: str, role: str = "engineer"
):
"""Add a second user under an existing account for revoke-scope tests."""
from app.models.user import User
user = User(
email=email,
password_hash=password_hash,
name=email,
role="engineer",
is_super_admin=False,
is_active=True,
account_id=account_id,
account_role=role,
email_verified_at=datetime.now(timezone.utc),
)
test_db.add(user)
await test_db.commit()
return user
class TestBulkRevoke:
"""§6 tests #17-#22 — POST /accounts/me/security/revoke-sessions."""
@pytest.mark.asyncio
async def test_revoke_all_kills_callers_own_session(
self, client: AsyncClient, test_user: dict, test_db
):
"""§6 test #17 — scope=all includes the caller's own token. After
the response, the caller's refresh_token gets invalid_refresh_token
on next /auth/refresh.
"""
from app.models.user import User
from sqlalchemy import select
owner = (
await test_db.execute(
select(User).where(User.email == test_user["email"])
)
).scalar_one()
await _seed_extra_account_user(
test_db,
email="member-revoke-all@example.com",
account_id=owner.account_id,
password_hash=owner.password_hash,
)
# Owner logs in (also seeds owner's refresh-token row).
owner_login = await client.post(
"/api/v1/auth/login/json",
json={"email": test_user["email"], "password": test_user["password"]},
)
owner_refresh = owner_login.json()["refresh_token"]
owner_access = owner_login.json()["access_token"]
# Member also logs in so there's another active refresh-token row.
member_login = await client.post(
"/api/v1/auth/login/json",
json={
"email": "member-revoke-all@example.com",
"password": test_user["password"],
},
)
assert member_login.status_code == 200
response = await client.post(
"/api/v1/accounts/me/security/revoke-sessions",
headers={"Authorization": f"Bearer {owner_access}"},
json={"scope": "all"},
)
assert response.status_code == 200, response.json()
assert response.json()["revoked_count"] == 2
# Owner's own refresh now returns invalid_refresh_token.
retry = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {owner_refresh}"},
)
assert retry.status_code == 401
assert retry.json()["detail"] == "invalid_refresh_token"
@pytest.mark.asyncio
async def test_revoke_others_preserves_callers_session(
self, client: AsyncClient, test_user: dict, test_db
):
"""§6 test #18 — scope=others excludes the caller's user_id from
the bulk update. Caller can still refresh; other users cannot.
"""
from app.models.user import User
from sqlalchemy import select
owner = (
await test_db.execute(
select(User).where(User.email == test_user["email"])
)
).scalar_one()
await _seed_extra_account_user(
test_db,
email="member-revoke-others@example.com",
account_id=owner.account_id,
password_hash=owner.password_hash,
)
owner_login = await client.post(
"/api/v1/auth/login/json",
json={"email": test_user["email"], "password": test_user["password"]},
)
owner_refresh = owner_login.json()["refresh_token"]
owner_access = owner_login.json()["access_token"]
member_login = await client.post(
"/api/v1/auth/login/json",
json={
"email": "member-revoke-others@example.com",
"password": test_user["password"],
},
)
member_refresh = member_login.json()["refresh_token"]
response = await client.post(
"/api/v1/accounts/me/security/revoke-sessions",
headers={"Authorization": f"Bearer {owner_access}"},
json={"scope": "others"},
)
assert response.status_code == 200
assert response.json()["revoked_count"] == 1
# Owner's refresh still works.
owner_retry = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {owner_refresh}"},
)
assert owner_retry.status_code == 200
# Member's refresh is dead.
member_retry = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {member_refresh}"},
)
assert member_retry.status_code == 401
assert member_retry.json()["detail"] == "invalid_refresh_token"
@pytest.mark.asyncio
async def test_revoke_is_account_scoped(
self, client: AsyncClient, test_user: dict, test_admin: dict
):
"""§6 test #19 — owner of account A cannot revoke tokens in account B.
test_admin lives in its own account. After test_user's owner runs
revoke-all, test_admin's session continues to work.
"""
owner_login = await client.post(
"/api/v1/auth/login/json",
json={"email": test_user["email"], "password": test_user["password"]},
)
owner_access = owner_login.json()["access_token"]
admin_login = await client.post(
"/api/v1/auth/login/json",
json={"email": test_admin["email"], "password": test_admin["password"]},
)
admin_refresh = admin_login.json()["refresh_token"]
response = await client.post(
"/api/v1/accounts/me/security/revoke-sessions",
headers={"Authorization": f"Bearer {owner_access}"},
json={"scope": "all"},
)
assert response.status_code == 200
# Only test_user's own session is revoked.
assert response.json()["revoked_count"] == 1
admin_retry = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {admin_refresh}"},
)
assert admin_retry.status_code == 200
@pytest.mark.asyncio
async def test_revoke_engineer_forbidden(
self, client: AsyncClient, test_user: dict, test_db
):
"""§6 test #20 — engineer-role member gets 403."""
from app.models.user import User
from sqlalchemy import select
owner = (
await test_db.execute(
select(User).where(User.email == test_user["email"])
)
).scalar_one()
await _seed_extra_account_user(
test_db,
email="engineer-revoke@example.com",
account_id=owner.account_id,
password_hash=owner.password_hash,
)
engineer_login = await client.post(
"/api/v1/auth/login/json",
json={
"email": "engineer-revoke@example.com",
"password": test_user["password"],
},
)
engineer_access = engineer_login.json()["access_token"]
response = await client.post(
"/api/v1/accounts/me/security/revoke-sessions",
headers={"Authorization": f"Bearer {engineer_access}"},
json={"scope": "all"},
)
assert response.status_code == 403
@pytest.mark.asyncio
async def test_revoke_writes_audit_row(
self, client: AsyncClient, test_user: dict, test_db
):
"""§6 test #21 — emits one account.sessions_revoked_bulk event."""
from app.models.audit_log import AuditLog
from sqlalchemy import select
owner_login = await client.post(
"/api/v1/auth/login/json",
json={"email": test_user["email"], "password": test_user["password"]},
)
owner_access = owner_login.json()["access_token"]
response = await client.post(
"/api/v1/accounts/me/security/revoke-sessions",
headers={"Authorization": f"Bearer {owner_access}"},
json={"scope": "all"},
)
assert response.status_code == 200
result = await test_db.execute(
select(AuditLog).where(AuditLog.action == "account.sessions_revoked_bulk")
)
rows = result.scalars().all()
assert len(rows) == 1
entry = rows[0]
assert entry.details["scope"] == "all"
assert entry.details["revoked_count"] == 1
@pytest.mark.asyncio
async def test_revoke_is_idempotent(
self, client: AsyncClient, test_user: dict
):
"""§6 test #22 — second immediate POST returns revoked_count=0
(no already-revoked rows get double-stamped or counted again).
"""
owner_login = await client.post(
"/api/v1/auth/login/json",
json={"email": test_user["email"], "password": test_user["password"]},
)
owner_access = owner_login.json()["access_token"]
first = await client.post(
"/api/v1/accounts/me/security/revoke-sessions",
headers={"Authorization": f"Bearer {owner_access}"},
json={"scope": "others"}, # owner's own session preserved
)
assert first.status_code == 200
second = await client.post(
"/api/v1/accounts/me/security/revoke-sessions",
headers={"Authorization": f"Bearer {owner_access}"},
json={"scope": "others"},
)
assert second.status_code == 200
assert second.json()["revoked_count"] == 0
class TestAbsoluteCap:
"""§6 tests #8, #9, #12 — absolute-cap enforcement and grandfather path."""
@pytest.mark.asyncio
async def test_refresh_at_absolute_deadline_rejects(
self, client: AsyncClient, test_user: dict
):
"""§6 test #8 — boundary check uses `>=`, not `>`.
A token whose auth_time + abs_max equals now() is expired, not
valid. Backdate the original token's auth_time by exactly abs_max
seconds so now >= deadline.
"""
login_resp = await client.post(
"/api/v1/auth/login/json",
json={"email": test_user["email"], "password": test_user["password"]},
)
original = login_resp.json()["refresh_token"]
abs_max = jwt.decode(
original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)["abs_max"]
expired = _backdate_auth_time(original, seconds_back=abs_max)
response = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {expired}"},
)
assert response.status_code == 401
assert response.json()["detail"] == "session_expired_absolute"
@pytest.mark.asyncio
async def test_absolute_expired_token_is_consumed(
self, client: AsyncClient, test_user: dict
):
"""§6 test #9 — first attempt returns session_expired_absolute and
revokes the row; second attempt sees the revoked row and returns
invalid_refresh_token. Prevents replay of an absolute-expired token.
"""
login_resp = await client.post(
"/api/v1/auth/login/json",
json={"email": test_user["email"], "password": test_user["password"]},
)
original = login_resp.json()["refresh_token"]
abs_max = jwt.decode(
original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)["abs_max"]
expired = _backdate_auth_time(original, seconds_back=abs_max + 1)
first = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {expired}"},
)
assert first.status_code == 401
assert first.json()["detail"] == "session_expired_absolute"
second = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {expired}"},
)
assert second.status_code == 401
assert second.json()["detail"] == "invalid_refresh_token"
@pytest.mark.asyncio
async def test_grandfather_path_for_legacy_token(
self, client: AsyncClient, test_user: dict, test_db
):
"""§6 test #12 — refresh token issued before this PR (no auth_time
claim) gets one successful rotation; the new token has fresh
auth_time/idle_max/abs_max claims snapshotted from current policy.
"""
from app.core.security import hash_token
from app.models.refresh_token import RefreshToken
login_resp = await client.post(
"/api/v1/auth/login/json",
json={"email": test_user["email"], "password": test_user["password"]},
)
original = login_resp.json()["refresh_token"]
original_payload = jwt.decode(
original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
# Strip the new claims to simulate a token issued before this PR.
# JTI preserved so the DB-side revoke still finds the row.
legacy_payload = {
"sub": original_payload["sub"],
"type": "refresh",
"jti": original_payload["jti"],
"exp": original_payload["exp"],
}
legacy_token = jwt.encode(
legacy_payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM
)
response = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {legacy_token}"},
)
assert response.status_code == 200, response.json()
new_payload = jwt.decode(
response.json()["refresh_token"],
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
assert new_payload.get("auth_time") is not None
assert new_payload.get("idle_max") == 3 * 24 * 60 * 60
assert new_payload.get("abs_max") == 14 * 24 * 60 * 60
# auth_time was set to ~now during grandfather, not preserved from
# the legacy token (since the legacy token didn't have one).
now_unix = int(datetime.now(timezone.utc).timestamp())
assert abs(new_payload["auth_time"] - now_unix) < 10