# Phase 4 Slice 2: Notification Integrations — Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Build an event-driven notification system that alerts engineers and team admins via email, Slack, Teams, and in-app notifications when key events occur (escalations, proposals, high-priority tickets). **Architecture:** Lightweight event-driven service. Events are fired from existing lifecycle points (escalation, proposal creation, approval). The notification service routes them to configured channels. Retry via APScheduler for failed webhooks. In-app notifications extend the existing `NotificationsPanel` bell icon. **Tech Stack:** FastAPI, SQLAlchemy 2.0 (async), APScheduler 3.x, Resend (email), Slack/Teams webhook POST, React, TypeScript, Tailwind CSS v4 **Prerequisites:** - Phase 1-3 complete (AI sessions, escalations, Knowledge Flywheel) - Existing: `EmailService` in `core/email.py`, APScheduler in `core/scheduler.py`, `NotificationsPanel.tsx` - Existing models: `User`, `AISession`, `FlowProposal`, `Account` **Parent plan:** `docs/2026-03-18-flowpilot-first-pivot-phase4.md` (Tasks 4, 5, 6, 6.5) --- ## Task 1: NotificationConfig and NotificationLog models + migration **Files:** - Create: `backend/app/models/notification_config.py` - Create: `backend/app/models/notification_log.py` - Create: `backend/app/models/notification.py` - Edit: `backend/app/models/__init__.py` - Create: `backend/alembic/versions/XXX_add_notification_tables.py` (via autogenerate) ### Step 1: Create NotificationConfig model Create `backend/app/models/notification_config.py`: ```python """Notification channel configuration per account. Each account can have multiple notification configs (email, Slack webhook, Teams webhook). Each config specifies which events it receives. """ import uuid from datetime import datetime, timezone from typing import Optional, Any, TYPE_CHECKING from sqlalchemy import String, Boolean, DateTime, ForeignKey, CheckConstraint from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.dialects.postgresql import UUID, JSONB from app.core.database import Base if TYPE_CHECKING: from app.models.account import Account class NotificationConfig(Base): """A notification channel configured by an account. channel: "email" | "slack_webhook" | "teams_webhook" events_enabled: {"session.escalated": true, "proposal.pending": true, ...} """ __tablename__ = "notification_configs" __table_args__ = ( CheckConstraint( "channel IN ('email', 'slack_webhook', 'teams_webhook')", name="ck_notification_configs_channel", ), ) id: Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 ) account_id: Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True), ForeignKey("accounts.id", ondelete="CASCADE"), nullable=False, index=True, ) channel: Mapped[str] = mapped_column(String(20), nullable=False) webhook_url: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) email_addresses: Mapped[Optional[list]] = mapped_column(JSONB, nullable=True) is_active: Mapped[bool] = mapped_column(Boolean, default=True) events_enabled: Mapped[dict[str, Any]] = mapped_column( JSONB, default=lambda: { "session.escalated": True, "session.high_priority": True, "proposal.pending": True, "proposal.approved": True, "knowledge_gap.detected": True, } ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) ) updated_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc), ) account: Mapped[Optional["Account"]] = relationship("Account", foreign_keys=[account_id]) ``` ### Step 2: Create NotificationLog model Create `backend/app/models/notification_log.py`: ```python """Notification delivery log with retry tracking. Tracks every notification delivery attempt. Failed deliveries are retried via APScheduler with exponential backoff (30s, 2m, 10m — max 3 retries). """ import uuid from datetime import datetime, timezone from typing import Optional, Any, TYPE_CHECKING from sqlalchemy import String, Integer, DateTime, ForeignKey, CheckConstraint from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.dialects.postgresql import UUID, JSONB from app.core.database import Base if TYPE_CHECKING: from app.models.notification_config import NotificationConfig class NotificationLog(Base): """A single notification delivery attempt.""" __tablename__ = "notification_logs" __table_args__ = ( CheckConstraint( "status IN ('sent', 'failed', 'retrying', 'exhausted')", name="ck_notification_logs_status", ), ) id: Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 ) notification_config_id: Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True), ForeignKey("notification_configs.id", ondelete="CASCADE"), nullable=False, index=True, ) event: Mapped[str] = mapped_column(String(50), nullable=False) payload: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False) status: Mapped[str] = mapped_column(String(20), default="sent") retry_count: Mapped[int] = mapped_column(Integer, default=0) max_retries: Mapped[int] = mapped_column(Integer, default=3) last_error: Mapped[Optional[str]] = mapped_column(String(1000), nullable=True) next_retry_at: Mapped[Optional[datetime]] = mapped_column( DateTime(timezone=True), nullable=True ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) ) delivered_at: Mapped[Optional[datetime]] = mapped_column( DateTime(timezone=True), nullable=True ) config: Mapped[Optional["NotificationConfig"]] = relationship( "NotificationConfig", foreign_keys=[notification_config_id] ) ``` ### Step 3: Create in-app Notification model Create `backend/app/models/notification.py`: ```python """In-app notification model. Created alongside external notifications (email/Slack/Teams). Powers the NotificationsPanel bell icon dropdown in the frontend. """ import uuid from datetime import datetime, timezone from typing import Optional, TYPE_CHECKING from sqlalchemy import String, Boolean, DateTime, ForeignKey from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.dialects.postgresql import UUID from app.core.database import Base if TYPE_CHECKING: from app.models.user import User class Notification(Base): """An in-app notification for a specific user.""" __tablename__ = "notifications" id: Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 ) account_id: Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True), ForeignKey("accounts.id", ondelete="CASCADE"), nullable=False, index=True, ) user_id: Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True, ) event: Mapped[str] = mapped_column(String(50), nullable=False) title: Mapped[str] = mapped_column(String(200), nullable=False) body: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) link: Mapped[Optional[str]] = mapped_column(String(200), nullable=True) is_read: Mapped[bool] = mapped_column(Boolean, default=False) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), index=True, ) user: Mapped[Optional["User"]] = relationship("User", foreign_keys=[user_id]) ``` ### Step 4: Register models in `__init__.py` Edit `backend/app/models/__init__.py` — add: ```python from app.models.notification_config import NotificationConfig from app.models.notification_log import NotificationLog from app.models.notification import Notification ``` ### Step 5: Generate migration ```bash cd /projects/patherly/backend DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \ venv/bin/alembic revision --autogenerate -m "add notification tables" ``` Review the generated migration. Verify it creates `notification_configs`, `notification_logs`, and `notifications` tables with correct FKs and constraints. ### Step 6: Run migration ```bash DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \ venv/bin/alembic upgrade head ``` ### Step 7: Commit ```bash git add backend/app/models/notification_config.py backend/app/models/notification_log.py \ backend/app/models/notification.py backend/app/models/__init__.py \ backend/alembic/versions/*notification* git commit -m "feat(notifications): add NotificationConfig, NotificationLog, and Notification models Co-Authored-By: Claude Opus 4.6 (1M context) " ``` --- ## Task 2: Notification schemas **Files:** - Create: `backend/app/schemas/notification.py` ### Step 1: Create schemas Create `backend/app/schemas/notification.py`: ```python """Pydantic schemas for notification system.""" from datetime import datetime from uuid import UUID from typing import Optional, Any from pydantic import BaseModel, Field # --- NotificationConfig schemas --- VALID_CHANNELS = {"email", "slack_webhook", "teams_webhook"} VALID_EVENTS = { "session.escalated", "session.high_priority", "proposal.pending", "proposal.approved", "knowledge_gap.detected", } class NotificationConfigCreate(BaseModel): channel: str = Field(..., pattern="^(email|slack_webhook|teams_webhook)$") webhook_url: str | None = None email_addresses: list[str] | None = None events_enabled: dict[str, bool] = Field( default_factory=lambda: {e: True for e in VALID_EVENTS} ) class NotificationConfigUpdate(BaseModel): webhook_url: str | None = None email_addresses: list[str] | None = None is_active: bool | None = None events_enabled: dict[str, bool] | None = None class NotificationConfigResponse(BaseModel): id: UUID channel: str webhook_url: str | None email_addresses: list[str] | None is_active: bool events_enabled: dict[str, bool] created_at: datetime updated_at: datetime model_config = {"from_attributes": True} # --- In-app Notification schemas --- class NotificationResponse(BaseModel): id: UUID event: str title: str body: str | None link: str | None is_read: bool created_at: datetime model_config = {"from_attributes": True} class UnreadCountResponse(BaseModel): count: int # --- Notification test --- class NotificationTestRequest(BaseModel): config_id: UUID class NotificationTestResponse(BaseModel): success: bool message: str ``` ### Step 2: Commit ```bash git add backend/app/schemas/notification.py git commit -m "feat(notifications): add notification Pydantic schemas Co-Authored-By: Claude Opus 4.6 (1M context) " ``` --- ## Task 3: Notification service (core event routing + channel delivery) **Files:** - Create: `backend/app/services/notification_service.py` ### Step 1: Create the notification service Create `backend/app/services/notification_service.py`: ```python """Event-driven notification service. Fires notifications to configured channels (email, Slack, Teams) and creates in-app notification records. Failed webhook deliveries are logged for retry by the scheduler. Usage: await notify("session.escalated", account_id, payload, db) """ import logging from datetime import datetime, timezone, timedelta from typing import Optional, Any from uuid import UUID import httpx from sqlalchemy import select, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.core.email import EmailService from app.models.notification_config import NotificationConfig from app.models.notification_log import NotificationLog from app.models.notification import Notification from app.models.user import User logger = logging.getLogger(__name__) # Exponential backoff delays for retries (seconds) RETRY_DELAYS = [30, 120, 600] # 30s, 2m, 10m async def notify( event: str, account_id: UUID, payload: dict[str, Any], db: AsyncSession, *, target_user_ids: list[UUID] | None = None, ) -> None: """Fire a notification event. Routes to all active channels for this account. Also creates in-app Notification records for target_user_ids. If target_user_ids is None, notifies all team admins + account owner. Args: event: Event type (e.g., "session.escalated") account_id: Account that owns the event payload: Event data (session summary, proposal title, etc.) db: Database session target_user_ids: Specific users to notify. None = team admins + owner. """ # 1. Create in-app notifications recipients = await _resolve_recipients(account_id, target_user_ids, db) title = _build_notification_title(event, payload) body = _build_notification_body(event, payload) link = _build_notification_link(event, payload) for user_id in recipients: db.add(Notification( account_id=account_id, user_id=user_id, event=event, title=title, body=body, link=link, )) # 2. Route to external channels configs = await _get_active_configs(account_id, event, db) for config in configs: await _deliver_to_channel(config, event, payload, db) await db.flush() async def retry_failed_notifications(db: AsyncSession) -> int: """Retry failed notification deliveries. Called by APScheduler. Returns the number of retries attempted. """ now = datetime.now(timezone.utc) result = await db.execute( select(NotificationLog) .where( NotificationLog.status == "retrying", NotificationLog.next_retry_at <= now, ) .limit(50) ) logs = result.scalars().all() retried = 0 for log in logs: config = log.config if not config: log.status = "exhausted" log.last_error = "Config deleted" continue success = await _attempt_delivery(config, log.event, log.payload) log.retry_count += 1 if success: log.status = "sent" log.delivered_at = now logger.info("Notification retry succeeded: %s (attempt %d)", log.id, log.retry_count) elif log.retry_count >= log.max_retries: log.status = "exhausted" log.last_error = f"Exhausted after {log.retry_count} retries" logger.warning("Notification exhausted after %d retries: %s", log.retry_count, log.id) else: delay = RETRY_DELAYS[min(log.retry_count, len(RETRY_DELAYS) - 1)] log.next_retry_at = now + timedelta(seconds=delay) log.status = "retrying" retried += 1 if retried: await db.commit() return retried async def send_test_notification(config: NotificationConfig) -> tuple[bool, str]: """Send a test notification to verify channel configuration. Returns (success, message). """ test_payload = { "title": "Test Notification", "body": "This is a test notification from ResolutionFlow.", "session_id": None, "engineer_name": "System", } if config.channel == "email": if not config.email_addresses: return False, "No email addresses configured" success = await EmailService.send_notification_email( config.email_addresses[0], "Test Notification", test_payload ) return success, "Test email sent" if success else "Email delivery failed" elif config.channel == "slack_webhook": if not config.webhook_url: return False, "No webhook URL configured" success = await _send_slack_message(config.webhook_url, "test", test_payload) return success, "Test message sent to Slack" if success else "Slack webhook failed" elif config.channel == "teams_webhook": if not config.webhook_url: return False, "No webhook URL configured" success = await _send_teams_message(config.webhook_url, "test", test_payload) return success, "Test message sent to Teams" if success else "Teams webhook failed" return False, f"Unknown channel: {config.channel}" # --- Internal helpers --- async def _get_active_configs( account_id: UUID, event: str, db: AsyncSession ) -> list[NotificationConfig]: """Get all active notification configs for this account + event.""" result = await db.execute( select(NotificationConfig).where( NotificationConfig.account_id == account_id, NotificationConfig.is_active.is_(True), ) ) configs = result.scalars().all() # Filter by event enabled return [c for c in configs if c.events_enabled.get(event, False)] async def _resolve_recipients( account_id: UUID, target_user_ids: list[UUID] | None, db: AsyncSession, ) -> list[UUID]: """Resolve notification recipients. Defaults to team admins + account owner.""" if target_user_ids: return target_user_ids result = await db.execute( select(User.id).where( User.account_id == account_id, User.is_active.is_(True), (User.is_team_admin.is_(True)) | (User.account_role == "owner"), ) ) return list(result.scalars().all()) async def _deliver_to_channel( config: NotificationConfig, event: str, payload: dict[str, Any], db: AsyncSession, ) -> None: """Attempt delivery to a channel. Log result.""" success = await _attempt_delivery(config, event, payload) log = NotificationLog( notification_config_id=config.id, event=event, payload=payload, ) if success: log.status = "sent" log.delivered_at = datetime.now(timezone.utc) else: log.status = "retrying" log.last_error = f"Initial delivery failed for {config.channel}" log.next_retry_at = datetime.now(timezone.utc) + timedelta(seconds=RETRY_DELAYS[0]) db.add(log) async def _attempt_delivery( config: NotificationConfig, event: str, payload: dict[str, Any] ) -> bool: """Attempt to deliver a notification via the configured channel.""" try: if config.channel == "email": return await _send_email(config, event, payload) elif config.channel == "slack_webhook": return await _send_slack_message( config.webhook_url, event, payload ) elif config.channel == "teams_webhook": return await _send_teams_message( config.webhook_url, event, payload ) return False except Exception: logger.exception("Notification delivery failed for config %s", config.id) return False async def _send_email( config: NotificationConfig, event: str, payload: dict[str, Any] ) -> bool: """Send notification email to all configured addresses.""" if not config.email_addresses: return False title = _build_notification_title(event, payload) success = True for addr in config.email_addresses: result = await EmailService.send_notification_email(addr, title, payload) if not result: success = False return success async def _send_slack_message( webhook_url: str, event: str, payload: dict[str, Any] ) -> bool: """Send a Slack notification via incoming webhook.""" title = _build_notification_title(event, payload) body = _build_notification_body(event, payload) link = payload.get("link", "") slack_payload = { "blocks": [ { "type": "header", "text": {"type": "plain_text", "text": f"🔔 {title}", "emoji": True}, }, { "type": "section", "text": {"type": "mrkdwn", "text": body}, }, ] } if link: slack_payload["blocks"].append({ "type": "actions", "elements": [ { "type": "button", "text": {"type": "plain_text", "text": "Open in ResolutionFlow"}, "url": f"{settings.FRONTEND_URL}{link}", } ], }) async with httpx.AsyncClient(timeout=10) as client: resp = await client.post(webhook_url, json=slack_payload) if resp.status_code == 200: logger.info("Slack notification sent: %s", event) return True logger.warning("Slack webhook returned %d: %s", resp.status_code, resp.text) return False async def _send_teams_message( webhook_url: str, event: str, payload: dict[str, Any] ) -> bool: """Send a Teams notification via incoming webhook (Adaptive Card).""" title = _build_notification_title(event, payload) body = _build_notification_body(event, payload) link = payload.get("link", "") card = { "type": "message", "attachments": [ { "contentType": "application/vnd.microsoft.card.adaptive", "content": { "$schema": "http://adaptivecards.io/schemas/adaptive-card.json", "type": "AdaptiveCard", "version": "1.4", "body": [ {"type": "TextBlock", "text": title, "weight": "Bolder", "size": "Medium"}, {"type": "TextBlock", "text": body, "wrap": True}, ], }, } ], } if link: card["attachments"][0]["content"]["actions"] = [ { "type": "Action.OpenUrl", "title": "Open in ResolutionFlow", "url": f"{settings.FRONTEND_URL}{link}", } ] async with httpx.AsyncClient(timeout=10) as client: resp = await client.post(webhook_url, json=card) if resp.status_code in (200, 202): logger.info("Teams notification sent: %s", event) return True logger.warning("Teams webhook returned %d: %s", resp.status_code, resp.text) return False def _build_notification_title(event: str, payload: dict[str, Any]) -> str: """Build a human-readable notification title.""" titles = { "session.escalated": f"Session escalated by {payload.get('engineer_name', 'an engineer')}", "session.high_priority": f"High-priority session started: {payload.get('ticket_number', 'N/A')}", "proposal.pending": f"New flow proposal: {payload.get('title', 'Untitled')}", "proposal.approved": f"Flow proposal approved: {payload.get('title', 'Untitled')}", "knowledge_gap.detected": f"Knowledge gap detected: {payload.get('gap_type', 'Unknown')}", "test": "Test Notification from ResolutionFlow", } return titles.get(event, f"Notification: {event}") def _build_notification_body(event: str, payload: dict[str, Any]) -> str: """Build notification body text.""" if event == "session.escalated": reason = payload.get("escalation_reason", "No reason provided") return f"Reason: {reason}\nProblem: {payload.get('problem_summary', 'N/A')}" elif event == "session.high_priority": return f"Ticket: {payload.get('ticket_number', 'N/A')}\nClient: {payload.get('client_name', 'N/A')}" elif event == "proposal.pending": return f"Type: {payload.get('proposal_type', 'N/A')}\nDomain: {payload.get('problem_domain', 'N/A')}" elif event == "proposal.approved": return f"Approved by {payload.get('reviewer_name', 'a reviewer')}" elif event == "knowledge_gap.detected": return f"Severity: {payload.get('severity', 'N/A')}\nArea: {payload.get('problem_domain', 'N/A')}" elif event == "test": return "This is a test notification from ResolutionFlow. Your integration is working!" return "" def _build_notification_link(event: str, payload: dict[str, Any]) -> str | None: """Build in-app link for the notification.""" links = { "session.escalated": f"/pilot/{payload.get('session_id', '')}", "session.high_priority": f"/pilot/{payload.get('session_id', '')}", "proposal.pending": "/review-queue", "proposal.approved": "/review-queue", "knowledge_gap.detected": "/analytics/flowpilot", } return links.get(event) ``` ### Step 2: Add notification email method to EmailService Edit `backend/app/core/email.py` — add this static method to the `EmailService` class: ```python @staticmethod async def send_notification_email(to_email: str, subject: str, data: dict) -> bool: """Send a notification email.""" if not settings.email_enabled: logger.warning("Notification email not sent — RESEND_API_KEY not configured") return False try: import resend resend.api_key = settings.RESEND_API_KEY html = _render_notification_html(subject=subject, data=data) resend.Emails.send({ "from": settings.FROM_EMAIL, "to": [to_email], "subject": f"[ResolutionFlow] {subject}", "html": html, }) logger.info("Notification email sent to %s: %s", to_email, subject) return True except Exception: logger.exception("Failed to send notification email to %s", to_email) return False ``` Add this HTML renderer at module level in `email.py`: ```python def _render_notification_html(subject: str, data: dict) -> str: body_text = data.get("body", "") link = data.get("link", "") link_html = "" if link: full_url = f"{settings.FRONTEND_URL}{link}" link_html = f'

