Three improvements driven by live wedge testing.
1) Notification title now includes a problem snippet and PSA ticket
suffix when present:
"Escalation from Jane · #12345: Outlook is failing to sync email…"
Replaces the prior "Session escalated by Jane" copy that made every
escalation from the same junior look identical in the bell panel.
Snippet is trimmed to 70 chars with ellipsis. handoff_manager now
passes psa_ticket_id through in the notify() payload so this works
for both /escalate and /handoff entry points.
2) AI enrichment (assessment + enhanced escalation_package) moved to
a FastAPI BackgroundTask. The escalating engineer no longer waits
on 15-25s of Sonnet latency — handoff creation returns as soon as
snapshot, status flip, dual-write, documentation, PSA push, and
notify() are committed. enrich_escalation_async opens its own DB
session, runs both AI calls, updates handoff.ai_assessment +
session.escalation_package, commits, and publishes a new
`handoff_assessment_ready` event on the escalation bus. Frontend
doesn't yet listen for that event — the magic-moment screen still
shows a placeholder ("AI assessment is still generating. Reopen
this view in a few seconds…") which is honest about the state.
Live polling / auto-refresh on the bus event is the natural next
step.
3) ChatSidebar entries now surface the problem summary as a secondary
line and tag PSA-linked sessions with a monospace #ticket badge plus
an "Escalated" pill on in-transit sessions. ChatListItem grew
problem_summary, psa_ticket_id, and status fields; loadChats
populates them from listSessions. The user couldn't tell their own
sessions apart in the sidebar because they all rendered as "New
Chat" with no distinguishing detail — this fixes that for any
session, escalated or not.
Test plan
- Backend full suite: 1103 passed in 255.85s with -n auto.
- Frontend tsc -b clean.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
448 lines
16 KiB
Python
448 lines
16 KiB
Python
"""Notification service — dispatches in-app + external notifications.
|
|
|
|
Entry point: `notify(event, account_id, payload, db)`.
|
|
Retry engine: `retry_failed_notifications(db)` called by APScheduler.
|
|
"""
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any, Optional
|
|
|
|
import httpx
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import settings
|
|
from app.core.email import EmailService
|
|
from app.models.notification import Notification
|
|
from app.models.notification_config import NotificationConfig
|
|
from app.models.notification_log import NotificationLog
|
|
from app.models.user import User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Exponential backoff schedule (seconds): 30s, 2m, 10m
|
|
_RETRY_DELAYS = [30, 120, 600]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def notify(
|
|
event: str,
|
|
account_id: uuid.UUID,
|
|
payload: dict[str, Any],
|
|
db: AsyncSession,
|
|
target_user_ids: Optional[list[uuid.UUID]] = None,
|
|
) -> None:
|
|
"""Main entry point — create in-app notifications + route to external channels.
|
|
|
|
IMPORTANT: This function does NOT commit or rollback. The caller owns the transaction.
|
|
In-app notifications are added to the session (flushed, not committed).
|
|
External channel delivery is fire-and-forget — failures are logged, not raised.
|
|
"""
|
|
try:
|
|
recipients = await _resolve_recipients(account_id, target_user_ids, db)
|
|
|
|
title = _build_notification_title(event, payload)
|
|
body = _build_notification_body(event, payload)
|
|
link = _build_notification_link(event, payload)
|
|
|
|
# Create in-app notification for each recipient
|
|
for user in recipients:
|
|
notification = Notification(
|
|
account_id=account_id,
|
|
user_id=user.id,
|
|
event=event,
|
|
title=title,
|
|
body=body,
|
|
link=link,
|
|
)
|
|
db.add(notification)
|
|
|
|
await db.flush()
|
|
|
|
# Route to active external channels (fire-and-forget per channel)
|
|
configs = await _get_active_configs(account_id, event, db)
|
|
for config in configs:
|
|
try:
|
|
await _deliver_to_channel(config, event, payload, db)
|
|
except Exception:
|
|
logger.exception(
|
|
"External delivery failed for config=%s event=%s", config.id, event
|
|
)
|
|
except Exception:
|
|
logger.exception("Failed to process notification event=%s account=%s", event, account_id)
|
|
|
|
|
|
async def retry_failed_notifications(db: AsyncSession) -> int:
|
|
"""Retry failed notification deliveries. Called by APScheduler."""
|
|
now = datetime.now(timezone.utc)
|
|
result = await db.execute(
|
|
select(NotificationLog)
|
|
.where(NotificationLog.status == "retrying")
|
|
.where(NotificationLog.next_retry_at <= now)
|
|
)
|
|
logs = result.scalars().all()
|
|
|
|
if not logs:
|
|
return 0
|
|
|
|
logger.info("Retrying %d failed notification deliveries", len(logs))
|
|
|
|
for log in logs:
|
|
# Load the config for this log entry
|
|
config_result = await db.execute(
|
|
select(NotificationConfig).where(NotificationConfig.id == log.notification_config_id)
|
|
)
|
|
config = config_result.scalar_one_or_none()
|
|
if not config or not config.is_active:
|
|
log.status = "exhausted"
|
|
log.last_error = "Config disabled or deleted"
|
|
continue
|
|
|
|
try:
|
|
await _attempt_delivery(config, log.event, log.payload)
|
|
log.status = "sent"
|
|
log.delivered_at = datetime.now(timezone.utc)
|
|
log.last_error = None
|
|
logger.info("Retry succeeded for log=%s event=%s", log.id, log.event)
|
|
except Exception as exc:
|
|
log.retry_count += 1
|
|
log.last_error = str(exc)[:1000]
|
|
|
|
if log.retry_count >= log.max_retries:
|
|
log.status = "exhausted"
|
|
logger.warning(
|
|
"Notification exhausted after %d retries: log=%s event=%s",
|
|
log.retry_count, log.id, log.event,
|
|
)
|
|
else:
|
|
delay = _RETRY_DELAYS[min(log.retry_count, len(_RETRY_DELAYS) - 1)]
|
|
log.next_retry_at = datetime.now(timezone.utc) + timedelta(seconds=delay)
|
|
logger.info(
|
|
"Notification retry %d/%d scheduled in %ds: log=%s",
|
|
log.retry_count, log.max_retries, delay, log.id,
|
|
)
|
|
|
|
await db.commit()
|
|
return len(logs)
|
|
|
|
|
|
async def send_test_notification(
|
|
config: NotificationConfig,
|
|
) -> tuple[bool, str]:
|
|
"""Send a test message through a channel config. Returns (success, message)."""
|
|
event = "test"
|
|
payload: dict[str, Any] = {}
|
|
try:
|
|
await _attempt_delivery(config, event, payload)
|
|
return True, "Test notification sent successfully"
|
|
except Exception as exc:
|
|
return False, f"Delivery failed: {exc}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _get_active_configs(
|
|
account_id: uuid.UUID,
|
|
event: str,
|
|
db: AsyncSession,
|
|
) -> list[NotificationConfig]:
|
|
"""Get configs where channel is active and event is enabled."""
|
|
result = await db.execute(
|
|
select(NotificationConfig)
|
|
.where(NotificationConfig.account_id == account_id)
|
|
.where(NotificationConfig.is_active.is_(True))
|
|
)
|
|
configs = result.scalars().all()
|
|
# Filter to configs where this event is enabled
|
|
return [
|
|
c for c in configs
|
|
if c.events_enabled and c.events_enabled.get(event, False)
|
|
]
|
|
|
|
|
|
async def _resolve_recipients(
|
|
account_id: uuid.UUID,
|
|
target_user_ids: Optional[list[uuid.UUID]],
|
|
db: AsyncSession,
|
|
) -> list[User]:
|
|
"""Resolve notification recipients. Defaults to team admins + account owners + admins."""
|
|
if target_user_ids:
|
|
result = await db.execute(
|
|
select(User)
|
|
.where(User.id.in_(target_user_ids))
|
|
.where(User.account_id == account_id) # enforce tenant boundary
|
|
.where(User.is_active.is_(True))
|
|
)
|
|
return list(result.scalars().all())
|
|
|
|
# Default: account owners, admins, and team admins
|
|
result = await db.execute(
|
|
select(User)
|
|
.where(User.account_id == account_id)
|
|
.where(User.is_active.is_(True))
|
|
)
|
|
users = result.scalars().all()
|
|
return [
|
|
u for u in users
|
|
if u.account_role in ("owner", "admin") or u.is_team_admin
|
|
]
|
|
|
|
|
|
async def _deliver_to_channel(
|
|
config: NotificationConfig,
|
|
event: str,
|
|
payload: dict[str, Any],
|
|
db: AsyncSession,
|
|
) -> None:
|
|
"""Attempt delivery and create a NotificationLog entry."""
|
|
log = NotificationLog(
|
|
notification_config_id=config.id,
|
|
event=event,
|
|
payload=payload,
|
|
)
|
|
|
|
try:
|
|
await _attempt_delivery(config, event, payload)
|
|
log.status = "sent"
|
|
log.delivered_at = datetime.now(timezone.utc)
|
|
except Exception as exc:
|
|
log.status = "retrying"
|
|
log.retry_count = 0
|
|
log.last_error = str(exc)[:1000]
|
|
log.next_retry_at = datetime.now(timezone.utc) + timedelta(seconds=_RETRY_DELAYS[0])
|
|
logger.warning(
|
|
"Notification delivery failed (will retry): config=%s event=%s error=%s",
|
|
config.id, event, exc,
|
|
)
|
|
|
|
db.add(log)
|
|
|
|
|
|
async def _attempt_delivery(
|
|
config: NotificationConfig,
|
|
event: str,
|
|
payload: dict[str, Any],
|
|
) -> None:
|
|
"""Dispatch to the appropriate channel. Raises on failure."""
|
|
if config.channel == "email":
|
|
await _send_email(config, event, payload)
|
|
elif config.channel == "slack_webhook":
|
|
if not config.webhook_url:
|
|
raise ValueError("Slack webhook URL not configured")
|
|
await _send_slack_message(config.webhook_url, event, payload)
|
|
elif config.channel == "teams_webhook":
|
|
if not config.webhook_url:
|
|
raise ValueError("Teams webhook URL not configured")
|
|
await _send_teams_message(config.webhook_url, event, payload)
|
|
else:
|
|
raise ValueError(f"Unknown channel: {config.channel}")
|
|
|
|
|
|
async def _send_email(
|
|
config: NotificationConfig,
|
|
event: str,
|
|
payload: dict[str, Any],
|
|
) -> None:
|
|
"""Send notification via email using EmailService."""
|
|
title = _build_notification_title(event, payload)
|
|
body = _build_notification_body(event, payload)
|
|
link = _build_notification_link(event, payload)
|
|
|
|
full_link = None
|
|
if link and settings.FRONTEND_URL:
|
|
full_link = f"{settings.FRONTEND_URL.rstrip('/')}{link}"
|
|
|
|
recipients = config.email_addresses or []
|
|
if not recipients:
|
|
raise ValueError("No email addresses configured for email channel")
|
|
|
|
failures = []
|
|
for email in recipients:
|
|
success = await EmailService.send_notification_email(
|
|
to_email=email,
|
|
title=title,
|
|
body=body,
|
|
link_url=full_link,
|
|
)
|
|
if not success:
|
|
failures.append(email)
|
|
if failures:
|
|
raise RuntimeError(f"Failed to send notification email to: {', '.join(failures)}")
|
|
|
|
|
|
async def _send_slack_message(
|
|
webhook_url: str,
|
|
event: str,
|
|
payload: dict[str, Any],
|
|
) -> None:
|
|
"""POST notification to Slack incoming webhook."""
|
|
title = _build_notification_title(event, payload)
|
|
body = _build_notification_body(event, payload)
|
|
link = _build_notification_link(event, payload)
|
|
|
|
blocks: list[dict[str, Any]] = [
|
|
{
|
|
"type": "header",
|
|
"text": {"type": "plain_text", "text": f"\U0001f514 {title}", "emoji": True},
|
|
},
|
|
{
|
|
"type": "section",
|
|
"text": {"type": "mrkdwn", "text": body},
|
|
},
|
|
]
|
|
|
|
if link and settings.FRONTEND_URL:
|
|
full_url = f"{settings.FRONTEND_URL.rstrip('/')}{link}"
|
|
blocks.append({
|
|
"type": "actions",
|
|
"elements": [{
|
|
"type": "button",
|
|
"text": {"type": "plain_text", "text": "Open in ResolutionFlow", "emoji": True},
|
|
"url": full_url,
|
|
}],
|
|
})
|
|
|
|
slack_payload = {"blocks": blocks}
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.post(webhook_url, json=slack_payload)
|
|
if resp.status_code != 200 or resp.text.strip() != "ok":
|
|
raise RuntimeError(
|
|
f"Slack webhook failed (status={resp.status_code}): {resp.text[:200]}"
|
|
)
|
|
|
|
|
|
async def _send_teams_message(
|
|
webhook_url: str,
|
|
event: str,
|
|
payload: dict[str, Any],
|
|
) -> None:
|
|
"""POST notification to Microsoft Teams incoming webhook (Adaptive Card)."""
|
|
title = _build_notification_title(event, payload)
|
|
body = _build_notification_body(event, payload)
|
|
link = _build_notification_link(event, payload)
|
|
|
|
card_body: list[dict[str, Any]] = [
|
|
{"type": "TextBlock", "text": title, "weight": "Bolder", "size": "Medium"},
|
|
{"type": "TextBlock", "text": body, "wrap": True},
|
|
]
|
|
|
|
actions: list[dict[str, Any]] = []
|
|
if link and settings.FRONTEND_URL:
|
|
full_url = f"{settings.FRONTEND_URL.rstrip('/')}{link}"
|
|
actions.append({
|
|
"type": "Action.OpenUrl",
|
|
"title": "Open in ResolutionFlow",
|
|
"url": full_url,
|
|
})
|
|
|
|
teams_payload = {
|
|
"type": "message",
|
|
"attachments": [{
|
|
"contentType": "application/vnd.microsoft.card.adaptive",
|
|
"content": {
|
|
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
|
|
"type": "AdaptiveCard",
|
|
"version": "1.4",
|
|
"body": card_body,
|
|
"actions": actions,
|
|
},
|
|
}],
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.post(webhook_url, json=teams_payload)
|
|
if resp.status_code not in (200, 202):
|
|
raise RuntimeError(
|
|
f"Teams webhook returned {resp.status_code}: {resp.text[:200]}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Content builders
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_notification_title(event: str, payload: dict[str, Any]) -> str:
|
|
"""Human-readable title per event type."""
|
|
titles = {
|
|
# Distinguishability matters in the bell panel: with a generic title
|
|
# ("Session escalated by Jane") two different escalations from the
|
|
# same junior look like a duplicate notification. Including a short
|
|
# problem snippet (and ticket number if present) lets the senior
|
|
# tell them apart at a glance.
|
|
"session.escalated": "Escalation from {engineer_name}{ticket_suffix}: {problem_snippet}",
|
|
"session.high_priority": "High-priority session started: {ticket_number}",
|
|
"proposal.pending": "New flow proposal: {title}",
|
|
"proposal.approved": "Flow proposal approved: {title}",
|
|
"knowledge_gap.detected": "Knowledge gap detected: {gap_type}",
|
|
"test": "Test Notification from ResolutionFlow",
|
|
}
|
|
|
|
# Build the escalation-specific derived fields. Done here rather than at
|
|
# the call site so every dispatch path (legacy /escalate shim, /handoff,
|
|
# any future entry point) gets consistent formatting without each one
|
|
# having to repeat the snippet logic.
|
|
if event == "session.escalated":
|
|
problem = (payload.get("problem_summary") or "").strip()
|
|
if not problem or problem.upper() == "N/A":
|
|
problem_snippet = "(no summary provided)"
|
|
elif len(problem) > 70:
|
|
problem_snippet = problem[:67].rstrip() + "…"
|
|
else:
|
|
problem_snippet = problem
|
|
ticket = payload.get("psa_ticket_id") or payload.get("ticket_number")
|
|
ticket_suffix = f" · #{ticket}" if ticket else ""
|
|
payload = {**payload, "problem_snippet": problem_snippet, "ticket_suffix": ticket_suffix}
|
|
|
|
template = titles.get(event, f"Notification: {event}")
|
|
try:
|
|
return template.format(**payload)
|
|
except KeyError:
|
|
return template
|
|
|
|
|
|
def _build_notification_body(event: str, payload: dict[str, Any]) -> str:
|
|
"""Body text per event type."""
|
|
bodies = {
|
|
"session.escalated": "Engineer {engineer_name} has escalated a FlowPilot session and needs assistance.",
|
|
"session.high_priority": "A new high-priority troubleshooting session has been started for ticket {ticket_number}.",
|
|
"proposal.pending": "A new flow proposal \"{title}\" is awaiting review in the review queue.",
|
|
"proposal.approved": "The flow proposal \"{title}\" has been approved and is ready for use.",
|
|
"knowledge_gap.detected": "A {gap_type} knowledge gap has been identified. Review recommended.",
|
|
"test": "This is a test notification to verify your notification channel is working correctly.",
|
|
}
|
|
template = bodies.get(event, f"Event: {event}")
|
|
try:
|
|
return template.format(**payload)
|
|
except KeyError:
|
|
return template
|
|
|
|
|
|
def _build_notification_link(event: str, payload: dict[str, Any]) -> Optional[str]:
|
|
"""In-app link per event type. Returns path (no host)."""
|
|
links: dict[str, str] = {
|
|
# ?pickup=true triggers the senior-tech handoff/pickup flow on the
|
|
# session page (magic-moment screen for handoff-based escalations,
|
|
# legacy SessionBriefing for `requesting_escalation` sessions).
|
|
# Without it the senior lands on a session-detail GET they can't
|
|
# access pre-pickup, which the user perceives as a dead notification.
|
|
"session.escalated": "/pilot/{session_id}?pickup=true",
|
|
"session.high_priority": "/pilot/{session_id}",
|
|
"proposal.pending": "/review-queue",
|
|
"proposal.approved": "/review-queue",
|
|
"knowledge_gap.detected": "/analytics/flowpilot",
|
|
}
|
|
template = links.get(event)
|
|
if template is None:
|
|
return None
|
|
try:
|
|
return template.format(**payload)
|
|
except KeyError:
|
|
return template
|