453 lines
16 KiB
Python
453 lines
16 KiB
Python
"""Notification service — dispatches in-app + external notifications.
|
|
|
|
Entry point: `notify(event, account_id, payload, db)`.
|
|
Retry engine: `retry_failed_notifications(db)` called by APScheduler.
|
|
"""
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any, Optional
|
|
|
|
import httpx
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import settings
|
|
from app.core.email import EmailService
|
|
from app.models.notification import Notification
|
|
from app.models.notification_config import NotificationConfig
|
|
from app.models.notification_log import NotificationLog
|
|
from app.models.user import User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Exponential backoff schedule (seconds): 30s, 2m, 10m
|
|
_RETRY_DELAYS = [30, 120, 600]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def notify(
|
|
event: str,
|
|
account_id: uuid.UUID,
|
|
payload: dict[str, Any],
|
|
db: AsyncSession,
|
|
target_user_ids: Optional[list[uuid.UUID]] = None,
|
|
) -> None:
|
|
"""Main entry point — create in-app notifications + route to external channels.
|
|
|
|
IMPORTANT: This function does NOT commit or rollback. The caller owns the transaction.
|
|
In-app notifications are added to the session (flushed, not committed).
|
|
External channel delivery is fire-and-forget — failures are logged, not raised.
|
|
"""
|
|
try:
|
|
recipients = await _resolve_recipients(account_id, target_user_ids, db)
|
|
|
|
title = _build_notification_title(event, payload)
|
|
body = _build_notification_body(event, payload)
|
|
link = _build_notification_link(event, payload)
|
|
|
|
# Create in-app notification for each recipient
|
|
for user in recipients:
|
|
notification = Notification(
|
|
account_id=account_id,
|
|
user_id=user.id,
|
|
event=event,
|
|
title=title,
|
|
body=body,
|
|
link=link,
|
|
)
|
|
db.add(notification)
|
|
|
|
await db.flush()
|
|
|
|
# Route to active external channels (fire-and-forget per channel)
|
|
configs = await _get_active_configs(account_id, event, db)
|
|
for config in configs:
|
|
try:
|
|
await _deliver_to_channel(config, event, payload, db)
|
|
except Exception:
|
|
logger.exception(
|
|
"External delivery failed for config=%s event=%s", config.id, event
|
|
)
|
|
except Exception:
|
|
logger.exception("Failed to process notification event=%s account=%s", event, account_id)
|
|
|
|
|
|
async def retry_failed_notifications(db: AsyncSession) -> int:
|
|
"""Retry failed notification deliveries. Called by APScheduler."""
|
|
now = datetime.now(timezone.utc)
|
|
result = await db.execute(
|
|
select(NotificationLog)
|
|
.where(NotificationLog.status == "retrying")
|
|
.where(NotificationLog.next_retry_at <= now)
|
|
)
|
|
logs = result.scalars().all()
|
|
|
|
if not logs:
|
|
return 0
|
|
|
|
logger.info("Retrying %d failed notification deliveries", len(logs))
|
|
|
|
for log in logs:
|
|
# Load the config for this log entry
|
|
config_result = await db.execute(
|
|
select(NotificationConfig).where(NotificationConfig.id == log.notification_config_id)
|
|
)
|
|
config = config_result.scalar_one_or_none()
|
|
if not config or not config.is_active:
|
|
log.status = "exhausted"
|
|
log.last_error = "Config disabled or deleted"
|
|
continue
|
|
|
|
try:
|
|
await _attempt_delivery(config, log.event, log.payload)
|
|
log.status = "sent"
|
|
log.delivered_at = datetime.now(timezone.utc)
|
|
log.last_error = None
|
|
logger.info("Retry succeeded for log=%s event=%s", log.id, log.event)
|
|
except Exception as exc:
|
|
log.retry_count += 1
|
|
log.last_error = str(exc)[:1000]
|
|
|
|
if log.retry_count >= log.max_retries:
|
|
log.status = "exhausted"
|
|
logger.warning(
|
|
"Notification exhausted after %d retries: log=%s event=%s",
|
|
log.retry_count, log.id, log.event,
|
|
)
|
|
else:
|
|
delay = _RETRY_DELAYS[min(log.retry_count, len(_RETRY_DELAYS) - 1)]
|
|
log.next_retry_at = datetime.now(timezone.utc) + timedelta(seconds=delay)
|
|
logger.info(
|
|
"Notification retry %d/%d scheduled in %ds: log=%s",
|
|
log.retry_count, log.max_retries, delay, log.id,
|
|
)
|
|
|
|
await db.commit()
|
|
return len(logs)
|
|
|
|
|
|
async def send_test_notification(
|
|
config: NotificationConfig,
|
|
) -> tuple[bool, str]:
|
|
"""Send a test message through a channel config. Returns (success, message)."""
|
|
event = "test"
|
|
payload: dict[str, Any] = {}
|
|
try:
|
|
await _attempt_delivery(config, event, payload)
|
|
return True, "Test notification sent successfully"
|
|
except Exception as exc:
|
|
return False, f"Delivery failed: {exc}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _get_active_configs(
|
|
account_id: uuid.UUID,
|
|
event: str,
|
|
db: AsyncSession,
|
|
) -> list[NotificationConfig]:
|
|
"""Get configs where channel is active and event is enabled."""
|
|
result = await db.execute(
|
|
select(NotificationConfig)
|
|
.where(NotificationConfig.account_id == account_id)
|
|
.where(NotificationConfig.is_active.is_(True))
|
|
)
|
|
configs = result.scalars().all()
|
|
# Filter to configs where this event is enabled
|
|
return [
|
|
c for c in configs
|
|
if c.events_enabled and c.events_enabled.get(event, False)
|
|
]
|
|
|
|
|
|
async def _resolve_recipients(
|
|
account_id: uuid.UUID,
|
|
target_user_ids: Optional[list[uuid.UUID]],
|
|
db: AsyncSession,
|
|
) -> list[User]:
|
|
"""Resolve notification recipients. Defaults to team admins + account owners + admins."""
|
|
if target_user_ids:
|
|
result = await db.execute(
|
|
select(User)
|
|
.where(User.id.in_(target_user_ids))
|
|
.where(User.account_id == account_id) # enforce tenant boundary
|
|
.where(User.is_active.is_(True))
|
|
)
|
|
return list(result.scalars().all())
|
|
|
|
# Default: account owners, admins, and team admins
|
|
result = await db.execute(
|
|
select(User)
|
|
.where(User.account_id == account_id)
|
|
.where(User.is_active.is_(True))
|
|
)
|
|
users = result.scalars().all()
|
|
return [
|
|
u for u in users
|
|
if u.account_role in ("owner", "admin") or u.is_team_admin
|
|
]
|
|
|
|
|
|
async def _deliver_to_channel(
|
|
config: NotificationConfig,
|
|
event: str,
|
|
payload: dict[str, Any],
|
|
db: AsyncSession,
|
|
) -> None:
|
|
"""Attempt delivery and create a NotificationLog entry."""
|
|
log = NotificationLog(
|
|
notification_config_id=config.id,
|
|
event=event,
|
|
payload=payload,
|
|
)
|
|
|
|
try:
|
|
await _attempt_delivery(config, event, payload)
|
|
log.status = "sent"
|
|
log.delivered_at = datetime.now(timezone.utc)
|
|
except Exception as exc:
|
|
log.status = "retrying"
|
|
log.retry_count = 0
|
|
log.last_error = str(exc)[:1000]
|
|
log.next_retry_at = datetime.now(timezone.utc) + timedelta(seconds=_RETRY_DELAYS[0])
|
|
logger.warning(
|
|
"Notification delivery failed (will retry): config=%s event=%s error=%s",
|
|
config.id, event, exc,
|
|
)
|
|
|
|
db.add(log)
|
|
|
|
|
|
async def _attempt_delivery(
|
|
config: NotificationConfig,
|
|
event: str,
|
|
payload: dict[str, Any],
|
|
) -> None:
|
|
"""Dispatch to the appropriate channel. Raises on failure."""
|
|
if config.channel == "email":
|
|
await _send_email(config, event, payload)
|
|
elif config.channel == "slack_webhook":
|
|
if not config.webhook_url:
|
|
raise ValueError("Slack webhook URL not configured")
|
|
await _send_slack_message(config.webhook_url, event, payload)
|
|
elif config.channel == "teams_webhook":
|
|
if not config.webhook_url:
|
|
raise ValueError("Teams webhook URL not configured")
|
|
await _send_teams_message(config.webhook_url, event, payload)
|
|
else:
|
|
raise ValueError(f"Unknown channel: {config.channel}")
|
|
|
|
|
|
async def _send_email(
|
|
config: NotificationConfig,
|
|
event: str,
|
|
payload: dict[str, Any],
|
|
) -> None:
|
|
"""Send notification via email using EmailService."""
|
|
title = _build_notification_title(event, payload)
|
|
body = _build_notification_body(event, payload)
|
|
link = _build_notification_link(event, payload)
|
|
|
|
full_link = None
|
|
if link and settings.FRONTEND_URL:
|
|
full_link = f"{settings.FRONTEND_URL.rstrip('/')}{link}"
|
|
|
|
recipients = config.email_addresses or []
|
|
if not recipients:
|
|
raise ValueError("No email addresses configured for email channel")
|
|
|
|
failures = []
|
|
for email in recipients:
|
|
success = await EmailService.send_notification_email(
|
|
to_email=email,
|
|
title=title,
|
|
body=body,
|
|
link_url=full_link,
|
|
)
|
|
if not success:
|
|
failures.append(email)
|
|
if failures:
|
|
raise RuntimeError(f"Failed to send notification email to: {', '.join(failures)}")
|
|
|
|
|
|
async def _send_slack_message(
|
|
webhook_url: str,
|
|
event: str,
|
|
payload: dict[str, Any],
|
|
) -> None:
|
|
"""POST notification to Slack incoming webhook."""
|
|
title = _build_notification_title(event, payload)
|
|
body = _build_notification_body(event, payload)
|
|
link = _build_notification_link(event, payload)
|
|
|
|
blocks: list[dict[str, Any]] = [
|
|
{
|
|
"type": "header",
|
|
"text": {"type": "plain_text", "text": f"\U0001f514 {title}", "emoji": True},
|
|
},
|
|
{
|
|
"type": "section",
|
|
"text": {"type": "mrkdwn", "text": body},
|
|
},
|
|
]
|
|
|
|
if link and settings.FRONTEND_URL:
|
|
full_url = f"{settings.FRONTEND_URL.rstrip('/')}{link}"
|
|
blocks.append({
|
|
"type": "actions",
|
|
"elements": [{
|
|
"type": "button",
|
|
"text": {"type": "plain_text", "text": "Open in ResolutionFlow", "emoji": True},
|
|
"url": full_url,
|
|
}],
|
|
})
|
|
|
|
slack_payload = {"blocks": blocks}
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.post(webhook_url, json=slack_payload)
|
|
if resp.status_code != 200 or resp.text.strip() != "ok":
|
|
raise RuntimeError(
|
|
f"Slack webhook failed (status={resp.status_code}): {resp.text[:200]}"
|
|
)
|
|
|
|
|
|
async def _send_teams_message(
|
|
webhook_url: str,
|
|
event: str,
|
|
payload: dict[str, Any],
|
|
) -> None:
|
|
"""POST notification to Microsoft Teams incoming webhook (Adaptive Card)."""
|
|
title = _build_notification_title(event, payload)
|
|
body = _build_notification_body(event, payload)
|
|
link = _build_notification_link(event, payload)
|
|
|
|
card_body: list[dict[str, Any]] = [
|
|
{"type": "TextBlock", "text": title, "weight": "Bolder", "size": "Medium"},
|
|
{"type": "TextBlock", "text": body, "wrap": True},
|
|
]
|
|
|
|
actions: list[dict[str, Any]] = []
|
|
if link and settings.FRONTEND_URL:
|
|
full_url = f"{settings.FRONTEND_URL.rstrip('/')}{link}"
|
|
actions.append({
|
|
"type": "Action.OpenUrl",
|
|
"title": "Open in ResolutionFlow",
|
|
"url": full_url,
|
|
})
|
|
|
|
teams_payload = {
|
|
"type": "message",
|
|
"attachments": [{
|
|
"contentType": "application/vnd.microsoft.card.adaptive",
|
|
"content": {
|
|
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
|
|
"type": "AdaptiveCard",
|
|
"version": "1.4",
|
|
"body": card_body,
|
|
"actions": actions,
|
|
},
|
|
}],
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.post(webhook_url, json=teams_payload)
|
|
if resp.status_code not in (200, 202):
|
|
raise RuntimeError(
|
|
f"Teams webhook returned {resp.status_code}: {resp.text[:200]}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Content builders
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_notification_title(event: str, payload: dict[str, Any]) -> str:
|
|
"""Human-readable title per event type."""
|
|
titles = {
|
|
# Distinguishability matters in the bell panel: with a generic title
|
|
# ("Session escalated by Jane") two different escalations from the
|
|
# same junior look like a duplicate notification. Including a short
|
|
# problem snippet (and ticket number if present) lets the senior
|
|
# tell them apart at a glance.
|
|
"session.escalated": "Escalation from {engineer_name}{ticket_suffix}: {problem_snippet}",
|
|
"session.high_priority": "High-priority session started: {ticket_number}",
|
|
"proposal.pending": "New flow proposal: {title}",
|
|
"proposal.approved": "Flow proposal approved: {title}",
|
|
"knowledge_gap.detected": "Knowledge gap detected: {gap_type}",
|
|
"l1.session.escalated": "L1 session escalated: {problem_summary}",
|
|
"test": "Test Notification from ResolutionFlow",
|
|
}
|
|
|
|
# Build the escalation-specific derived fields. Done here rather than at
|
|
# the call site so every dispatch path (legacy /escalate shim, /handoff,
|
|
# any future entry point) gets consistent formatting without each one
|
|
# having to repeat the snippet logic.
|
|
if event == "session.escalated":
|
|
problem = (payload.get("problem_summary") or "").strip()
|
|
if not problem or problem.upper() == "N/A":
|
|
problem_snippet = "(no summary provided)"
|
|
elif len(problem) > 70:
|
|
problem_snippet = problem[:67].rstrip() + "…"
|
|
else:
|
|
problem_snippet = problem
|
|
ticket = payload.get("psa_ticket_id") or payload.get("ticket_number")
|
|
ticket_suffix = f" · #{ticket}" if ticket else ""
|
|
payload = {**payload, "problem_snippet": problem_snippet, "ticket_suffix": ticket_suffix}
|
|
|
|
template = titles.get(event, f"Notification: {event}")
|
|
try:
|
|
return template.format(**payload)
|
|
except KeyError:
|
|
return template
|
|
|
|
|
|
def _build_notification_body(event: str, payload: dict[str, Any]) -> str:
|
|
"""Body text per event type."""
|
|
bodies = {
|
|
"session.escalated": "Engineer {engineer_name} has escalated a FlowPilot session and needs assistance.",
|
|
"session.high_priority": "A new high-priority troubleshooting session has been started for ticket {ticket_number}.",
|
|
"proposal.pending": "A new flow proposal \"{title}\" is awaiting review in the review queue.",
|
|
"proposal.approved": "The flow proposal \"{title}\" has been approved and is ready for use.",
|
|
"knowledge_gap.detected": "A {gap_type} knowledge gap has been identified. Review recommended.",
|
|
"l1.session.escalated": "L1 escalated a ticket: {problem_summary}",
|
|
"test": "This is a test notification to verify your notification channel is working correctly.",
|
|
}
|
|
template = bodies.get(event, f"Event: {event}")
|
|
try:
|
|
return template.format(**payload)
|
|
except KeyError:
|
|
return template
|
|
|
|
|
|
def _build_notification_link(event: str, payload: dict[str, Any]) -> Optional[str]:
|
|
"""In-app link per event type. Returns path (no host)."""
|
|
links: dict[str, str] = {
|
|
# ?pickup=true triggers the senior-tech handoff/pickup flow on the
|
|
# session page (magic-moment screen for handoff-based escalations,
|
|
# legacy SessionBriefing for `requesting_escalation` sessions).
|
|
# Without it the senior lands on a session-detail GET they can't
|
|
# access pre-pickup, which the user perceives as a dead notification.
|
|
"session.escalated": "/pilot/{session_id}?pickup=true",
|
|
"session.high_priority": "/pilot/{session_id}",
|
|
"proposal.pending": "/review-queue",
|
|
"proposal.approved": "/review-queue",
|
|
"knowledge_gap.detected": "/analytics/flowpilot",
|
|
# L1 AI-build escalations go to the escalations dashboard — not to
|
|
# a specific pilot session, which may not have a pickup flow.
|
|
"l1.session.escalated": "/escalations",
|
|
}
|
|
template = links.get(event)
|
|
if template is None:
|
|
return None
|
|
try:
|
|
return template.format(**payload)
|
|
except KeyError:
|
|
return template
|