Files
resolutionflow/backend/app/api/endpoints/flow_proposals.py
chihlasm 0f750e63e0 feat(notifications): add Phase 4 Slice 2 — multi-channel notification system
Full notification infrastructure with in-app, email, Slack, and Teams channels:

Backend:
- NotificationConfig, NotificationLog, Notification models + migration
- Notification service with event routing, channel delivery, retry logic
- 9 API endpoints (config CRUD + in-app notifications)
- APScheduler retry job with exponential backoff (30s, 2m, 10m)
- Wired into escalation, proposal approval, and knowledge flywheel
- Pydantic event key validation, cross-tenant protection on recipients

Frontend:
- TypeScript types + API client for all notification endpoints
- NotificationsPanel: bell icon with unread badge, dropdown, mark-read
- NotificationSettings: channel config, event toggles, test, delete confirm
- Notifications tab on IntegrationsPage
- ARIA attributes, Escape handler, settings link on panel

Review fixes (13 issues resolved):
- notify() no longer commits/rolls back caller's transaction (critical)
- retry_failed_notifications returns count instead of None (critical)
- NotificationSettings moved inside dedicated tab (critical)
- target_user_ids scoped by account_id (security)
- Email loop collects all failures before raising
- Slack webhook validates response body
- events_enabled rejects unknown event keys
- link column widened to String(500)
- Dead code removed from _auto_reinforce
- Delete confirmation, ARIA, Escape key support

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-19 12:37:54 +00:00

315 lines
11 KiB
Python

