"""Tests for the session-expiration-policy series. See docs/plans/2026-05-13-session-expiration-policy.md. Test numbers below correspond to the cases listed in §6 of the plan. This file grows across commits: - Commit 2: error-detail taxonomy (#11 + wrong-type + bad-signature) - Commit 3: claims embedded at login + response fields surfaced (#1, #14) - Commit 4: absolute-cap enforcement + grandfather path (#8, #9, #12) """ import uuid from datetime import datetime, timedelta, timezone import pytest from httpx import AsyncClient from jose import jwt from app.core.config import settings def _encode_refresh_token( *, sub: str, exp: datetime, token_type: str = "refresh", secret: str | None = None, ) -> str: """Build a refresh JWT with arbitrary `exp` for testing. Bypasses create_refresh_token so tests can produce already-expired tokens, wrong-type tokens, or wrong-signature tokens. """ return jwt.encode( { "sub": sub, "type": token_type, "jti": str(uuid.uuid4()), "exp": exp, }, secret or settings.SECRET_KEY, algorithm=settings.ALGORITHM, ) class TestRefreshTokenErrorTaxonomy: """§6 test #11 — refresh-token error-detail taxonomy. `/auth/refresh` distinguishes idle expiry from generic invalid-token failures via `detail`, so the frontend can choose between the "session ended for security" banner and a plain logout redirect. """ @pytest.mark.asyncio async def test_idle_expired_refresh_returns_session_expired_idle( self, client: AsyncClient, test_user: dict ): token = _encode_refresh_token( sub=test_user["user_data"]["id"], exp=datetime.now(timezone.utc) - timedelta(seconds=1), ) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 401 assert response.json()["detail"] == "session_expired_idle" @pytest.mark.asyncio async def test_wrong_type_token_returns_invalid_refresh_token( self, client: AsyncClient, test_user: dict ): token = _encode_refresh_token( sub=test_user["user_data"]["id"], exp=datetime.now(timezone.utc) + timedelta(minutes=5), token_type="access", ) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 401 assert response.json()["detail"] == "invalid_refresh_token" @pytest.mark.asyncio async def test_bad_signature_returns_invalid_refresh_token( self, client: AsyncClient, test_user: dict ): token = _encode_refresh_token( sub=test_user["user_data"]["id"], exp=datetime.now(timezone.utc) + timedelta(minutes=5), secret="not-the-real-secret-key", ) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 401 assert response.json()["detail"] == "invalid_refresh_token" class TestSessionPolicyClaims: """§6 tests #1 and #14 — session-policy claims stamped at login. Every token-issuing endpoint embeds auth_time/idle_max/abs_max in the refresh JWT and surfaces idle_expires_at/absolute_expires_at on the response. """ @pytest.mark.asyncio async def test_login_json_embeds_session_claims_with_defaults( self, client: AsyncClient, test_user: dict ): before = datetime.now(timezone.utc) response = await client.post( "/api/v1/auth/login/json", json={ "email": test_user["email"], "password": test_user["password"], }, ) assert response.status_code == 200, response.json() body = response.json() after = datetime.now(timezone.utc) # Response surfaces both expiry windows as ISO strings. assert body["idle_expires_at"] is not None assert body["absolute_expires_at"] is not None idle_at = datetime.fromisoformat(body["idle_expires_at"]) abs_at = datetime.fromisoformat(body["absolute_expires_at"]) # Strict default: 3 days idle, 14 days absolute. assert timedelta(days=3) - timedelta(seconds=10) <= idle_at - before <= timedelta(days=3) + timedelta(seconds=10) assert timedelta(days=14) - timedelta(seconds=10) <= abs_at - before <= timedelta(days=14) + timedelta(seconds=10) # JWT carries the claims in seconds, plus auth_time as Unix seconds. decoded = jwt.decode( body["refresh_token"], settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) assert decoded["idle_max"] == 3 * 24 * 60 * 60 # 259200 assert decoded["abs_max"] == 14 * 24 * 60 * 60 # 1209600 assert int(before.timestamp()) <= decoded["auth_time"] <= int(after.timestamp()) @pytest.mark.asyncio async def test_refresh_carries_claims_forward_unchanged( self, client: AsyncClient, test_user: dict ): # Login produces the original session. login_resp = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) original_refresh = login_resp.json()["refresh_token"] original_payload = jwt.decode( original_refresh, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) # Refresh rotates the token but must carry auth_time/idle_max/abs_max # forward unchanged so the absolute window doesn't slide. refresh_resp = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {original_refresh}"}, ) assert refresh_resp.status_code == 200, refresh_resp.json() new_refresh = refresh_resp.json()["refresh_token"] new_payload = jwt.decode( new_refresh, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) assert new_payload["auth_time"] == original_payload["auth_time"] assert new_payload["idle_max"] == original_payload["idle_max"] assert new_payload["abs_max"] == original_payload["abs_max"] # Idle deadline does slide because exp = now + idle_max. assert new_payload["exp"] >= original_payload["exp"] # JTI rotates. assert new_payload["jti"] != original_payload["jti"] def _backdate_auth_time(refresh_token: str, *, seconds_back: int) -> str: """Re-sign a refresh JWT with an earlier auth_time, preserving JTI. The DB row in refresh_tokens is keyed on hash(jti), so preserving jti lets the atomic revoke step still find the row. Used to simulate "this session is past its absolute cap" without waiting two weeks. """ payload = jwt.decode( refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) payload["auth_time"] = payload["auth_time"] - seconds_back return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM) class TestAbsoluteCap: """§6 tests #8, #9, #12 — absolute-cap enforcement and grandfather path.""" @pytest.mark.asyncio async def test_refresh_at_absolute_deadline_rejects( self, client: AsyncClient, test_user: dict ): """§6 test #8 — boundary check uses `>=`, not `>`. A token whose auth_time + abs_max equals now() is expired, not valid. Backdate the original token's auth_time by exactly abs_max seconds so now >= deadline. """ login_resp = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) original = login_resp.json()["refresh_token"] abs_max = jwt.decode( original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] )["abs_max"] expired = _backdate_auth_time(original, seconds_back=abs_max) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {expired}"}, ) assert response.status_code == 401 assert response.json()["detail"] == "session_expired_absolute" @pytest.mark.asyncio async def test_absolute_expired_token_is_consumed( self, client: AsyncClient, test_user: dict ): """§6 test #9 — first attempt returns session_expired_absolute and revokes the row; second attempt sees the revoked row and returns invalid_refresh_token. Prevents replay of an absolute-expired token. """ login_resp = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) original = login_resp.json()["refresh_token"] abs_max = jwt.decode( original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] )["abs_max"] expired = _backdate_auth_time(original, seconds_back=abs_max + 1) first = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {expired}"}, ) assert first.status_code == 401 assert first.json()["detail"] == "session_expired_absolute" second = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {expired}"}, ) assert second.status_code == 401 assert second.json()["detail"] == "invalid_refresh_token" @pytest.mark.asyncio async def test_grandfather_path_for_legacy_token( self, client: AsyncClient, test_user: dict, test_db ): """§6 test #12 — refresh token issued before this PR (no auth_time claim) gets one successful rotation; the new token has fresh auth_time/idle_max/abs_max claims snapshotted from current policy. """ from app.core.security import hash_token from app.models.refresh_token import RefreshToken login_resp = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) original = login_resp.json()["refresh_token"] original_payload = jwt.decode( original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) # Strip the new claims to simulate a token issued before this PR. # JTI preserved so the DB-side revoke still finds the row. legacy_payload = { "sub": original_payload["sub"], "type": "refresh", "jti": original_payload["jti"], "exp": original_payload["exp"], } legacy_token = jwt.encode( legacy_payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM ) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {legacy_token}"}, ) assert response.status_code == 200, response.json() new_payload = jwt.decode( response.json()["refresh_token"], settings.SECRET_KEY, algorithms=[settings.ALGORITHM], ) assert new_payload.get("auth_time") is not None assert new_payload.get("idle_max") == 3 * 24 * 60 * 60 assert new_payload.get("abs_max") == 14 * 24 * 60 * 60 # auth_time was set to ~now during grandfather, not preserved from # the legacy token (since the legacy token didn't have one). now_unix = int(datetime.now(timezone.utc).timestamp()) assert abs(new_payload["auth_time"] - now_unix) < 10