Files
resolutionflow/docs/plans/archive/2026-03-19-phase4-slice2-notifications.md
Michael Chihlas cbb4b25671
All checks were successful
Mirror to GitHub / mirror (push) Successful in 5s
CI / frontend (pull_request) Successful in 6m42s
CI / e2e (pull_request) Successful in 10m11s
CI / backend (pull_request) Successful in 10m43s
fix(ui): drop setState-in-effect in useAuthSessionExpiry
CI surfaced react-hooks/set-state-in-effect on the synchronous
setState(computeState(token)) inside the useEffect body. The earlier
shape mirrored token -> state via an effect, which is exactly the
"you might not need an effect" pattern React 19's eslint rule now
flags.

Switch to derived state: compute during render, use a useReducer
tick to force re-render on the 30s cadence (so relative timestamps
stay current even when token props don't change). Same observable
behavior, no cascading renders.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-13 20:15:11 -04:00

51 KiB

Phase 4 Slice 2: Notification Integrations — Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Build an event-driven notification system that alerts engineers and team admins via email, Slack, Teams, and in-app notifications when key events occur (escalations, proposals, high-priority tickets).

Architecture: Lightweight event-driven service. Events are fired from existing lifecycle points (escalation, proposal creation, approval). The notification service routes them to configured channels. Retry via APScheduler for failed webhooks. In-app notifications extend the existing NotificationsPanel bell icon.

Tech Stack: FastAPI, SQLAlchemy 2.0 (async), APScheduler 3.x, Resend (email), Slack/Teams webhook POST, React, TypeScript, Tailwind CSS v4

Prerequisites:

  • Phase 1-3 complete (AI sessions, escalations, Knowledge Flywheel)
  • Existing: EmailService in core/email.py, APScheduler in core/scheduler.py, NotificationsPanel.tsx
  • Existing models: User, AISession, FlowProposal, Account

Parent plan: docs/2026-03-18-flowpilot-first-pivot-phase4.md (Tasks 4, 5, 6, 6.5)


Task 1: NotificationConfig and NotificationLog models + migration

Files:

  • Create: backend/app/models/notification_config.py
  • Create: backend/app/models/notification_log.py
  • Create: backend/app/models/notification.py
  • Edit: backend/app/models/__init__.py
  • Create: backend/alembic/versions/XXX_add_notification_tables.py (via autogenerate)

Step 1: Create NotificationConfig model

Create backend/app/models/notification_config.py:

"""Notification channel configuration per account.

Each account can have multiple notification configs (email, Slack webhook, Teams webhook).
Each config specifies which events it receives.
"""
import uuid
from datetime import datetime, timezone
from typing import Optional, Any, TYPE_CHECKING

from sqlalchemy import String, Boolean, DateTime, ForeignKey, CheckConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB

from app.core.database import Base

if TYPE_CHECKING:
    from app.models.account import Account


class NotificationConfig(Base):
    """A notification channel configured by an account.

    channel: "email" | "slack_webhook" | "teams_webhook"
    events_enabled: {"session.escalated": true, "proposal.pending": true, ...}
    """
    __tablename__ = "notification_configs"
    __table_args__ = (
        CheckConstraint(
            "channel IN ('email', 'slack_webhook', 'teams_webhook')",
            name="ck_notification_configs_channel",
        ),
    )

    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    account_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("accounts.id", ondelete="CASCADE"),
        nullable=False,
        index=True,
    )
    channel: Mapped[str] = mapped_column(String(20), nullable=False)
    webhook_url: Mapped[Optional[str]] = mapped_column(String(500), nullable=True)
    email_addresses: Mapped[Optional[list]] = mapped_column(JSONB, nullable=True)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    events_enabled: Mapped[dict[str, Any]] = mapped_column(
        JSONB, default=lambda: {
            "session.escalated": True,
            "session.high_priority": True,
            "proposal.pending": True,
            "proposal.approved": True,
            "knowledge_gap.detected": True,
        }
    )
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc),
    )

    account: Mapped[Optional["Account"]] = relationship("Account", foreign_keys=[account_id])

Step 2: Create NotificationLog model

Create backend/app/models/notification_log.py:

"""Notification delivery log with retry tracking.

Tracks every notification delivery attempt. Failed deliveries are retried
via APScheduler with exponential backoff (30s, 2m, 10m — max 3 retries).
"""
import uuid
from datetime import datetime, timezone
from typing import Optional, Any, TYPE_CHECKING

from sqlalchemy import String, Integer, DateTime, ForeignKey, CheckConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB

from app.core.database import Base

if TYPE_CHECKING:
    from app.models.notification_config import NotificationConfig


class NotificationLog(Base):
    """A single notification delivery attempt."""
    __tablename__ = "notification_logs"
    __table_args__ = (
        CheckConstraint(
            "status IN ('sent', 'failed', 'retrying', 'exhausted')",
            name="ck_notification_logs_status",
        ),
    )

    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    notification_config_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("notification_configs.id", ondelete="CASCADE"),
        nullable=False,
        index=True,
    )
    event: Mapped[str] = mapped_column(String(50), nullable=False)
    payload: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)
    status: Mapped[str] = mapped_column(String(20), default="sent")
    retry_count: Mapped[int] = mapped_column(Integer, default=0)
    max_retries: Mapped[int] = mapped_column(Integer, default=3)
    last_error: Mapped[Optional[str]] = mapped_column(String(1000), nullable=True)
    next_retry_at: Mapped[Optional[datetime]] = mapped_column(
        DateTime(timezone=True), nullable=True
    )
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
    )
    delivered_at: Mapped[Optional[datetime]] = mapped_column(
        DateTime(timezone=True), nullable=True
    )

    config: Mapped[Optional["NotificationConfig"]] = relationship(
        "NotificationConfig", foreign_keys=[notification_config_id]
    )

