"""Tests for the session-expiration-policy series. See docs/plans/2026-05-13-session-expiration-policy.md. Test numbers below correspond to the cases listed in §6 of the plan. This file grows across commits: - Commit 2: error-detail taxonomy (#11 + wrong-type + bad-signature) - Commit 3: claims embedded at login + response fields surfaced (#1, #14) - Commit 4: absolute-cap enforcement + grandfather path (#8, #9, #12) - Commit 5: GET/PATCH /accounts/me/security (#2, #3, #4, #5, #7, #16) - Commit 6: POST /accounts/me/security/revoke-sessions (#17-#22) """ import uuid from datetime import datetime, timedelta, timezone import pytest from httpx import AsyncClient from jose import jwt from app.core.config import settings def _encode_refresh_token( *, sub: str, exp: datetime, token_type: str = "refresh", secret: str | None = None, ) -> str: """Build a refresh JWT with arbitrary `exp` for testing. Bypasses create_refresh_token so tests can produce already-expired tokens, wrong-type tokens, or wrong-signature tokens. """ return jwt.encode( { "sub": sub, "type": token_type, "jti": str(uuid.uuid4()), "exp": exp, }, secret or settings.SECRET_KEY, algorithm=settings.ALGORITHM, ) class TestRefreshTokenErrorTaxonomy: """§6 test #11 — refresh-token error-detail taxonomy. `/auth/refresh` distinguishes idle expiry from generic invalid-token failures via `detail`, so the frontend can choose between the "session ended for security" banner and a plain logout redirect. """ @pytest.mark.asyncio async def test_idle_expired_refresh_returns_session_expired_idle( self, client: AsyncClient, test_user: dict ): token = _encode_refresh_token( sub=test_user["user_data"]["id"], exp=datetime.now(timezone.utc) - timedelta(seconds=1), ) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 401 assert response.json()["detail"] == "session_expired_idle" @pytest.mark.asyncio async def test_wrong_type_token_returns_invalid_refresh_token( self, client: AsyncClient, test_user: dict ): token = _encode_refresh_token( sub=test_user["user_data"]["id"], exp=datetime.now(timezone.utc) + timedelta(minutes=5), token_type="access", ) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 401 assert response.json()["detail"] == "invalid_refresh_token" @pytest.mark.asyncio async def test_bad_signature_returns_invalid_refresh_token( self, client: AsyncClient, test_user: dict ): token = _encode_refresh_token( sub=test_user["user_data"]["id"], exp=datetime.now(timezone.utc) + timedelta(minutes=5), secret="not-the-real-secret-key", ) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {token}"}, ) assert response.status_code == 401 assert response.json()["detail"] == "invalid_refresh_token" class TestSessionPolicyClaims: """§6 tests #1 and #14 — session-policy claims stamped at login. Every token-issuing endpoint embeds auth_time/idle_max/abs_max in the refresh JWT and surfaces idle_expires_at/absolute_expires_at on the response. """ @pytest.mark.asyncio async def test_login_json_embeds_session_claims_with_defaults( self, client: AsyncClient, test_user: dict ): before = datetime.now(timezone.utc) response = await client.post( "/api/v1/auth/login/json", json={ "email": test_user["email"], "password": test_user["password"], }, ) assert response.status_code == 200, response.json() body = response.json() after = datetime.now(timezone.utc) # Response surfaces both expiry windows as ISO strings. assert body["idle_expires_at"] is not None assert body["absolute_expires_at"] is not None idle_at = datetime.fromisoformat(body["idle_expires_at"]) abs_at = datetime.fromisoformat(body["absolute_expires_at"]) # Strict default: 3 days idle, 14 days absolute. assert timedelta(days=3) - timedelta(seconds=10) <= idle_at - before <= timedelta(days=3) + timedelta(seconds=10) assert timedelta(days=14) - timedelta(seconds=10) <= abs_at - before <= timedelta(days=14) + timedelta(seconds=10) # JWT carries the claims in seconds, plus auth_time as Unix seconds. decoded = jwt.decode( body["refresh_token"], settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) assert decoded["idle_max"] == 3 * 24 * 60 * 60 # 259200 assert decoded["abs_max"] == 14 * 24 * 60 * 60 # 1209600 assert int(before.timestamp()) <= decoded["auth_time"] <= int(after.timestamp()) @pytest.mark.asyncio async def test_refresh_carries_claims_forward_unchanged( self, client: AsyncClient, test_user: dict ): # Login produces the original session. login_resp = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) original_refresh = login_resp.json()["refresh_token"] original_payload = jwt.decode( original_refresh, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) # Refresh rotates the token but must carry auth_time/idle_max/abs_max # forward unchanged so the absolute window doesn't slide. refresh_resp = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {original_refresh}"}, ) assert refresh_resp.status_code == 200, refresh_resp.json() new_refresh = refresh_resp.json()["refresh_token"] new_payload = jwt.decode( new_refresh, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) assert new_payload["auth_time"] == original_payload["auth_time"] assert new_payload["idle_max"] == original_payload["idle_max"] assert new_payload["abs_max"] == original_payload["abs_max"] # Idle deadline does slide because exp = now + idle_max. assert new_payload["exp"] >= original_payload["exp"] # JTI rotates. assert new_payload["jti"] != original_payload["jti"] def _backdate_auth_time(refresh_token: str, *, seconds_back: int) -> str: """Re-sign a refresh JWT with an earlier auth_time, preserving JTI. The DB row in refresh_tokens is keyed on hash(jti), so preserving jti lets the atomic revoke step still find the row. Used to simulate "this session is past its absolute cap" without waiting two weeks. """ payload = jwt.decode( refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) payload["auth_time"] = payload["auth_time"] - seconds_back return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM) class TestSessionPolicyEndpoint: """§6 tests #2, #3, #4, #5, #7, #16 — GET/PATCH /accounts/me/security.""" @pytest.mark.asyncio async def test_get_returns_defaults_and_bounds( self, client: AsyncClient, auth_headers: dict, test_user: dict ): response = await client.get( "/api/v1/accounts/me/security", headers=auth_headers ) assert response.status_code == 200, response.json() body = response.json() # No override yet -> effective values are the system defaults. assert body["idle_minutes"] is None assert body["absolute_minutes"] is None assert body["effective_idle_minutes"] == 4320 # 3d Strict default assert body["effective_absolute_minutes"] == 20160 # 14d assert body["idle_minutes_min"] == 15 assert body["idle_minutes_max"] == 43200 assert body["absolute_minutes_min"] == 60 assert body["absolute_minutes_max"] == 129600 # active_users reflects users with un-revoked refresh tokens. # auth_headers logged the owner in once, so they should appear. assert isinstance(body["active_users"], list) assert len(body["active_users"]) >= 1 emails = [u["email"] for u in body["active_users"]] assert test_user["email"] in emails # Schema check on one row. first = body["active_users"][0] assert "user_id" in first assert "name" in first assert "email" in first assert "last_login_at" in first @pytest.mark.asyncio async def test_patch_persists_override_and_returns_new_state( self, client: AsyncClient, auth_headers: dict ): response = await client.patch( "/api/v1/accounts/me/security", headers=auth_headers, json={"idle_minutes": 60, "absolute_minutes": 240}, ) assert response.status_code == 200, response.json() body = response.json() assert body["idle_minutes"] == 60 assert body["absolute_minutes"] == 240 assert body["effective_idle_minutes"] == 60 assert body["effective_absolute_minutes"] == 240 # Next login picks up the new policy. login_resp = await client.post( "/api/v1/auth/login/json", json={"email": "test@example.com", "password": "TestPassword123!"}, ) new_payload = jwt.decode( login_resp.json()["refresh_token"], settings.SECRET_KEY, algorithms=[settings.ALGORITHM], ) assert new_payload["idle_max"] == 60 * 60 # 3600 seconds assert new_payload["abs_max"] == 240 * 60 # 14400 seconds @pytest.mark.asyncio async def test_patch_rejects_idle_below_min( self, client: AsyncClient, auth_headers: dict ): response = await client.patch( "/api/v1/accounts/me/security", headers=auth_headers, json={"idle_minutes": 5, "absolute_minutes": 60}, ) assert response.status_code == 422 assert "idle_minutes" in response.json()["detail"] @pytest.mark.asyncio async def test_patch_rejects_absolute_above_max( self, client: AsyncClient, auth_headers: dict ): response = await client.patch( "/api/v1/accounts/me/security", headers=auth_headers, json={"absolute_minutes": 200000}, ) assert response.status_code == 422 @pytest.mark.asyncio async def test_patch_rejects_idle_greater_than_absolute_both_set( self, client: AsyncClient, auth_headers: dict ): response = await client.patch( "/api/v1/accounts/me/security", headers=auth_headers, json={"idle_minutes": 300, "absolute_minutes": 120}, ) assert response.status_code == 422 assert "exceed" in response.json()["detail"].lower() @pytest.mark.asyncio async def test_patch_rejects_partial_override_when_effective_invalid( self, client: AsyncClient, auth_headers: dict ): """§6 test #5 — partial override: idle=43200, absolute=NULL -> effective idle (43200) > effective absolute (20160 default) -> 422. """ response = await client.patch( "/api/v1/accounts/me/security", headers=auth_headers, json={"idle_minutes": 43200, "absolute_minutes": None}, ) assert response.status_code == 422 assert "exceed" in response.json()["detail"].lower() @pytest.mark.asyncio async def test_non_owner_cannot_patch( self, client: AsyncClient, test_user: dict, test_db ): """§6 test #7 — engineer role is forbidden.""" from app.models.user import User from sqlalchemy import select # Add a second user in the same account with account_role=engineer. result = await test_db.execute( select(User).where(User.email == test_user["email"]) ) owner = result.scalar_one() engineer = User( email="engineer-policy@example.com", password_hash=owner.password_hash, # reuse the bcrypt hash name="Engineer", role="engineer", is_super_admin=False, is_active=True, account_id=owner.account_id, account_role="engineer", email_verified_at=datetime.now(timezone.utc), ) test_db.add(engineer) await test_db.commit() login_resp = await client.post( "/api/v1/auth/login/json", json={ "email": "engineer-policy@example.com", "password": test_user["password"], }, ) assert login_resp.status_code == 200 engineer_headers = { "Authorization": f"Bearer {login_resp.json()['access_token']}" } response = await client.patch( "/api/v1/accounts/me/security", headers=engineer_headers, json={"idle_minutes": 60, "absolute_minutes": 240}, ) assert response.status_code == 403 @pytest.mark.asyncio async def test_patch_writes_audit_row( self, client: AsyncClient, auth_headers: dict, test_db ): """§6 test #16 — PATCH emits one account.session_policy_update audit event with old/new + effective_old/new payload. """ from app.models.audit_log import AuditLog from sqlalchemy import select response = await client.patch( "/api/v1/accounts/me/security", headers=auth_headers, json={"idle_minutes": 120, "absolute_minutes": 480}, ) assert response.status_code == 200 result = await test_db.execute( select(AuditLog).where(AuditLog.action == "account.session_policy_update") ) rows = result.scalars().all() assert len(rows) == 1 entry = rows[0] assert entry.resource_type == "account" assert entry.details["new"] == {"idle_minutes": 120, "absolute_minutes": 480} assert entry.details["effective_new"] == { "idle_minutes": 120, "absolute_minutes": 480, } assert entry.details["effective_old"]["idle_minutes"] == 4320 # default assert entry.details["effective_old"]["absolute_minutes"] == 20160 async def _seed_extra_account_user( test_db, *, email: str, account_id, password_hash: str, role: str = "engineer" ): """Add a second user under an existing account for revoke-scope tests.""" from app.models.user import User user = User( email=email, password_hash=password_hash, name=email, role="engineer", is_super_admin=False, is_active=True, account_id=account_id, account_role=role, email_verified_at=datetime.now(timezone.utc), ) test_db.add(user) await test_db.commit() return user class TestBulkRevoke: """§6 tests #17-#22 — POST /accounts/me/security/revoke-sessions.""" @pytest.mark.asyncio async def test_revoke_all_kills_callers_own_session( self, client: AsyncClient, test_user: dict, test_db ): """§6 test #17 — scope=all includes the caller's own token. After the response, the caller's refresh_token gets invalid_refresh_token on next /auth/refresh. """ from app.models.user import User from sqlalchemy import select owner = ( await test_db.execute( select(User).where(User.email == test_user["email"]) ) ).scalar_one() await _seed_extra_account_user( test_db, email="member-revoke-all@example.com", account_id=owner.account_id, password_hash=owner.password_hash, ) # Owner logs in (also seeds owner's refresh-token row). owner_login = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) owner_refresh = owner_login.json()["refresh_token"] owner_access = owner_login.json()["access_token"] # Member also logs in so there's another active refresh-token row. member_login = await client.post( "/api/v1/auth/login/json", json={ "email": "member-revoke-all@example.com", "password": test_user["password"], }, ) assert member_login.status_code == 200 response = await client.post( "/api/v1/accounts/me/security/revoke-sessions", headers={"Authorization": f"Bearer {owner_access}"}, json={"scope": "all"}, ) assert response.status_code == 200, response.json() assert response.json()["revoked_count"] == 2 # Owner's own refresh now returns invalid_refresh_token. retry = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {owner_refresh}"}, ) assert retry.status_code == 401 assert retry.json()["detail"] == "invalid_refresh_token" @pytest.mark.asyncio async def test_revoke_others_preserves_callers_session( self, client: AsyncClient, test_user: dict, test_db ): """§6 test #18 — scope=others excludes the caller's user_id from the bulk update. Caller can still refresh; other users cannot. """ from app.models.user import User from sqlalchemy import select owner = ( await test_db.execute( select(User).where(User.email == test_user["email"]) ) ).scalar_one() await _seed_extra_account_user( test_db, email="member-revoke-others@example.com", account_id=owner.account_id, password_hash=owner.password_hash, ) owner_login = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) owner_refresh = owner_login.json()["refresh_token"] owner_access = owner_login.json()["access_token"] member_login = await client.post( "/api/v1/auth/login/json", json={ "email": "member-revoke-others@example.com", "password": test_user["password"], }, ) member_refresh = member_login.json()["refresh_token"] response = await client.post( "/api/v1/accounts/me/security/revoke-sessions", headers={"Authorization": f"Bearer {owner_access}"}, json={"scope": "others"}, ) assert response.status_code == 200 assert response.json()["revoked_count"] == 1 # Owner's refresh still works. owner_retry = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {owner_refresh}"}, ) assert owner_retry.status_code == 200 # Member's refresh is dead. member_retry = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {member_refresh}"}, ) assert member_retry.status_code == 401 assert member_retry.json()["detail"] == "invalid_refresh_token" @pytest.mark.asyncio async def test_revoke_is_account_scoped( self, client: AsyncClient, test_user: dict, test_admin: dict ): """§6 test #19 — owner of account A cannot revoke tokens in account B. test_admin lives in its own account. After test_user's owner runs revoke-all, test_admin's session continues to work. """ owner_login = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) owner_access = owner_login.json()["access_token"] admin_login = await client.post( "/api/v1/auth/login/json", json={"email": test_admin["email"], "password": test_admin["password"]}, ) admin_refresh = admin_login.json()["refresh_token"] response = await client.post( "/api/v1/accounts/me/security/revoke-sessions", headers={"Authorization": f"Bearer {owner_access}"}, json={"scope": "all"}, ) assert response.status_code == 200 # Only test_user's own session is revoked. assert response.json()["revoked_count"] == 1 admin_retry = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {admin_refresh}"}, ) assert admin_retry.status_code == 200 @pytest.mark.asyncio async def test_revoke_engineer_forbidden( self, client: AsyncClient, test_user: dict, test_db ): """§6 test #20 — engineer-role member gets 403.""" from app.models.user import User from sqlalchemy import select owner = ( await test_db.execute( select(User).where(User.email == test_user["email"]) ) ).scalar_one() await _seed_extra_account_user( test_db, email="engineer-revoke@example.com", account_id=owner.account_id, password_hash=owner.password_hash, ) engineer_login = await client.post( "/api/v1/auth/login/json", json={ "email": "engineer-revoke@example.com", "password": test_user["password"], }, ) engineer_access = engineer_login.json()["access_token"] response = await client.post( "/api/v1/accounts/me/security/revoke-sessions", headers={"Authorization": f"Bearer {engineer_access}"}, json={"scope": "all"}, ) assert response.status_code == 403 @pytest.mark.asyncio async def test_revoke_writes_audit_row( self, client: AsyncClient, test_user: dict, test_db ): """§6 test #21 — emits one account.sessions_revoked_bulk event.""" from app.models.audit_log import AuditLog from sqlalchemy import select owner_login = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) owner_access = owner_login.json()["access_token"] response = await client.post( "/api/v1/accounts/me/security/revoke-sessions", headers={"Authorization": f"Bearer {owner_access}"}, json={"scope": "all"}, ) assert response.status_code == 200 result = await test_db.execute( select(AuditLog).where(AuditLog.action == "account.sessions_revoked_bulk") ) rows = result.scalars().all() assert len(rows) == 1 entry = rows[0] assert entry.details["scope"] == "all" assert entry.details["revoked_count"] == 1 @pytest.mark.asyncio async def test_revoke_is_idempotent( self, client: AsyncClient, test_user: dict ): """§6 test #22 — second immediate POST returns revoked_count=0 (no already-revoked rows get double-stamped or counted again). """ owner_login = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) owner_access = owner_login.json()["access_token"] first = await client.post( "/api/v1/accounts/me/security/revoke-sessions", headers={"Authorization": f"Bearer {owner_access}"}, json={"scope": "others"}, # owner's own session preserved ) assert first.status_code == 200 second = await client.post( "/api/v1/accounts/me/security/revoke-sessions", headers={"Authorization": f"Bearer {owner_access}"}, json={"scope": "others"}, ) assert second.status_code == 200 assert second.json()["revoked_count"] == 0 class TestAbsoluteCap: """§6 tests #8, #9, #12 — absolute-cap enforcement and grandfather path.""" @pytest.mark.asyncio async def test_refresh_at_absolute_deadline_rejects( self, client: AsyncClient, test_user: dict ): """§6 test #8 — boundary check uses `>=`, not `>`. A token whose auth_time + abs_max equals now() is expired, not valid. Backdate the original token's auth_time by exactly abs_max seconds so now >= deadline. """ login_resp = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) original = login_resp.json()["refresh_token"] abs_max = jwt.decode( original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] )["abs_max"] expired = _backdate_auth_time(original, seconds_back=abs_max) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {expired}"}, ) assert response.status_code == 401 assert response.json()["detail"] == "session_expired_absolute" @pytest.mark.asyncio async def test_absolute_expired_token_is_consumed( self, client: AsyncClient, test_user: dict ): """§6 test #9 — first attempt returns session_expired_absolute and revokes the row; second attempt sees the revoked row and returns invalid_refresh_token. Prevents replay of an absolute-expired token. """ login_resp = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) original = login_resp.json()["refresh_token"] abs_max = jwt.decode( original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] )["abs_max"] expired = _backdate_auth_time(original, seconds_back=abs_max + 1) first = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {expired}"}, ) assert first.status_code == 401 assert first.json()["detail"] == "session_expired_absolute" second = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {expired}"}, ) assert second.status_code == 401 assert second.json()["detail"] == "invalid_refresh_token" @pytest.mark.asyncio async def test_grandfather_path_for_legacy_token( self, client: AsyncClient, test_user: dict, test_db ): """§6 test #12 — refresh token issued before this PR (no auth_time claim) gets one successful rotation; the new token has fresh auth_time/idle_max/abs_max claims snapshotted from current policy. """ from app.core.security import hash_token from app.models.refresh_token import RefreshToken login_resp = await client.post( "/api/v1/auth/login/json", json={"email": test_user["email"], "password": test_user["password"]}, ) original = login_resp.json()["refresh_token"] original_payload = jwt.decode( original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) # Strip the new claims to simulate a token issued before this PR. # JTI preserved so the DB-side revoke still finds the row. legacy_payload = { "sub": original_payload["sub"], "type": "refresh", "jti": original_payload["jti"], "exp": original_payload["exp"], } legacy_token = jwt.encode( legacy_payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM ) response = await client.post( "/api/v1/auth/refresh", headers={"Authorization": f"Bearer {legacy_token}"}, ) assert response.status_code == 200, response.json() new_payload = jwt.decode( response.json()["refresh_token"], settings.SECRET_KEY, algorithms=[settings.ALGORITHM], ) assert new_payload.get("auth_time") is not None assert new_payload.get("idle_max") == 3 * 24 * 60 * 60 assert new_payload.get("abs_max") == 14 * 24 * 60 * 60 # auth_time was set to ~now during grandfather, not preserved from # the legacy token (since the legacy token didn't have one). now_unix = int(datetime.now(timezone.utc).timestamp()) assert abs(new_payload["auth_time"] - now_unix) < 10