Files
resolutionflow/backend/app/services/notification_service.py
chihlasm 0f750e63e0 feat(notifications): add Phase 4 Slice 2 — multi-channel notification system
Full notification infrastructure with in-app, email, Slack, and Teams channels:

Backend:
- NotificationConfig, NotificationLog, Notification models + migration
- Notification service with event routing, channel delivery, retry logic
- 9 API endpoints (config CRUD + in-app notifications)
- APScheduler retry job with exponential backoff (30s, 2m, 10m)
- Wired into escalation, proposal approval, and knowledge flywheel
- Pydantic event key validation, cross-tenant protection on recipients

Frontend:
- TypeScript types + API client for all notification endpoints
- NotificationsPanel: bell icon with unread badge, dropdown, mark-read
- NotificationSettings: channel config, event toggles, test, delete confirm
- Notifications tab on IntegrationsPage
- ARIA attributes, Escape handler, settings link on panel

Review fixes (13 issues resolved):
- notify() no longer commits/rolls back caller's transaction (critical)
- retry_failed_notifications returns count instead of None (critical)
- NotificationSettings moved inside dedicated tab (critical)
- target_user_ids scoped by account_id (security)
- Email loop collects all failures before raising
- Slack webhook validates response body
- events_enabled rejects unknown event keys
- link column widened to String(500)
- Dead code removed from _auto_reinforce
- Delete confirmation, ARIA, Escape key support

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 12:37:54 +00:00

421 lines
14 KiB
Python

