feat(notifications): add Phase 4 Slice 2 — multi-channel notification system
Full notification infrastructure with in-app, email, Slack, and Teams channels: Backend: - NotificationConfig, NotificationLog, Notification models + migration - Notification service with event routing, channel delivery, retry logic - 9 API endpoints (config CRUD + in-app notifications) - APScheduler retry job with exponential backoff (30s, 2m, 10m) - Wired into escalation, proposal approval, and knowledge flywheel - Pydantic event key validation, cross-tenant protection on recipients Frontend: - TypeScript types + API client for all notification endpoints - NotificationsPanel: bell icon with unread badge, dropdown, mark-read - NotificationSettings: channel config, event toggles, test, delete confirm - Notifications tab on IntegrationsPage - ARIA attributes, Escape handler, settings link on panel Review fixes (13 issues resolved): - notify() no longer commits/rolls back caller's transaction (critical) - retry_failed_notifications returns count instead of None (critical) - NotificationSettings moved inside dedicated tab (critical) - target_user_ids scoped by account_id (security) - Email loop collects all failures before raising - Slack webhook validates response body - events_enabled rejects unknown event keys - link column widened to String(500) - Dead code removed from _auto_reinforce - Delete confirmation, ARIA, Escape key support Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
420
backend/app/services/notification_service.py
Normal file
420
backend/app/services/notification_service.py
Normal file
@@ -0,0 +1,420 @@
|
||||
"""Notification service — dispatches in-app + external notifications.
|
||||
|
||||
Entry point: `notify(event, account_id, payload, db)`.
|
||||
Retry engine: `retry_failed_notifications(db)` called by APScheduler.
|
||||
"""
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any, Optional
|
||||
|
||||
import httpx
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.email import EmailService
|
||||
from app.models.notification import Notification
|
||||
from app.models.notification_config import NotificationConfig
|
||||
from app.models.notification_log import NotificationLog
|
||||
from app.models.user import User
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Exponential backoff schedule (seconds): 30s, 2m, 10m
|
||||
_RETRY_DELAYS = [30, 120, 600]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def notify(
|
||||
event: str,
|
||||
account_id: uuid.UUID,
|
||||
payload: dict[str, Any],
|
||||
db: AsyncSession,
|
||||
target_user_ids: Optional[list[uuid.UUID]] = None,
|
||||
) -> None:
|
||||
"""Main entry point — create in-app notifications + route to external channels.
|
||||
|
||||
IMPORTANT: This function does NOT commit or rollback. The caller owns the transaction.
|
||||
In-app notifications are added to the session (flushed, not committed).
|
||||
External channel delivery is fire-and-forget — failures are logged, not raised.
|
||||
"""
|
||||
try:
|
||||
recipients = await _resolve_recipients(account_id, target_user_ids, db)
|
||||
|
||||
title = _build_notification_title(event, payload)
|
||||
body = _build_notification_body(event, payload)
|
||||
link = _build_notification_link(event, payload)
|
||||
|
||||
# Create in-app notification for each recipient
|
||||
for user in recipients:
|
||||
notification = Notification(
|
||||
account_id=account_id,
|
||||
user_id=user.id,
|
||||
event=event,
|
||||
title=title,
|
||||
body=body,
|
||||
link=link,
|
||||
)
|
||||
db.add(notification)
|
||||
|
||||
await db.flush()
|
||||
|
||||
# Route to active external channels (fire-and-forget per channel)
|
||||
configs = await _get_active_configs(account_id, event, db)
|
||||
for config in configs:
|
||||
try:
|
||||
await _deliver_to_channel(config, event, payload, db)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"External delivery failed for config=%s event=%s", config.id, event
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to process notification event=%s account=%s", event, account_id)
|
||||
|
||||
|
||||
async def retry_failed_notifications(db: AsyncSession) -> int:
|
||||
"""Retry failed notification deliveries. Called by APScheduler."""
|
||||
now = datetime.now(timezone.utc)
|
||||
result = await db.execute(
|
||||
select(NotificationLog)
|
||||
.where(NotificationLog.status == "retrying")
|
||||
.where(NotificationLog.next_retry_at <= now)
|
||||
)
|
||||
logs = result.scalars().all()
|
||||
|
||||
if not logs:
|
||||
return 0
|
||||
|
||||
logger.info("Retrying %d failed notification deliveries", len(logs))
|
||||
|
||||
for log in logs:
|
||||
# Load the config for this log entry
|
||||
config_result = await db.execute(
|
||||
select(NotificationConfig).where(NotificationConfig.id == log.notification_config_id)
|
||||
)
|
||||
config = config_result.scalar_one_or_none()
|
||||
if not config or not config.is_active:
|
||||
log.status = "exhausted"
|
||||
log.last_error = "Config disabled or deleted"
|
||||
continue
|
||||
|
||||
try:
|
||||
await _attempt_delivery(config, log.event, log.payload)
|
||||
log.status = "sent"
|
||||
log.delivered_at = datetime.now(timezone.utc)
|
||||
log.last_error = None
|
||||
logger.info("Retry succeeded for log=%s event=%s", log.id, log.event)
|
||||
except Exception as exc:
|
||||
log.retry_count += 1
|
||||
log.last_error = str(exc)[:1000]
|
||||
|
||||
if log.retry_count >= log.max_retries:
|
||||
log.status = "exhausted"
|
||||
logger.warning(
|
||||
"Notification exhausted after %d retries: log=%s event=%s",
|
||||
log.retry_count, log.id, log.event,
|
||||
)
|
||||
else:
|
||||
delay = _RETRY_DELAYS[min(log.retry_count, len(_RETRY_DELAYS) - 1)]
|
||||
log.next_retry_at = datetime.now(timezone.utc) + timedelta(seconds=delay)
|
||||
logger.info(
|
||||
"Notification retry %d/%d scheduled in %ds: log=%s",
|
||||
log.retry_count, log.max_retries, delay, log.id,
|
||||
)
|
||||
|
||||
await db.commit()
|
||||
return len(logs)
|
||||
|
||||
|
||||
async def send_test_notification(
|
||||
config: NotificationConfig,
|
||||
) -> tuple[bool, str]:
|
||||
"""Send a test message through a channel config. Returns (success, message)."""
|
||||
event = "test"
|
||||
payload: dict[str, Any] = {}
|
||||
try:
|
||||
await _attempt_delivery(config, event, payload)
|
||||
return True, "Test notification sent successfully"
|
||||
except Exception as exc:
|
||||
return False, f"Delivery failed: {exc}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _get_active_configs(
|
||||
account_id: uuid.UUID,
|
||||
event: str,
|
||||
db: AsyncSession,
|
||||
) -> list[NotificationConfig]:
|
||||
"""Get configs where channel is active and event is enabled."""
|
||||
result = await db.execute(
|
||||
select(NotificationConfig)
|
||||
.where(NotificationConfig.account_id == account_id)
|
||||
.where(NotificationConfig.is_active.is_(True))
|
||||
)
|
||||
configs = result.scalars().all()
|
||||
# Filter to configs where this event is enabled
|
||||
return [
|
||||
c for c in configs
|
||||
if c.events_enabled and c.events_enabled.get(event, False)
|
||||
]
|
||||
|
||||
|
||||
async def _resolve_recipients(
|
||||
account_id: uuid.UUID,
|
||||
target_user_ids: Optional[list[uuid.UUID]],
|
||||
db: AsyncSession,
|
||||
) -> list[User]:
|
||||
"""Resolve notification recipients. Defaults to team admins + account owners + admins."""
|
||||
if target_user_ids:
|
||||
result = await db.execute(
|
||||
select(User)
|
||||
.where(User.id.in_(target_user_ids))
|
||||
.where(User.account_id == account_id) # enforce tenant boundary
|
||||
.where(User.is_active.is_(True))
|
||||
)
|
||||
return list(result.scalars().all())
|
||||
|
||||
# Default: account owners, admins, and team admins
|
||||
result = await db.execute(
|
||||
select(User)
|
||||
.where(User.account_id == account_id)
|
||||
.where(User.is_active.is_(True))
|
||||
)
|
||||
users = result.scalars().all()
|
||||
return [
|
||||
u for u in users
|
||||
if u.account_role in ("owner", "admin") or u.is_team_admin
|
||||
]
|
||||
|
||||
|
||||
async def _deliver_to_channel(
|
||||
config: NotificationConfig,
|
||||
event: str,
|
||||
payload: dict[str, Any],
|
||||
db: AsyncSession,
|
||||
) -> None:
|
||||
"""Attempt delivery and create a NotificationLog entry."""
|
||||
log = NotificationLog(
|
||||
notification_config_id=config.id,
|
||||
event=event,
|
||||
payload=payload,
|
||||
)
|
||||
|
||||
try:
|
||||
await _attempt_delivery(config, event, payload)
|
||||
log.status = "sent"
|
||||
log.delivered_at = datetime.now(timezone.utc)
|
||||
except Exception as exc:
|
||||
log.status = "retrying"
|
||||
log.retry_count = 0
|
||||
log.last_error = str(exc)[:1000]
|
||||
log.next_retry_at = datetime.now(timezone.utc) + timedelta(seconds=_RETRY_DELAYS[0])
|
||||
logger.warning(
|
||||
"Notification delivery failed (will retry): config=%s event=%s error=%s",
|
||||
config.id, event, exc,
|
||||
)
|
||||
|
||||
db.add(log)
|
||||
|
||||
|
||||
async def _attempt_delivery(
|
||||
config: NotificationConfig,
|
||||
event: str,
|
||||
payload: dict[str, Any],
|
||||
) -> None:
|
||||
"""Dispatch to the appropriate channel. Raises on failure."""
|
||||
if config.channel == "email":
|
||||
await _send_email(config, event, payload)
|
||||
elif config.channel == "slack_webhook":
|
||||
if not config.webhook_url:
|
||||
raise ValueError("Slack webhook URL not configured")
|
||||
await _send_slack_message(config.webhook_url, event, payload)
|
||||
elif config.channel == "teams_webhook":
|
||||
if not config.webhook_url:
|
||||
raise ValueError("Teams webhook URL not configured")
|
||||
await _send_teams_message(config.webhook_url, event, payload)
|
||||
else:
|
||||
raise ValueError(f"Unknown channel: {config.channel}")
|
||||
|
||||
|
||||
async def _send_email(
|
||||
config: NotificationConfig,
|
||||
event: str,
|
||||
payload: dict[str, Any],
|
||||
) -> None:
|
||||
"""Send notification via email using EmailService."""
|
||||
title = _build_notification_title(event, payload)
|
||||
body = _build_notification_body(event, payload)
|
||||
link = _build_notification_link(event, payload)
|
||||
|
||||
full_link = None
|
||||
if link and settings.FRONTEND_URL:
|
||||
full_link = f"{settings.FRONTEND_URL.rstrip('/')}{link}"
|
||||
|
||||
recipients = config.email_addresses or []
|
||||
if not recipients:
|
||||
raise ValueError("No email addresses configured for email channel")
|
||||
|
||||
failures = []
|
||||
for email in recipients:
|
||||
success = await EmailService.send_notification_email(
|
||||
to_email=email,
|
||||
title=title,
|
||||
body=body,
|
||||
link_url=full_link,
|
||||
)
|
||||
if not success:
|
||||
failures.append(email)
|
||||
if failures:
|
||||
raise RuntimeError(f"Failed to send notification email to: {', '.join(failures)}")
|
||||
|
||||
|
||||
async def _send_slack_message(
|
||||
webhook_url: str,
|
||||
event: str,
|
||||
payload: dict[str, Any],
|
||||
) -> None:
|
||||
"""POST notification to Slack incoming webhook."""
|
||||
title = _build_notification_title(event, payload)
|
||||
body = _build_notification_body(event, payload)
|
||||
link = _build_notification_link(event, payload)
|
||||
|
||||
blocks: list[dict[str, Any]] = [
|
||||
{
|
||||
"type": "header",
|
||||
"text": {"type": "plain_text", "text": f"\U0001f514 {title}", "emoji": True},
|
||||
},
|
||||
{
|
||||
"type": "section",
|
||||
"text": {"type": "mrkdwn", "text": body},
|
||||
},
|
||||
]
|
||||
|
||||
if link and settings.FRONTEND_URL:
|
||||
full_url = f"{settings.FRONTEND_URL.rstrip('/')}{link}"
|
||||
blocks.append({
|
||||
"type": "actions",
|
||||
"elements": [{
|
||||
"type": "button",
|
||||
"text": {"type": "plain_text", "text": "Open in ResolutionFlow", "emoji": True},
|
||||
"url": full_url,
|
||||
}],
|
||||
})
|
||||
|
||||
slack_payload = {"blocks": blocks}
|
||||
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.post(webhook_url, json=slack_payload)
|
||||
if resp.status_code != 200 or resp.text.strip() != "ok":
|
||||
raise RuntimeError(
|
||||
f"Slack webhook failed (status={resp.status_code}): {resp.text[:200]}"
|
||||
)
|
||||
|
||||
|
||||
async def _send_teams_message(
|
||||
webhook_url: str,
|
||||
event: str,
|
||||
payload: dict[str, Any],
|
||||
) -> None:
|
||||
"""POST notification to Microsoft Teams incoming webhook (Adaptive Card)."""
|
||||
title = _build_notification_title(event, payload)
|
||||
body = _build_notification_body(event, payload)
|
||||
link = _build_notification_link(event, payload)
|
||||
|
||||
card_body: list[dict[str, Any]] = [
|
||||
{"type": "TextBlock", "text": title, "weight": "Bolder", "size": "Medium"},
|
||||
{"type": "TextBlock", "text": body, "wrap": True},
|
||||
]
|
||||
|
||||
actions: list[dict[str, Any]] = []
|
||||
if link and settings.FRONTEND_URL:
|
||||
full_url = f"{settings.FRONTEND_URL.rstrip('/')}{link}"
|
||||
actions.append({
|
||||
"type": "Action.OpenUrl",
|
||||
"title": "Open in ResolutionFlow",
|
||||
"url": full_url,
|
||||
})
|
||||
|
||||
teams_payload = {
|
||||
"type": "message",
|
||||
"attachments": [{
|
||||
"contentType": "application/vnd.microsoft.card.adaptive",
|
||||
"content": {
|
||||
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
|
||||
"type": "AdaptiveCard",
|
||||
"version": "1.4",
|
||||
"body": card_body,
|
||||
"actions": actions,
|
||||
},
|
||||
}],
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.post(webhook_url, json=teams_payload)
|
||||
if resp.status_code not in (200, 202):
|
||||
raise RuntimeError(
|
||||
f"Teams webhook returned {resp.status_code}: {resp.text[:200]}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Content builders
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _build_notification_title(event: str, payload: dict[str, Any]) -> str:
|
||||
"""Human-readable title per event type."""
|
||||
titles = {
|
||||
"session.escalated": "Session escalated by {engineer_name}",
|
||||
"session.high_priority": "High-priority session started: {ticket_number}",
|
||||
"proposal.pending": "New flow proposal: {title}",
|
||||
"proposal.approved": "Flow proposal approved: {title}",
|
||||
"knowledge_gap.detected": "Knowledge gap detected: {gap_type}",
|
||||
"test": "Test Notification from ResolutionFlow",
|
||||
}
|
||||
template = titles.get(event, f"Notification: {event}")
|
||||
try:
|
||||
return template.format(**payload)
|
||||
except KeyError:
|
||||
return template
|
||||
|
||||
|
||||
def _build_notification_body(event: str, payload: dict[str, Any]) -> str:
|
||||
"""Body text per event type."""
|
||||
bodies = {
|
||||
"session.escalated": "Engineer {engineer_name} has escalated a FlowPilot session and needs assistance.",
|
||||
"session.high_priority": "A new high-priority troubleshooting session has been started for ticket {ticket_number}.",
|
||||
"proposal.pending": "A new flow proposal \"{title}\" is awaiting review in the review queue.",
|
||||
"proposal.approved": "The flow proposal \"{title}\" has been approved and is ready for use.",
|
||||
"knowledge_gap.detected": "A {gap_type} knowledge gap has been identified. Review recommended.",
|
||||
"test": "This is a test notification to verify your notification channel is working correctly.",
|
||||
}
|
||||
template = bodies.get(event, f"Event: {event}")
|
||||
try:
|
||||
return template.format(**payload)
|
||||
except KeyError:
|
||||
return template
|
||||
|
||||
|
||||
def _build_notification_link(event: str, payload: dict[str, Any]) -> Optional[str]:
|
||||
"""In-app link per event type. Returns path (no host)."""
|
||||
links: dict[str, str] = {
|
||||
"session.escalated": "/pilot/{session_id}",
|
||||
"session.high_priority": "/pilot/{session_id}",
|
||||
"proposal.pending": "/review-queue",
|
||||
"proposal.approved": "/review-queue",
|
||||
"knowledge_gap.detected": "/analytics/flowpilot",
|
||||
}
|
||||
template = links.get(event)
|
||||
if template is None:
|
||||
return None
|
||||
try:
|
||||
return template.format(**payload)
|
||||
except KeyError:
|
||||
return template
|
||||
Reference in New Issue
Block a user