Files
resolutionflow/backend/app/api/endpoints/notifications.py
chihlasm 0f750e63e0 feat(notifications): add Phase 4 Slice 2 — multi-channel notification system
Full notification infrastructure with in-app, email, Slack, and Teams channels:

Backend:
- NotificationConfig, NotificationLog, Notification models + migration
- Notification service with event routing, channel delivery, retry logic
- 9 API endpoints (config CRUD + in-app notifications)
- APScheduler retry job with exponential backoff (30s, 2m, 10m)
- Wired into escalation, proposal approval, and knowledge flywheel
- Pydantic event key validation, cross-tenant protection on recipients

Frontend:
- TypeScript types + API client for all notification endpoints
- NotificationsPanel: bell icon with unread badge, dropdown, mark-read
- NotificationSettings: channel config, event toggles, test, delete confirm
- Notifications tab on IntegrationsPage
- ARIA attributes, Escape handler, settings link on panel

Review fixes (13 issues resolved):
- notify() no longer commits/rolls back caller's transaction (critical)
- retry_failed_notifications returns count instead of None (critical)
- NotificationSettings moved inside dedicated tab (critical)
- target_user_ids scoped by account_id (security)
- Email loop collects all failures before raising
- Slack webhook validates response body
- events_enabled rejects unknown event keys
- link column widened to String(500)
- Dead code removed from _auto_reinforce
- Delete confirmation, ARIA, Escape key support

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 12:37:54 +00:00

256 lines
8.9 KiB
Python

"""Notification endpoints — config CRUD + in-app notification management.
Config CRUD (team_admin):
GET /notifications/configs — List configs for account
POST /notifications/configs — Create config
PATCH /notifications/configs/{id} — Update config
DELETE /notifications/configs/{id} — Delete config
POST /notifications/configs/test — Test a config
In-app notifications (any authenticated user):
GET /notifications — List notifications (paginated)
GET /notifications/unread-count — Unread count
PATCH /notifications/{id}/read — Mark one as read
POST /notifications/mark-all-read — Mark all as read
"""
import logging
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import select, func, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.rate_limit import limiter
from app.api.deps import get_current_active_user, require_team_admin
from app.core.database import get_db
from app.models.user import User
from app.models.notification_config import NotificationConfig
from app.models.notification import Notification
from app.schemas.notification import (
NotificationConfigCreate,
NotificationConfigUpdate,
NotificationConfigResponse,
NotificationResponse,
UnreadCountResponse,
NotificationTestRequest,
NotificationTestResponse,
)
from app.services.notification_service import send_test_notification
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/notifications", tags=["notifications"])
# ---------------------------------------------------------------------------
# Config CRUD (team_admin required)
# ---------------------------------------------------------------------------
@router.get("/configs", response_model=list[NotificationConfigResponse])
@limiter.limit("30/minute")
async def list_configs(
request: Request,
current_user: Annotated[User, Depends(require_team_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""List all notification configs for the current account."""
result = await db.execute(
select(NotificationConfig)
.where(NotificationConfig.account_id == current_user.account_id)
.order_by(NotificationConfig.created_at.desc())
)
return result.scalars().all()
@router.post("/configs", response_model=NotificationConfigResponse, status_code=status.HTTP_201_CREATED)
@limiter.limit("10/minute")
async def create_config(
request: Request,
body: NotificationConfigCreate,
current_user: Annotated[User, Depends(require_team_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Create a new notification config."""
# Validate channel-specific requirements
if body.channel in ("slack_webhook", "teams_webhook") and not body.webhook_url:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"webhook_url is required for {body.channel} channel",
)
if body.channel == "email" and not body.email_addresses:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="email_addresses is required for email channel",
)
config = NotificationConfig(
account_id=current_user.account_id,
channel=body.channel,
webhook_url=body.webhook_url,
email_addresses=body.email_addresses,
events_enabled=body.events_enabled,
)
db.add(config)
await db.commit()
await db.refresh(config)
return config
@router.patch("/configs/{config_id}", response_model=NotificationConfigResponse)
@limiter.limit("20/minute")
async def update_config(
request: Request,
config_id: UUID,
body: NotificationConfigUpdate,
current_user: Annotated[User, Depends(require_team_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Update an existing notification config."""
result = await db.execute(
select(NotificationConfig)
.where(NotificationConfig.id == config_id)
.where(NotificationConfig.account_id == current_user.account_id)
)
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Config not found")
update_data = body.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(config, field, value)
await db.commit()
await db.refresh(config)
return config
@router.delete("/configs/{config_id}", status_code=status.HTTP_204_NO_CONTENT)
@limiter.limit("10/minute")
async def delete_config(
request: Request,
config_id: UUID,
current_user: Annotated[User, Depends(require_team_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Delete a notification config."""
result = await db.execute(
select(NotificationConfig)
.where(NotificationConfig.id == config_id)
.where(NotificationConfig.account_id == current_user.account_id)
)
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Config not found")
await db.delete(config)
await db.commit()
@router.post("/configs/test", response_model=NotificationTestResponse)
@limiter.limit("5/minute")
async def test_config(
request: Request,
body: NotificationTestRequest,
current_user: Annotated[User, Depends(require_team_admin)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Send a test notification through a config."""
result = await db.execute(
select(NotificationConfig)
.where(NotificationConfig.id == body.config_id)
.where(NotificationConfig.account_id == current_user.account_id)
)
config = result.scalar_one_or_none()
if not config:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Config not found")
success, message = await send_test_notification(config)
return NotificationTestResponse(success=success, message=message)
# ---------------------------------------------------------------------------
# In-app notifications (any authenticated user)
# ---------------------------------------------------------------------------
@router.get("", response_model=list[NotificationResponse])
@limiter.limit("60/minute")
async def list_notifications(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100),
):
"""List notifications for the current user, unread first."""
result = await db.execute(
select(Notification)
.where(Notification.user_id == current_user.id)
.order_by(Notification.is_read.asc(), Notification.created_at.desc())
.offset(skip)
.limit(limit)
)
return result.scalars().all()
@router.get("/unread-count", response_model=UnreadCountResponse)
@limiter.limit("120/minute")
async def unread_count(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Get count of unread notifications for the current user."""
result = await db.execute(
select(func.count())
.select_from(Notification)
.where(Notification.user_id == current_user.id)
.where(Notification.is_read.is_(False))
)
count = result.scalar_one()
return UnreadCountResponse(count=count)
@router.patch("/{notification_id}/read", response_model=NotificationResponse)
@limiter.limit("60/minute")
async def mark_read(
request: Request,
notification_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Mark a single notification as read."""
result = await db.execute(
select(Notification)
.where(Notification.id == notification_id)
.where(Notification.user_id == current_user.id)
)
notification = result.scalar_one_or_none()
if not notification:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Notification not found")
notification.is_read = True
await db.commit()
await db.refresh(notification)
return notification
@router.post("/mark-all-read", response_model=UnreadCountResponse)
@limiter.limit("10/minute")
async def mark_all_read(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Mark all notifications as read for the current user."""
await db.execute(
update(Notification)
.where(Notification.user_id == current_user.id)
.where(Notification.is_read.is_(False))
.values(is_read=True)
)
await db.commit()
return UnreadCountResponse(count=0)