Eighth commit in the session-expiration-policy series. Surfaces all
the owner controls and user-facing expiry UX that the prior commits
plumbed through, designed end-to-end via /plan-design-review (initial
4/10 -> final 9/10; 7 decisions locked in the plan).
Backend additions:
- accounts/me/security GET response gains active_users: list of
{user_id, name, email, last_login_at} for users in this account
with at least one un-revoked refresh token. Joined query on
refresh_tokens + users, distinct, ordered by last_login desc.
Drives the Active Sessions section.
Frontend additions:
- api/accountSecurity.ts: typed client for GET/PATCH/revoke-sessions.
- hooks/useAuthSessionExpiry.ts: reads idle/absolute expiry from the
auth store, returns warning ('none'|'soon'|'now') + reason
('idle'|'absolute') so consumers can pick the right UX for the
closer window. Re-evaluates every 30s.
- components/common/SessionExpiryToast.tsx: top-of-app notice that
fires at T-5min. Idle case: warning-amber tone, [Stay signed in]
button hits authApi.refresh() and updates the store on success.
Absolute case: info-cyan tone, [Sign in now] link to /login (no
recoverable action). Dismissable, doesn't re-fire after dismissal.
- components/account/RevokeSessionsModal.tsx: confirmation modal for
the two bulk-revoke scopes. Title, body, and confirm-label vary by
scope; danger-styled confirm button.
- pages/account/AccountSecuritySettingsPage.tsx: the main page.
Header (Shield icon), intro, Policy card with Strict/Standard/Custom
radios + always-visible-disabled Custom inputs (idle/absolute
minutes) with inline validation, Save button + emerald success ping,
info note about 'applies at next login'. Active sessions card with
count-aware copy, list of {name, email, last-login-ago} rows
(caller tagged '(you)'), two buttons — 'except me' hidden when
count=1, 'sign me out and everyone else' uses danger-tinted styling.
- pages/AccountSettingsPage.tsx: 'Session security' row added to the
owner-only settings list.
- router.tsx: /account/security route, owner-gated via ProtectedRoute.
- pages/LoginPage.tsx: cyan info-tone banner above form when
?reason=session_expired is in the URL.
- components/layout/AppLayout.tsx: mounts <SessionExpiryToast />.
Scope=all bulk-revoke UX (the most jarring moment): on success,
toast.success(N sessions), 1.5s delay, then clear localStorage +
useAuthStore.logout() + window.location='/login' (no banner — the
owner just did this).
Backend tests: existing 22/22 still green plus the GET test now
asserts active_users is present + non-empty after login. Frontend:
tsc clean, authStore test 2/2.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
783 lines
29 KiB
Python
783 lines
29 KiB
Python
"""Tests for the session-expiration-policy series.
|
|
|
|
See docs/plans/2026-05-13-session-expiration-policy.md.
|
|
Test numbers below correspond to the cases listed in §6 of the plan.
|
|
|
|
This file grows across commits:
|
|
- Commit 2: error-detail taxonomy (#11 + wrong-type + bad-signature)
|
|
- Commit 3: claims embedded at login + response fields surfaced (#1, #14)
|
|
- Commit 4: absolute-cap enforcement + grandfather path (#8, #9, #12)
|
|
- Commit 5: GET/PATCH /accounts/me/security (#2, #3, #4, #5, #7, #16)
|
|
- Commit 6: POST /accounts/me/security/revoke-sessions (#17-#22)
|
|
"""
|
|
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
from jose import jwt
|
|
|
|
from app.core.config import settings
|
|
|
|
|
|
def _encode_refresh_token(
|
|
*,
|
|
sub: str,
|
|
exp: datetime,
|
|
token_type: str = "refresh",
|
|
secret: str | None = None,
|
|
) -> str:
|
|
"""Build a refresh JWT with arbitrary `exp` for testing.
|
|
|
|
Bypasses create_refresh_token so tests can produce already-expired
|
|
tokens, wrong-type tokens, or wrong-signature tokens.
|
|
"""
|
|
return jwt.encode(
|
|
{
|
|
"sub": sub,
|
|
"type": token_type,
|
|
"jti": str(uuid.uuid4()),
|
|
"exp": exp,
|
|
},
|
|
secret or settings.SECRET_KEY,
|
|
algorithm=settings.ALGORITHM,
|
|
)
|
|
|
|
|
|
class TestRefreshTokenErrorTaxonomy:
|
|
"""§6 test #11 — refresh-token error-detail taxonomy.
|
|
|
|
`/auth/refresh` distinguishes idle expiry from generic invalid-token
|
|
failures via `detail`, so the frontend can choose between the "session
|
|
ended for security" banner and a plain logout redirect.
|
|
"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_idle_expired_refresh_returns_session_expired_idle(
|
|
self, client: AsyncClient, test_user: dict
|
|
):
|
|
token = _encode_refresh_token(
|
|
sub=test_user["user_data"]["id"],
|
|
exp=datetime.now(timezone.utc) - timedelta(seconds=1),
|
|
)
|
|
|
|
response = await client.post(
|
|
"/api/v1/auth/refresh",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
assert response.json()["detail"] == "session_expired_idle"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wrong_type_token_returns_invalid_refresh_token(
|
|
self, client: AsyncClient, test_user: dict
|
|
):
|
|
token = _encode_refresh_token(
|
|
sub=test_user["user_data"]["id"],
|
|
exp=datetime.now(timezone.utc) + timedelta(minutes=5),
|
|
token_type="access",
|
|
)
|
|
|
|
response = await client.post(
|
|
"/api/v1/auth/refresh",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
assert response.json()["detail"] == "invalid_refresh_token"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bad_signature_returns_invalid_refresh_token(
|
|
self, client: AsyncClient, test_user: dict
|
|
):
|
|
token = _encode_refresh_token(
|
|
sub=test_user["user_data"]["id"],
|
|
exp=datetime.now(timezone.utc) + timedelta(minutes=5),
|
|
secret="not-the-real-secret-key",
|
|
)
|
|
|
|
response = await client.post(
|
|
"/api/v1/auth/refresh",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
assert response.json()["detail"] == "invalid_refresh_token"
|
|
|
|
|
|
class TestSessionPolicyClaims:
|
|
"""§6 tests #1 and #14 — session-policy claims stamped at login.
|
|
|
|
Every token-issuing endpoint embeds auth_time/idle_max/abs_max in
|
|
the refresh JWT and surfaces idle_expires_at/absolute_expires_at on
|
|
the response.
|
|
"""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_json_embeds_session_claims_with_defaults(
|
|
self, client: AsyncClient, test_user: dict
|
|
):
|
|
before = datetime.now(timezone.utc)
|
|
|
|
response = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={
|
|
"email": test_user["email"],
|
|
"password": test_user["password"],
|
|
},
|
|
)
|
|
assert response.status_code == 200, response.json()
|
|
body = response.json()
|
|
after = datetime.now(timezone.utc)
|
|
|
|
# Response surfaces both expiry windows as ISO strings.
|
|
assert body["idle_expires_at"] is not None
|
|
assert body["absolute_expires_at"] is not None
|
|
idle_at = datetime.fromisoformat(body["idle_expires_at"])
|
|
abs_at = datetime.fromisoformat(body["absolute_expires_at"])
|
|
# Strict default: 3 days idle, 14 days absolute.
|
|
assert timedelta(days=3) - timedelta(seconds=10) <= idle_at - before <= timedelta(days=3) + timedelta(seconds=10)
|
|
assert timedelta(days=14) - timedelta(seconds=10) <= abs_at - before <= timedelta(days=14) + timedelta(seconds=10)
|
|
|
|
# JWT carries the claims in seconds, plus auth_time as Unix seconds.
|
|
decoded = jwt.decode(
|
|
body["refresh_token"], settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
|
)
|
|
assert decoded["idle_max"] == 3 * 24 * 60 * 60 # 259200
|
|
assert decoded["abs_max"] == 14 * 24 * 60 * 60 # 1209600
|
|
assert int(before.timestamp()) <= decoded["auth_time"] <= int(after.timestamp())
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_refresh_carries_claims_forward_unchanged(
|
|
self, client: AsyncClient, test_user: dict
|
|
):
|
|
# Login produces the original session.
|
|
login_resp = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": test_user["email"], "password": test_user["password"]},
|
|
)
|
|
original_refresh = login_resp.json()["refresh_token"]
|
|
original_payload = jwt.decode(
|
|
original_refresh, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
|
)
|
|
|
|
# Refresh rotates the token but must carry auth_time/idle_max/abs_max
|
|
# forward unchanged so the absolute window doesn't slide.
|
|
refresh_resp = await client.post(
|
|
"/api/v1/auth/refresh",
|
|
headers={"Authorization": f"Bearer {original_refresh}"},
|
|
)
|
|
assert refresh_resp.status_code == 200, refresh_resp.json()
|
|
new_refresh = refresh_resp.json()["refresh_token"]
|
|
new_payload = jwt.decode(
|
|
new_refresh, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
|
)
|
|
|
|
assert new_payload["auth_time"] == original_payload["auth_time"]
|
|
assert new_payload["idle_max"] == original_payload["idle_max"]
|
|
assert new_payload["abs_max"] == original_payload["abs_max"]
|
|
# Idle deadline does slide because exp = now + idle_max.
|
|
assert new_payload["exp"] >= original_payload["exp"]
|
|
# JTI rotates.
|
|
assert new_payload["jti"] != original_payload["jti"]
|
|
|
|
|
|
def _backdate_auth_time(refresh_token: str, *, seconds_back: int) -> str:
|
|
"""Re-sign a refresh JWT with an earlier auth_time, preserving JTI.
|
|
|
|
The DB row in refresh_tokens is keyed on hash(jti), so preserving jti
|
|
lets the atomic revoke step still find the row. Used to simulate
|
|
"this session is past its absolute cap" without waiting two weeks.
|
|
"""
|
|
payload = jwt.decode(
|
|
refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
|
)
|
|
payload["auth_time"] = payload["auth_time"] - seconds_back
|
|
return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
|
|
|
|
|
|
class TestSessionPolicyEndpoint:
|
|
"""§6 tests #2, #3, #4, #5, #7, #16 — GET/PATCH /accounts/me/security."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_returns_defaults_and_bounds(
|
|
self, client: AsyncClient, auth_headers: dict, test_user: dict
|
|
):
|
|
response = await client.get(
|
|
"/api/v1/accounts/me/security", headers=auth_headers
|
|
)
|
|
assert response.status_code == 200, response.json()
|
|
body = response.json()
|
|
|
|
# No override yet -> effective values are the system defaults.
|
|
assert body["idle_minutes"] is None
|
|
assert body["absolute_minutes"] is None
|
|
assert body["effective_idle_minutes"] == 4320 # 3d Strict default
|
|
assert body["effective_absolute_minutes"] == 20160 # 14d
|
|
assert body["idle_minutes_min"] == 15
|
|
assert body["idle_minutes_max"] == 43200
|
|
assert body["absolute_minutes_min"] == 60
|
|
assert body["absolute_minutes_max"] == 129600
|
|
|
|
# active_users reflects users with un-revoked refresh tokens.
|
|
# auth_headers logged the owner in once, so they should appear.
|
|
assert isinstance(body["active_users"], list)
|
|
assert len(body["active_users"]) >= 1
|
|
emails = [u["email"] for u in body["active_users"]]
|
|
assert test_user["email"] in emails
|
|
# Schema check on one row.
|
|
first = body["active_users"][0]
|
|
assert "user_id" in first
|
|
assert "name" in first
|
|
assert "email" in first
|
|
assert "last_login_at" in first
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_patch_persists_override_and_returns_new_state(
|
|
self, client: AsyncClient, auth_headers: dict
|
|
):
|
|
response = await client.patch(
|
|
"/api/v1/accounts/me/security",
|
|
headers=auth_headers,
|
|
json={"idle_minutes": 60, "absolute_minutes": 240},
|
|
)
|
|
assert response.status_code == 200, response.json()
|
|
body = response.json()
|
|
assert body["idle_minutes"] == 60
|
|
assert body["absolute_minutes"] == 240
|
|
assert body["effective_idle_minutes"] == 60
|
|
assert body["effective_absolute_minutes"] == 240
|
|
|
|
# Next login picks up the new policy.
|
|
login_resp = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": "test@example.com", "password": "TestPassword123!"},
|
|
)
|
|
new_payload = jwt.decode(
|
|
login_resp.json()["refresh_token"],
|
|
settings.SECRET_KEY,
|
|
algorithms=[settings.ALGORITHM],
|
|
)
|
|
assert new_payload["idle_max"] == 60 * 60 # 3600 seconds
|
|
assert new_payload["abs_max"] == 240 * 60 # 14400 seconds
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_patch_rejects_idle_below_min(
|
|
self, client: AsyncClient, auth_headers: dict
|
|
):
|
|
response = await client.patch(
|
|
"/api/v1/accounts/me/security",
|
|
headers=auth_headers,
|
|
json={"idle_minutes": 5, "absolute_minutes": 60},
|
|
)
|
|
assert response.status_code == 422
|
|
assert "idle_minutes" in response.json()["detail"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_patch_rejects_absolute_above_max(
|
|
self, client: AsyncClient, auth_headers: dict
|
|
):
|
|
response = await client.patch(
|
|
"/api/v1/accounts/me/security",
|
|
headers=auth_headers,
|
|
json={"absolute_minutes": 200000},
|
|
)
|
|
assert response.status_code == 422
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_patch_rejects_idle_greater_than_absolute_both_set(
|
|
self, client: AsyncClient, auth_headers: dict
|
|
):
|
|
response = await client.patch(
|
|
"/api/v1/accounts/me/security",
|
|
headers=auth_headers,
|
|
json={"idle_minutes": 300, "absolute_minutes": 120},
|
|
)
|
|
assert response.status_code == 422
|
|
assert "exceed" in response.json()["detail"].lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_patch_rejects_partial_override_when_effective_invalid(
|
|
self, client: AsyncClient, auth_headers: dict
|
|
):
|
|
"""§6 test #5 — partial override: idle=43200, absolute=NULL ->
|
|
effective idle (43200) > effective absolute (20160 default) -> 422.
|
|
"""
|
|
response = await client.patch(
|
|
"/api/v1/accounts/me/security",
|
|
headers=auth_headers,
|
|
json={"idle_minutes": 43200, "absolute_minutes": None},
|
|
)
|
|
assert response.status_code == 422
|
|
assert "exceed" in response.json()["detail"].lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_non_owner_cannot_patch(
|
|
self, client: AsyncClient, test_user: dict, test_db
|
|
):
|
|
"""§6 test #7 — engineer role is forbidden."""
|
|
from app.models.user import User
|
|
from sqlalchemy import select
|
|
|
|
# Add a second user in the same account with account_role=engineer.
|
|
result = await test_db.execute(
|
|
select(User).where(User.email == test_user["email"])
|
|
)
|
|
owner = result.scalar_one()
|
|
engineer = User(
|
|
email="engineer-policy@example.com",
|
|
password_hash=owner.password_hash, # reuse the bcrypt hash
|
|
name="Engineer",
|
|
role="engineer",
|
|
is_super_admin=False,
|
|
is_active=True,
|
|
account_id=owner.account_id,
|
|
account_role="engineer",
|
|
email_verified_at=datetime.now(timezone.utc),
|
|
)
|
|
test_db.add(engineer)
|
|
await test_db.commit()
|
|
|
|
login_resp = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={
|
|
"email": "engineer-policy@example.com",
|
|
"password": test_user["password"],
|
|
},
|
|
)
|
|
assert login_resp.status_code == 200
|
|
engineer_headers = {
|
|
"Authorization": f"Bearer {login_resp.json()['access_token']}"
|
|
}
|
|
|
|
response = await client.patch(
|
|
"/api/v1/accounts/me/security",
|
|
headers=engineer_headers,
|
|
json={"idle_minutes": 60, "absolute_minutes": 240},
|
|
)
|
|
assert response.status_code == 403
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_patch_writes_audit_row(
|
|
self, client: AsyncClient, auth_headers: dict, test_db
|
|
):
|
|
"""§6 test #16 — PATCH emits one account.session_policy_update
|
|
audit event with old/new + effective_old/new payload.
|
|
"""
|
|
from app.models.audit_log import AuditLog
|
|
from sqlalchemy import select
|
|
|
|
response = await client.patch(
|
|
"/api/v1/accounts/me/security",
|
|
headers=auth_headers,
|
|
json={"idle_minutes": 120, "absolute_minutes": 480},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
result = await test_db.execute(
|
|
select(AuditLog).where(AuditLog.action == "account.session_policy_update")
|
|
)
|
|
rows = result.scalars().all()
|
|
assert len(rows) == 1
|
|
entry = rows[0]
|
|
assert entry.resource_type == "account"
|
|
assert entry.details["new"] == {"idle_minutes": 120, "absolute_minutes": 480}
|
|
assert entry.details["effective_new"] == {
|
|
"idle_minutes": 120,
|
|
"absolute_minutes": 480,
|
|
}
|
|
assert entry.details["effective_old"]["idle_minutes"] == 4320 # default
|
|
assert entry.details["effective_old"]["absolute_minutes"] == 20160
|
|
|
|
|
|
async def _seed_extra_account_user(
|
|
test_db, *, email: str, account_id, password_hash: str, role: str = "engineer"
|
|
):
|
|
"""Add a second user under an existing account for revoke-scope tests."""
|
|
from app.models.user import User
|
|
|
|
user = User(
|
|
email=email,
|
|
password_hash=password_hash,
|
|
name=email,
|
|
role="engineer",
|
|
is_super_admin=False,
|
|
is_active=True,
|
|
account_id=account_id,
|
|
account_role=role,
|
|
email_verified_at=datetime.now(timezone.utc),
|
|
)
|
|
test_db.add(user)
|
|
await test_db.commit()
|
|
return user
|
|
|
|
|
|
class TestBulkRevoke:
|
|
"""§6 tests #17-#22 — POST /accounts/me/security/revoke-sessions."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_revoke_all_kills_callers_own_session(
|
|
self, client: AsyncClient, test_user: dict, test_db
|
|
):
|
|
"""§6 test #17 — scope=all includes the caller's own token. After
|
|
the response, the caller's refresh_token gets invalid_refresh_token
|
|
on next /auth/refresh.
|
|
"""
|
|
from app.models.user import User
|
|
from sqlalchemy import select
|
|
|
|
owner = (
|
|
await test_db.execute(
|
|
select(User).where(User.email == test_user["email"])
|
|
)
|
|
).scalar_one()
|
|
await _seed_extra_account_user(
|
|
test_db,
|
|
email="member-revoke-all@example.com",
|
|
account_id=owner.account_id,
|
|
password_hash=owner.password_hash,
|
|
)
|
|
|
|
# Owner logs in (also seeds owner's refresh-token row).
|
|
owner_login = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": test_user["email"], "password": test_user["password"]},
|
|
)
|
|
owner_refresh = owner_login.json()["refresh_token"]
|
|
owner_access = owner_login.json()["access_token"]
|
|
|
|
# Member also logs in so there's another active refresh-token row.
|
|
member_login = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={
|
|
"email": "member-revoke-all@example.com",
|
|
"password": test_user["password"],
|
|
},
|
|
)
|
|
assert member_login.status_code == 200
|
|
|
|
response = await client.post(
|
|
"/api/v1/accounts/me/security/revoke-sessions",
|
|
headers={"Authorization": f"Bearer {owner_access}"},
|
|
json={"scope": "all"},
|
|
)
|
|
assert response.status_code == 200, response.json()
|
|
assert response.json()["revoked_count"] == 2
|
|
|
|
# Owner's own refresh now returns invalid_refresh_token.
|
|
retry = await client.post(
|
|
"/api/v1/auth/refresh",
|
|
headers={"Authorization": f"Bearer {owner_refresh}"},
|
|
)
|
|
assert retry.status_code == 401
|
|
assert retry.json()["detail"] == "invalid_refresh_token"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_revoke_others_preserves_callers_session(
|
|
self, client: AsyncClient, test_user: dict, test_db
|
|
):
|
|
"""§6 test #18 — scope=others excludes the caller's user_id from
|
|
the bulk update. Caller can still refresh; other users cannot.
|
|
"""
|
|
from app.models.user import User
|
|
from sqlalchemy import select
|
|
|
|
owner = (
|
|
await test_db.execute(
|
|
select(User).where(User.email == test_user["email"])
|
|
)
|
|
).scalar_one()
|
|
await _seed_extra_account_user(
|
|
test_db,
|
|
email="member-revoke-others@example.com",
|
|
account_id=owner.account_id,
|
|
password_hash=owner.password_hash,
|
|
)
|
|
|
|
owner_login = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": test_user["email"], "password": test_user["password"]},
|
|
)
|
|
owner_refresh = owner_login.json()["refresh_token"]
|
|
owner_access = owner_login.json()["access_token"]
|
|
|
|
member_login = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={
|
|
"email": "member-revoke-others@example.com",
|
|
"password": test_user["password"],
|
|
},
|
|
)
|
|
member_refresh = member_login.json()["refresh_token"]
|
|
|
|
response = await client.post(
|
|
"/api/v1/accounts/me/security/revoke-sessions",
|
|
headers={"Authorization": f"Bearer {owner_access}"},
|
|
json={"scope": "others"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["revoked_count"] == 1
|
|
|
|
# Owner's refresh still works.
|
|
owner_retry = await client.post(
|
|
"/api/v1/auth/refresh",
|
|
headers={"Authorization": f"Bearer {owner_refresh}"},
|
|
)
|
|
assert owner_retry.status_code == 200
|
|
|
|
# Member's refresh is dead.
|
|
member_retry = await client.post(
|
|
"/api/v1/auth/refresh",
|
|
headers={"Authorization": f"Bearer {member_refresh}"},
|
|
)
|
|
assert member_retry.status_code == 401
|
|
assert member_retry.json()["detail"] == "invalid_refresh_token"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_revoke_is_account_scoped(
|
|
self, client: AsyncClient, test_user: dict, test_admin: dict
|
|
):
|
|
"""§6 test #19 — owner of account A cannot revoke tokens in account B.
|
|
|
|
test_admin lives in its own account. After test_user's owner runs
|
|
revoke-all, test_admin's session continues to work.
|
|
"""
|
|
owner_login = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": test_user["email"], "password": test_user["password"]},
|
|
)
|
|
owner_access = owner_login.json()["access_token"]
|
|
|
|
admin_login = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": test_admin["email"], "password": test_admin["password"]},
|
|
)
|
|
admin_refresh = admin_login.json()["refresh_token"]
|
|
|
|
response = await client.post(
|
|
"/api/v1/accounts/me/security/revoke-sessions",
|
|
headers={"Authorization": f"Bearer {owner_access}"},
|
|
json={"scope": "all"},
|
|
)
|
|
assert response.status_code == 200
|
|
# Only test_user's own session is revoked.
|
|
assert response.json()["revoked_count"] == 1
|
|
|
|
admin_retry = await client.post(
|
|
"/api/v1/auth/refresh",
|
|
headers={"Authorization": f"Bearer {admin_refresh}"},
|
|
)
|
|
assert admin_retry.status_code == 200
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_revoke_engineer_forbidden(
|
|
self, client: AsyncClient, test_user: dict, test_db
|
|
):
|
|
"""§6 test #20 — engineer-role member gets 403."""
|
|
from app.models.user import User
|
|
from sqlalchemy import select
|
|
|
|
owner = (
|
|
await test_db.execute(
|
|
select(User).where(User.email == test_user["email"])
|
|
)
|
|
).scalar_one()
|
|
await _seed_extra_account_user(
|
|
test_db,
|
|
email="engineer-revoke@example.com",
|
|
account_id=owner.account_id,
|
|
password_hash=owner.password_hash,
|
|
)
|
|
|
|
engineer_login = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={
|
|
"email": "engineer-revoke@example.com",
|
|
"password": test_user["password"],
|
|
},
|
|
)
|
|
engineer_access = engineer_login.json()["access_token"]
|
|
|
|
response = await client.post(
|
|
"/api/v1/accounts/me/security/revoke-sessions",
|
|
headers={"Authorization": f"Bearer {engineer_access}"},
|
|
json={"scope": "all"},
|
|
)
|
|
assert response.status_code == 403
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_revoke_writes_audit_row(
|
|
self, client: AsyncClient, test_user: dict, test_db
|
|
):
|
|
"""§6 test #21 — emits one account.sessions_revoked_bulk event."""
|
|
from app.models.audit_log import AuditLog
|
|
from sqlalchemy import select
|
|
|
|
owner_login = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": test_user["email"], "password": test_user["password"]},
|
|
)
|
|
owner_access = owner_login.json()["access_token"]
|
|
|
|
response = await client.post(
|
|
"/api/v1/accounts/me/security/revoke-sessions",
|
|
headers={"Authorization": f"Bearer {owner_access}"},
|
|
json={"scope": "all"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
result = await test_db.execute(
|
|
select(AuditLog).where(AuditLog.action == "account.sessions_revoked_bulk")
|
|
)
|
|
rows = result.scalars().all()
|
|
assert len(rows) == 1
|
|
entry = rows[0]
|
|
assert entry.details["scope"] == "all"
|
|
assert entry.details["revoked_count"] == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_revoke_is_idempotent(
|
|
self, client: AsyncClient, test_user: dict
|
|
):
|
|
"""§6 test #22 — second immediate POST returns revoked_count=0
|
|
(no already-revoked rows get double-stamped or counted again).
|
|
"""
|
|
owner_login = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": test_user["email"], "password": test_user["password"]},
|
|
)
|
|
owner_access = owner_login.json()["access_token"]
|
|
|
|
first = await client.post(
|
|
"/api/v1/accounts/me/security/revoke-sessions",
|
|
headers={"Authorization": f"Bearer {owner_access}"},
|
|
json={"scope": "others"}, # owner's own session preserved
|
|
)
|
|
assert first.status_code == 200
|
|
|
|
second = await client.post(
|
|
"/api/v1/accounts/me/security/revoke-sessions",
|
|
headers={"Authorization": f"Bearer {owner_access}"},
|
|
json={"scope": "others"},
|
|
)
|
|
assert second.status_code == 200
|
|
assert second.json()["revoked_count"] == 0
|
|
|
|
|
|
class TestAbsoluteCap:
|
|
"""§6 tests #8, #9, #12 — absolute-cap enforcement and grandfather path."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_refresh_at_absolute_deadline_rejects(
|
|
self, client: AsyncClient, test_user: dict
|
|
):
|
|
"""§6 test #8 — boundary check uses `>=`, not `>`.
|
|
|
|
A token whose auth_time + abs_max equals now() is expired, not
|
|
valid. Backdate the original token's auth_time by exactly abs_max
|
|
seconds so now >= deadline.
|
|
"""
|
|
login_resp = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": test_user["email"], "password": test_user["password"]},
|
|
)
|
|
original = login_resp.json()["refresh_token"]
|
|
abs_max = jwt.decode(
|
|
original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
|
)["abs_max"]
|
|
|
|
expired = _backdate_auth_time(original, seconds_back=abs_max)
|
|
|
|
response = await client.post(
|
|
"/api/v1/auth/refresh",
|
|
headers={"Authorization": f"Bearer {expired}"},
|
|
)
|
|
|
|
assert response.status_code == 401
|
|
assert response.json()["detail"] == "session_expired_absolute"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_absolute_expired_token_is_consumed(
|
|
self, client: AsyncClient, test_user: dict
|
|
):
|
|
"""§6 test #9 — first attempt returns session_expired_absolute and
|
|
revokes the row; second attempt sees the revoked row and returns
|
|
invalid_refresh_token. Prevents replay of an absolute-expired token.
|
|
"""
|
|
login_resp = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": test_user["email"], "password": test_user["password"]},
|
|
)
|
|
original = login_resp.json()["refresh_token"]
|
|
abs_max = jwt.decode(
|
|
original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
|
)["abs_max"]
|
|
expired = _backdate_auth_time(original, seconds_back=abs_max + 1)
|
|
|
|
first = await client.post(
|
|
"/api/v1/auth/refresh",
|
|
headers={"Authorization": f"Bearer {expired}"},
|
|
)
|
|
assert first.status_code == 401
|
|
assert first.json()["detail"] == "session_expired_absolute"
|
|
|
|
second = await client.post(
|
|
"/api/v1/auth/refresh",
|
|
headers={"Authorization": f"Bearer {expired}"},
|
|
)
|
|
assert second.status_code == 401
|
|
assert second.json()["detail"] == "invalid_refresh_token"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_grandfather_path_for_legacy_token(
|
|
self, client: AsyncClient, test_user: dict, test_db
|
|
):
|
|
"""§6 test #12 — refresh token issued before this PR (no auth_time
|
|
claim) gets one successful rotation; the new token has fresh
|
|
auth_time/idle_max/abs_max claims snapshotted from current policy.
|
|
"""
|
|
from app.core.security import hash_token
|
|
from app.models.refresh_token import RefreshToken
|
|
|
|
login_resp = await client.post(
|
|
"/api/v1/auth/login/json",
|
|
json={"email": test_user["email"], "password": test_user["password"]},
|
|
)
|
|
original = login_resp.json()["refresh_token"]
|
|
original_payload = jwt.decode(
|
|
original, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
|
|
)
|
|
|
|
# Strip the new claims to simulate a token issued before this PR.
|
|
# JTI preserved so the DB-side revoke still finds the row.
|
|
legacy_payload = {
|
|
"sub": original_payload["sub"],
|
|
"type": "refresh",
|
|
"jti": original_payload["jti"],
|
|
"exp": original_payload["exp"],
|
|
}
|
|
legacy_token = jwt.encode(
|
|
legacy_payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM
|
|
)
|
|
|
|
response = await client.post(
|
|
"/api/v1/auth/refresh",
|
|
headers={"Authorization": f"Bearer {legacy_token}"},
|
|
)
|
|
|
|
assert response.status_code == 200, response.json()
|
|
new_payload = jwt.decode(
|
|
response.json()["refresh_token"],
|
|
settings.SECRET_KEY,
|
|
algorithms=[settings.ALGORITHM],
|
|
)
|
|
assert new_payload.get("auth_time") is not None
|
|
assert new_payload.get("idle_max") == 3 * 24 * 60 * 60
|
|
assert new_payload.get("abs_max") == 14 * 24 * 60 * 60
|
|
# auth_time was set to ~now during grandfather, not preserved from
|
|
# the legacy token (since the legacy token didn't have one).
|
|
now_unix = int(datetime.now(timezone.utc).timestamp())
|
|
assert abs(new_payload["auth_time"] - now_unix) < 10
|