"""Notification service — dispatches in-app + external notifications.
Entry point: `notify(event, account_id, payload, db)`.
Retry engine: `retry_failed_notifications(db)` called by APScheduler.
"""
import logging
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
import httpx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.email import EmailService
from app.models.notification import Notification
from app.models.notification_config import NotificationConfig
from app.models.notification_log import NotificationLog
from app.models.user import User
logger = logging.getLogger(__name__)
# Exponential backoff schedule (seconds): 30s, 2m, 10m
_RETRY_DELAYS = [30, 120, 600]
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def notify(
event: str,
account_id: uuid.UUID,
payload: dict[str, Any],
db: AsyncSession,
target_user_ids: Optional[list[uuid.UUID]] = None,
) -> None:
"""Main entry point — create in-app notifications + route to external channels.
IMPORTANT: This function does NOT commit or rollback. The caller owns the transaction.
In-app notifications are added to the session (flushed, not committed).
External channel delivery is fire-and-forget — failures are logged, not raised.
"""
try:
recipients = await _resolve_recipients(account_id, target_user_ids, db)
title = _build_notification_title(event, payload)
body = _build_notification_body(event, payload)
link = _build_notification_link(event, payload)
# Create in-app notification for each recipient
for user in recipients:
notification = Notification(
account_id=account_id,
user_id=user.id,
event=event,
title=title,
body=body,
link=link,
)
db.add(notification)
await db.flush()
# Route to active external channels (fire-and-forget per channel)
configs = await _get_active_configs(account_id, event, db)
for config in configs:
try:
await _deliver_to_channel(config, event, payload, db)
except Exception:
logger.exception(
"External delivery failed for config=%s event=%s", config.id, event
)
except Exception:
logger.exception("Failed to process notification event=%s account=%s", event, account_id)
async def retry_failed_notifications(db: AsyncSession) -> int:
"""Retry failed notification deliveries. Called by APScheduler."""
now = datetime.now(timezone.utc)
result = await db.execute(
select(NotificationLog)
.where(NotificationLog.status == "retrying")
.where(NotificationLog.next_retry_at <= now)
)
logs = result.scalars().all()
if not logs:
return 0
logger.info("Retrying %d failed notification deliveries", len(logs))
for log in logs:
# Load the config for this log entry
config_result = await db.execute(
select(NotificationConfig).where(NotificationConfig.id == log.notification_config_id)
)
config = config_result.scalar_one_or_none()
if not config or not config.is_active:
log.status = "exhausted"
log.last_error = "Config disabled or deleted"
continue
try:
await _attempt_delivery(config, log.event, log.payload)
log.status = "sent"
log.delivered_at = datetime.now(timezone.utc)
log.last_error = None
logger.info("Retry succeeded for log=%s event=%s", log.id, log.event)
except Exception as exc:
log.retry_count += 1
log.last_error = str(exc)[:1000]
if log.retry_count >= log.max_retries:
log.status = "exhausted"
logger.warning(
"Notification exhausted after %d retries: log=%s event=%s",
log.retry_count, log.id, log.event,
)
else:
delay = _RETRY_DELAYS[min(log.retry_count, len(_RETRY_DELAYS) - 1)]
log.next_retry_at = datetime.now(timezone.utc) + timedelta(seconds=delay)
logger.info(
"Notification retry %d/%d scheduled in %ds: log=%s",
log.retry_count, log.max_retries, delay, log.id,
)
await db.commit()
return len(logs)
async def send_test_notification(
config: NotificationConfig,
) -> tuple[bool, str]:
"""Send a test message through a channel config. Returns (success, message)."""
event = "test"
payload: dict[str, Any] = {}
try:
await _attempt_delivery(config, event, payload)
return True, "Test notification sent successfully"
except Exception as exc:
return False, f"Delivery failed: {exc}"
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
async def _get_active_configs(
account_id: uuid.UUID,
event: str,
db: AsyncSession,
) -> list[NotificationConfig]:
"""Get configs where channel is active and event is enabled."""
result = await db.execute(
select(NotificationConfig)
.where(NotificationConfig.account_id == account_id)
.where(NotificationConfig.is_active.is_(True))
)
configs = result.scalars().all()
# Filter to configs where this event is enabled
return [
c for c in configs
if c.events_enabled and c.events_enabled.get(event, False)
]
async def _resolve_recipients(
account_id: uuid.UUID,
target_user_ids: Optional[list[uuid.UUID]],
db: AsyncSession,
) -> list[User]:
"""Resolve notification recipients. Defaults to team admins + account owners + admins."""
if target_user_ids:
result = await db.execute(
select(User)
.where(User.id.in_(target_user_ids))
.where(User.account_id == account_id) # enforce tenant boundary
.where(User.is_active.is_(True))
)
return list(result.scalars().all())
# Default: account owners, admins, and team admins
result = await db.execute(
select(User)
.where(User.account_id == account_id)
.where(User.is_active.is_(True))
)
users = result.scalars().all()
return [
u for u in users
if u.account_role in ("owner", "admin") or u.is_team_admin
]
async def _deliver_to_channel(
config: NotificationConfig,
event: str,
payload: dict[str, Any],
db: AsyncSession,
) -> None:
"""Attempt delivery and create a NotificationLog entry."""
log = NotificationLog(
notification_config_id=config.id,
event=event,
payload=payload,
)
try:
await _attempt_delivery(config, event, payload)
log.status = "sent"
log.delivered_at = datetime.now(timezone.utc)
except Exception as exc:
log.status = "retrying"
log.retry_count = 0
log.last_error = str(exc)[:1000]
log.next_retry_at = datetime.now(timezone.utc) + timedelta(seconds=_RETRY_DELAYS[0])
logger.warning(
"Notification delivery failed (will retry): config=%s event=%s error=%s",
config.id, event, exc,
)
db.add(log)
async def _attempt_delivery(
config: NotificationConfig,
event: str,
payload: dict[str, Any],
) -> None:
"""Dispatch to the appropriate channel. Raises on failure."""
if config.channel == "email":
await _send_email(config, event, payload)
elif config.channel == "slack_webhook":
if not config.webhook_url:
raise ValueError("Slack webhook URL not configured")
await _send_slack_message(config.webhook_url, event, payload)
elif config.channel == "teams_webhook":
if not config.webhook_url:
raise ValueError("Teams webhook URL not configured")
await _send_teams_message(config.webhook_url, event, payload)
else:
raise ValueError(f"Unknown channel: {config.channel}")
async def _send_email(
config: NotificationConfig,
event: str,
payload: dict[str, Any],
) -> None:
"""Send notification via email using EmailService."""
title = _build_notification_title(event, payload)
body = _build_notification_body(event, payload)
link = _build_notification_link(event, payload)
full_link = None
if link and settings.FRONTEND_URL:
full_link = f"{settings.FRONTEND_URL.rstrip('/')}{link}"
recipients = config.email_addresses or []
if not recipients:
raise ValueError("No email addresses configured for email channel")
failures = []
for email in recipients:
success = await EmailService.send_notification_email(
to_email=email,
title=title,
body=body,
link_url=full_link,
)
if not success:
failures.append(email)
if failures:
raise RuntimeError(f"Failed to send notification email to: {', '.join(failures)}")
async def _send_slack_message(
webhook_url: str,
event: str,
payload: dict[str, Any],
) -> None:
"""POST notification to Slack incoming webhook."""
title = _build_notification_title(event, payload)
body = _build_notification_body(event, payload)
link = _build_notification_link(event, payload)
blocks: list[dict[str, Any]] = [
{
"type": "header",
"text": {"type": "plain_text", "text": f"\U0001f514 {title}", "emoji": True},
},
{
"type": "section",
"text": {"type": "mrkdwn", "text": body},
},
]
if link and settings.FRONTEND_URL:
full_url = f"{settings.FRONTEND_URL.rstrip('/')}{link}"
blocks.append({
"type": "actions",
"elements": [{
"type": "button",
"text": {"type": "plain_text", "text": "Open in ResolutionFlow", "emoji": True},
"url": full_url,
}],
})
slack_payload = {"blocks": blocks}
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(webhook_url, json=slack_payload)
if resp.status_code != 200 or resp.text.strip() != "ok":
raise RuntimeError(
f"Slack webhook failed (status={resp.status_code}): {resp.text[:200]}"
)
async def _send_teams_message(
webhook_url: str,
event: str,
payload: dict[str, Any],
) -> None:
"""POST notification to Microsoft Teams incoming webhook (Adaptive Card)."""
title = _build_notification_title(event, payload)
body = _build_notification_body(event, payload)
link = _build_notification_link(event, payload)
card_body: list[dict[str, Any]] = [
{"type": "TextBlock", "text": title, "weight": "Bolder", "size": "Medium"},
{"type": "TextBlock", "text": body, "wrap": True},
]
actions: list[dict[str, Any]] = []
if link and settings.FRONTEND_URL:
full_url = f"{settings.FRONTEND_URL.rstrip('/')}{link}"
actions.append({
"type": "Action.OpenUrl",
"title": "Open in ResolutionFlow",
"url": full_url,
})
teams_payload = {
"type": "message",
"attachments": [{
"contentType": "application/vnd.microsoft.card.adaptive",
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.4",
"body": card_body,
"actions": actions,
},
}],
}
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(webhook_url, json=teams_payload)
if resp.status_code not in (200, 202):
raise RuntimeError(
f"Teams webhook returned {resp.status_code}: {resp.text[:200]}"
)
# ---------------------------------------------------------------------------
# Content builders
# ---------------------------------------------------------------------------
def _build_notification_title(event: str, payload: dict[str, Any]) -> str:
"""Human-readable title per event type."""
titles = {
"session.escalated": "Session escalated by {engineer_name}",
"session.high_priority": "High-priority session started: {ticket_number}",
"proposal.pending": "New flow proposal: {title}",
"proposal.approved": "Flow proposal approved: {title}",
"knowledge_gap.detected": "Knowledge gap detected: {gap_type}",
"test": "Test Notification from ResolutionFlow",
}
template = titles.get(event, f"Notification: {event}")
try:
return template.format(**payload)
except KeyError:
return template
def _build_notification_body(event: str, payload: dict[str, Any]) -> str:
"""Body text per event type."""
bodies = {
"session.escalated": "Engineer {engineer_name} has escalated a FlowPilot session and needs assistance.",
"session.high_priority": "A new high-priority troubleshooting session has been started for ticket {ticket_number}.",
"proposal.pending": "A new flow proposal \"{title}\" is awaiting review in the review queue.",
"proposal.approved": "The flow proposal \"{title}\" has been approved and is ready for use.",
"knowledge_gap.detected": "A {gap_type} knowledge gap has been identified. Review recommended.",
"test": "This is a test notification to verify your notification channel is working correctly.",
}
template = bodies.get(event, f"Event: {event}")
try:
return template.format(**payload)
except KeyError:
return template
def _build_notification_link(event: str, payload: dict[str, Any]) -> Optional[str]:
"""In-app link per event type. Returns path (no host)."""
links: dict[str, str] = {
"session.escalated": "/pilot/{session_id}",
"session.high_priority": "/pilot/{session_id}",
"proposal.pending": "/review-queue",
"proposal.approved": "/review-queue",
"knowledge_gap.detected": "/analytics/flowpilot",
}
template = links.get(event)
if template is None:
return None
try:
return template.format(**payload)
except KeyError:
return template