Files
resolutionflow/backend/tests/test_session_policy.py
Michael Chihlas d6a02ee8da feat(auth): embed auth_time/idle_max/abs_max in refresh tokens at every login
Third commit in the session-expiration-policy series. Every refresh token
issued from now on carries the policy snapshot in its JWT (in seconds,
for direct Unix math), and every login/OAuth response surfaces both
expiry windows as ISO timestamps. /auth/refresh carries the claims
forward unchanged — including auth_time, which never resets on rotation.

Does NOT yet enforce the absolute cap — that's commit 4, sequenced so
the gate can be reverted independently if pilots hit an edge case.
But the wire is fully populated, and a grandfather path is already in
_refresh_session_tokens for tokens issued before this PR.

Key changes:
- core/security.py: create_refresh_token signature changes to
  (user_id, *, auth_time, idle_max_seconds, abs_max_seconds). Adds
  resolve_session_policy(account) -> (idle_minutes, absolute_minutes)
  applying defaults for NULL overrides.
- schemas/token.py + schemas/oauth.py: Token and OAuthCallbackResponse
  gain idle_expires_at + absolute_expires_at (Optional[datetime],
  Pydantic emits ISO 8601 UTC strings).
- endpoints/auth.py: new _mint_session_tokens(user, db) and
  _refresh_session_tokens(payload, user, db) helpers. /auth/login,
  /auth/login/json, and /auth/refresh now route through them. The
  refresh endpoint's pre-existing "Refresh token has been revoked"
  error normalized to the taxonomy detail "invalid_refresh_token".
- endpoints/oauth.py: both Google and Microsoft callbacks call
  _mint_session_tokens; OAuthCallbackResponse carries the expiry
  fields through.
- tests: two new cases in test_session_policy.py — login_json embeds
  the claims with strict defaults (3d/14d -> 259200/1209600 sec) and
  surfaces matching ISO expiry fields; refresh carries auth_time,
  idle_max, abs_max forward unchanged across rotation.

35/35 across test_session_policy + test_auth + test_oauth_callbacks +
test_account_invite_lookup + test_account_management.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-13 16:22:53 -04:00

182 lines
6.5 KiB
Python

"""Tests for the session-expiration-policy series.
See docs/plans/2026-05-13-session-expiration-policy.md.
Test numbers below correspond to the cases listed in §6 of the plan.
This file grows across commits:
- Commit 2: error-detail taxonomy (#11 + wrong-type + bad-signature)
- Commit 3: claims embedded at login + response fields surfaced (#1, #14)
"""
import uuid
from datetime import datetime, timedelta, timezone
import pytest
from httpx import AsyncClient
from jose import jwt
from app.core.config import settings
def _encode_refresh_token(
*,
sub: str,
exp: datetime,
token_type: str = "refresh",
secret: str | None = None,
) -> str:
"""Build a refresh JWT with arbitrary `exp` for testing.
Bypasses create_refresh_token so tests can produce already-expired
tokens, wrong-type tokens, or wrong-signature tokens.
"""
return jwt.encode(
{
"sub": sub,
"type": token_type,
"jti": str(uuid.uuid4()),
"exp": exp,
},
secret or settings.SECRET_KEY,
algorithm=settings.ALGORITHM,
)
class TestRefreshTokenErrorTaxonomy:
"""§6 test #11 — refresh-token error-detail taxonomy.
`/auth/refresh` distinguishes idle expiry from generic invalid-token
failures via `detail`, so the frontend can choose between the "session
ended for security" banner and a plain logout redirect.
"""
@pytest.mark.asyncio
async def test_idle_expired_refresh_returns_session_expired_idle(
self, client: AsyncClient, test_user: dict
):
token = _encode_refresh_token(
sub=test_user["user_data"]["id"],
exp=datetime.now(timezone.utc) - timedelta(seconds=1),
)
response = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 401
assert response.json()["detail"] == "session_expired_idle"
@pytest.mark.asyncio
async def test_wrong_type_token_returns_invalid_refresh_token(
self, client: AsyncClient, test_user: dict
):
token = _encode_refresh_token(
sub=test_user["user_data"]["id"],
exp=datetime.now(timezone.utc) + timedelta(minutes=5),
token_type="access",
)
response = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 401
assert response.json()["detail"] == "invalid_refresh_token"
@pytest.mark.asyncio
async def test_bad_signature_returns_invalid_refresh_token(
self, client: AsyncClient, test_user: dict
):
token = _encode_refresh_token(
sub=test_user["user_data"]["id"],
exp=datetime.now(timezone.utc) + timedelta(minutes=5),
secret="not-the-real-secret-key",
)
response = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 401
assert response.json()["detail"] == "invalid_refresh_token"
class TestSessionPolicyClaims:
"""§6 tests #1 and #14 — session-policy claims stamped at login.
Every token-issuing endpoint embeds auth_time/idle_max/abs_max in
the refresh JWT and surfaces idle_expires_at/absolute_expires_at on
the response.
"""
@pytest.mark.asyncio
async def test_login_json_embeds_session_claims_with_defaults(
self, client: AsyncClient, test_user: dict
):
before = datetime.now(timezone.utc)
response = await client.post(
"/api/v1/auth/login/json",
json={
"email": test_user["email"],
"password": test_user["password"],
},
)
assert response.status_code == 200, response.json()
body = response.json()
after = datetime.now(timezone.utc)
# Response surfaces both expiry windows as ISO strings.
assert body["idle_expires_at"] is not None
assert body["absolute_expires_at"] is not None
idle_at = datetime.fromisoformat(body["idle_expires_at"])
abs_at = datetime.fromisoformat(body["absolute_expires_at"])
# Strict default: 3 days idle, 14 days absolute.
assert timedelta(days=3) - timedelta(seconds=10) <= idle_at - before <= timedelta(days=3) + timedelta(seconds=10)
assert timedelta(days=14) - timedelta(seconds=10) <= abs_at - before <= timedelta(days=14) + timedelta(seconds=10)
# JWT carries the claims in seconds, plus auth_time as Unix seconds.
decoded = jwt.decode(
body["refresh_token"], settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
assert decoded["idle_max"] == 3 * 24 * 60 * 60 # 259200
assert decoded["abs_max"] == 14 * 24 * 60 * 60 # 1209600
assert int(before.timestamp()) <= decoded["auth_time"] <= int(after.timestamp())
@pytest.mark.asyncio
async def test_refresh_carries_claims_forward_unchanged(
self, client: AsyncClient, test_user: dict
):
# Login produces the original session.
login_resp = await client.post(
"/api/v1/auth/login/json",
json={"email": test_user["email"], "password": test_user["password"]},
)
original_refresh = login_resp.json()["refresh_token"]
original_payload = jwt.decode(
original_refresh, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
# Refresh rotates the token but must carry auth_time/idle_max/abs_max
# forward unchanged so the absolute window doesn't slide.
refresh_resp = await client.post(
"/api/v1/auth/refresh",
headers={"Authorization": f"Bearer {original_refresh}"},
)
assert refresh_resp.status_code == 200, refresh_resp.json()
new_refresh = refresh_resp.json()["refresh_token"]
new_payload = jwt.decode(
new_refresh, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
assert new_payload["auth_time"] == original_payload["auth_time"]
assert new_payload["idle_max"] == original_payload["idle_max"]
assert new_payload["abs_max"] == original_payload["abs_max"]
# Idle deadline does slide because exp = now + idle_max.
assert new_payload["exp"] >= original_payload["exp"]
# JTI rotates.
assert new_payload["jti"] != original_payload["jti"]