"""Tests for the session-expiration-policy series. See docs/plans/2026-05-13-session-expiration-policy.md. Test numbers below correspond to the cases listed in §6 of the plan. This file grows across commits: - Commit 2: error-detail taxonomy (#11 + wrong-type + bad-signature) - Commit 3: claims embedded at login + response fields surfaced (#1, #14) - Commit 4: absolute-cap enforcement + grandfather path (#8, #9, #12) - Commit 5: GET/PATCH /accounts/me/security (#2, #3, #4, #5, #7, #16) """ import uuid from datetime import datetime, timedelta, timezone import pytest from httpx import AsyncClient from jose import jwt from app.core.config import settings def _encode_refresh_token( *, sub: str, exp: datetime, token_type: str = "refresh", secret: str | None = None, ) -> str: """Build a refresh JWT with arbitrary `exp` for testing. Bypasses create_refresh_token so tests can produce already-expired tokens, wrong-type tokens, or wrong-signature tokens. """ return jwt.encode( { "sub": sub, "type": token_type, "jti": str(uuid.uuid4()), "exp": exp, }, secret or settings.SECRET_KEY, algorithm=settings.ALGORITHM, ) class TestRefreshTokenErrorTaxonomy: """§6 test #11 — refresh-token error-detail taxonomy. `/auth/refresh` distinguishes idle expiry from generic invalid-token failures via `detail`, so the frontend can choose between the "session ended for security" banner and a plain logout redirect. """ @pytest.mark.asyncio async def test_idle_expired_refresh_returns_session_expired_idle( self, client: AsyncClient, test_user: dict ): token = _encode_refresh_token( sub=test_user["user_data"]["id"], exp=datetime.now(timezone.utc) - timedelta(seconds=1), ) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 401 assert response.json()["detail"] == "session_expired_idle" @pytest.mark.asyncio async def test_wrong_type_token_returns_invalid_refresh_token( self, client: AsyncClient, test_user: dict ): token = _encode_refresh_token( sub=test_user["user_data"]["id"], exp=datetime.now(timezone.utc) + timedelta(minutes=5), token_type="access", ) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 401 assert response.json()["detail"] == "invalid_refresh_token" @pytest.mark.asyncio async def test_bad_signature_returns_invalid_refresh_token( self, client: AsyncClient, test_user: dict ): token = _encode_refresh_token( sub=test_user["user_data"]["id"], exp=datetime.now(timezone.utc) + timedelta(minutes=5), secret="not-the-real-secret-key", ) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 401 assert response.json()["detail"] == "invalid_refresh_token" class TestSessionPolicyClaims: """§6 tests #1 and #14 — session-policy claims stamped at login. Every token-issuing endpoint embeds auth_time/idle_max/abs_max in the refresh JWT and surfaces idle_expires_at/absolute_expires_at on the response. """ @pytest.mark.asyncio async def test_login_json_embeds_session_claims_with_defaults( self, client: AsyncClient, test_user: dict ): before = datetime.now(timezone.utc) response = await client.post( "/api/v1/auth/login/json", json={ "email": test_user["email"], "password": test_user["password"], }, ) assert response.status_code == 200, response.json() body = response.json() after = datetime.now(timezone.utc) # Response surfaces both expiry windows as ISO strings. assert body["idle_expires_at"] is not None assert body["absolute_expires_at"] is not None idle_at = datetime.fromisoformat(body["idle_expires_at"]) abs_at = datetime.fromisoformat(body["absolute_expires_at"]) # Strict default: 3 days idle, 14 days absolute. assert timedelta(days=3) - timedelta(seconds=10) <= idle_at - before <= timedelta(days=3) + timedelta(seconds=10) assert timedelta(days=14) - timedelta(seconds=10) <= abs_at - before <= timedelta(days=14) + timedelta(seconds=10) # JWT carries the claims in seconds, plus auth_time as Unix seconds. decoded = jwt.decode( body["refresh_token"], settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) assert decoded["idle_max"] == 3 * 24 * 60 * 60 # 259200 assert decoded["abs_max"] == 14 * 24 * 60 * 60 # 1209600 assert int(before.timestamp()) <= decoded["auth_time"] <= int(after.timestamp()) @pytest.mark.asyncio async def test_refresh_carries_claims_forward_unchanged( self, client: AsyncClient, test_user: dict ): # Login produces the original session. login_resp = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) original_refresh = login_resp.json()["refresh_token"] original_payload = jwt.decode( original_refresh, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) # Refresh rotates the token but must carry auth_time/idle_max/abs_max # forward unchanged so the absolute window doesn't slide. refresh_resp = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {original_refresh}"}, ) assert refresh_resp.status_code == 200, refresh_resp.json() new_refresh = refresh_resp.json()["refresh_token"] new_payload = jwt.decode( new_refresh, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) assert new_payload["auth_time"] == original_payload["auth_time"] assert new_payload["idle_max"] == original_payload["idle_max"] assert new_payload["abs_max"] == original_payload["abs_max"] # Idle deadline does slide because exp = now + idle_max. assert new_payload["exp"] >= original_payload["exp"] # JTI rotates. assert new_payload["jti"] != original_payload["jti"] def _backdate_auth_time(refresh_token: str, *, seconds_back: int) -> str: """Re-sign a refresh JWT with an earlier auth_time, preserving JTI. The DB row in refresh_tokens is keyed on hash(jti), so preserving jti lets the atomic revoke step still find the row. Used to simulate "this session is past its absolute cap" without waiting two weeks. """ payload = jwt.decode( refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) payload["auth_time"] = payload["auth_time"] - seconds_back return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM) class TestSessionPolicyEndpoint: """§6 tests #2, #3, #4, #5, #7, #16 — GET/PATCH /accounts/me/security.""" @pytest.mark.asyncio async def test_get_returns_defaults_and_bounds( self, client: AsyncClient, auth_headers: dict ): response = await client.get( "/api/v1/accounts/me/security", headers=auth_headers ) assert response.status_code == 200, response.json() body = response.json() # No override yet -> effective values are the system defaults. assert body["idle_minutes"] is None assert body["absolute_minutes"] is None assert body["effective_idle_minutes"] == 4320 # 3d Strict default assert body["effective_absolute_minutes"] == 20160 # 14d assert body["idle_minutes_min"] == 15 assert body["idle_minutes_max"] == 43200 assert body["absolute_minutes_min"] == 60 assert body["absolute_minutes_max"] == 129600 @pytest.mark.asyncio async def test_patch_persists_override_and_returns_new_state( self, client: AsyncClient, auth_headers: dict ): response = await client.patch( "/api/v1/accounts/me/security", headers=auth_headers, json={"idle_minutes": 60, "absolute_minutes": 240}, ) assert response.status_code == 200, response.json() body = response.json() assert body["idle_minutes"] == 60 assert body["absolute_minutes"] == 240 assert body["effective_idle_minutes"] == 60 assert body["effective_absolute_minutes"] == 240 # Next login picks up the new policy. login_resp = await client.post( "/api/v1/auth/login/json", json={"email": "test@example.com", "password": "TestPassword123!"}, ) new_payload = jwt.decode( login_resp.json()["refresh_token"], settings.SECRET_KEY, algorithms=[settings.ALGORITHM], ) assert new_payload["idle_max"] == 60 * 60 # 3600 seconds assert new_payload["abs_max"] == 240 * 60 # 14400 seconds @pytest.mark.asyncio async def test_patch_rejects_idle_below_min( self, client: AsyncClient, auth_headers: dict ): response = await client.patch( "/api/v1/accounts/me/security", headers=auth_headers, json={"idle_minutes": 5, "absolute_minutes": 60}, ) assert response.status_code == 422 assert "idle_minutes" in response.json()["detail"] @pytest.mark.asyncio async def test_patch_rejects_absolute_above_max( self, client: AsyncClient, auth_headers: dict ): response = await client.patch( "/api/v1/accounts/me/security", headers=auth_headers, json={"absolute_minutes": 200000}, ) assert response.status_code == 422 @pytest.mark.asyncio async def test_patch_rejects_idle_greater_than_absolute_both_set( self, client: AsyncClient, auth_headers: dict ): response = await client.patch( "/api/v1/accounts/me/security", headers=auth_headers, json={"idle_minutes": 300, "absolute_minutes": 120}, ) assert response.status_code == 422 assert "exceed" in response.json()["detail"].lower() @pytest.mark.asyncio async def test_patch_rejects_partial_override_when_effective_invalid( self, client: AsyncClient, auth_headers: dict ): """§6 test #5 — partial override: idle=43200, absolute=NULL -> effective idle (43200) > effective absolute (20160 default) -> 422. """ response = await client.patch( "/api/v1/accounts/me/security", headers=auth_headers, json={"idle_minutes": 43200, "absolute_minutes": None}, ) assert response.status_code == 422 assert "exceed" in response.json()["detail"].lower() @pytest.mark.asyncio async def test_non_owner_cannot_patch( self, client: AsyncClient, test_user: dict, test_db ): """§6 test #7 — engineer role is forbidden.""" from app.models.user import User from sqlalchemy import select # Add a second user in the same account with account_role=engineer. result = await test_db.execute( select(User).where(User.email == test_user["email"]) ) owner = result.scalar_one() engineer = User( email="engineer-policy@example.com", password_hash=owner.password_hash, # reuse the bcrypt hash name="Engineer", role="engineer", is_super_admin=False, is_active=True, account_id=owner.account_id, account_role="engineer", email_verified_at=datetime.now(timezone.utc), ) test_db.add(engineer) await test_db.commit() login_resp = await client.post( "/api/v1/auth/login/json", json={ "email": "engineer-policy@example.com", "password": test_user["password"], }, ) assert login_resp.status_code == 200 engineer_headers = { "Authorization": f"Bearer {login_resp.json()['access_token']}" } response = await client.patch( "/api/v1/accounts/me/security", headers=engineer_headers, json={"idle_minutes": 60, "absolute_minutes": 240}, ) assert response.status_code == 403 @pytest.mark.asyncio async def test_patch_writes_audit_row( self, client: AsyncClient, auth_headers: dict, test_db ): """§6 test #16 — PATCH emits one account.session_policy_update audit event with old/new + effective_old/new payload. """ from app.models.audit_log import AuditLog from sqlalchemy import select response = await client.patch( "/api/v1/accounts/me/security", headers=auth_headers, json={"idle_minutes": 120, "absolute_minutes": 480}, ) assert response.status_code == 200 result = await test_db.execute( select(AuditLog).where(AuditLog.action == "account.session_policy_update") ) rows = result.scalars().all() assert len(rows) == 1 entry = rows[0] assert entry.resource_type == "account" assert entry.details["new"] == {"idle_minutes": 120, "absolute_minutes": 480} assert entry.details["effective_new"] == { "idle_minutes": 120, "absolute_minutes": 480, } assert entry.details["effective_old"]["idle_minutes"] == 4320 # default assert entry.details["effective_old"]["absolute_minutes"] == 20160 class TestAbsoluteCap: """§6 tests #8, #9, #12 — absolute-cap enforcement and grandfather path.""" @pytest.mark.asyncio async def test_refresh_at_absolute_deadline_rejects( self, client: AsyncClient, test_user: dict ): """§6 test #8 — boundary check uses `>=`, not `>`. A token whose auth_time + abs_max equals now() is expired, not valid. Backdate the original token's auth_time by exactly abs_max seconds so now >= deadline. """ login_resp = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) original = login_resp.json()["refresh_token"] abs_max = jwt.decode( original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] )["abs_max"] expired = _backdate_auth_time(original, seconds_back=abs_max) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {expired}"}, ) assert response.status_code == 401 assert response.json()["detail"] == "session_expired_absolute" @pytest.mark.asyncio async def test_absolute_expired_token_is_consumed( self, client: AsyncClient, test_user: dict ): """§6 test #9 — first attempt returns session_expired_absolute and revokes the row; second attempt sees the revoked row and returns invalid_refresh_token. Prevents replay of an absolute-expired token. """ login_resp = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) original = login_resp.json()["refresh_token"] abs_max = jwt.decode( original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] )["abs_max"] expired = _backdate_auth_time(original, seconds_back=abs_max + 1) first = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {expired}"}, ) assert first.status_code == 401 assert first.json()["detail"] == "session_expired_absolute" second = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {expired}"}, ) assert second.status_code == 401 assert second.json()["detail"] == "invalid_refresh_token" @pytest.mark.asyncio async def test_grandfather_path_for_legacy_token( self, client: AsyncClient, test_user: dict, test_db ): """§6 test #12 — refresh token issued before this PR (no auth_time claim) gets one successful rotation; the new token has fresh auth_time/idle_max/abs_max claims snapshotted from current policy. """ from app.core.security import hash_token from app.models.refresh_token import RefreshToken login_resp = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) original = login_resp.json()["refresh_token"] original_payload = jwt.decode( original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) # Strip the new claims to simulate a token issued before this PR. # JTI preserved so the DB-side revoke still finds the row. legacy_payload = { "sub": original_payload["sub"], "type": "refresh", "jti": original_payload["jti"], "exp": original_payload["exp"], } legacy_token = jwt.encode( legacy_payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM ) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {legacy_token}"}, ) assert response.status_code == 200, response.json() new_payload = jwt.decode( response.json()["refresh_token"], settings.SECRET_KEY, algorithms=[settings.ALGORITHM], ) assert new_payload.get("auth_time") is not None assert new_payload.get("idle_max") == 3 * 24 * 60 * 60 assert new_payload.get("abs_max") == 14 * 24 * 60 * 60 # auth_time was set to ~now during grandfather, not preserved from # the legacy token (since the legacy token didn't have one). now_unix = int(datetime.now(timezone.utc).timestamp()) assert abs(new_payload["auth_time"] - now_unix) < 10