Co-authored-by: Michael Chihlas <michael@resolutionflow.com> Co-committed-by: Michael Chihlas <michael@resolutionflow.com>
197 lines
7.3 KiB
Python
197 lines
7.3 KiB
Python
import uuid
|
|
import pytest
|
|
from unittest.mock import patch
|
|
from sqlalchemy import select
|
|
from app.core.security import decode_token, hash_token
|
|
from app.models.user import User
|
|
from app.models.oauth_identity import OAuthIdentity
|
|
from app.models.refresh_token import RefreshToken
|
|
from app.models.subscription import Subscription
|
|
from app.services.oauth_providers import OAuthProfile
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_google_callback_creates_user_account_subscription(
|
|
client, test_db, monkeypatch
|
|
):
|
|
"""Brand-new user via Google OAuth -> User + Account + Subscription + OAuthIdentity."""
|
|
from app.core.config import settings
|
|
monkeypatch.setattr(settings, "GOOGLE_CLIENT_ID", "client_dummy")
|
|
monkeypatch.setattr(settings, "GOOGLE_CLIENT_SECRET", "secret_dummy")
|
|
|
|
profile = OAuthProfile(
|
|
provider_subject="google_subject_123",
|
|
email="newuser@example.com",
|
|
name="New User",
|
|
)
|
|
with patch("app.api.endpoints.oauth.google_exchange_code", return_value=profile):
|
|
response = await client.post(
|
|
"/api/v1/auth/google/callback", json={"code": "auth_code_xyz"}
|
|
)
|
|
assert response.status_code == 200, response.json()
|
|
body = response.json()
|
|
assert body["is_new_user"] is True
|
|
assert body["access_token"]
|
|
|
|
user = (await test_db.execute(
|
|
select(User).where(User.email == "newuser@example.com")
|
|
)).scalar_one()
|
|
assert user.password_hash is None
|
|
assert user.email_verified_at is not None
|
|
|
|
identity = (await test_db.execute(
|
|
select(OAuthIdentity).where(OAuthIdentity.user_id == user.id)
|
|
)).scalar_one()
|
|
assert identity.provider == "google"
|
|
assert identity.provider_subject == "google_subject_123"
|
|
|
|
sub = (await test_db.execute(
|
|
select(Subscription).where(Subscription.account_id == user.account_id)
|
|
)).scalar_one()
|
|
assert sub.status == "trialing"
|
|
assert sub.plan == "pro"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_google_callback_existing_user_is_idempotent(
|
|
client, test_db, test_user, monkeypatch
|
|
):
|
|
"""When test_user's email is already registered, OAuth links + returns the
|
|
same user. Two calls with same provider_subject must not duplicate
|
|
OAuthIdentity rows."""
|
|
from app.core.config import settings
|
|
monkeypatch.setattr(settings, "GOOGLE_CLIENT_ID", "client_dummy")
|
|
monkeypatch.setattr(settings, "GOOGLE_CLIENT_SECRET", "secret_dummy")
|
|
|
|
user_id = uuid.UUID(test_user["user_data"]["id"])
|
|
email = test_user["email"]
|
|
name = test_user["user_data"]["name"]
|
|
|
|
profile = OAuthProfile(
|
|
provider_subject="google_subject_456",
|
|
email=email,
|
|
name=name,
|
|
)
|
|
with patch("app.api.endpoints.oauth.google_exchange_code", return_value=profile):
|
|
r1 = await client.post("/api/v1/auth/google/callback", json={"code": "x"})
|
|
r2 = await client.post("/api/v1/auth/google/callback", json={"code": "x"})
|
|
assert r1.status_code == 200
|
|
assert r2.status_code == 200
|
|
assert r1.json()["is_new_user"] is False
|
|
assert r2.json()["is_new_user"] is False
|
|
|
|
identities = (await test_db.execute(
|
|
select(OAuthIdentity).where(OAuthIdentity.user_id == user_id)
|
|
)).scalars().all()
|
|
assert len(identities) == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_google_callback_503_when_unconfigured(client, monkeypatch):
|
|
from app.core.config import settings
|
|
monkeypatch.setattr(settings, "GOOGLE_CLIENT_ID", None)
|
|
|
|
response = await client.post(
|
|
"/api/v1/auth/google/callback", json={"code": "x"}
|
|
)
|
|
assert response.status_code == 503
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_microsoft_callback_creates_user(client, test_db, monkeypatch):
|
|
from app.core.config import settings
|
|
monkeypatch.setattr(settings, "MS_CLIENT_ID", "client_dummy")
|
|
monkeypatch.setattr(settings, "MS_CLIENT_SECRET", "secret_dummy")
|
|
|
|
profile = OAuthProfile(
|
|
provider_subject="ms_subject_789",
|
|
email="msuser@example.com",
|
|
name="MS User",
|
|
)
|
|
with patch("app.api.endpoints.oauth.microsoft_exchange_code", return_value=profile):
|
|
response = await client.post(
|
|
"/api/v1/auth/microsoft/callback", json={"code": "auth_code"}
|
|
)
|
|
assert response.status_code == 200, response.json()
|
|
user = (await test_db.execute(
|
|
select(User).where(User.email == "msuser@example.com")
|
|
)).scalar_one()
|
|
identity = (await test_db.execute(
|
|
select(OAuthIdentity).where(OAuthIdentity.user_id == user.id)
|
|
)).scalar_one()
|
|
assert identity.provider == "microsoft"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oauth_google_callback_stores_refresh_token_jti(
|
|
client, test_db, monkeypatch
|
|
):
|
|
"""A successful Google OAuth callback must persist the refresh-token JTI
|
|
in the refresh_tokens table — otherwise /auth/refresh rejects it."""
|
|
from app.core.config import settings
|
|
monkeypatch.setattr(settings, "GOOGLE_CLIENT_ID", "client_dummy")
|
|
monkeypatch.setattr(settings, "GOOGLE_CLIENT_SECRET", "secret_dummy")
|
|
|
|
profile = OAuthProfile(
|
|
provider_subject="google_subject_jti_test",
|
|
email="jtitest@example.com",
|
|
name="JTI Test",
|
|
)
|
|
with patch("app.api.endpoints.oauth.google_exchange_code", return_value=profile):
|
|
response = await client.post(
|
|
"/api/v1/auth/google/callback", json={"code": "auth_code_xyz"}
|
|
)
|
|
assert response.status_code == 200, response.json()
|
|
body = response.json()
|
|
refresh_token_str = body["refresh_token"]
|
|
|
|
payload = decode_token(refresh_token_str)
|
|
assert payload is not None
|
|
jti = payload["jti"]
|
|
token_hash = hash_token(jti)
|
|
|
|
user = (await test_db.execute(
|
|
select(User).where(User.email == "jtitest@example.com")
|
|
)).scalar_one()
|
|
|
|
stored = (await test_db.execute(
|
|
select(RefreshToken).where(RefreshToken.token_hash == token_hash)
|
|
)).scalar_one_or_none()
|
|
assert stored is not None, "OAuth callback did not persist refresh-token JTI"
|
|
assert stored.user_id == user.id
|
|
assert stored.revoked_at is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_oauth_refresh_works_after_oauth_signup(
|
|
client, test_db, monkeypatch
|
|
):
|
|
"""End-to-end: OAuth callback issues a refresh token; calling /auth/refresh
|
|
with that token must succeed (not be rejected as revoked)."""
|
|
from app.core.config import settings
|
|
monkeypatch.setattr(settings, "GOOGLE_CLIENT_ID", "client_dummy")
|
|
monkeypatch.setattr(settings, "GOOGLE_CLIENT_SECRET", "secret_dummy")
|
|
|
|
profile = OAuthProfile(
|
|
provider_subject="google_subject_refresh_test",
|
|
email="refresh-after-oauth@example.com",
|
|
name="Refresh After OAuth",
|
|
)
|
|
with patch("app.api.endpoints.oauth.google_exchange_code", return_value=profile):
|
|
callback_resp = await client.post(
|
|
"/api/v1/auth/google/callback", json={"code": "auth_code_xyz"}
|
|
)
|
|
assert callback_resp.status_code == 200, callback_resp.json()
|
|
refresh_token_str = callback_resp.json()["refresh_token"]
|
|
|
|
refresh_resp = await client.post(
|
|
"/api/v1/auth/refresh",
|
|
headers={"Authorization": f"Bearer {refresh_token_str}"},
|
|
)
|
|
assert refresh_resp.status_code == 200, refresh_resp.json()
|
|
refreshed = refresh_resp.json()
|
|
assert refreshed["access_token"]
|
|
assert refreshed["refresh_token"]
|
|
# Token rotation: new refresh token differs from the original.
|
|
assert refreshed["refresh_token"] != refresh_token_str
|