Full notification infrastructure with in-app, email, Slack, and Teams channels: Backend: - NotificationConfig, NotificationLog, Notification models + migration - Notification service with event routing, channel delivery, retry logic - 9 API endpoints (config CRUD + in-app notifications) - APScheduler retry job with exponential backoff (30s, 2m, 10m) - Wired into escalation, proposal approval, and knowledge flywheel - Pydantic event key validation, cross-tenant protection on recipients Frontend: - TypeScript types + API client for all notification endpoints - NotificationsPanel: bell icon with unread badge, dropdown, mark-read - NotificationSettings: channel config, event toggles, test, delete confirm - Notifications tab on IntegrationsPage - ARIA attributes, Escape handler, settings link on panel Review fixes (13 issues resolved): - notify() no longer commits/rolls back caller's transaction (critical) - retry_failed_notifications returns count instead of None (critical) - NotificationSettings moved inside dedicated tab (critical) - target_user_ids scoped by account_id (security) - Email loop collects all failures before raising - Slack webhook validates response body - events_enabled rejects unknown event keys - link column widened to String(500) - Dead code removed from _auto_reinforce - Delete confirmation, ARIA, Escape key support Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
51 KiB
Phase 4 Slice 2: Notification Integrations — Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Build an event-driven notification system that alerts engineers and team admins via email, Slack, Teams, and in-app notifications when key events occur (escalations, proposals, high-priority tickets).
Architecture: Lightweight event-driven service. Events are fired from existing lifecycle points (escalation, proposal creation, approval). The notification service routes them to configured channels. Retry via APScheduler for failed webhooks. In-app notifications extend the existing NotificationsPanel bell icon.
Tech Stack: FastAPI, SQLAlchemy 2.0 (async), APScheduler 3.x, Resend (email), Slack/Teams webhook POST, React, TypeScript, Tailwind CSS v4
Prerequisites:
- Phase 1-3 complete (AI sessions, escalations, Knowledge Flywheel)
- Existing:
EmailServiceincore/email.py, APScheduler incore/scheduler.py,NotificationsPanel.tsx - Existing models:
User,AISession,FlowProposal,Account
Parent plan: docs/2026-03-18-flowpilot-first-pivot-phase4.md (Tasks 4, 5, 6, 6.5)
Task 1: NotificationConfig and NotificationLog models + migration
Files:
- Create:
backend/app/models/notification_config.py - Create:
backend/app/models/notification_log.py - Create:
backend/app/models/notification.py - Edit:
backend/app/models/__init__.py - Create:
backend/alembic/versions/XXX_add_notification_tables.py(via autogenerate)
Step 1: Create NotificationConfig model
Create backend/app/models/notification_config.py:
"""Notification channel configuration per account.
Each account can have multiple notification configs (email, Slack webhook, Teams webhook).
Each config specifies which events it receives.
"""
import uuid
from datetime import datetime, timezone
from typing import Optional, Any, TYPE_CHECKING
from sqlalchemy import String, Boolean, DateTime, ForeignKey, CheckConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
if TYPE_CHECKING:
from app.models.account import Account
class NotificationConfig(Base):
"""A notification channel configured by an account.
channel: "email" | "slack_webhook" | "teams_webhook"
events_enabled: {"session.escalated": true, "proposal.pending": true, ...}
"""
__tablename__ = "notification_configs"
__table_args__ = (
CheckConstraint(
"channel IN ('email', 'slack_webhook', 'teams_webhook')",
name="ck_notification_configs_channel",
),
)
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
account_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("accounts.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
channel: Mapped[str] = mapped_column(String(20), nullable=False)
webhook_url: Mapped[Optional[str]] = mapped_column(String(500), nullable=True)
email_addresses: Mapped[Optional[list]] = mapped_column(JSONB, nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
events_enabled: Mapped[dict[str, Any]] = mapped_column(
JSONB, default=lambda: {
"session.escalated": True,
"session.high_priority": True,
"proposal.pending": True,
"proposal.approved": True,
"knowledge_gap.detected": True,
}
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
account: Mapped[Optional["Account"]] = relationship("Account", foreign_keys=[account_id])
Step 2: Create NotificationLog model
Create backend/app/models/notification_log.py:
"""Notification delivery log with retry tracking.
Tracks every notification delivery attempt. Failed deliveries are retried
via APScheduler with exponential backoff (30s, 2m, 10m — max 3 retries).
"""
import uuid
from datetime import datetime, timezone
from typing import Optional, Any, TYPE_CHECKING
from sqlalchemy import String, Integer, DateTime, ForeignKey, CheckConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB
from app.core.database import Base
if TYPE_CHECKING:
from app.models.notification_config import NotificationConfig
class NotificationLog(Base):
"""A single notification delivery attempt."""
__tablename__ = "notification_logs"
__table_args__ = (
CheckConstraint(
"status IN ('sent', 'failed', 'retrying', 'exhausted')",
name="ck_notification_logs_status",
),
)
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
notification_config_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("notification_configs.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
event: Mapped[str] = mapped_column(String(50), nullable=False)
payload: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)
status: Mapped[str] = mapped_column(String(20), default="sent")
retry_count: Mapped[int] = mapped_column(Integer, default=0)
max_retries: Mapped[int] = mapped_column(Integer, default=3)
last_error: Mapped[Optional[str]] = mapped_column(String(1000), nullable=True)
next_retry_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True), nullable=True
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
delivered_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True), nullable=True
)
config: Mapped[Optional["NotificationConfig"]] = relationship(
"NotificationConfig", foreign_keys=[notification_config_id]
)
Step 3: Create in-app Notification model
Create backend/app/models/notification.py:
"""In-app notification model.
Created alongside external notifications (email/Slack/Teams).
Powers the NotificationsPanel bell icon dropdown in the frontend.
"""
import uuid
from datetime import datetime, timezone
from typing import Optional, TYPE_CHECKING
from sqlalchemy import String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from app.core.database import Base
if TYPE_CHECKING:
from app.models.user import User
class Notification(Base):
"""An in-app notification for a specific user."""
__tablename__ = "notifications"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
account_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("accounts.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
user_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
event: Mapped[str] = mapped_column(String(50), nullable=False)
title: Mapped[str] = mapped_column(String(200), nullable=False)
body: Mapped[Optional[str]] = mapped_column(String(500), nullable=True)
link: Mapped[Optional[str]] = mapped_column(String(200), nullable=True)
is_read: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
index=True,
)
user: Mapped[Optional["User"]] = relationship("User", foreign_keys=[user_id])
Step 4: Register models in __init__.py
Edit backend/app/models/__init__.py — add:
from app.models.notification_config import NotificationConfig
from app.models.notification_log import NotificationLog
from app.models.notification import Notification
Step 5: Generate migration
cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
venv/bin/alembic revision --autogenerate -m "add notification tables"
Review the generated migration. Verify it creates notification_configs, notification_logs, and notifications tables with correct FKs and constraints.
Step 6: Run migration
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
venv/bin/alembic upgrade head
Step 7: Commit
git add backend/app/models/notification_config.py backend/app/models/notification_log.py \
backend/app/models/notification.py backend/app/models/__init__.py \
backend/alembic/versions/*notification*
git commit -m "feat(notifications): add NotificationConfig, NotificationLog, and Notification models
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"
Task 2: Notification schemas
Files:
- Create:
backend/app/schemas/notification.py
Step 1: Create schemas
Create backend/app/schemas/notification.py:
"""Pydantic schemas for notification system."""
from datetime import datetime
from uuid import UUID
from typing import Optional, Any
from pydantic import BaseModel, Field
# --- NotificationConfig schemas ---
VALID_CHANNELS = {"email", "slack_webhook", "teams_webhook"}
VALID_EVENTS = {
"session.escalated",
"session.high_priority",
"proposal.pending",
"proposal.approved",
"knowledge_gap.detected",
}
class NotificationConfigCreate(BaseModel):
channel: str = Field(..., pattern="^(email|slack_webhook|teams_webhook)$")
webhook_url: str | None = None
email_addresses: list[str] | None = None
events_enabled: dict[str, bool] = Field(
default_factory=lambda: {e: True for e in VALID_EVENTS}
)
class NotificationConfigUpdate(BaseModel):
webhook_url: str | None = None
email_addresses: list[str] | None = None
is_active: bool | None = None
events_enabled: dict[str, bool] | None = None
class NotificationConfigResponse(BaseModel):
id: UUID
channel: str
webhook_url: str | None
email_addresses: list[str] | None
is_active: bool
events_enabled: dict[str, bool]
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
# --- In-app Notification schemas ---
class NotificationResponse(BaseModel):
id: UUID
event: str
title: str
body: str | None
link: str | None
is_read: bool
created_at: datetime
model_config = {"from_attributes": True}
class UnreadCountResponse(BaseModel):
count: int
# --- Notification test ---
class NotificationTestRequest(BaseModel):
config_id: UUID
class NotificationTestResponse(BaseModel):
success: bool
message: str
Step 2: Commit
git add backend/app/schemas/notification.py
git commit -m "feat(notifications): add notification Pydantic schemas
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"
Task 3: Notification service (core event routing + channel delivery)
Files:
- Create:
backend/app/services/notification_service.py
Step 1: Create the notification service
Create backend/app/services/notification_service.py:
"""Event-driven notification service.
Fires notifications to configured channels (email, Slack, Teams) and
creates in-app notification records. Failed webhook deliveries are
logged for retry by the scheduler.
Usage:
await notify("session.escalated", account_id, payload, db)
"""
import logging
from datetime import datetime, timezone, timedelta
from typing import Optional, Any
from uuid import UUID
import httpx
from sqlalchemy import select, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.email import EmailService
from app.models.notification_config import NotificationConfig
from app.models.notification_log import NotificationLog
from app.models.notification import Notification
from app.models.user import User
logger = logging.getLogger(__name__)
# Exponential backoff delays for retries (seconds)
RETRY_DELAYS = [30, 120, 600] # 30s, 2m, 10m
async def notify(
event: str,
account_id: UUID,
payload: dict[str, Any],
db: AsyncSession,
*,
target_user_ids: list[UUID] | None = None,
) -> None:
"""Fire a notification event. Routes to all active channels for this account.
Also creates in-app Notification records for target_user_ids.
If target_user_ids is None, notifies all team admins + account owner.
Args:
event: Event type (e.g., "session.escalated")
account_id: Account that owns the event
payload: Event data (session summary, proposal title, etc.)
db: Database session
target_user_ids: Specific users to notify. None = team admins + owner.
"""
# 1. Create in-app notifications
recipients = await _resolve_recipients(account_id, target_user_ids, db)
title = _build_notification_title(event, payload)
body = _build_notification_body(event, payload)
link = _build_notification_link(event, payload)
for user_id in recipients:
db.add(Notification(
account_id=account_id,
user_id=user_id,
event=event,
title=title,
body=body,
link=link,
))
# 2. Route to external channels
configs = await _get_active_configs(account_id, event, db)
for config in configs:
await _deliver_to_channel(config, event, payload, db)
await db.flush()
async def retry_failed_notifications(db: AsyncSession) -> int:
"""Retry failed notification deliveries. Called by APScheduler.
Returns the number of retries attempted.
"""
now = datetime.now(timezone.utc)
result = await db.execute(
select(NotificationLog)
.where(
NotificationLog.status == "retrying",
NotificationLog.next_retry_at <= now,
)
.limit(50)
)
logs = result.scalars().all()
retried = 0
for log in logs:
config = log.config
if not config:
log.status = "exhausted"
log.last_error = "Config deleted"
continue
success = await _attempt_delivery(config, log.event, log.payload)
log.retry_count += 1
if success:
log.status = "sent"
log.delivered_at = now
logger.info("Notification retry succeeded: %s (attempt %d)", log.id, log.retry_count)
elif log.retry_count >= log.max_retries:
log.status = "exhausted"
log.last_error = f"Exhausted after {log.retry_count} retries"
logger.warning("Notification exhausted after %d retries: %s", log.retry_count, log.id)
else:
delay = RETRY_DELAYS[min(log.retry_count, len(RETRY_DELAYS) - 1)]
log.next_retry_at = now + timedelta(seconds=delay)
log.status = "retrying"
retried += 1
if retried:
await db.commit()
return retried
async def send_test_notification(config: NotificationConfig) -> tuple[bool, str]:
"""Send a test notification to verify channel configuration.
Returns (success, message).
"""
test_payload = {
"title": "Test Notification",
"body": "This is a test notification from ResolutionFlow.",
"session_id": None,
"engineer_name": "System",
}
if config.channel == "email":
if not config.email_addresses:
return False, "No email addresses configured"
success = await EmailService.send_notification_email(
config.email_addresses[0], "Test Notification", test_payload
)
return success, "Test email sent" if success else "Email delivery failed"
elif config.channel == "slack_webhook":
if not config.webhook_url:
return False, "No webhook URL configured"
success = await _send_slack_message(config.webhook_url, "test", test_payload)
return success, "Test message sent to Slack" if success else "Slack webhook failed"
elif config.channel == "teams_webhook":
if not config.webhook_url:
return False, "No webhook URL configured"
success = await _send_teams_message(config.webhook_url, "test", test_payload)
return success, "Test message sent to Teams" if success else "Teams webhook failed"
return False, f"Unknown channel: {config.channel}"
# --- Internal helpers ---
async def _get_active_configs(
account_id: UUID, event: str, db: AsyncSession
) -> list[NotificationConfig]:
"""Get all active notification configs for this account + event."""
result = await db.execute(
select(NotificationConfig).where(
NotificationConfig.account_id == account_id,
NotificationConfig.is_active.is_(True),
)
)
configs = result.scalars().all()
# Filter by event enabled
return [c for c in configs if c.events_enabled.get(event, False)]
async def _resolve_recipients(
account_id: UUID,
target_user_ids: list[UUID] | None,
db: AsyncSession,
) -> list[UUID]:
"""Resolve notification recipients. Defaults to team admins + account owner."""
if target_user_ids:
return target_user_ids
result = await db.execute(
select(User.id).where(
User.account_id == account_id,
User.is_active.is_(True),
(User.is_team_admin.is_(True)) | (User.account_role == "owner"),
)
)
return list(result.scalars().all())
async def _deliver_to_channel(
config: NotificationConfig,
event: str,
payload: dict[str, Any],
db: AsyncSession,
) -> None:
"""Attempt delivery to a channel. Log result."""
success = await _attempt_delivery(config, event, payload)
log = NotificationLog(
notification_config_id=config.id,
event=event,
payload=payload,
)
if success:
log.status = "sent"
log.delivered_at = datetime.now(timezone.utc)
else:
log.status = "retrying"
log.last_error = f"Initial delivery failed for {config.channel}"
log.next_retry_at = datetime.now(timezone.utc) + timedelta(seconds=RETRY_DELAYS[0])
db.add(log)
async def _attempt_delivery(
config: NotificationConfig, event: str, payload: dict[str, Any]
) -> bool:
"""Attempt to deliver a notification via the configured channel."""
try:
if config.channel == "email":
return await _send_email(config, event, payload)
elif config.channel == "slack_webhook":
return await _send_slack_message(
config.webhook_url, event, payload
)
elif config.channel == "teams_webhook":
return await _send_teams_message(
config.webhook_url, event, payload
)
return False
except Exception:
logger.exception("Notification delivery failed for config %s", config.id)
return False
async def _send_email(
config: NotificationConfig, event: str, payload: dict[str, Any]
) -> bool:
"""Send notification email to all configured addresses."""
if not config.email_addresses:
return False
title = _build_notification_title(event, payload)
success = True
for addr in config.email_addresses:
result = await EmailService.send_notification_email(addr, title, payload)
if not result:
success = False
return success
async def _send_slack_message(
webhook_url: str, event: str, payload: dict[str, Any]
) -> bool:
"""Send a Slack notification via incoming webhook."""
title = _build_notification_title(event, payload)
body = _build_notification_body(event, payload)
link = payload.get("link", "")
slack_payload = {
"blocks": [
{
"type": "header",
"text": {"type": "plain_text", "text": f"🔔 {title}", "emoji": True},
},
{
"type": "section",
"text": {"type": "mrkdwn", "text": body},
},
]
}
if link:
slack_payload["blocks"].append({
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "Open in ResolutionFlow"},
"url": f"{settings.FRONTEND_URL}{link}",
}
],
})
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(webhook_url, json=slack_payload)
if resp.status_code == 200:
logger.info("Slack notification sent: %s", event)
return True
logger.warning("Slack webhook returned %d: %s", resp.status_code, resp.text)
return False
async def _send_teams_message(
webhook_url: str, event: str, payload: dict[str, Any]
) -> bool:
"""Send a Teams notification via incoming webhook (Adaptive Card)."""
title = _build_notification_title(event, payload)
body = _build_notification_body(event, payload)
link = payload.get("link", "")
card = {
"type": "message",
"attachments": [
{
"contentType": "application/vnd.microsoft.card.adaptive",
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.4",
"body": [
{"type": "TextBlock", "text": title, "weight": "Bolder", "size": "Medium"},
{"type": "TextBlock", "text": body, "wrap": True},
],
},
}
],
}
if link:
card["attachments"][0]["content"]["actions"] = [
{
"type": "Action.OpenUrl",
"title": "Open in ResolutionFlow",
"url": f"{settings.FRONTEND_URL}{link}",
}
]
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(webhook_url, json=card)
if resp.status_code in (200, 202):
logger.info("Teams notification sent: %s", event)
return True
logger.warning("Teams webhook returned %d: %s", resp.status_code, resp.text)
return False
def _build_notification_title(event: str, payload: dict[str, Any]) -> str:
"""Build a human-readable notification title."""
titles = {
"session.escalated": f"Session escalated by {payload.get('engineer_name', 'an engineer')}",
"session.high_priority": f"High-priority session started: {payload.get('ticket_number', 'N/A')}",
"proposal.pending": f"New flow proposal: {payload.get('title', 'Untitled')}",
"proposal.approved": f"Flow proposal approved: {payload.get('title', 'Untitled')}",
"knowledge_gap.detected": f"Knowledge gap detected: {payload.get('gap_type', 'Unknown')}",
"test": "Test Notification from ResolutionFlow",
}
return titles.get(event, f"Notification: {event}")
def _build_notification_body(event: str, payload: dict[str, Any]) -> str:
"""Build notification body text."""
if event == "session.escalated":
reason = payload.get("escalation_reason", "No reason provided")
return f"Reason: {reason}\nProblem: {payload.get('problem_summary', 'N/A')}"
elif event == "session.high_priority":
return f"Ticket: {payload.get('ticket_number', 'N/A')}\nClient: {payload.get('client_name', 'N/A')}"
elif event == "proposal.pending":
return f"Type: {payload.get('proposal_type', 'N/A')}\nDomain: {payload.get('problem_domain', 'N/A')}"
elif event == "proposal.approved":
return f"Approved by {payload.get('reviewer_name', 'a reviewer')}"
elif event == "knowledge_gap.detected":
return f"Severity: {payload.get('severity', 'N/A')}\nArea: {payload.get('problem_domain', 'N/A')}"
elif event == "test":
return "This is a test notification from ResolutionFlow. Your integration is working!"
return ""
def _build_notification_link(event: str, payload: dict[str, Any]) -> str | None:
"""Build in-app link for the notification."""
links = {
"session.escalated": f"/pilot/{payload.get('session_id', '')}",
"session.high_priority": f"/pilot/{payload.get('session_id', '')}",
"proposal.pending": "/review-queue",
"proposal.approved": "/review-queue",
"knowledge_gap.detected": "/analytics/flowpilot",
}
return links.get(event)
Step 2: Add notification email method to EmailService
Edit backend/app/core/email.py — add this static method to the EmailService class:
@staticmethod
async def send_notification_email(to_email: str, subject: str, data: dict) -> bool:
"""Send a notification email."""
if not settings.email_enabled:
logger.warning("Notification email not sent — RESEND_API_KEY not configured")
return False
try:
import resend
resend.api_key = settings.RESEND_API_KEY
html = _render_notification_html(subject=subject, data=data)
resend.Emails.send({
"from": settings.FROM_EMAIL,
"to": [to_email],
"subject": f"[ResolutionFlow] {subject}",
"html": html,
})
logger.info("Notification email sent to %s: %s", to_email, subject)
return True
except Exception:
logger.exception("Failed to send notification email to %s", to_email)
return False
Add this HTML renderer at module level in email.py:
def _render_notification_html(subject: str, data: dict) -> str:
body_text = data.get("body", "")
link = data.get("link", "")
link_html = ""
if link:
full_url = f"{settings.FRONTEND_URL}{link}"
link_html = f'<p><a href="{full_url}" style="color: #06b6d4;">Open in ResolutionFlow →</a></p>'
return f"""
<div style="font-family: 'IBM Plex Sans', sans-serif; max-width: 600px; margin: 0 auto; padding: 24px;">
<h2 style="color: #f8fafc; margin-bottom: 16px;">{subject}</h2>
<div style="background: rgba(24, 26, 31, 0.8); border: 1px solid rgba(255,255,255,0.06); border-radius: 12px; padding: 20px;">
<p style="color: #8891a0; white-space: pre-line;">{body_text}</p>
{link_html}
</div>
<p style="color: #5a6170; font-size: 12px; margin-top: 24px;">— ResolutionFlow</p>
</div>
"""
Step 3: Verify imports
cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
venv/bin/python -c "from app.services.notification_service import notify, retry_failed_notifications, send_test_notification; print('Service OK')"
Step 4: Commit
git add backend/app/services/notification_service.py backend/app/core/email.py
git commit -m "feat(notifications): add notification service with email/Slack/Teams delivery + retry
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"
Task 4: Notification API endpoints
Files:
- Create:
backend/app/api/endpoints/notifications.py - Edit:
backend/app/api/router.py
Step 1: Create notification endpoints
Create backend/app/api/endpoints/notifications.py:
"""Notification endpoints.
- NotificationConfig CRUD (account-level, requires team admin)
- In-app notification list/read/mark-all-read (per user)
- Test notification delivery
"""
import logging
from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import select, func, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.rate_limit import limiter
from app.api.deps import get_current_active_user, require_team_admin
from app.models.user import User
from app.models.notification_config import NotificationConfig
from app.models.notification import Notification as NotificationModel
from app.schemas.notification import (
NotificationConfigCreate,
NotificationConfigUpdate,
NotificationConfigResponse,
NotificationResponse,
UnreadCountResponse,
NotificationTestRequest,
NotificationTestResponse,
)
from app.services.notification_service import send_test_notification
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/notifications", tags=["notifications"])
# --- NotificationConfig CRUD (team admin) ---
@router.get("/configs", response_model=list[NotificationConfigResponse])
async def list_configs(
request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_team_admin),
):
"""List all notification configs for the current account."""
result = await db.execute(
select(NotificationConfig)
.where(NotificationConfig.account_id == current_user.account_id)
.order_by(NotificationConfig.created_at)
)
return result.scalars().all()
@router.post("/configs", response_model=NotificationConfigResponse, status_code=201)
async def create_config(
data: NotificationConfigCreate,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_team_admin),
):
"""Create a new notification channel config."""
# Validate channel-specific requirements
if data.channel in ("slack_webhook", "teams_webhook") and not data.webhook_url:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"webhook_url is required for {data.channel}",
)
if data.channel == "email" and not data.email_addresses:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="email_addresses required for email channel",
)
config = NotificationConfig(
account_id=current_user.account_id,
channel=data.channel,
webhook_url=data.webhook_url,
email_addresses=data.email_addresses,
events_enabled=data.events_enabled,
)
db.add(config)
await db.commit()
await db.refresh(config)
return config
@router.patch("/configs/{config_id}", response_model=NotificationConfigResponse)
async def update_config(
config_id: UUID,
data: NotificationConfigUpdate,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_team_admin),
):
"""Update a notification channel config."""
config = await db.get(NotificationConfig, config_id)
if not config or config.account_id != current_user.account_id:
raise HTTPException(status_code=404, detail="Config not found")
for field, value in data.model_dump(exclude_unset=True).items():
setattr(config, field, value)
await db.commit()
await db.refresh(config)
return config
@router.delete("/configs/{config_id}", status_code=204)
async def delete_config(
config_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_team_admin),
):
"""Delete a notification channel config."""
config = await db.get(NotificationConfig, config_id)
if not config or config.account_id != current_user.account_id:
raise HTTPException(status_code=404, detail="Config not found")
await db.delete(config)
await db.commit()
@router.post("/configs/test", response_model=NotificationTestResponse)
async def test_config(
data: NotificationTestRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_team_admin),
):
"""Send a test notification to verify channel configuration."""
config = await db.get(NotificationConfig, data.config_id)
if not config or config.account_id != current_user.account_id:
raise HTTPException(status_code=404, detail="Config not found")
success, message = await send_test_notification(config)
return NotificationTestResponse(success=success, message=message)
# --- In-app notifications (per user) ---
@router.get("", response_model=list[NotificationResponse])
async def list_notifications(
request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
):
"""List in-app notifications for the current user (unread first)."""
result = await db.execute(
select(NotificationModel)
.where(NotificationModel.user_id == current_user.id)
.order_by(NotificationModel.is_read, NotificationModel.created_at.desc())
.offset(skip)
.limit(limit)
)
return result.scalars().all()
@router.get("/unread-count", response_model=UnreadCountResponse)
async def unread_count(
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Get unread notification count for badge display."""
result = await db.execute(
select(func.count(NotificationModel.id)).where(
NotificationModel.user_id == current_user.id,
NotificationModel.is_read.is_(False),
)
)
return UnreadCountResponse(count=result.scalar_one())
@router.patch("/{notification_id}/read")
async def mark_read(
notification_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Mark a single notification as read."""
notif = await db.get(NotificationModel, notification_id)
if not notif or notif.user_id != current_user.id:
raise HTTPException(status_code=404, detail="Notification not found")
notif.is_read = True
await db.commit()
return {"ok": True}
@router.post("/mark-all-read")
async def mark_all_read(
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
):
"""Mark all notifications as read for the current user."""
await db.execute(
update(NotificationModel)
.where(
NotificationModel.user_id == current_user.id,
NotificationModel.is_read.is_(False),
)
.values(is_read=True)
)
await db.commit()
return {"ok": True}
Step 2: Register router
Edit backend/app/api/router.py — add:
from app.api.endpoints import notifications
api_router.include_router(notifications.router)
Step 3: Verify
cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
venv/bin/python -c "from app.api.endpoints.notifications import router; print(f'Notifications: {len(router.routes)} routes')"
Step 4: Commit
git add backend/app/api/endpoints/notifications.py backend/app/api/router.py
git commit -m "feat(notifications): add notification config CRUD + in-app notification endpoints
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"
Task 5: Notification retry scheduler
Files:
- Edit:
backend/app/main.py
Step 1: Add retry job to scheduler
Edit backend/app/main.py — in the lifespan function, after the existing scheduler jobs, add:
from app.services.notification_service import retry_failed_notifications
from app.core.database import AsyncSessionLocal
async def _process_notification_retries():
"""Retry failed notification deliveries."""
async with AsyncSessionLocal() as db:
retried = await retry_failed_notifications(db)
if retried:
logger.info("Retried %d failed notifications", retried)
scheduler.add_job(
_process_notification_retries,
trigger="interval",
minutes=1,
id="notification_retry",
replace_existing=True,
max_instances=1,
)
Step 2: Verify scheduler starts
cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
venv/bin/python -c "from app.main import app; print('App created OK')"
Step 3: Commit
git add backend/app/main.py
git commit -m "feat(notifications): add APScheduler job for notification retry (1m interval, max_instances=1)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"
Task 6: Wire notifications into session lifecycle
Files:
- Edit:
backend/app/services/flowpilot_engine.py - Edit:
backend/app/services/knowledge_flywheel.py - Edit:
backend/app/api/endpoints/flow_proposals.py
Step 1: Wire into escalation
Edit backend/app/services/flowpilot_engine.py — in escalate_session(), after await db.flush() (line ~498), add:
import asyncio
from app.services.notification_service import notify
# Fire escalation notification (non-blocking)
escalation_payload = {
"session_id": str(session_id),
"engineer_name": session.user.display_name if session.user else "Unknown",
"escalation_reason": request.escalation_reason,
"problem_summary": session.problem_summary or "N/A",
"link": f"/pilot/{session_id}",
}
target_users = [request.escalated_to_id] if request.escalated_to_id else None
asyncio.create_task(
notify("session.escalated", session.account_id, escalation_payload, db, target_user_ids=target_users)
)
Note: asyncio.create_task won't work directly here because the db session may close before the task runs. Instead, use a synchronous call within the existing transaction:
await notify("session.escalated", session.account_id, escalation_payload, db, target_user_ids=target_users)
Step 2: Wire into proposal creation
Edit backend/app/services/knowledge_flywheel.py — after each db.add(proposal) call (lines ~297, ~357, ~433), add:
from app.services.notification_service import notify
# Only notify for proposals that need review (not auto_reinforced)
if proposal.status == "pending":
await notify("proposal.pending", proposal.account_id, {
"title": proposal.title,
"proposal_type": proposal.proposal_type,
"problem_domain": proposal.problem_domain or "General",
"link": "/review-queue",
}, db)
Step 3: Wire into proposal approval
Edit backend/app/api/endpoints/flow_proposals.py — in review_proposal(), after the status is set to "approved" (lines ~228, ~240), add:
from app.services.notification_service import notify
if data.action == "approve":
# ... existing approval logic ...
# Notify the engineer who created the source session
await notify("proposal.approved", proposal.account_id, {
"title": proposal.title,
"reviewer_name": current_user.display_name if hasattr(current_user, 'display_name') else current_user.email,
"link": "/review-queue",
}, db, target_user_ids=[proposal.created_by_id] if proposal.created_by_id else None)
Step 4: Verify imports work
cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
venv/bin/python -c "
from app.services.flowpilot_engine import escalate_session
from app.services.knowledge_flywheel import analyze_session
from app.api.endpoints.flow_proposals import router
print('All wiring OK')
"
Step 5: Commit
git add backend/app/services/flowpilot_engine.py backend/app/services/knowledge_flywheel.py \
backend/app/api/endpoints/flow_proposals.py
git commit -m "feat(notifications): wire notify() into escalation, proposal creation, and approval
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"
Task 7: Frontend — notification types + API client
Files:
- Create:
frontend/src/types/notification.ts - Create:
frontend/src/api/notifications.ts - Edit:
frontend/src/types/index.ts - Edit:
frontend/src/api/index.ts
Step 1: Create types
Create frontend/src/types/notification.ts:
export interface NotificationConfig {
id: string
channel: 'email' | 'slack_webhook' | 'teams_webhook'
webhook_url: string | null
email_addresses: string[] | null
is_active: boolean
events_enabled: Record<string, boolean>
created_at: string
updated_at: string
}
export interface NotificationConfigCreate {
channel: 'email' | 'slack_webhook' | 'teams_webhook'
webhook_url?: string
email_addresses?: string[]
events_enabled?: Record<string, boolean>
}
export interface NotificationConfigUpdate {
webhook_url?: string
email_addresses?: string[]
is_active?: boolean
events_enabled?: Record<string, boolean>
}
export interface AppNotification {
id: string
event: string
title: string
body: string | null
link: string | null
is_read: boolean
created_at: string
}
export interface UnreadCount {
count: number
}
export const NOTIFICATION_EVENTS = {
'session.escalated': 'Session Escalated',
'session.high_priority': 'High Priority Session',
'proposal.pending': 'New Flow Proposal',
'proposal.approved': 'Proposal Approved',
'knowledge_gap.detected': 'Knowledge Gap Detected',
} as const
export const CHANNEL_LABELS = {
email: 'Email',
slack_webhook: 'Slack',
teams_webhook: 'Microsoft Teams',
} as const
Step 2: Create API client
Create frontend/src/api/notifications.ts:
import apiClient from './client'
import type {
NotificationConfig,
NotificationConfigCreate,
NotificationConfigUpdate,
AppNotification,
UnreadCount,
} from '@/types/notification'
export const notificationsApi = {
// --- Config CRUD ---
async listConfigs(): Promise<NotificationConfig[]> {
const response = await apiClient.get<NotificationConfig[]>('/notifications/configs')
return response.data
},
async createConfig(data: NotificationConfigCreate): Promise<NotificationConfig> {
const response = await apiClient.post<NotificationConfig>('/notifications/configs', data)
return response.data
},
async updateConfig(id: string, data: NotificationConfigUpdate): Promise<NotificationConfig> {
const response = await apiClient.patch<NotificationConfig>(`/notifications/configs/${id}`, data)
return response.data
},
async deleteConfig(id: string): Promise<void> {
await apiClient.delete(`/notifications/configs/${id}`)
},
async testConfig(configId: string): Promise<{ success: boolean; message: string }> {
const response = await apiClient.post<{ success: boolean; message: string }>(
'/notifications/configs/test',
{ config_id: configId }
)
return response.data
},
// --- In-app notifications ---
async list(params?: { skip?: number; limit?: number }): Promise<AppNotification[]> {
const response = await apiClient.get<AppNotification[]>('/notifications', { params })
return response.data
},
async unreadCount(): Promise<number> {
const response = await apiClient.get<UnreadCount>('/notifications/unread-count')
return response.data.count
},
async markRead(id: string): Promise<void> {
await apiClient.patch(`/notifications/${id}/read`)
},
async markAllRead(): Promise<void> {
await apiClient.post('/notifications/mark-all-read')
},
}
export default notificationsApi
Step 3: Export from index files
Edit frontend/src/types/index.ts — add:
export type * from './notification'
Edit frontend/src/api/index.ts — add:
export { notificationsApi } from './notifications'
Step 4: Verify build
cd /projects/patherly/frontend && npm run build
Step 5: Commit
git add frontend/src/types/notification.ts frontend/src/api/notifications.ts \
frontend/src/types/index.ts frontend/src/api/index.ts
git commit -m "feat(notifications): add notification types and API client
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"
Task 8: Frontend — NotificationsPanel upgrade (in-app notification center)
Files:
- Edit:
frontend/src/components/layout/NotificationsPanel.tsx
Step 1: Rewrite NotificationsPanel
Replace the contents of frontend/src/components/layout/NotificationsPanel.tsx with a component that:
- Fetches from
notificationsApi.list()instead ofsessionsApi.list() - Shows unread count badge (number, not just a dot) via
notificationsApi.unreadCount() - Each item: event icon, title, body, time ago, click → mark read + navigate
- "Mark all as read" button in header
- Polls unread count every 30 seconds
- Keep existing glass-card dropdown styling
Event icons mapping:
session.escalated→AlertTriangle(amber)session.high_priority→AlertCircle(rose)proposal.pending→FileText(primary/cyan)proposal.approved→CheckCircle(emerald)knowledge_gap.detected→TrendingUp(amber)
Key implementation details:
- Use
useEffectwithsetIntervalfor polling unread count (30s) - On dropdown open, fetch full notification list
- On notification click:
notificationsApi.markRead(id)thennavigate(link)if link exists - Badge: show count number if > 0 (e.g., "3"), red background
bg-rose-500 - Keep the existing click-outside-to-close pattern
Step 2: Verify build
cd /projects/patherly/frontend && npm run build
Step 3: Commit
git add frontend/src/components/layout/NotificationsPanel.tsx
git commit -m "feat(notifications): upgrade NotificationsPanel to real notification center
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"
Task 9: Frontend — Notification settings UI
Files:
- Create:
frontend/src/components/account/NotificationSettings.tsx - Edit:
frontend/src/pages/account/IntegrationsPage.tsx
Step 1: Create NotificationSettings component
Create frontend/src/components/account/NotificationSettings.tsx:
A section component that renders within the Integrations page. Shows:
- Section header: "Notifications" with Bell icon
- List of configured channels as glass-card items
- Each card: channel icon (Mail/Slack/Teams), name, status toggle, event checkboxes, test button, delete button
- "Add Channel" button → dropdown with Email, Slack, Teams options
- For Slack/Teams: webhook URL input field
- For Email: email addresses input (comma-separated)
- Event toggles: checkboxes for each event from
NOTIFICATION_EVENTS - Test button: calls
notificationsApi.testConfig(), shows toast result
Design rules:
- Cards use
.glass-card-static - Buttons follow CLAUDE.md patterns (primary:
bg-gradient-brand, secondary:bg-[rgba(255,255,255,0.04)]) - Status toggle: simple on/off with emerald/muted colors
- Section label:
font-label text-[0.625rem] uppercase tracking-[0.1em] text-muted-foreground
Step 2: Add NotificationSettings to IntegrationsPage
Edit frontend/src/pages/account/IntegrationsPage.tsx — import and render <NotificationSettings /> below the existing PSA connections section.
Step 3: Verify build
cd /projects/patherly/frontend && npm run build
Step 4: Commit
git add frontend/src/components/account/NotificationSettings.tsx \
frontend/src/pages/account/IntegrationsPage.tsx
git commit -m "feat(notifications): add notification settings UI in integrations page
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"
Task 10: Final review + integration test
Step 1: Run backend import check
cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
venv/bin/python -c "
from app.models.notification_config import NotificationConfig
from app.models.notification_log import NotificationLog
from app.models.notification import Notification
from app.schemas.notification import NotificationConfigCreate, NotificationResponse
from app.services.notification_service import notify, retry_failed_notifications
from app.api.endpoints.notifications import router
print(f'Models: OK')
print(f'Schemas: OK')
print(f'Service: OK')
print(f'Endpoints: {len(router.routes)} routes')
print('All imports clean')
"
Step 2: Run frontend build
cd /projects/patherly/frontend && npm run build
Step 3: Verify migration
cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
venv/bin/alembic current
Step 4: Manual smoke test checklist
- Start backend + frontend dev servers
- Open Integrations page → see Notifications section
- Add an email notification config → verify it saves
- Toggle events on/off → verify PATCH works
- Click Test → verify toast shows result
- Bell icon in top bar → see notification dropdown
- Escalate a FlowPilot session → verify notification appears in bell dropdown
- Click notification → navigates to session
- Mark all as read → badge clears
Step 5: Final commit (if any cleanup needed)
git commit -m "fix(notifications): address review feedback
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"
Summary of All Files
New Files
backend/app/models/notification_config.py # Channel config model
backend/app/models/notification_log.py # Delivery log + retry model
backend/app/models/notification.py # In-app notification model
backend/app/schemas/notification.py # All notification schemas
backend/app/services/notification_service.py # Core service (routing, delivery, retry)
backend/app/api/endpoints/notifications.py # Config CRUD + in-app notification endpoints
backend/alembic/versions/XXX_add_notification_tables.py
frontend/src/types/notification.ts # TypeScript interfaces
frontend/src/api/notifications.ts # API client
frontend/src/components/account/NotificationSettings.tsx # Settings UI
Modified Files
backend/app/models/__init__.py # Register 3 new models
backend/app/api/router.py # Register notifications router
backend/app/main.py # Add retry scheduler job
backend/app/core/email.py # Add send_notification_email method
backend/app/services/flowpilot_engine.py # Wire escalation notification
backend/app/services/knowledge_flywheel.py # Wire proposal creation notification
backend/app/api/endpoints/flow_proposals.py # Wire proposal approval notification
frontend/src/types/index.ts # Export notification types
frontend/src/api/index.ts # Export notifications API
frontend/src/components/layout/NotificationsPanel.tsx # Upgrade to real notification center
frontend/src/pages/account/IntegrationsPage.tsx # Add notification settings section