From fdeb4171313efb4706356c361bb2f3024f1cd87c Mon Sep 17 00:00:00 2001 From: chihlasm Date: Tue, 17 Mar 2026 00:12:00 -0400 Subject: [PATCH] feat: add team branding CRUD endpoints with tests Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/api/endpoints/branding.py | 130 ++++++++++++++ backend/app/api/router.py | 2 + backend/tests/test_branding.py | 249 ++++++++++++++++++++++++++ 3 files changed, 381 insertions(+) create mode 100644 backend/app/api/endpoints/branding.py create mode 100644 backend/tests/test_branding.py diff --git a/backend/app/api/endpoints/branding.py b/backend/app/api/endpoints/branding.py new file mode 100644 index 00000000..7e22b2f7 --- /dev/null +++ b/backend/app/api/endpoints/branding.py @@ -0,0 +1,130 @@ +"""Team branding endpoints — logo upload and company display name.""" + +import base64 +from typing import Annotated, Optional +from uuid import UUID + +from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.deps import get_current_active_user, get_db +from app.models.team import Team +from app.models.user import User +from app.schemas.branding import BrandingResponse + +router = APIRouter(prefix="/teams", tags=["branding"]) + +ALLOWED_CONTENT_TYPES = {"image/png", "image/jpeg", "image/svg+xml"} +MAX_LOGO_SIZE = 2 * 1024 * 1024 # 2MB + + +def _require_team_member(user: User, team_id: UUID) -> None: + """Ensure the user belongs to the given team (or is super admin).""" + if user.is_super_admin: + return + if user.team_id != team_id: + raise HTTPException(status_code=403, detail="Not a member of this team") + + +def _require_team_admin(user: User, team_id: UUID) -> None: + """Ensure the user is a team admin for the given team (or is super admin).""" + if user.is_super_admin: + return + if not user.is_team_admin or user.team_id != team_id: + raise HTTPException(status_code=403, detail="Team admin required") + + +@router.get("/{team_id}/branding", response_model=BrandingResponse) +async def get_branding( + team_id: UUID, + db: Annotated[AsyncSession, Depends(get_db)], + current_user: Annotated[User, Depends(get_current_active_user)], +) -> BrandingResponse: + """Retrieve branding info for a team. Any team member can read.""" + _require_team_member(current_user, team_id) + + result = await db.execute(select(Team).where(Team.id == team_id)) + team = result.scalar_one_or_none() + if not team: + raise HTTPException(status_code=404, detail="Team not found") + + return BrandingResponse( + company_display_name=team.company_display_name, + logo_content_type=team.logo_content_type, + has_logo=team.logo_data is not None, + ) + + +@router.patch("/{team_id}/branding", response_model=BrandingResponse) +async def update_branding( + team_id: UUID, + db: Annotated[AsyncSession, Depends(get_db)], + current_user: Annotated[User, Depends(get_current_active_user)], + logo: Annotated[Optional[UploadFile], File()] = None, + company_display_name: Annotated[Optional[str], Form()] = None, +) -> BrandingResponse: + """Upload logo and/or update company display name. Team admin only.""" + _require_team_admin(current_user, team_id) + + result = await db.execute(select(Team).where(Team.id == team_id)) + team = result.scalar_one_or_none() + if not team: + raise HTTPException(status_code=404, detail="Team not found") + + # Handle logo upload + if logo is not None: + if logo.content_type not in ALLOWED_CONTENT_TYPES: + raise HTTPException( + status_code=400, + detail=f"Invalid content type '{logo.content_type}'. Allowed: {', '.join(sorted(ALLOWED_CONTENT_TYPES))}", + ) + + raw_bytes = await logo.read() + if len(raw_bytes) > MAX_LOGO_SIZE: + raise HTTPException( + status_code=400, + detail=f"Logo exceeds maximum size of {MAX_LOGO_SIZE // (1024 * 1024)}MB", + ) + + team.logo_data = base64.b64encode(raw_bytes).decode("utf-8") + team.logo_content_type = logo.content_type + + # Handle display name update + if company_display_name is not None: + team.company_display_name = company_display_name + + await db.commit() + await db.refresh(team) + + return BrandingResponse( + company_display_name=team.company_display_name, + logo_content_type=team.logo_content_type, + has_logo=team.logo_data is not None, + ) + + +@router.delete("/{team_id}/branding/logo", status_code=200, response_model=BrandingResponse) +async def delete_logo( + team_id: UUID, + db: Annotated[AsyncSession, Depends(get_db)], + current_user: Annotated[User, Depends(get_current_active_user)], +) -> BrandingResponse: + """Remove the team logo. Team admin only.""" + _require_team_admin(current_user, team_id) + + result = await db.execute(select(Team).where(Team.id == team_id)) + team = result.scalar_one_or_none() + if not team: + raise HTTPException(status_code=404, detail="Team not found") + + team.logo_data = None + team.logo_content_type = None + await db.commit() + await db.refresh(team) + + return BrandingResponse( + company_display_name=team.company_display_name, + logo_content_type=team.logo_content_type, + has_logo=False, + ) diff --git a/backend/app/api/router.py b/backend/app/api/router.py index c8d96850..c5ee9ddd 100644 --- a/backend/app/api/router.py +++ b/backend/app/api/router.py @@ -19,6 +19,7 @@ from app.api.endpoints import beta_signup from app.api.endpoints import scripts from app.api.endpoints import integrations from app.api.endpoints import onboarding +from app.api.endpoints import branding api_router = APIRouter() @@ -63,3 +64,4 @@ api_router.include_router(beta_signup.router) api_router.include_router(scripts.router) api_router.include_router(integrations.router) api_router.include_router(onboarding.router) +api_router.include_router(branding.router) diff --git a/backend/tests/test_branding.py b/backend/tests/test_branding.py new file mode 100644 index 00000000..c94c696a --- /dev/null +++ b/backend/tests/test_branding.py @@ -0,0 +1,249 @@ +"""Tests for team branding endpoints (logo upload + company display name).""" + +import uuid + +import pytest +from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select + +from app.core.security import get_password_hash +from app.models.team import Team +from app.models.user import User + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +async def _create_team_with_admin( + test_db: AsyncSession, + client: AsyncClient, + *, + team_name: str = "Branding Test Team", +) -> tuple[dict, str, Team]: + """Create a team + team admin user. Returns (auth_headers, team_id_str, team).""" + team = Team(name=team_name) + test_db.add(team) + await test_db.flush() + + email = f"admin_{uuid.uuid4().hex[:8]}@test.com" + user = User( + email=email, + password_hash=get_password_hash("Password123!"), + name="Team Admin", + is_active=True, + team_id=team.id, + is_team_admin=True, + role="engineer", + ) + test_db.add(user) + await test_db.commit() + + resp = await client.post( + "/api/v1/auth/login/json", + json={"email": email, "password": "Password123!"}, + ) + assert resp.status_code == 200 + token = resp.json()["access_token"] + headers = {"Authorization": f"Bearer {token}"} + return headers, str(team.id), team + + +async def _create_team_member( + test_db: AsyncSession, + client: AsyncClient, + team: Team, + *, + is_team_admin: bool = False, +) -> dict: + """Create a regular team member. Returns auth_headers.""" + email = f"member_{uuid.uuid4().hex[:8]}@test.com" + user = User( + email=email, + password_hash=get_password_hash("Password123!"), + name="Team Member", + is_active=True, + team_id=team.id, + is_team_admin=is_team_admin, + role="engineer", + ) + test_db.add(user) + await test_db.commit() + + resp = await client.post( + "/api/v1/auth/login/json", + json={"email": email, "password": "Password123!"}, + ) + assert resp.status_code == 200 + token = resp.json()["access_token"] + return {"Authorization": f"Bearer {token}"} + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_get_branding_defaults(client: AsyncClient, test_db: AsyncSession): + """GET branding with no logo returns defaults (has_logo=False).""" + headers, team_id, _ = await _create_team_with_admin(test_db, client) + + resp = await client.get(f"/api/v1/teams/{team_id}/branding", headers=headers) + assert resp.status_code == 200 + data = resp.json() + assert data["has_logo"] is False + assert data["company_display_name"] is None + assert data["logo_content_type"] is None + + +@pytest.mark.asyncio +async def test_upload_logo_with_company_name(client: AsyncClient, test_db: AsyncSession): + """PATCH with valid PNG logo + company name succeeds.""" + headers, team_id, _ = await _create_team_with_admin(test_db, client) + + # 1x1 transparent PNG (67 bytes) + png_bytes = ( + b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01" + b"\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89" + b"\x00\x00\x00\nIDATx\x9cc\x00\x01\x00\x00\x05\x00\x01" + b"\r\n\xb4\x00\x00\x00\x00IEND\xaeB`\x82" + ) + + resp = await client.patch( + f"/api/v1/teams/{team_id}/branding", + headers=headers, + files={"logo": ("logo.png", png_bytes, "image/png")}, + data={"company_display_name": "Acme MSP"}, + ) + assert resp.status_code == 200, resp.text + data = resp.json() + assert data["has_logo"] is True + assert data["logo_content_type"] == "image/png" + assert data["company_display_name"] == "Acme MSP" + + +@pytest.mark.asyncio +async def test_upload_oversized_logo(client: AsyncClient, test_db: AsyncSession): + """PATCH with >2MB file returns 400.""" + headers, team_id, _ = await _create_team_with_admin(test_db, client) + + big_bytes = b"\x00" * (2 * 1024 * 1024 + 1) # 2MB + 1 byte + + resp = await client.patch( + f"/api/v1/teams/{team_id}/branding", + headers=headers, + files={"logo": ("big.png", big_bytes, "image/png")}, + ) + assert resp.status_code == 400 + assert "maximum size" in resp.json()["detail"].lower() + + +@pytest.mark.asyncio +async def test_upload_invalid_content_type(client: AsyncClient, test_db: AsyncSession): + """PATCH with application/pdf content type returns 400.""" + headers, team_id, _ = await _create_team_with_admin(test_db, client) + + resp = await client.patch( + f"/api/v1/teams/{team_id}/branding", + headers=headers, + files={"logo": ("doc.pdf", b"%PDF-fake", "application/pdf")}, + ) + assert resp.status_code == 400 + assert "content type" in resp.json()["detail"].lower() + + +@pytest.mark.asyncio +async def test_delete_logo(client: AsyncClient, test_db: AsyncSession): + """DELETE logo clears logo_data while keeping company_display_name.""" + headers, team_id, _ = await _create_team_with_admin(test_db, client) + + # Upload a logo + name first + png_bytes = b"\x89PNG\r\n\x1a\n" + b"\x00" * 50 + await client.patch( + f"/api/v1/teams/{team_id}/branding", + headers=headers, + files={"logo": ("logo.png", png_bytes, "image/png")}, + data={"company_display_name": "Keep This Name"}, + ) + + # Delete logo + resp = await client.delete( + f"/api/v1/teams/{team_id}/branding/logo", + headers=headers, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["has_logo"] is False + assert data["logo_content_type"] is None + assert data["company_display_name"] == "Keep This Name" + + +@pytest.mark.asyncio +async def test_non_admin_cannot_update(client: AsyncClient, test_db: AsyncSession): + """Regular team member (non-admin) cannot PATCH branding — returns 403.""" + admin_headers, team_id, team = await _create_team_with_admin(test_db, client) + member_headers = await _create_team_member(test_db, client, team) + + resp = await client.patch( + f"/api/v1/teams/{team_id}/branding", + headers=member_headers, + data={"company_display_name": "Should Fail"}, + ) + assert resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_non_admin_cannot_delete_logo(client: AsyncClient, test_db: AsyncSession): + """Regular team member cannot DELETE logo — returns 403.""" + admin_headers, team_id, team = await _create_team_with_admin(test_db, client) + member_headers = await _create_team_member(test_db, client, team) + + resp = await client.delete( + f"/api/v1/teams/{team_id}/branding/logo", + headers=member_headers, + ) + assert resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_non_member_cannot_read(client: AsyncClient, test_db: AsyncSession): + """User from a different team cannot GET branding — returns 403.""" + _, team_id, _ = await _create_team_with_admin(test_db, client, team_name="Team A") + other_headers, _, _ = await _create_team_with_admin(test_db, client, team_name="Team B") + + resp = await client.get( + f"/api/v1/teams/{team_id}/branding", + headers=other_headers, + ) + assert resp.status_code == 403 + + +@pytest.mark.asyncio +async def test_member_can_read_branding(client: AsyncClient, test_db: AsyncSession): + """Regular team member CAN read branding.""" + admin_headers, team_id, team = await _create_team_with_admin(test_db, client) + member_headers = await _create_team_member(test_db, client, team) + + resp = await client.get( + f"/api/v1/teams/{team_id}/branding", + headers=member_headers, + ) + assert resp.status_code == 200 + assert resp.json()["has_logo"] is False + + +@pytest.mark.asyncio +async def test_update_display_name_only(client: AsyncClient, test_db: AsyncSession): + """PATCH with only company_display_name (no logo) succeeds.""" + headers, team_id, _ = await _create_team_with_admin(test_db, client) + + resp = await client.patch( + f"/api/v1/teams/{team_id}/branding", + headers=headers, + data={"company_display_name": "Just A Name"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["company_display_name"] == "Just A Name" + assert data["has_logo"] is False