Step 3: Create in-app Notification model

Create backend/app/models/notification.py:

"""In-app notification model.

Created alongside external notifications (email/Slack/Teams).
Powers the NotificationsPanel bell icon dropdown in the frontend.
"""
import uuid
from datetime import datetime, timezone
from typing import Optional, TYPE_CHECKING

from sqlalchemy import String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID

from app.core.database import Base

if TYPE_CHECKING:
    from app.models.user import User


class Notification(Base):
    """An in-app notification for a specific user."""
    __tablename__ = "notifications"

    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    account_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("accounts.id", ondelete="CASCADE"),
        nullable=False,
        index=True,
    )
    user_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("users.id", ondelete="CASCADE"),
        nullable=False,
        index=True,
    )
    event: Mapped[str] = mapped_column(String(50), nullable=False)
    title: Mapped[str] = mapped_column(String(200), nullable=False)
    body: Mapped[Optional[str]] = mapped_column(String(500), nullable=True)
    link: Mapped[Optional[str]] = mapped_column(String(200), nullable=True)
    is_read: Mapped[bool] = mapped_column(Boolean, default=False)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
        index=True,
    )

    user: Mapped[Optional["User"]] = relationship("User", foreign_keys=[user_id])

Step 4: Register models in __init__.py

Edit backend/app/models/__init__.py — add:

from app.models.notification_config import NotificationConfig
from app.models.notification_log import NotificationLog
from app.models.notification import Notification

Step 5: Generate migration

cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
  venv/bin/alembic revision --autogenerate -m "add notification tables"

Review the generated migration. Verify it creates notification_configs, notification_logs, and notifications tables with correct FKs and constraints.

Step 6: Run migration

DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
  venv/bin/alembic upgrade head

Step 7: Commit