Open in ResolutionFlow →

' return f"""

{subject}

{body_text}

{link_html}

— ResolutionFlow

""" ``` ### Step 3: Verify imports ```bash cd /projects/patherly/backend DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \ venv/bin/python -c "from app.services.notification_service import notify, retry_failed_notifications, send_test_notification; print('Service OK')" ``` ### Step 4: Commit ```bash git add backend/app/services/notification_service.py backend/app/core/email.py git commit -m "feat(notifications): add notification service with email/Slack/Teams delivery + retry Co-Authored-By: Claude Opus 4.6 (1M context) " ``` --- ## Task 4: Notification API endpoints **Files:** - Create: `backend/app/api/endpoints/notifications.py` - Edit: `backend/app/api/router.py` ### Step 1: Create notification endpoints Create `backend/app/api/endpoints/notifications.py`: ```python """Notification endpoints. - NotificationConfig CRUD (account-level, requires team admin) - In-app notification list/read/mark-all-read (per user) - Test notification delivery """ import logging from typing import Annotated, Optional from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy import select, func, update from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_db from app.core.rate_limit import limiter from app.api.deps import get_current_active_user, require_team_admin from app.models.user import User from app.models.notification_config import NotificationConfig from app.models.notification import Notification as NotificationModel from app.schemas.notification import ( NotificationConfigCreate, NotificationConfigUpdate, NotificationConfigResponse, NotificationResponse, UnreadCountResponse, NotificationTestRequest, NotificationTestResponse, ) from app.services.notification_service import send_test_notification logger = logging.getLogger(__name__) router = APIRouter(prefix="/notifications", tags=["notifications"]) # --- NotificationConfig CRUD (team admin) --- @router.get("/configs", response_model=list[NotificationConfigResponse]) async def list_configs( request, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_team_admin), ): """List all notification configs for the current account.""" result = await db.execute( select(NotificationConfig) .where(NotificationConfig.account_id == current_user.account_id) .order_by(NotificationConfig.created_at) ) return result.scalars().all() @router.post("/configs", response_model=NotificationConfigResponse, status_code=201) async def create_config( data: NotificationConfigCreate, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_team_admin), ): """Create a new notification channel config.""" # Validate channel-specific requirements if data.channel in ("slack_webhook", "teams_webhook") and not data.webhook_url: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"webhook_url is required for {data.channel}", ) if data.channel == "email" and not data.email_addresses: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="email_addresses required for email channel", ) config = NotificationConfig( account_id=current_user.account_id, channel=data.channel, webhook_url=data.webhook_url, email_addresses=data.email_addresses, events_enabled=data.events_enabled, ) db.add(config) await db.commit() await db.refresh(config) return config @router.patch("/configs/{config_id}", response_model=NotificationConfigResponse) async def update_config( config_id: UUID, data: NotificationConfigUpdate, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_team_admin), ): """Update a notification channel config.""" config = await db.get(NotificationConfig, config_id) if not config or config.account_id != current_user.account_id: raise HTTPException(status_code=404, detail="Config not found") for field, value in data.model_dump(exclude_unset=True).items(): setattr(config, field, value) await db.commit() await db.refresh(config) return config @router.delete("/configs/{config_id}", status_code=204) async def delete_config( config_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_team_admin), ): """Delete a notification channel config.""" config = await db.get(NotificationConfig, config_id) if not config or config.account_id != current_user.account_id: raise HTTPException(status_code=404, detail="Config not found") await db.delete(config) await db.commit() @router.post("/configs/test", response_model=NotificationTestResponse) async def test_config( data: NotificationTestRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_team_admin), ): """Send a test notification to verify channel configuration.""" config = await db.get(NotificationConfig, data.config_id) if not config or config.account_id != current_user.account_id: raise HTTPException(status_code=404, detail="Config not found") success, message = await send_test_notification(config) return NotificationTestResponse(success=success, message=message) # --- In-app notifications (per user) --- @router.get("", response_model=list[NotificationResponse]) async def list_notifications( request, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], skip: int = Query(0, ge=0), limit: int = Query(20, ge=1, le=100), ): """List in-app notifications for the current user (unread first).""" result = await db.execute( select(NotificationModel) .where(NotificationModel.user_id == current_user.id) .order_by(NotificationModel.is_read, NotificationModel.created_at.desc()) .offset(skip) .limit(limit) ) return result.scalars().all() @router.get("/unread-count", response_model=UnreadCountResponse) async def unread_count( current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Get unread notification count for badge display.""" result = await db.execute( select(func.count(NotificationModel.id)).where( NotificationModel.user_id == current_user.id, NotificationModel.is_read.is_(False), ) ) return UnreadCountResponse(count=result.scalar_one()) @router.patch("/{notification_id}/read") async def mark_read( notification_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Mark a single notification as read.""" notif = await db.get(NotificationModel, notification_id) if not notif or notif.user_id != current_user.id: raise HTTPException(status_code=404, detail="Notification not found") notif.is_read = True await db.commit() return {"ok": True} @router.post("/mark-all-read") async def mark_all_read( current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], ): """Mark all notifications as read for the current user.""" await db.execute( update(NotificationModel) .where( NotificationModel.user_id == current_user.id, NotificationModel.is_read.is_(False), ) .values(is_read=True) ) await db.commit() return {"ok": True} ``` ### Step 2: Register router Edit `backend/app/api/router.py` — add: ```python from app.api.endpoints import notifications api_router.include_router(notifications.router) ``` ### Step 3: Verify ```bash cd /projects/patherly/backend DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \ venv/bin/python -c "from app.api.endpoints.notifications import router; print(f'Notifications: {len(router.routes)} routes')" ``` ### Step 4: Commit ```bash git add backend/app/api/endpoints/notifications.py backend/app/api/router.py git commit -m "feat(notifications): add notification config CRUD + in-app notification endpoints Co-Authored-By: Claude Opus 4.6 (1M context) " ``` --- ## Task 5: Notification retry scheduler **Files:** - Edit: `backend/app/main.py` ### Step 1: Add retry job to scheduler Edit `backend/app/main.py` — in the lifespan function, after the existing scheduler jobs, add: ```python from app.services.notification_service import retry_failed_notifications from app.core.database import AsyncSessionLocal async def _process_notification_retries(): """Retry failed notification deliveries.""" async with AsyncSessionLocal() as db: retried = await retry_failed_notifications(db) if retried: logger.info("Retried %d failed notifications", retried) scheduler.add_job( _process_notification_retries, trigger="interval", minutes=1, id="notification_retry", replace_existing=True, max_instances=1, ) ``` ### Step 2: Verify scheduler starts ```bash cd /projects/patherly/backend DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \ venv/bin/python -c "from app.main import app; print('App created OK')" ``` ### Step 3: Commit ```bash git add backend/app/main.py git commit -m "feat(notifications): add APScheduler job for notification retry (1m interval, max_instances=1) Co-Authored-By: Claude Opus 4.6 (1M context) " ``` --- ## Task 6: Wire notifications into session lifecycle **Files:** - Edit: `backend/app/services/flowpilot_engine.py` - Edit: `backend/app/services/knowledge_flywheel.py` - Edit: `backend/app/api/endpoints/flow_proposals.py` ### Step 1: Wire into escalation Edit `backend/app/services/flowpilot_engine.py` — in `escalate_session()`, after `await db.flush()` (line ~498), add: ```python import asyncio from app.services.notification_service import notify # Fire escalation notification (non-blocking) escalation_payload = { "session_id": str(session_id), "engineer_name": session.user.display_name if session.user else "Unknown", "escalation_reason": request.escalation_reason, "problem_summary": session.problem_summary or "N/A", "link": f"/pilot/{session_id}", } target_users = [request.escalated_to_id] if request.escalated_to_id else None asyncio.create_task( notify("session.escalated", session.account_id, escalation_payload, db, target_user_ids=target_users) ) ``` **Note:** `asyncio.create_task` won't work directly here because the db session may close before the task runs. Instead, use a synchronous call within the existing transaction: ```python await notify("session.escalated", session.account_id, escalation_payload, db, target_user_ids=target_users) ``` ### Step 2: Wire into proposal creation Edit `backend/app/services/knowledge_flywheel.py` — after each `db.add(proposal)` call (lines ~297, ~357, ~433), add: ```python from app.services.notification_service import notify # Only notify for proposals that need review (not auto_reinforced) if proposal.status == "pending": await notify("proposal.pending", proposal.account_id, { "title": proposal.title, "proposal_type": proposal.proposal_type, "problem_domain": proposal.problem_domain or "General", "link": "/review-queue", }, db) ``` ### Step 3: Wire into proposal approval Edit `backend/app/api/endpoints/flow_proposals.py` — in `review_proposal()`, after the status is set to "approved" (lines ~228, ~240), add: ```python from app.services.notification_service import notify if data.action == "approve": # ... existing approval logic ... # Notify the engineer who created the source session await notify("proposal.approved", proposal.account_id, { "title": proposal.title, "reviewer_name": current_user.display_name if hasattr(current_user, 'display_name') else current_user.email, "link": "/review-queue", }, db, target_user_ids=[proposal.created_by_id] if proposal.created_by_id else None) ``` ### Step 4: Verify imports work ```bash cd /projects/patherly/backend DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \ venv/bin/python -c " from app.services.flowpilot_engine import escalate_session from app.services.knowledge_flywheel import analyze_session from app.api.endpoints.flow_proposals import router print('All wiring OK') " ``` ### Step 5: Commit ```bash git add backend/app/services/flowpilot_engine.py backend/app/services/knowledge_flywheel.py \ backend/app/api/endpoints/flow_proposals.py git commit -m "feat(notifications): wire notify() into escalation, proposal creation, and approval Co-Authored-By: Claude Opus 4.6 (1M context) " ``` --- ## Task 7: Frontend — notification types + API client **Files:** - Create: `frontend/src/types/notification.ts` - Create: `frontend/src/api/notifications.ts` - Edit: `frontend/src/types/index.ts` - Edit: `frontend/src/api/index.ts` ### Step 1: Create types Create `frontend/src/types/notification.ts`: ```typescript export interface NotificationConfig { id: string channel: 'email' | 'slack_webhook' | 'teams_webhook' webhook_url: string | null email_addresses: string[] | null is_active: boolean events_enabled: Record created_at: string updated_at: string } export interface NotificationConfigCreate { channel: 'email' | 'slack_webhook' | 'teams_webhook' webhook_url?: string email_addresses?: string[] events_enabled?: Record } export interface NotificationConfigUpdate { webhook_url?: string email_addresses?: string[] is_active?: boolean events_enabled?: Record } export interface AppNotification { id: string event: string title: string body: string | null link: string | null is_read: boolean created_at: string } export interface UnreadCount { count: number } export const NOTIFICATION_EVENTS = { 'session.escalated': 'Session Escalated', 'session.high_priority': 'High Priority Session', 'proposal.pending': 'New Flow Proposal', 'proposal.approved': 'Proposal Approved', 'knowledge_gap.detected': 'Knowledge Gap Detected', } as const export const CHANNEL_LABELS = { email: 'Email', slack_webhook: 'Slack', teams_webhook: 'Microsoft Teams', } as const ``` ### Step 2: Create API client Create `frontend/src/api/notifications.ts`: ```typescript import apiClient from './client' import type { NotificationConfig, NotificationConfigCreate, NotificationConfigUpdate, AppNotification, UnreadCount, } from '@/types/notification' export const notificationsApi = { // --- Config CRUD --- async listConfigs(): Promise { const response = await apiClient.get('/notifications/configs') return response.data }, async createConfig(data: NotificationConfigCreate): Promise { const response = await apiClient.post('/notifications/configs', data) return response.data }, async updateConfig(id: string, data: NotificationConfigUpdate): Promise { const response = await apiClient.patch(`/notifications/configs/${id}`, data) return response.data }, async deleteConfig(id: string): Promise { await apiClient.delete(`/notifications/configs/${id}`) }, async testConfig(configId: string): Promise<{ success: boolean; message: string }> { const response = await apiClient.post<{ success: boolean; message: string }>( '/notifications/configs/test', { config_id: configId } ) return response.data }, // --- In-app notifications --- async list(params?: { skip?: number; limit?: number }): Promise { const response = await apiClient.get('/notifications', { params }) return response.data }, async unreadCount(): Promise { const response = await apiClient.get('/notifications/unread-count') return response.data.count }, async markRead(id: string): Promise { await apiClient.patch(`/notifications/${id}/read`) }, async markAllRead(): Promise { await apiClient.post('/notifications/mark-all-read') }, } export default notificationsApi ``` ### Step 3: Export from index files Edit `frontend/src/types/index.ts` — add: ```typescript export type * from './notification' ``` Edit `frontend/src/api/index.ts` — add: ```typescript export { notificationsApi } from './notifications' ``` ### Step 4: Verify build ```bash cd /projects/patherly/frontend && npm run build ``` ### Step 5: Commit ```bash git add frontend/src/types/notification.ts frontend/src/api/notifications.ts \ frontend/src/types/index.ts frontend/src/api/index.ts git commit -m "feat(notifications): add notification types and API client Co-Authored-By: Claude Opus 4.6 (1M context) " ``` --- ## Task 8: Frontend — NotificationsPanel upgrade (in-app notification center) **Files:** - Edit: `frontend/src/components/layout/NotificationsPanel.tsx` ### Step 1: Rewrite NotificationsPanel Replace the contents of `frontend/src/components/layout/NotificationsPanel.tsx` with a component that: - Fetches from `notificationsApi.list()` instead of `sessionsApi.list()` - Shows unread count badge (number, not just a dot) via `notificationsApi.unreadCount()` - Each item: event icon, title, body, time ago, click → mark read + navigate - "Mark all as read" button in header - Polls unread count every 30 seconds - Keep existing glass-card dropdown styling **Event icons mapping:** - `session.escalated` → `AlertTriangle` (amber) - `session.high_priority` → `AlertCircle` (rose) - `proposal.pending` → `FileText` (primary/cyan) - `proposal.approved` → `CheckCircle` (emerald) - `knowledge_gap.detected` → `TrendingUp` (amber) **Key implementation details:** - Use `useEffect` with `setInterval` for polling unread count (30s) - On dropdown open, fetch full notification list - On notification click: `notificationsApi.markRead(id)` then `navigate(link)` if link exists - Badge: show count number if > 0 (e.g., "3"), red background `bg-rose-500` - Keep the existing click-outside-to-close pattern ### Step 2: Verify build ```bash cd /projects/patherly/frontend && npm run build ``` ### Step 3: Commit ```bash git add frontend/src/components/layout/NotificationsPanel.tsx git commit -m "feat(notifications): upgrade NotificationsPanel to real notification center Co-Authored-By: Claude Opus 4.6 (1M context) " ``` --- ## Task 9: Frontend — Notification settings UI **Files:** - Create: `frontend/src/components/account/NotificationSettings.tsx` - Edit: `frontend/src/pages/account/IntegrationsPage.tsx` ### Step 1: Create NotificationSettings component Create `frontend/src/components/account/NotificationSettings.tsx`: A section component that renders within the Integrations page. Shows: - Section header: "Notifications" with Bell icon - List of configured channels as glass-card items - Each card: channel icon (Mail/Slack/Teams), name, status toggle, event checkboxes, test button, delete button - "Add Channel" button → dropdown with Email, Slack, Teams options - For Slack/Teams: webhook URL input field - For Email: email addresses input (comma-separated) - Event toggles: checkboxes for each event from `NOTIFICATION_EVENTS` - Test button: calls `notificationsApi.testConfig()`, shows toast result **Design rules:** - Cards use `.glass-card-static` - Buttons follow CLAUDE.md patterns (primary: `bg-gradient-brand`, secondary: `bg-[rgba(255,255,255,0.04)]`) - Status toggle: simple on/off with emerald/muted colors - Section label: `font-label text-[0.625rem] uppercase tracking-[0.1em] text-muted-foreground` ### Step 2: Add NotificationSettings to IntegrationsPage Edit `frontend/src/pages/account/IntegrationsPage.tsx` — import and render `` below the existing PSA connections section. ### Step 3: Verify build ```bash cd /projects/patherly/frontend && npm run build ``` ### Step 4: Commit ```bash git add frontend/src/components/account/NotificationSettings.tsx \ frontend/src/pages/account/IntegrationsPage.tsx git commit -m "feat(notifications): add notification settings UI in integrations page Co-Authored-By: Claude Opus 4.6 (1M context) " ``` --- ## Task 10: Final review + integration test ### Step 1: Run backend import check ```bash cd /projects/patherly/backend DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \ venv/bin/python -c " from app.models.notification_config import NotificationConfig from app.models.notification_log import NotificationLog from app.models.notification import Notification from app.schemas.notification import NotificationConfigCreate, NotificationResponse from app.services.notification_service import notify, retry_failed_notifications from app.api.endpoints.notifications import router print(f'Models: OK') print(f'Schemas: OK') print(f'Service: OK') print(f'Endpoints: {len(router.routes)} routes') print('All imports clean') " ``` ### Step 2: Run frontend build ```bash cd /projects/patherly/frontend && npm run build ``` ### Step 3: Verify migration ```bash cd /projects/patherly/backend DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \ venv/bin/alembic current ``` ### Step 4: Manual smoke test checklist - [ ] Start backend + frontend dev servers - [ ] Open Integrations page → see Notifications section - [ ] Add an email notification config → verify it saves - [ ] Toggle events on/off → verify PATCH works - [ ] Click Test → verify toast shows result - [ ] Bell icon in top bar → see notification dropdown - [ ] Escalate a FlowPilot session → verify notification appears in bell dropdown - [ ] Click notification → navigates to session - [ ] Mark all as read → badge clears ### Step 5: Final commit (if any cleanup needed) ```bash git commit -m "fix(notifications): address review feedback Co-Authored-By: Claude Opus 4.6 (1M context) " ``` --- ## Summary of All Files ### New Files ``` backend/app/models/notification_config.py # Channel config model backend/app/models/notification_log.py # Delivery log + retry model backend/app/models/notification.py # In-app notification model backend/app/schemas/notification.py # All notification schemas backend/app/services/notification_service.py # Core service (routing, delivery, retry) backend/app/api/endpoints/notifications.py # Config CRUD + in-app notification endpoints backend/alembic/versions/XXX_add_notification_tables.py frontend/src/types/notification.ts # TypeScript interfaces frontend/src/api/notifications.ts # API client frontend/src/components/account/NotificationSettings.tsx # Settings UI ``` ### Modified Files ``` backend/app/models/__init__.py # Register 3 new models backend/app/api/router.py # Register notifications router backend/app/main.py # Add retry scheduler job backend/app/core/email.py # Add send_notification_email method backend/app/services/flowpilot_engine.py # Wire escalation notification backend/app/services/knowledge_flywheel.py # Wire proposal creation notification backend/app/api/endpoints/flow_proposals.py # Wire proposal approval notification frontend/src/types/index.ts # Export notification types frontend/src/api/index.ts # Export notifications API frontend/src/components/layout/NotificationsPanel.tsx # Upgrade to real notification center frontend/src/pages/account/IntegrationsPage.tsx # Add notification settings section ```