"""Review Queue API — CRUD for flow proposals.
Endpoints for listing, reviewing, and managing Knowledge Flywheel proposals:
GET /flow-proposals — List proposals (filterable)
GET /flow-proposals/stats — Dashboard stats
GET /flow-proposals/{id} — Get proposal detail
POST /flow-proposals/{id}/review — Approve, reject, modify, or dismiss
"""
import logging
import uuid
from datetime import datetime, timezone, timedelta
from typing import Annotated, Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from sqlalchemy import select, func, case
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.rate_limit import limiter
from app.services.notification_service import notify
from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin, require_team_admin
from app.models.user import User
from app.models.tree import Tree
from app.models.flow_proposal import FlowProposal
from app.schemas.flow_proposal import (
FlowProposalSummary,
FlowProposalDetail,
FlowProposalStats,
ReviewProposalRequest,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/flow-proposals", tags=["flow-proposals"])
# ── List proposals ──
@router.get("", response_model=list[FlowProposalSummary])
@limiter.limit("30/minute")
async def list_proposals(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
proposal_status: Optional[str] = Query(None, alias="status"),
proposal_type: Optional[str] = Query(None, alias="type"),
domain: Optional[str] = Query(None),
sort_by: str = Query("newest", pattern="^(newest|confidence|sessions)$"),
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
):
"""List flow proposals for the current user's account."""
if not current_user.account_id:
return []
query = (
select(FlowProposal)
.where(FlowProposal.account_id == current_user.account_id)
)
if proposal_status:
query = query.where(FlowProposal.status == proposal_status)
if proposal_type:
query = query.where(FlowProposal.proposal_type == proposal_type)
if domain:
query = query.where(FlowProposal.problem_domain == domain)
# Sorting
if sort_by == "confidence":
query = query.order_by(FlowProposal.confidence_score.desc())
elif sort_by == "sessions":
query = query.order_by(FlowProposal.supporting_session_count.desc())
else: # newest
query = query.order_by(FlowProposal.created_at.desc())
query = query.offset(skip).limit(limit)
result = await db.execute(query)
proposals = result.scalars().all()
return [FlowProposalSummary.model_validate(p) for p in proposals]
# ── Stats ──
@router.get("/stats", response_model=FlowProposalStats)
@limiter.limit("30/minute")
async def get_proposal_stats(
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
):
"""Get review queue dashboard stats."""
if not current_user.account_id:
return FlowProposalStats(
pending_count=0, approved_this_week=0, rejected_this_week=0,
auto_reinforced_this_week=0, top_domains=[],
)
week_ago = datetime.now(timezone.utc) - timedelta(days=7)
# Count pending
pending_result = await db.execute(
select(func.count(FlowProposal.id))
.where(
FlowProposal.account_id == current_user.account_id,
FlowProposal.status == "pending",
)
)
pending_count = pending_result.scalar() or 0
# Reviewed this week (approved/rejected/modified use reviewed_at)
reviewed_result = await db.execute(
select(
FlowProposal.status,
func.count(FlowProposal.id),
)
.where(
FlowProposal.account_id == current_user.account_id,
FlowProposal.reviewed_at >= week_ago,
FlowProposal.status.in_(["approved", "modified", "rejected", "dismissed"]),
)
.group_by(FlowProposal.status)
)
reviewed_counts = {row[0]: row[1] for row in reviewed_result.all()}
# Auto-reinforced this week (use created_at since they have no review)
reinforced_result = await db.execute(
select(func.count(FlowProposal.id))
.where(
FlowProposal.account_id == current_user.account_id,
FlowProposal.created_at >= week_ago,
FlowProposal.status == "auto_reinforced",
)
)
auto_reinforced_count = reinforced_result.scalar() or 0
# Top domains
domain_result = await db.execute(
select(
FlowProposal.problem_domain,
func.count(FlowProposal.id).label("count"),
)
.where(
FlowProposal.account_id == current_user.account_id,
FlowProposal.status == "pending",
FlowProposal.problem_domain.isnot(None),
)
.group_by(FlowProposal.problem_domain)
.order_by(func.count(FlowProposal.id).desc())
.limit(5)
)
top_domains = [{"domain": row[0], "count": row[1]} for row in domain_result.all()]
return FlowProposalStats(
pending_count=pending_count,
approved_this_week=reviewed_counts.get("approved", 0) + reviewed_counts.get("modified", 0),
rejected_this_week=reviewed_counts.get("rejected", 0),
auto_reinforced_this_week=auto_reinforced_count,
top_domains=top_domains,
)
# ── Detail ──
@router.get("/{proposal_id}", response_model=FlowProposalDetail)
@limiter.limit("30/minute")
async def get_proposal(
request: Request,
proposal_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
):
"""Get full proposal detail."""
result = await db.execute(
select(FlowProposal).where(
FlowProposal.id == proposal_id,
FlowProposal.account_id == current_user.account_id,
)
)
proposal = result.scalar_one_or_none()
if not proposal:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Proposal not found")
return FlowProposalDetail.model_validate(proposal)
# ── Review ──
@router.post("/{proposal_id}/review", response_model=FlowProposalDetail)
@limiter.limit("10/minute")
async def review_proposal(
request: Request,
proposal_id: UUID,
data: ReviewProposalRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_team_admin),
):
"""Review a proposal: approve, reject, modify, or dismiss."""
result = await db.execute(
select(FlowProposal).where(
FlowProposal.id == proposal_id,
FlowProposal.account_id == current_user.account_id,
)
)
proposal = result.scalar_one_or_none()
if not proposal:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Proposal not found")
if proposal.status not in ("pending", "dismissed"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot review proposal in status: {proposal.status}",
)
proposal.reviewed_by = current_user.id
proposal.reviewed_at = datetime.now(timezone.utc)
proposal.reviewer_notes = data.reviewer_notes
if data.action == "approve":
if proposal.proposal_type == "new_flow":
flow_data = proposal.proposed_flow_data
new_tree = await _create_tree_from_proposal(proposal, flow_data, current_user, db)
proposal.status = "approved"
proposal.published_flow_id = new_tree.id
elif proposal.proposal_type in ("enhancement", "branch_addition"):
# Enhancement proposals contain diffs, not complete tree structures.
# Direct approval requires modified_flow_data with the complete merged structure.
# Redirect reviewers to use "Edit & Publish" for enhancements.
if not data.modified_flow_data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Enhancement proposals require 'Edit & Publish' to merge changes into the existing flow. Use the modify action with modified_flow_data.",
)
new_tree = await _create_tree_from_proposal(proposal, data.modified_flow_data, current_user, db)
proposal.status = "approved"
proposal.published_flow_id = new_tree.id
else:
# auto_reinforced shouldn't reach here, but handle gracefully
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Cannot approve proposal of type: {proposal.proposal_type}",
)
elif data.action == "modify":
if not data.modified_flow_data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="modified_flow_data is required for modify action",
)
new_tree = await _create_tree_from_proposal(proposal, data.modified_flow_data, current_user, db)
proposal.status = "modified"
proposal.published_flow_id = new_tree.id
elif data.action == "reject":
proposal.status = "rejected"
elif data.action == "dismiss":
proposal.status = "dismissed"
if data.action == "approve":
await notify("proposal.approved", proposal.account_id, {
"title": proposal.title,
"reviewer_name": current_user.display_name if hasattr(current_user, 'display_name') else current_user.email,
"link": "/review-queue",
}, db, target_user_ids=[proposal.created_by_id] if proposal.created_by_id else None)
await db.commit()
return FlowProposalDetail.model_validate(proposal)
async def _create_tree_from_proposal(
proposal: FlowProposal,
flow_data: dict,
user: User,
db: AsyncSession,
) -> Tree:
"""Create a new Tree from proposal flow data."""
tree_structure = flow_data.get("tree_structure", flow_data)
match_keywords = flow_data.get("match_keywords", [])
if not tree_structure or not isinstance(tree_structure, dict) or not tree_structure.get("id"):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Proposal has no valid tree structure. Use 'Edit & Publish' to build the flow manually.",
)
new_tree = Tree(
id=uuid.uuid4(),
name=proposal.title,
description=proposal.description,
tree_type="troubleshooting",
tree_structure=tree_structure,
author_id=user.id,
account_id=proposal.account_id,
team_id=proposal.team_id,
origin="ai_generated" if proposal.proposal_type == "new_flow" else "ai_enhanced",
source_session_id=proposal.source_session_id,
match_keywords=match_keywords,
)
db.add(new_tree)
await db.flush()
logger.info(
"Created tree %s from proposal %s (%s)",
new_tree.id, proposal.id, proposal.proposal_type,
)
return new_tree