git add backend/app/models/notification_config.py backend/app/models/notification_log.py \
  backend/app/models/notification.py backend/app/models/__init__.py \
  backend/alembic/versions/*notification*
git commit -m "feat(notifications): add NotificationConfig, NotificationLog, and Notification models

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"

Task 2: Notification schemas

Files:

  • Create: backend/app/schemas/notification.py

Step 1: Create schemas

Create backend/app/schemas/notification.py:

"""Pydantic schemas for notification system."""
from datetime import datetime
from uuid import UUID
from typing import Optional, Any

from pydantic import BaseModel, Field


# --- NotificationConfig schemas ---

VALID_CHANNELS = {"email", "slack_webhook", "teams_webhook"}

VALID_EVENTS = {
    "session.escalated",
    "session.high_priority",
    "proposal.pending",
    "proposal.approved",
    "knowledge_gap.detected",
}


class NotificationConfigCreate(BaseModel):
    channel: str = Field(..., pattern="^(email|slack_webhook|teams_webhook)$")
    webhook_url: str | None = None
    email_addresses: list[str] | None = None
    events_enabled: dict[str, bool] = Field(
        default_factory=lambda: {e: True for e in VALID_EVENTS}
    )


class NotificationConfigUpdate(BaseModel):
    webhook_url: str | None = None
    email_addresses: list[str] | None = None
    is_active: bool | None = None
    events_enabled: dict[str, bool] | None = None


class NotificationConfigResponse(BaseModel):
    id: UUID
    channel: str
    webhook_url: str | None
    email_addresses: list[str] | None
    is_active: bool
    events_enabled: dict[str, bool]
    created_at: datetime
    updated_at: datetime

    model_config = {"from_attributes": True}


# --- In-app Notification schemas ---

class NotificationResponse(BaseModel):
    id: UUID
    event: str
    title: str
    body: str | None
    link: str | None
    is_read: bool
    created_at: datetime

    model_config = {"from_attributes": True}


class UnreadCountResponse(BaseModel):
    count: int


# --- Notification test ---

class NotificationTestRequest(BaseModel):
    config_id: UUID


class NotificationTestResponse(BaseModel):
    success: bool
    message: str

Step 2: Commit

git add backend/app/schemas/notification.py
git commit -m "feat(notifications): add notification Pydantic schemas

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"

Task 3: Notification service (core event routing + channel delivery)

Files:

  • Create: backend/app/services/notification_service.py

Step 1: Create the notification service

Create backend/app/services/notification_service.py:

"""Event-driven notification service.

Fires notifications to configured channels (email, Slack, Teams) and
creates in-app notification records. Failed webhook deliveries are
logged for retry by the scheduler.

Usage:
    await notify("session.escalated", account_id, payload, db)
"""
import logging
from datetime import datetime, timezone, timedelta
from typing import Optional, Any
from uuid import UUID

import httpx
from sqlalchemy import select, and_
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.config import settings
from app.core.email import EmailService
from app.models.notification_config import NotificationConfig
from app.models.notification_log import NotificationLog
from app.models.notification import Notification
from app.models.user import User

logger = logging.getLogger(__name__)

# Exponential backoff delays for retries (seconds)
RETRY_DELAYS = [30, 120, 600]  # 30s, 2m, 10m


async def notify(
    event: str,
    account_id: UUID,
    payload: dict[str, Any],
    db: AsyncSession,
    *,
    target_user_ids: list[UUID] | None = None,
) -> None:
    """Fire a notification event. Routes to all active channels for this account.

    Also creates in-app Notification records for target_user_ids.
    If target_user_ids is None, notifies all team admins + account owner.

    Args:
        event: Event type (e.g., "session.escalated")
        account_id: Account that owns the event
        payload: Event data (session summary, proposal title, etc.)
        db: Database session
        target_user_ids: Specific users to notify. None = team admins + owner.
    """
    # 1. Create in-app notifications
    recipients = await _resolve_recipients(account_id, target_user_ids, db)
    title = _build_notification_title(event, payload)
    body = _build_notification_body(event, payload)
    link = _build_notification_link(event, payload)

    for user_id in recipients:
        db.add(Notification(
            account_id=account_id,
            user_id=user_id,
            event=event,
            title=title,
            body=body,
            link=link,
        ))

    # 2. Route to external channels
    configs = await _get_active_configs(account_id, event, db)
    for config in configs:
        await _deliver_to_channel(config, event, payload, db)

    await db.flush()


async def retry_failed_notifications(db: AsyncSession) -> int:
    """Retry failed notification deliveries. Called by APScheduler.

    Returns the number of retries attempted.
    """
    now = datetime.now(timezone.utc)
    result = await db.execute(
        select(NotificationLog)
        .where(
            NotificationLog.status == "retrying",
            NotificationLog.next_retry_at <= now,
        )
        .limit(50)
    )
    logs = result.scalars().all()

    retried = 0
    for log in logs:
        config = log.config
        if not config:
            log.status = "exhausted"
            log.last_error = "Config deleted"
            continue

        success = await _attempt_delivery(config, log.event, log.payload)
        log.retry_count += 1

        if success:
            log.status = "sent"
            log.delivered_at = now
            logger.info("Notification retry succeeded: %s (attempt %d)", log.id, log.retry_count)
        elif log.retry_count >= log.max_retries:
            log.status = "exhausted"
            log.last_error = f"Exhausted after {log.retry_count} retries"
            logger.warning("Notification exhausted after %d retries: %s", log.retry_count, log.id)
        else:
            delay = RETRY_DELAYS[min(log.retry_count, len(RETRY_DELAYS) - 1)]
            log.next_retry_at = now + timedelta(seconds=delay)
            log.status = "retrying"

        retried += 1

    if retried:
        await db.commit()

    return retried


async def send_test_notification(config: NotificationConfig) -> tuple[bool, str]:
    """Send a test notification to verify channel configuration.

    Returns (success, message).
    """
    test_payload = {
        "title": "Test Notification",
        "body": "This is a test notification from ResolutionFlow.",
        "session_id": None,
        "engineer_name": "System",
    }

    if config.channel == "email":
        if not config.email_addresses:
            return False, "No email addresses configured"
        success = await EmailService.send_notification_email(
            config.email_addresses[0], "Test Notification", test_payload
        )
        return success, "Test email sent" if success else "Email delivery failed"

    elif config.channel == "slack_webhook":
        if not config.webhook_url:
            return False, "No webhook URL configured"
        success = await _send_slack_message(config.webhook_url, "test", test_payload)
        return success, "Test message sent to Slack" if success else "Slack webhook failed"

    elif config.channel == "teams_webhook":
        if not config.webhook_url:
            return False, "No webhook URL configured"
        success = await _send_teams_message(config.webhook_url, "test", test_payload)
        return success, "Test message sent to Teams" if success else "Teams webhook failed"

    return False, f"Unknown channel: {config.channel}"


# --- Internal helpers ---

async def _get_active_configs(
    account_id: UUID, event: str, db: AsyncSession
) -> list[NotificationConfig]:
    """Get all active notification configs for this account + event."""
    result = await db.execute(
        select(NotificationConfig).where(
            NotificationConfig.account_id == account_id,
            NotificationConfig.is_active.is_(True),
        )
    )
    configs = result.scalars().all()
    # Filter by event enabled
    return [c for c in configs if c.events_enabled.get(event, False)]


async def _resolve_recipients(
    account_id: UUID,
    target_user_ids: list[UUID] | None,
    db: AsyncSession,
) -> list[UUID]:
    """Resolve notification recipients. Defaults to team admins + account owner."""
    if target_user_ids:
        return target_user_ids

    result = await db.execute(
        select(User.id).where(
            User.account_id == account_id,
            User.is_active.is_(True),
            (User.is_team_admin.is_(True)) | (User.account_role == "owner"),
        )
    )
    return list(result.scalars().all())


async def _deliver_to_channel(
    config: NotificationConfig,
    event: str,
    payload: dict[str, Any],
    db: AsyncSession,
) -> None:
    """Attempt delivery to a channel. Log result."""
    success = await _attempt_delivery(config, event, payload)

    log = NotificationLog(
        notification_config_id=config.id,
        event=event,
        payload=payload,
    )

    if success:
        log.status = "sent"
        log.delivered_at = datetime.now(timezone.utc)
    else:
        log.status = "retrying"
        log.last_error = f"Initial delivery failed for {config.channel}"
        log.next_retry_at = datetime.now(timezone.utc) + timedelta(seconds=RETRY_DELAYS[0])

    db.add(log)


async def _attempt_delivery(
    config: NotificationConfig, event: str, payload: dict[str, Any]
) -> bool:
    """Attempt to deliver a notification via the configured channel."""
    try:
        if config.channel == "email":
            return await _send_email(config, event, payload)
        elif config.channel == "slack_webhook":
            return await _send_slack_message(
                config.webhook_url, event, payload
            )
        elif config.channel == "teams_webhook":
            return await _send_teams_message(
                config.webhook_url, event, payload
            )
        return False
    except Exception:
        logger.exception("Notification delivery failed for config %s", config.id)
        return False


async def _send_email(
    config: NotificationConfig, event: str, payload: dict[str, Any]
) -> bool:
    """Send notification email to all configured addresses."""
    if not config.email_addresses:
        return False

    title = _build_notification_title(event, payload)
    success = True
    for addr in config.email_addresses:
        result = await EmailService.send_notification_email(addr, title, payload)
        if not result:
            success = False
    return success


async def _send_slack_message(
    webhook_url: str, event: str, payload: dict[str, Any]
) -> bool:
    """Send a Slack notification via incoming webhook."""
    title = _build_notification_title(event, payload)
    body = _build_notification_body(event, payload)
    link = payload.get("link", "")

    slack_payload = {
        "blocks": [
            {
                "type": "header",
                "text": {"type": "plain_text", "text": f"🔔 {title}", "emoji": True},
            },
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": body},
            },
        ]
    }

    if link:
        slack_payload["blocks"].append({
            "type": "actions",
            "elements": [
                {
                    "type": "button",
                    "text": {"type": "plain_text", "text": "Open in ResolutionFlow"},
                    "url": f"{settings.FRONTEND_URL}{link}",
                }
            ],
        })

    async with httpx.AsyncClient(timeout=10) as client:
        resp = await client.post(webhook_url, json=slack_payload)
        if resp.status_code == 200:
            logger.info("Slack notification sent: %s", event)
            return True
        logger.warning("Slack webhook returned %d: %s", resp.status_code, resp.text)
        return False


async def _send_teams_message(
    webhook_url: str, event: str, payload: dict[str, Any]
) -> bool:
    """Send a Teams notification via incoming webhook (Adaptive Card)."""
    title = _build_notification_title(event, payload)
    body = _build_notification_body(event, payload)
    link = payload.get("link", "")

    card = {
        "type": "message",
        "attachments": [
            {
                "contentType": "application/vnd.microsoft.card.adaptive",
                "content": {
                    "$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
                    "type": "AdaptiveCard",
                    "version": "1.4",
                    "body": [
                        {"type": "TextBlock", "text": title, "weight": "Bolder", "size": "Medium"},
                        {"type": "TextBlock", "text": body, "wrap": True},
                    ],
                },
            }
        ],
    }

    if link:
        card["attachments"][0]["content"]["actions"] = [
            {
                "type": "Action.OpenUrl",
                "title": "Open in ResolutionFlow",
                "url": f"{settings.FRONTEND_URL}{link}",
            }
        ]

    async with httpx.AsyncClient(timeout=10) as client:
        resp = await client.post(webhook_url, json=card)
        if resp.status_code in (200, 202):
            logger.info("Teams notification sent: %s", event)
            return True
        logger.warning("Teams webhook returned %d: %s", resp.status_code, resp.text)
        return False


def _build_notification_title(event: str, payload: dict[str, Any]) -> str:
    """Build a human-readable notification title."""
    titles = {
        "session.escalated": f"Session escalated by {payload.get('engineer_name', 'an engineer')}",
        "session.high_priority": f"High-priority session started: {payload.get('ticket_number', 'N/A')}",
        "proposal.pending": f"New flow proposal: {payload.get('title', 'Untitled')}",
        "proposal.approved": f"Flow proposal approved: {payload.get('title', 'Untitled')}",
        "knowledge_gap.detected": f"Knowledge gap detected: {payload.get('gap_type', 'Unknown')}",
        "test": "Test Notification from ResolutionFlow",
    }
    return titles.get(event, f"Notification: {event}")


def _build_notification_body(event: str, payload: dict[str, Any]) -> str:
    """Build notification body text."""
    if event == "session.escalated":
        reason = payload.get("escalation_reason", "No reason provided")
        return f"Reason: {reason}\nProblem: {payload.get('problem_summary', 'N/A')}"
    elif event == "session.high_priority":
        return f"Ticket: {payload.get('ticket_number', 'N/A')}\nClient: {payload.get('client_name', 'N/A')}"
    elif event == "proposal.pending":
        return f"Type: {payload.get('proposal_type', 'N/A')}\nDomain: {payload.get('problem_domain', 'N/A')}"
    elif event == "proposal.approved":
        return f"Approved by {payload.get('reviewer_name', 'a reviewer')}"
    elif event == "knowledge_gap.detected":
        return f"Severity: {payload.get('severity', 'N/A')}\nArea: {payload.get('problem_domain', 'N/A')}"
    elif event == "test":
        return "This is a test notification from ResolutionFlow. Your integration is working!"
    return ""


def _build_notification_link(event: str, payload: dict[str, Any]) -> str | None:
    """Build in-app link for the notification."""
    links = {
        "session.escalated": f"/pilot/{payload.get('session_id', '')}",
        "session.high_priority": f"/pilot/{payload.get('session_id', '')}",
        "proposal.pending": "/review-queue",
        "proposal.approved": "/review-queue",
        "knowledge_gap.detected": "/analytics/flowpilot",
    }
    return links.get(event)

Step 2: Add notification email method to EmailService

Edit backend/app/core/email.py — add this static method to the EmailService class:

@staticmethod
async def send_notification_email(to_email: str, subject: str, data: dict) -> bool:
    """Send a notification email."""
    if not settings.email_enabled:
        logger.warning("Notification email not sent — RESEND_API_KEY not configured")
        return False
    try:
        import resend
        resend.api_key = settings.RESEND_API_KEY
        html = _render_notification_html(subject=subject, data=data)
        resend.Emails.send({
            "from": settings.FROM_EMAIL,
            "to": [to_email],
            "subject": f"[ResolutionFlow] {subject}",
            "html": html,
        })
        logger.info("Notification email sent to %s: %s", to_email, subject)
        return True
    except Exception:
        logger.exception("Failed to send notification email to %s", to_email)
        return False

Add this HTML renderer at module level in email.py:

def _render_notification_html(subject: str, data: dict) -> str:
    body_text = data.get("body", "")
    link = data.get("link", "")
    link_html = ""
    if link:
        full_url = f"{settings.FRONTEND_URL}{link}"
        link_html = f'<p><a href="{full_url}" style="color: #06b6d4;">Open in ResolutionFlow →</a></p>'

    return f"""
    <div style="font-family: 'IBM Plex Sans', sans-serif; max-width: 600px; margin: 0 auto; padding: 24px;">
        <h2 style="color: #f8fafc; margin-bottom: 16px;">{subject}</h2>
        <div style="background: rgba(24, 26, 31, 0.8); border: 1px solid rgba(255,255,255,0.06); border-radius: 12px; padding: 20px;">
            <p style="color: #8891a0; white-space: pre-line;">{body_text}</p>
            {link_html}
        </div>
        <p style="color: #5a6170; font-size: 12px; margin-top: 24px;">— ResolutionFlow</p>
    </div>
    """

Step 3: Verify imports

cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
  venv/bin/python -c "from app.services.notification_service import notify, retry_failed_notifications, send_test_notification; print('Service OK')"

Step 4: Commit

git add backend/app/services/notification_service.py backend/app/core/email.py
git commit -m "feat(notifications): add notification service with email/Slack/Teams delivery + retry

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"

Task 4: Notification API endpoints

Files:

  • Create: backend/app/api/endpoints/notifications.py
  • Edit: backend/app/api/router.py

Step 1: Create notification endpoints

Create backend/app/api/endpoints/notifications.py:

"""Notification endpoints.

- NotificationConfig CRUD (account-level, requires team admin)
- In-app notification list/read/mark-all-read (per user)
- Test notification delivery
"""
import logging
from typing import Annotated, Optional
from uuid import UUID

from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import select, func, update
from sqlalchemy.ext.asyncio import AsyncSession

from app.core.database import get_db
from app.core.rate_limit import limiter
from app.api.deps import get_current_active_user, require_team_admin
from app.models.user import User
from app.models.notification_config import NotificationConfig
from app.models.notification import Notification as NotificationModel
from app.schemas.notification import (
    NotificationConfigCreate,
    NotificationConfigUpdate,
    NotificationConfigResponse,
    NotificationResponse,
    UnreadCountResponse,
    NotificationTestRequest,
    NotificationTestResponse,
)
from app.services.notification_service import send_test_notification

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/notifications", tags=["notifications"])


# --- NotificationConfig CRUD (team admin) ---

@router.get("/configs", response_model=list[NotificationConfigResponse])
async def list_configs(
    request,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
    _: None = Depends(require_team_admin),
):
    """List all notification configs for the current account."""
    result = await db.execute(
        select(NotificationConfig)
        .where(NotificationConfig.account_id == current_user.account_id)
        .order_by(NotificationConfig.created_at)
    )
    return result.scalars().all()


@router.post("/configs", response_model=NotificationConfigResponse, status_code=201)
async def create_config(
    data: NotificationConfigCreate,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
    _: None = Depends(require_team_admin),
):
    """Create a new notification channel config."""
    # Validate channel-specific requirements
    if data.channel in ("slack_webhook", "teams_webhook") and not data.webhook_url:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=f"webhook_url is required for {data.channel}",
        )
    if data.channel == "email" and not data.email_addresses:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail="email_addresses required for email channel",
        )

    config = NotificationConfig(
        account_id=current_user.account_id,
        channel=data.channel,
        webhook_url=data.webhook_url,
        email_addresses=data.email_addresses,
        events_enabled=data.events_enabled,
    )
    db.add(config)
    await db.commit()
    await db.refresh(config)
    return config


@router.patch("/configs/{config_id}", response_model=NotificationConfigResponse)
async def update_config(
    config_id: UUID,
    data: NotificationConfigUpdate,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
    _: None = Depends(require_team_admin),
):
    """Update a notification channel config."""
    config = await db.get(NotificationConfig, config_id)
    if not config or config.account_id != current_user.account_id:
        raise HTTPException(status_code=404, detail="Config not found")

    for field, value in data.model_dump(exclude_unset=True).items():
        setattr(config, field, value)

    await db.commit()
    await db.refresh(config)
    return config


@router.delete("/configs/{config_id}", status_code=204)
async def delete_config(
    config_id: UUID,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
    _: None = Depends(require_team_admin),
):
    """Delete a notification channel config."""
    config = await db.get(NotificationConfig, config_id)
    if not config or config.account_id != current_user.account_id:
        raise HTTPException(status_code=404, detail="Config not found")

    await db.delete(config)
    await db.commit()


@router.post("/configs/test", response_model=NotificationTestResponse)
async def test_config(
    data: NotificationTestRequest,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
    _: None = Depends(require_team_admin),
):
    """Send a test notification to verify channel configuration."""
    config = await db.get(NotificationConfig, data.config_id)
    if not config or config.account_id != current_user.account_id:
        raise HTTPException(status_code=404, detail="Config not found")

    success, message = await send_test_notification(config)
    return NotificationTestResponse(success=success, message=message)


# --- In-app notifications (per user) ---

@router.get("", response_model=list[NotificationResponse])
async def list_notifications(
    request,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
    skip: int = Query(0, ge=0),
    limit: int = Query(20, ge=1, le=100),
):
    """List in-app notifications for the current user (unread first)."""
    result = await db.execute(
        select(NotificationModel)
        .where(NotificationModel.user_id == current_user.id)
        .order_by(NotificationModel.is_read, NotificationModel.created_at.desc())
        .offset(skip)
        .limit(limit)
    )
    return result.scalars().all()


@router.get("/unread-count", response_model=UnreadCountResponse)
async def unread_count(
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
):
    """Get unread notification count for badge display."""
    result = await db.execute(
        select(func.count(NotificationModel.id)).where(
            NotificationModel.user_id == current_user.id,
            NotificationModel.is_read.is_(False),
        )
    )
    return UnreadCountResponse(count=result.scalar_one())


@router.patch("/{notification_id}/read")
async def mark_read(
    notification_id: UUID,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
):
    """Mark a single notification as read."""
    notif = await db.get(NotificationModel, notification_id)
    if not notif or notif.user_id != current_user.id:
        raise HTTPException(status_code=404, detail="Notification not found")

    notif.is_read = True
    await db.commit()
    return {"ok": True}


@router.post("/mark-all-read")
async def mark_all_read(
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
):
    """Mark all notifications as read for the current user."""
    await db.execute(
        update(NotificationModel)
        .where(
            NotificationModel.user_id == current_user.id,
            NotificationModel.is_read.is_(False),
        )
        .values(is_read=True)
    )
    await db.commit()
    return {"ok": True}

Step 2: Register router

Edit backend/app/api/router.py — add:

from app.api.endpoints import notifications

api_router.include_router(notifications.router)

Step 3: Verify

cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
  venv/bin/python -c "from app.api.endpoints.notifications import router; print(f'Notifications: {len(router.routes)} routes')"

Step 4: Commit

git add backend/app/api/endpoints/notifications.py backend/app/api/router.py
git commit -m "feat(notifications): add notification config CRUD + in-app notification endpoints

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"

Task 5: Notification retry scheduler

Files:

  • Edit: backend/app/main.py

Step 1: Add retry job to scheduler

Edit backend/app/main.py — in the lifespan function, after the existing scheduler jobs, add:

from app.services.notification_service import retry_failed_notifications
from app.core.database import AsyncSessionLocal

async def _process_notification_retries():
    """Retry failed notification deliveries."""
    async with AsyncSessionLocal() as db:
        retried = await retry_failed_notifications(db)
        if retried:
            logger.info("Retried %d failed notifications", retried)

scheduler.add_job(
    _process_notification_retries,
    trigger="interval",
    minutes=1,
    id="notification_retry",
    replace_existing=True,
    max_instances=1,
)

Step 2: Verify scheduler starts

cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
  venv/bin/python -c "from app.main import app; print('App created OK')"

Step 3: Commit

git add backend/app/main.py
git commit -m "feat(notifications): add APScheduler job for notification retry (1m interval, max_instances=1)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"

Task 6: Wire notifications into session lifecycle

Files:

  • Edit: backend/app/services/flowpilot_engine.py
  • Edit: backend/app/services/knowledge_flywheel.py
  • Edit: backend/app/api/endpoints/flow_proposals.py

Step 1: Wire into escalation

Edit backend/app/services/flowpilot_engine.py — in escalate_session(), after await db.flush() (line ~498), add:

import asyncio
from app.services.notification_service import notify

# Fire escalation notification (non-blocking)
escalation_payload = {
    "session_id": str(session_id),
    "engineer_name": session.user.display_name if session.user else "Unknown",
    "escalation_reason": request.escalation_reason,
    "problem_summary": session.problem_summary or "N/A",
    "link": f"/pilot/{session_id}",
}
target_users = [request.escalated_to_id] if request.escalated_to_id else None
asyncio.create_task(
    notify("session.escalated", session.account_id, escalation_payload, db, target_user_ids=target_users)
)

Note: asyncio.create_task won't work directly here because the db session may close before the task runs. Instead, use a synchronous call within the existing transaction:

await notify("session.escalated", session.account_id, escalation_payload, db, target_user_ids=target_users)

Step 2: Wire into proposal creation

Edit backend/app/services/knowledge_flywheel.py — after each db.add(proposal) call (lines ~297, ~357, ~433), add:

from app.services.notification_service import notify

# Only notify for proposals that need review (not auto_reinforced)
if proposal.status == "pending":
    await notify("proposal.pending", proposal.account_id, {
        "title": proposal.title,
        "proposal_type": proposal.proposal_type,
        "problem_domain": proposal.problem_domain or "General",
        "link": "/review-queue",
    }, db)

Step 3: Wire into proposal approval

Edit backend/app/api/endpoints/flow_proposals.py — in review_proposal(), after the status is set to "approved" (lines ~228, ~240), add:

from app.services.notification_service import notify

if data.action == "approve":
    # ... existing approval logic ...

    # Notify the engineer who created the source session
    await notify("proposal.approved", proposal.account_id, {
        "title": proposal.title,
        "reviewer_name": current_user.display_name if hasattr(current_user, 'display_name') else current_user.email,
        "link": "/review-queue",
    }, db, target_user_ids=[proposal.created_by_id] if proposal.created_by_id else None)

Step 4: Verify imports work

cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
  venv/bin/python -c "
from app.services.flowpilot_engine import escalate_session
from app.services.knowledge_flywheel import analyze_session
from app.api.endpoints.flow_proposals import router
print('All wiring OK')
"

Step 5: Commit

git add backend/app/services/flowpilot_engine.py backend/app/services/knowledge_flywheel.py \
  backend/app/api/endpoints/flow_proposals.py
git commit -m "feat(notifications): wire notify() into escalation, proposal creation, and approval

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"

Task 7: Frontend — notification types + API client

Files:

  • Create: frontend/src/types/notification.ts
  • Create: frontend/src/api/notifications.ts
  • Edit: frontend/src/types/index.ts
  • Edit: frontend/src/api/index.ts

Step 1: Create types

Create frontend/src/types/notification.ts:

export interface NotificationConfig {
  id: string
  channel: 'email' | 'slack_webhook' | 'teams_webhook'
  webhook_url: string | null
  email_addresses: string[] | null
  is_active: boolean
  events_enabled: Record<string, boolean>
  created_at: string
  updated_at: string
}

export interface NotificationConfigCreate {
  channel: 'email' | 'slack_webhook' | 'teams_webhook'
  webhook_url?: string
  email_addresses?: string[]
  events_enabled?: Record<string, boolean>
}

export interface NotificationConfigUpdate {
  webhook_url?: string
  email_addresses?: string[]
  is_active?: boolean
  events_enabled?: Record<string, boolean>
}

export interface AppNotification {
  id: string
  event: string
  title: string
  body: string | null
  link: string | null
  is_read: boolean
  created_at: string
}

export interface UnreadCount {
  count: number
}

export const NOTIFICATION_EVENTS = {
  'session.escalated': 'Session Escalated',
  'session.high_priority': 'High Priority Session',
  'proposal.pending': 'New Flow Proposal',
  'proposal.approved': 'Proposal Approved',
  'knowledge_gap.detected': 'Knowledge Gap Detected',
} as const

export const CHANNEL_LABELS = {
  email: 'Email',
  slack_webhook: 'Slack',
  teams_webhook: 'Microsoft Teams',
} as const

Step 2: Create API client

Create frontend/src/api/notifications.ts:

import apiClient from './client'
import type {
  NotificationConfig,
  NotificationConfigCreate,
  NotificationConfigUpdate,
  AppNotification,
  UnreadCount,
} from '@/types/notification'

export const notificationsApi = {
  // --- Config CRUD ---
  async listConfigs(): Promise<NotificationConfig[]> {
    const response = await apiClient.get<NotificationConfig[]>('/notifications/configs')
    return response.data
  },

  async createConfig(data: NotificationConfigCreate): Promise<NotificationConfig> {
    const response = await apiClient.post<NotificationConfig>('/notifications/configs', data)
    return response.data
  },

  async updateConfig(id: string, data: NotificationConfigUpdate): Promise<NotificationConfig> {
    const response = await apiClient.patch<NotificationConfig>(`/notifications/configs/${id}`, data)
    return response.data
  },

  async deleteConfig(id: string): Promise<void> {
    await apiClient.delete(`/notifications/configs/${id}`)
  },

  async testConfig(configId: string): Promise<{ success: boolean; message: string }> {
    const response = await apiClient.post<{ success: boolean; message: string }>(
      '/notifications/configs/test',
      { config_id: configId }
    )
    return response.data
  },

  // --- In-app notifications ---
  async list(params?: { skip?: number; limit?: number }): Promise<AppNotification[]> {
    const response = await apiClient.get<AppNotification[]>('/notifications', { params })
    return response.data
  },

  async unreadCount(): Promise<number> {
    const response = await apiClient.get<UnreadCount>('/notifications/unread-count')
    return response.data.count
  },

  async markRead(id: string): Promise<void> {
    await apiClient.patch(`/notifications/${id}/read`)
  },

  async markAllRead(): Promise<void> {
    await apiClient.post('/notifications/mark-all-read')
  },
}

export default notificationsApi

Step 3: Export from index files

Edit frontend/src/types/index.ts — add:

export type * from './notification'

Edit frontend/src/api/index.ts — add:

export { notificationsApi } from './notifications'

Step 4: Verify build

cd /projects/patherly/frontend && npm run build

Step 5: Commit

git add frontend/src/types/notification.ts frontend/src/api/notifications.ts \
  frontend/src/types/index.ts frontend/src/api/index.ts
git commit -m "feat(notifications): add notification types and API client

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"

Task 8: Frontend — NotificationsPanel upgrade (in-app notification center)

Files:

  • Edit: frontend/src/components/layout/NotificationsPanel.tsx

Step 1: Rewrite NotificationsPanel

Replace the contents of frontend/src/components/layout/NotificationsPanel.tsx with a component that:

  • Fetches from notificationsApi.list() instead of sessionsApi.list()
  • Shows unread count badge (number, not just a dot) via notificationsApi.unreadCount()
  • Each item: event icon, title, body, time ago, click → mark read + navigate
  • "Mark all as read" button in header
  • Polls unread count every 30 seconds
  • Keep existing glass-card dropdown styling

Event icons mapping:

  • session.escalatedAlertTriangle (amber)
  • session.high_priorityAlertCircle (rose)
  • proposal.pendingFileText (primary/cyan)
  • proposal.approvedCheckCircle (emerald)
  • knowledge_gap.detectedTrendingUp (amber)

Key implementation details:

  • Use useEffect with setInterval for polling unread count (30s)
  • On dropdown open, fetch full notification list
  • On notification click: notificationsApi.markRead(id) then navigate(link) if link exists
  • Badge: show count number if > 0 (e.g., "3"), red background bg-rose-500
  • Keep the existing click-outside-to-close pattern

Step 2: Verify build

cd /projects/patherly/frontend && npm run build

Step 3: Commit

git add frontend/src/components/layout/NotificationsPanel.tsx
git commit -m "feat(notifications): upgrade NotificationsPanel to real notification center

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"

Task 9: Frontend — Notification settings UI

Files:

  • Create: frontend/src/components/account/NotificationSettings.tsx
  • Edit: frontend/src/pages/account/IntegrationsPage.tsx

Step 1: Create NotificationSettings component

Create frontend/src/components/account/NotificationSettings.tsx:

A section component that renders within the Integrations page. Shows:

  • Section header: "Notifications" with Bell icon
  • List of configured channels as glass-card items
  • Each card: channel icon (Mail/Slack/Teams), name, status toggle, event checkboxes, test button, delete button
  • "Add Channel" button → dropdown with Email, Slack, Teams options
  • For Slack/Teams: webhook URL input field
  • For Email: email addresses input (comma-separated)
  • Event toggles: checkboxes for each event from NOTIFICATION_EVENTS
  • Test button: calls notificationsApi.testConfig(), shows toast result

Design rules:

  • Cards use .glass-card-static
  • Buttons follow CLAUDE.md patterns (primary: bg-gradient-brand, secondary: bg-[rgba(255,255,255,0.04)])
  • Status toggle: simple on/off with emerald/muted colors
  • Section label: font-label text-[0.625rem] uppercase tracking-[0.1em] text-muted-foreground

Step 2: Add NotificationSettings to IntegrationsPage

Edit frontend/src/pages/account/IntegrationsPage.tsx — import and render <NotificationSettings /> below the existing PSA connections section.

Step 3: Verify build

cd /projects/patherly/frontend && npm run build

Step 4: Commit

git add frontend/src/components/account/NotificationSettings.tsx \
  frontend/src/pages/account/IntegrationsPage.tsx
git commit -m "feat(notifications): add notification settings UI in integrations page

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"

Task 10: Final review + integration test

Step 1: Run backend import check

cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
  venv/bin/python -c "
from app.models.notification_config import NotificationConfig
from app.models.notification_log import NotificationLog
from app.models.notification import Notification
from app.schemas.notification import NotificationConfigCreate, NotificationResponse
from app.services.notification_service import notify, retry_failed_notifications
from app.api.endpoints.notifications import router
print(f'Models: OK')
print(f'Schemas: OK')
print(f'Service: OK')
print(f'Endpoints: {len(router.routes)} routes')
print('All imports clean')
"

Step 2: Run frontend build

cd /projects/patherly/frontend && npm run build

Step 3: Verify migration

cd /projects/patherly/backend
DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \
  venv/bin/alembic current

Step 4: Manual smoke test checklist

  • Start backend + frontend dev servers
  • Open Integrations page → see Notifications section
  • Add an email notification config → verify it saves
  • Toggle events on/off → verify PATCH works
  • Click Test → verify toast shows result
  • Bell icon in top bar → see notification dropdown
  • Escalate a FlowPilot session → verify notification appears in bell dropdown
  • Click notification → navigates to session
  • Mark all as read → badge clears

Step 5: Final commit (if any cleanup needed)

git commit -m "fix(notifications): address review feedback

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>"

Summary of All Files

New Files

backend/app/models/notification_config.py          # Channel config model
backend/app/models/notification_log.py             # Delivery log + retry model
backend/app/models/notification.py                 # In-app notification model
backend/app/schemas/notification.py                # All notification schemas
backend/app/services/notification_service.py       # Core service (routing, delivery, retry)
backend/app/api/endpoints/notifications.py         # Config CRUD + in-app notification endpoints
backend/alembic/versions/XXX_add_notification_tables.py
frontend/src/types/notification.ts                 # TypeScript interfaces
frontend/src/api/notifications.ts                  # API client
frontend/src/components/account/NotificationSettings.tsx  # Settings UI

Modified Files

backend/app/models/__init__.py                     # Register 3 new models
backend/app/api/router.py                          # Register notifications router
backend/app/main.py                                # Add retry scheduler job
backend/app/core/email.py                          # Add send_notification_email method
backend/app/services/flowpilot_engine.py           # Wire escalation notification
backend/app/services/knowledge_flywheel.py         # Wire proposal creation notification
backend/app/api/endpoints/flow_proposals.py        # Wire proposal approval notification
frontend/src/types/index.ts                        # Export notification types
frontend/src/api/index.ts                          # Export notifications API
frontend/src/components/layout/NotificationsPanel.tsx  # Upgrade to real notification center
frontend/src/pages/account/IntegrationsPage.tsx    # Add notification settings section