- FastAPI backend with JWT auth - PostgreSQL database schema - Trees and Sessions CRUD APIs - Export functionality (Markdown, Text, HTML) - Docker setup for local development - Alembic migrations
83 lines
2.3 KiB
Python
83 lines
2.3 KiB
Python
from typing import Annotated
|
|
from uuid import UUID
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
|
|
from app.core.database import get_db
|
|
from app.core.security import decode_token
|
|
from app.models.user import User
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
|
|
|
|
|
|
async def get_current_user(
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
token: Annotated[str, Depends(oauth2_scheme)]
|
|
) -> User:
|
|
"""Get current authenticated user from JWT token."""
|
|
credentials_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
payload = decode_token(token)
|
|
if payload is None:
|
|
raise credentials_exception
|
|
|
|
token_type = payload.get("type")
|
|
if token_type != "access":
|
|
raise credentials_exception
|
|
|
|
user_id: str = payload.get("sub")
|
|
if user_id is None:
|
|
raise credentials_exception
|
|
|
|
try:
|
|
user_uuid = UUID(user_id)
|
|
except ValueError:
|
|
raise credentials_exception
|
|
|
|
result = await db.execute(select(User).where(User.id == user_uuid))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if user is None:
|
|
raise credentials_exception
|
|
|
|
return user
|
|
|
|
|
|
async def get_current_active_user(
|
|
current_user: Annotated[User, Depends(get_current_user)]
|
|
) -> User:
|
|
"""Ensure user is active (not disabled)."""
|
|
# For now, all users are considered active
|
|
# Add logic here if you add an is_active field to User
|
|
return current_user
|
|
|
|
|
|
async def require_admin(
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
) -> User:
|
|
"""Require admin role."""
|
|
if current_user.role != "admin":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin access required"
|
|
)
|
|
return current_user
|
|
|
|
|
|
async def require_engineer_or_admin(
|
|
current_user: Annotated[User, Depends(get_current_active_user)]
|
|
) -> User:
|
|
"""Require engineer or admin role."""
|
|
if current_user.role not in ("admin", "engineer"):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Engineer or admin access required"
|
|
)
|
|
return current_user
|