Files
resolutionflow/backend/app/api/endpoints/invite.py
chihlasm 71ba0b95a5 fix: high-severity security hardening (Phase B permissions audit)
Phase B addresses 7 high-severity gaps from the permissions audit:

- B1: Enforce tree access check on session start via can_access_tree
- B2: Replace all inline permission helpers with centralized permissions.py
- B3: Fix require_engineer_or_admin to check is_team_admin before role
- B4: Add is_active field on User with enforcement in get_current_active_user
- B5: Add admin user management endpoints (list, get, role, team-admin, deactivate, activate)
- B6: Add rate limiting on auth/invite endpoints via slowapi (disabled in DEBUG)
- B7: Implement refresh token rotation with JTI-based revocation and meaningful logout

Also reduces access token TTL from 15 to 5 minutes and updates CLAUDE.md
with SaaS/MSP context for future planning sessions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-05 22:44:05 -05:00

99 lines
3.1 KiB
Python

from datetime import datetime, timezone
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.core.database import get_db
from app.core.rate_limit import limiter
from app.models.user import User
from app.models.invite_code import InviteCode
from app.schemas.invite_code import InviteCodeCreate, InviteCodeResponse, InviteCodeValidation
from app.api.deps import require_admin
router = APIRouter(prefix="/invites", tags=["invites"])
@router.post("", response_model=InviteCodeResponse, status_code=status.HTTP_201_CREATED)
async def create_invite_code(
invite_data: InviteCodeCreate,
current_user: Annotated[User, Depends(require_admin)],
db: Annotated[AsyncSession, Depends(get_db)]
):
"""Create a new invite code. Admin only."""
invite_code = InviteCode(
created_by_id=current_user.id,
expires_at=invite_data.expires_at,
note=invite_data.note
)
db.add(invite_code)
await db.commit()
await db.refresh(invite_code)
return invite_code
@router.get("", response_model=list[InviteCodeResponse])
async def list_invite_codes(
current_user: Annotated[User, Depends(require_admin)],
db: Annotated[AsyncSession, Depends(get_db)]
):
"""List all invite codes. Admin only."""
result = await db.execute(
select(InviteCode).order_by(InviteCode.created_at.desc())
)
invite_codes = result.scalars().all()
return invite_codes
@router.delete("/{code}", status_code=status.HTTP_204_NO_CONTENT)
async def revoke_invite_code(
code: str,
current_user: Annotated[User, Depends(require_admin)],
db: Annotated[AsyncSession, Depends(get_db)]
):
"""Revoke (delete) an invite code. Admin only."""
result = await db.execute(
select(InviteCode).where(InviteCode.code == code)
)
invite_code = result.scalar_one_or_none()
if not invite_code:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Invite code not found"
)
if invite_code.is_used:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot revoke a used invite code"
)
await db.delete(invite_code)
await db.commit()
@router.get("/validate/{code}", response_model=InviteCodeValidation)
@limiter.limit("5/minute")
async def validate_invite_code(
request: Request,
code: str,
db: Annotated[AsyncSession, Depends(get_db)]
):
"""Check if an invite code is valid. Public endpoint for UX."""
result = await db.execute(
select(InviteCode).where(InviteCode.code == code.upper())
)
invite_code = result.scalar_one_or_none()
if not invite_code:
return InviteCodeValidation(valid=False, message="Invalid invite code")
if invite_code.is_used:
return InviteCodeValidation(valid=False, message="Invite code has already been used")
if invite_code.is_expired:
return InviteCodeValidation(valid=False, message="Invite code has expired")
return InviteCodeValidation(valid=True, message="Invite code is valid")