Files
resolutionflow/backend/app/api/endpoints/auth.py
Michael Chihlas aa54b6c192 Complete integration test suite with role-based auth fixes
Test Suite Completion (29 tests, all passing):
- Fixed test_auth.py: expect 201 status for registration endpoint
- Fixed test_trees.py: version only increments on tree_structure updates
- Fixed test_trees.py: delete endpoint requires admin role, returns 204
- Added admin user fixtures (test_admin, admin_auth_headers) in conftest.py

Role-Based User Registration Fix:
- Added role field to UserCreate schema (default="engineer")
- Updated registration endpoint to use user_data.role instead of hardcoding
- Enables proper admin/engineer/viewer role assignment during registration
- Maintains secure defaults while allowing test flexibility

Documentation Updates:
- Updated PROGRESS.md: corrected test count (29), added role fix notes
- Updated CLAUDE-SETUP.md: corrected test count, updated last modified date
- Updated backend file structure to include new logging and test files

Test Configuration:
- pytest 7.4.3 + pytest-asyncio 0.23.0 (stable async support)
- Comprehensive coverage: 7 auth + 10 trees + 12 sessions tests
- All endpoints verified with proper status codes and authorization

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-27 20:38:43 -05:00

160 lines
4.9 KiB
Python

from datetime import datetime, timezone
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.core.database import get_db
from app.core.security import (
verify_password,
get_password_hash,
create_access_token,
create_refresh_token,
decode_token
)
from app.models.user import User
from app.schemas.user import UserCreate, UserResponse, UserLogin
from app.schemas.token import Token
from app.api.deps import get_current_user
router = APIRouter(prefix="/auth", tags=["authentication"])
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(
user_data: UserCreate,
db: Annotated[AsyncSession, Depends(get_db)]
):
"""Register a new user."""
# Check if email already exists
result = await db.execute(select(User).where(User.email == user_data.email))
existing_user = result.scalar_one_or_none()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
new_user = User(
email=user_data.email,
password_hash=get_password_hash(user_data.password),
name=user_data.name,
role=user_data.role # Use role from request (defaults to "engineer")
)
db.add(new_user)
await db.commit()
await db.refresh(new_user)
return new_user
@router.post("/login", response_model=Token)
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: Annotated[AsyncSession, Depends(get_db)]
):
"""Login and get access token."""
# Find user by email
result = await db.execute(select(User).where(User.email == form_data.username))
user = result.scalar_one_or_none()
if not user or not verify_password(form_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Update last login
user.last_login = datetime.now(timezone.utc)
await db.commit()
# Create tokens
access_token = create_access_token(data={"sub": str(user.id)})
refresh_token = create_refresh_token(data={"sub": str(user.id)})
return Token(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer"
)
@router.post("/login/json", response_model=Token)
async def login_json(
credentials: UserLogin,
db: Annotated[AsyncSession, Depends(get_db)]
):
"""Login with JSON body (alternative to form data)."""
result = await db.execute(select(User).where(User.email == credentials.email))
user = result.scalar_one_or_none()
if not user or not verify_password(credentials.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password"
)
user.last_login = datetime.now(timezone.utc)
await db.commit()
access_token = create_access_token(data={"sub": str(user.id)})
refresh_token = create_refresh_token(data={"sub": str(user.id)})
return Token(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer"
)
@router.post("/refresh", response_model=Token)
async def refresh_token(
refresh_token: str,
db: Annotated[AsyncSession, Depends(get_db)]
):
"""Refresh access token using refresh token."""
payload = decode_token(refresh_token)
if payload is None or payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
user_id = payload.get("sub")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
access_token = create_access_token(data={"sub": str(user.id)})
new_refresh_token = create_refresh_token(data={"sub": str(user.id)})
return Token(
access_token=access_token,
refresh_token=new_refresh_token,
token_type="bearer"
)
@router.get("/me", response_model=UserResponse)
async def get_me(
current_user: Annotated[User, Depends(get_current_user)]
):
"""Get current authenticated user."""
return current_user
@router.post("/logout")
async def logout():
"""Logout user (client should discard tokens)."""
# JWT tokens are stateless, so logout is handled client-side
# In a production app, you might want to blacklist the token
return {"message": "Successfully logged out"}