"""Team branding endpoints — logo upload and company display name.""" import base64 from typing import Annotated, Optional from uuid import UUID from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import get_current_active_user, get_db from app.models.team import Team from app.models.user import User from app.schemas.branding import BrandingResponse router = APIRouter(prefix="/teams", tags=["branding"]) ALLOWED_CONTENT_TYPES = {"image/png", "image/jpeg", "image/svg+xml"} MAX_LOGO_SIZE = 2 * 1024 * 1024 # 2MB def _require_team_member(user: User, team_id: UUID) -> None: """Ensure the user belongs to the given team (or is super admin).""" if user.is_super_admin: return if user.team_id != team_id: raise HTTPException(status_code=403, detail="Not a member of this team") def _require_team_admin(user: User, team_id: UUID) -> None: """Ensure the user is a team admin for the given team (or is super admin).""" if user.is_super_admin: return if not user.is_team_admin or user.team_id != team_id: raise HTTPException(status_code=403, detail="Team admin required") @router.get("/{team_id}/branding", response_model=BrandingResponse) async def get_branding( team_id: UUID, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)], ) -> BrandingResponse: """Retrieve branding info for a team. Any team member can read.""" _require_team_member(current_user, team_id) result = await db.execute(select(Team).where(Team.id == team_id)) team = result.scalar_one_or_none() if not team: raise HTTPException(status_code=404, detail="Team not found") return BrandingResponse( company_display_name=team.company_display_name, logo_content_type=team.logo_content_type, has_logo=team.logo_data is not None, ) @router.patch("/{team_id}/branding", response_model=BrandingResponse) async def update_branding( team_id: UUID, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)], logo: Annotated[Optional[UploadFile], File()] = None, company_display_name: Annotated[Optional[str], Form()] = None, ) -> BrandingResponse: """Upload logo and/or update company display name. Team admin only.""" _require_team_admin(current_user, team_id) result = await db.execute(select(Team).where(Team.id == team_id)) team = result.scalar_one_or_none() if not team: raise HTTPException(status_code=404, detail="Team not found") # Handle logo upload if logo is not None: if logo.content_type not in ALLOWED_CONTENT_TYPES: raise HTTPException( status_code=400, detail=f"Invalid content type '{logo.content_type}'. Allowed: {', '.join(sorted(ALLOWED_CONTENT_TYPES))}", ) raw_bytes = await logo.read() if len(raw_bytes) > MAX_LOGO_SIZE: raise HTTPException( status_code=400, detail=f"Logo exceeds maximum size of {MAX_LOGO_SIZE // (1024 * 1024)}MB", ) team.logo_data = base64.b64encode(raw_bytes).decode("utf-8") team.logo_content_type = logo.content_type # Handle display name update if company_display_name is not None: team.company_display_name = company_display_name await db.commit() await db.refresh(team) return BrandingResponse( company_display_name=team.company_display_name, logo_content_type=team.logo_content_type, has_logo=team.logo_data is not None, ) @router.delete("/{team_id}/branding/logo", status_code=200, response_model=BrandingResponse) async def delete_logo( team_id: UUID, db: Annotated[AsyncSession, Depends(get_db)], current_user: Annotated[User, Depends(get_current_active_user)], ) -> BrandingResponse: """Remove the team logo. Team admin only.""" _require_team_admin(current_user, team_id) result = await db.execute(select(Team).where(Team.id == team_id)) team = result.scalar_one_or_none() if not team: raise HTTPException(status_code=404, detail="Team not found") team.logo_data = None team.logo_content_type = None await db.commit() await db.refresh(team) return BrandingResponse( company_display_name=team.company_display_name, logo_content_type=team.logo_content_type, has_logo=False, )