Files
resolutionflow/docs/superpowers/plans/2026-03-24-conversational-branching.md
chihlasm 58c2a80d8a docs: add Conversational Branching implementation plan
27-task plan covering 6 phases: data foundation (4 models, migration),
branch engine (BranchManager, prompt builder, API), handoff system
(park/escalate with dual-write), resolution outputs (3-output generator),
AI description pipeline, and frontend (BranchMap, ForkCard, HandoffModal,
ResolutionOutputPanel, SessionQueuePage).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 08:15:19 +00:00

174 KiB

Conversational Branching Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add branching troubleshooting to FlowPilot — engineers explore multiple diagnostic hypotheses as first-class branches with cross-branch AI context, unified park/escalate handoff, and three-output resolution packages.

Architecture: Additive layer on top of existing FlowPilot. Four new DB tables + nullable columns on three existing tables. Four new backend services that call _call_ai from assistant_chat_service.py for all LLM interactions. Frontend adds Branch Map sidebar, Fork Cards, Handoff modal, and Resolution panel. All branching logic gated behind session.is_branching — removing the feature = drop tables + remove guards.

Tech Stack: Python FastAPI, SQLAlchemy 2.0 (async), PostgreSQL, Alembic migrations, React 19 + TypeScript, Tailwind CSS v4, Zustand, Axios, Lucide React icons.

Spec: docs/superpowers/specs/2026-03-24-conversational-branching-design.md


File Structure

New Backend Files

File Responsibility
backend/app/models/session_branch.py SessionBranch SQLAlchemy model
backend/app/models/fork_point.py ForkPoint SQLAlchemy model
backend/app/models/session_handoff.py SessionHandoff SQLAlchemy model
backend/app/models/session_resolution_output.py SessionResolutionOutput SQLAlchemy model
backend/app/schemas/session_branch.py Pydantic schemas for branches + fork points
backend/app/schemas/session_handoff.py Pydantic schemas for handoffs
backend/app/schemas/session_resolution.py Pydantic schemas for resolution outputs
backend/app/services/branch_manager.py Branch lifecycle: create root, fork, switch, mark status, revive, tree query, context summary
backend/app/services/branch_aware_prompt_builder.py Pure function: assembles system prompt + messages + images with cross-branch context for _call_ai
backend/app/services/handoff_manager.py Park/escalate with dual-write backward compat, snapshot, AI assessment, claim, PSA push
backend/app/services/resolution_output_generator.py PSA notes, KB article, client summary — three LLM calls on resolve
backend/app/api/endpoints/session_branches.py Branch CRUD + fork + switch + revive + branch message endpoints
backend/app/api/endpoints/session_handoffs.py Handoff create + history + claim + queue endpoints
backend/app/api/endpoints/session_resolutions.py Resolution output CRUD + push endpoints
backend/alembic/versions/030_add_conversational_branching.py Single migration: 4 new tables + columns on 3 existing tables
backend/tests/test_branch_manager.py Integration tests for BranchManager
backend/tests/test_branch_aware_prompt_builder.py Unit tests for prompt builder (pure function, no DB)
backend/tests/test_handoff_manager.py Integration tests for HandoffManager
backend/tests/test_resolution_outputs.py Integration tests for ResolutionOutputGenerator
backend/tests/test_session_branches_api.py API endpoint tests for branches
backend/tests/test_session_handoffs_api.py API endpoint tests for handoffs
backend/tests/test_session_resolutions_api.py API endpoint tests for resolutions

New Frontend Files

File Responsibility
frontend/src/types/branching.ts TypeScript interfaces for branches, forks, handoffs, resolution outputs
frontend/src/api/branches.ts Axios client for branch endpoints
frontend/src/api/handoffs.ts Axios client for handoff endpoints
frontend/src/api/resolutions.ts Axios client for resolution output endpoints
frontend/src/hooks/useBranching.ts Branch state management hook
frontend/src/hooks/useHandoff.ts Handoff flow state hook
frontend/src/hooks/useResolutionOutputs.ts Resolution output state hook
frontend/src/components/session/BranchMap.tsx Sidebar tree visualization with status badges
frontend/src/components/session/BranchNode.tsx Individual node in branch map
frontend/src/components/session/ForkCard.tsx In-chat fork decision point card
frontend/src/components/session/BranchTransitionBar.tsx Context bar shown on branch switch
frontend/src/components/session/BranchRevivalCard.tsx Evidence card for revival
frontend/src/components/session/HandoffModal.tsx Unified park/escalate modal
frontend/src/components/session/ResolutionOutputPanel.tsx Three-tab resolution view + edit + push
frontend/src/pages/SessionQueuePage.tsx Team queue for parked/escalated sessions

Modified Existing Files

File Change
backend/app/models/ai_session.py Add 5 columns: is_branching, active_branch_id, handoff_count, total_active_seconds, total_parked_seconds
backend/app/models/ai_session_step.py Add 3 columns: branch_id, is_fork_point, fork_point_id. Add 'fork' to step_type CHECK constraint
backend/app/models/file_upload.py Add 5 columns: ai_description, extracted_content, content_summary, uploaded_on_branch_id, uploaded_at_step_id
backend/app/models/__init__.py Import 4 new models, add to __all__
backend/alembic/env.py Import 4 new models for metadata tracking
backend/app/api/router.py Register 3 new endpoint routers
backend/app/schemas/ai_session.py Add is_branching, active_branch_id to AISessionDetail
backend/app/services/unified_chat_service.py Add if session.is_branching: guard to route messages to branch
backend/app/services/flowpilot_engine.py Set branch_id on AISessionStep creation when is_branching=True
backend/app/api/endpoints/uploads.py Add async AI description generation background task on upload
frontend/src/types/ai-session.ts Add is_branching, active_branch_id to AISessionDetail
frontend/src/api/index.ts Export new API clients
frontend/src/types/index.ts Export new types
frontend/src/pages/FlowPilotSessionPage.tsx Integrate BranchMap sidebar, ForkCard rendering, branch-aware message routing
frontend/src/router.tsx Add SessionQueuePage route

Phase 1: Data Foundation

Task 1: Create SessionBranch Model

Files:

  • Create: backend/app/models/session_branch.py

  • Test: backend/tests/test_branch_manager.py (started here, completed in Phase 2)

  • Step 1: Write the SessionBranch model

# backend/app/models/session_branch.py
"""Session branch model — represents a diagnostic hypothesis path within a FlowPilot session."""
import uuid
from datetime import datetime, timezone
from typing import Optional, Any, TYPE_CHECKING

from sqlalchemy import String, Text, DateTime, ForeignKey, Integer, CheckConstraint, Index
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB

from app.core.database import Base

if TYPE_CHECKING:
    from app.models.ai_session import AISession
    from app.models.ai_session_step import AISessionStep
    from app.models.user import User


class SessionBranch(Base):
    """A diagnostic branch within a FlowPilot session.

    Each branch represents one hypothesis being explored. Branches form a tree
    via parent_branch_id (NULL = root branch). Status tracks the outcome:
    active, dead_end, solved, untried, revived.
    """
    __tablename__ = "session_branches"
    __table_args__ = (
        CheckConstraint(
            "status IN ('active', 'dead_end', 'solved', 'untried', 'revived')",
            name="ck_session_branches_status",
        ),
        CheckConstraint(
            "branch_order > 0",
            name="ck_session_branches_branch_order_positive",
        ),
        Index("ix_session_branches_session_id", "session_id"),
        Index("ix_session_branches_parent_branch_id", "parent_branch_id"),
        Index("ix_session_branches_session_status", "session_id", "status"),
        Index("ix_session_branches_session_order", "session_id", "branch_order"),
    )

    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    session_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("ai_sessions.id", ondelete="CASCADE"),
        nullable=False,
    )
    parent_branch_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("session_branches.id", ondelete="CASCADE"),
        nullable=True,
    )
    fork_point_step_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("ai_session_steps.id", ondelete="SET NULL"),
        nullable=True,
    )
    branch_order: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
    label: Mapped[str] = mapped_column(String(200), nullable=False)
    status: Mapped[str] = mapped_column(String(20), nullable=False, default="active")
    status_reason: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    status_changed_at: Mapped[Optional[datetime]] = mapped_column(
        DateTime(timezone=True), nullable=True
    )
    status_changed_by: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("users.id", ondelete="SET NULL"),
        nullable=True,
    )
    conversation_messages: Mapped[list[dict[str, Any]]] = mapped_column(
        JSONB, nullable=False, default=list,
        comment="LLM message history scoped to this branch",
    )
    context_summary: Mapped[Optional[dict[str, Any]]] = mapped_column(
        JSONB, nullable=True,
        comment="{tried: [], concluded: str, artifacts: []}",
    )
    evidence_from_branch_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("session_branches.id", ondelete="SET NULL"),
        nullable=True,
    )
    evidence_description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc),
    )

    # Relationships
    session: Mapped["AISession"] = relationship("AISession", foreign_keys=[session_id])
    parent_branch: Mapped[Optional["SessionBranch"]] = relationship(
        "SessionBranch", remote_side="SessionBranch.id",
        foreign_keys=[parent_branch_id],
    )
    fork_point_step: Mapped[Optional["AISessionStep"]] = relationship(
        "AISessionStep", foreign_keys=[fork_point_step_id]
    )
    status_changed_by_user: Mapped[Optional["User"]] = relationship(
        "User", foreign_keys=[status_changed_by]
    )
    evidence_source: Mapped[Optional["SessionBranch"]] = relationship(
        "SessionBranch", remote_side="SessionBranch.id",
        foreign_keys=[evidence_from_branch_id],
    )
  • Step 2: Verify file was created correctly

Run: cd /home/coder/resolutionflow/backend && python -c "from app.models.session_branch import SessionBranch; print('OK:', SessionBranch.__tablename__)" Expected: OK: session_branches

  • Step 3: Commit
git add backend/app/models/session_branch.py
git commit -m "feat: add SessionBranch model for conversational branching"

Task 2: Create ForkPoint Model

Files:

  • Create: backend/app/models/fork_point.py

  • Step 1: Write the ForkPoint model

# backend/app/models/fork_point.py
"""Fork point model — captures decision points where a session branches."""
import uuid
from datetime import datetime, timezone
from typing import Optional, Any, TYPE_CHECKING

from sqlalchemy import Text, DateTime, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB

from app.core.database import Base

if TYPE_CHECKING:
    from app.models.ai_session import AISession
    from app.models.session_branch import SessionBranch
    from app.models.ai_session_step import AISessionStep


class ForkPoint(Base):
    """A decision point where a session forks into multiple branches.

    Stores the fork reason and the options presented (each becoming a branch).
    options JSONB: [{label, description, branch_id, status}]
    """
    __tablename__ = "fork_points"

    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    session_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("ai_sessions.id", ondelete="CASCADE"),
        nullable=False,
        index=True,
    )
    parent_branch_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("session_branches.id", ondelete="CASCADE"),
        nullable=False,
    )
    trigger_step_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("ai_session_steps.id", ondelete="SET NULL"),
        nullable=True,
    )
    fork_reason: Mapped[str] = mapped_column(Text, nullable=False)
    options: Mapped[list[dict[str, Any]]] = mapped_column(
        JSONB, nullable=False, default=list,
        comment="[{label, description, branch_id, status}]",
    )
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
    )

    # Relationships
    session: Mapped["AISession"] = relationship("AISession")
    parent_branch: Mapped["SessionBranch"] = relationship("SessionBranch")
    trigger_step: Mapped[Optional["AISessionStep"]] = relationship("AISessionStep")
  • Step 2: Verify file was created correctly

Run: cd /home/coder/resolutionflow/backend && python -c "from app.models.fork_point import ForkPoint; print('OK:', ForkPoint.__tablename__)" Expected: OK: fork_points

  • Step 3: Commit
git add backend/app/models/fork_point.py
git commit -m "feat: add ForkPoint model for branch decision points"

Task 3: Create SessionHandoff Model

Files:

  • Create: backend/app/models/session_handoff.py

  • Step 1: Write the SessionHandoff model

# backend/app/models/session_handoff.py
"""Session handoff model — unified park/escalate with history."""
import uuid
from datetime import datetime, timezone
from typing import Optional, Any, TYPE_CHECKING

from sqlalchemy import String, Text, Boolean, DateTime, ForeignKey, CheckConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB

from app.core.database import Base

if TYPE_CHECKING:
    from app.models.ai_session import AISession
    from app.models.session_branch import SessionBranch
    from app.models.user import User


class SessionHandoff(Base):
    """A handoff event — either parking or escalating a session.

    Captures a snapshot of the session state at handoff time, including
    branch map, AI assessment (for escalations), and artifacts.
    Dual-writes to ai_sessions.escalation_package for backward compat.
    """
    __tablename__ = "session_handoffs"
    __table_args__ = (
        CheckConstraint(
            "intent IN ('park', 'escalate')",
            name="ck_session_handoffs_intent",
        ),
        CheckConstraint(
            "priority IN ('normal', 'elevated')",
            name="ck_session_handoffs_priority",
        ),
    )

    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    session_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("ai_sessions.id", ondelete="CASCADE"),
        nullable=False,
        index=True,
    )
    handed_off_by: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("users.id", ondelete="CASCADE"),
        nullable=False,
    )
    intent: Mapped[str] = mapped_column(String(20), nullable=False)
    source_branch_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("session_branches.id", ondelete="SET NULL"),
        nullable=True,
    )
    snapshot: Mapped[dict[str, Any]] = mapped_column(
        JSONB, nullable=False, default=dict,
        comment="Branch map, status, next step, waiting on, watch out",
    )
    ai_assessment: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    ai_assessment_data: Mapped[Optional[dict[str, Any]]] = mapped_column(
        JSONB, nullable=True,
        comment="{likely_cause, suggested_steps, confidence}",
    )
    artifacts: Mapped[Optional[list[dict[str, Any]]]] = mapped_column(
        JSONB, nullable=True,
        comment="[{name, type, reference}]",
    )
    engineer_notes: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    priority: Mapped[str] = mapped_column(
        String(20), nullable=False, default="normal"
    )
    claimed_by: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("users.id", ondelete="SET NULL"),
        nullable=True,
    )
    claimed_at: Mapped[Optional[datetime]] = mapped_column(
        DateTime(timezone=True), nullable=True
    )
    psa_note_pushed: Mapped[bool] = mapped_column(Boolean, default=False)
    psa_note_id: Mapped[Optional[str]] = mapped_column(
        String(100), nullable=True
    )
    notification_sent: Mapped[bool] = mapped_column(Boolean, default=False)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
    )

    # Relationships
    session: Mapped["AISession"] = relationship("AISession")
    handed_off_by_user: Mapped["User"] = relationship("User", foreign_keys=[handed_off_by])
    source_branch: Mapped[Optional["SessionBranch"]] = relationship("SessionBranch")
    claimed_by_user: Mapped[Optional["User"]] = relationship("User", foreign_keys=[claimed_by])
  • Step 2: Verify file was created correctly

Run: cd /home/coder/resolutionflow/backend && python -c "from app.models.session_handoff import SessionHandoff; print('OK:', SessionHandoff.__tablename__)" Expected: OK: session_handoffs

  • Step 3: Commit
git add backend/app/models/session_handoff.py
git commit -m "feat: add SessionHandoff model for unified park/escalate"

Task 4: Create SessionResolutionOutput Model

Files:

  • Create: backend/app/models/session_resolution_output.py

  • Step 1: Write the SessionResolutionOutput model

# backend/app/models/session_resolution_output.py
"""Session resolution output model — three deliverables generated on resolve."""
import uuid
from datetime import datetime, timezone
from typing import Optional, Any

from sqlalchemy import String, Text, DateTime, ForeignKey, CheckConstraint, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB

from app.core.database import Base


class SessionResolutionOutput(Base):
    """One of three resolution deliverables: PSA ticket notes, KB article, client summary.

    Uses UNIQUE(session_id, output_type) + upsert pattern so outputs can be
    regenerated if a session is re-opened after resolution.
    """
    __tablename__ = "session_resolution_outputs"
    __table_args__ = (
        CheckConstraint(
            "output_type IN ('psa_ticket_notes', 'knowledge_base', 'client_summary')",
            name="ck_session_resolution_outputs_output_type",
        ),
        CheckConstraint(
            "status IN ('draft', 'approved', 'pushed', 'rejected')",
            name="ck_session_resolution_outputs_status",
        ),
        UniqueConstraint("session_id", "output_type", name="uq_session_resolution_session_type"),
    )

    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    session_id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("ai_sessions.id", ondelete="CASCADE"),
        nullable=False,
        index=True,
    )
    output_type: Mapped[str] = mapped_column(String(30), nullable=False)
    generated_content: Mapped[str] = mapped_column(Text, nullable=False)
    structured_data: Mapped[Optional[dict[str, Any]]] = mapped_column(
        JSONB, nullable=True,
        comment="For KB: {symptoms, root_cause, steps, tags}",
    )
    edited_content: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    status: Mapped[str] = mapped_column(String(20), nullable=False, default="draft")
    pushed_to: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
    pushed_at: Mapped[Optional[datetime]] = mapped_column(
        DateTime(timezone=True), nullable=True
    )
    pushed_reference: Mapped[Optional[str]] = mapped_column(String(200), nullable=True)
    generated_by_model: Mapped[str] = mapped_column(String(50), nullable=False)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc),
    )

    # Relationships
    session = relationship("AISession")
  • Step 2: Verify file was created correctly

Run: cd /home/coder/resolutionflow/backend && python -c "from app.models.session_resolution_output import SessionResolutionOutput; print('OK:', SessionResolutionOutput.__tablename__)" Expected: OK: session_resolution_outputs

  • Step 3: Commit
git add backend/app/models/session_resolution_output.py
git commit -m "feat: add SessionResolutionOutput model for three-output resolution"

Task 5: Add Columns to Existing Models

Files:

  • Modify: backend/app/models/ai_session.py:25-221 — add 5 columns + relationships

  • Modify: backend/app/models/ai_session_step.py:21-134 — add 3 columns + update CHECK, add relationships

  • Modify: backend/app/models/file_upload.py:13-33 — add 5 columns

  • Step 1: Add branching columns to AISession model

In backend/app/models/ai_session.py, add these columns after conversation_messages (line ~207) and before the Relationships section:

    # ── Branching ──
    is_branching: Mapped[bool] = mapped_column(
        default=False,
        comment="Whether conversational branching is active for this session",
    )
    active_branch_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True), nullable=True,
        comment="Currently viewed branch. No FK — soft pointer to avoid circular FK with session_branches",
    )
    handoff_count: Mapped[int] = mapped_column(
        Integer, nullable=False, default=0,
        comment="Number of times this session has been handed off",
    )
    total_active_seconds: Mapped[int] = mapped_column(
        Integer, nullable=False, default=0,
        comment="Cumulative active time in seconds",
    )
    total_parked_seconds: Mapped[int] = mapped_column(
        Integer, nullable=False, default=0,
        comment="Cumulative parked time in seconds",
    )

Also add these TYPE_CHECKING imports at the top:

    from app.models.session_branch import SessionBranch
    from app.models.session_handoff import SessionHandoff
    from app.models.session_resolution_output import SessionResolutionOutput

And add these relationships at the bottom of the class:

    branches: Mapped[list["SessionBranch"]] = relationship(
        "SessionBranch",
        foreign_keys="SessionBranch.session_id",
        cascade="all, delete-orphan",
        order_by="SessionBranch.branch_order",
    )
    handoffs: Mapped[list["SessionHandoff"]] = relationship(
        "SessionHandoff",
        cascade="all, delete-orphan",
        order_by="SessionHandoff.created_at",
    )
    resolution_outputs: Mapped[list["SessionResolutionOutput"]] = relationship(
        "SessionResolutionOutput",
        cascade="all, delete-orphan",
    )
  • Step 2: Add branching columns to AISessionStep model

In backend/app/models/ai_session_step.py:

  1. Update the CHECK constraint (line ~36) to include 'fork':
    __table_args__ = (
        CheckConstraint(
            "step_type IN ('question', 'action', 'script_generation', 'verification', "
            "'info_request', 'note', 'intake_analysis', 'fork')",
            name="ck_ai_session_steps_step_type",
        ),
    )
  1. Add these columns after output_tokens (line ~120):
    # ── Branching ──
    branch_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("session_branches.id", ondelete="SET NULL"),
        nullable=True,
        index=True,
        comment="NULL = pre-branching/root messages",
    )
    is_fork_point: Mapped[bool] = mapped_column(
        default=False,
        comment="Whether this step triggered a fork",
    )
    fork_point_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("fork_points.id", ondelete="SET NULL"),
        nullable=True,
    )
  1. Add TYPE_CHECKING imports for SessionBranch and ForkPoint.
  • Step 3: Add columns to FileUpload model

In backend/app/models/file_upload.py, add after created_at (line ~32):

    # ── AI description + branching context ──
    ai_description: Mapped[Optional[str]] = mapped_column(
        Text, nullable=True,
        comment="AI-generated one-sentence description of the file",
    )
    extracted_content: Mapped[Optional[str]] = mapped_column(
        Text, nullable=True,
        comment="Extracted text from logs/configs",
    )
    content_summary: Mapped[Optional[str]] = mapped_column(
        Text, nullable=True,
        comment="AI summary for long text files (>2000 tokens)",
    )
    uploaded_on_branch_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("session_branches.id", ondelete="SET NULL"),
        nullable=True,
    )
    uploaded_at_step_id: Mapped[Optional[uuid.UUID]] = mapped_column(
        UUID(as_uuid=True),
        ForeignKey("ai_session_steps.id", ondelete="SET NULL"),
        nullable=True,
    )

Add the needed imports: Text from sqlalchemy, ForeignKey (already imported? check), Optional from typing.

  • Step 4: Verify all model changes compile

Run: cd /home/coder/resolutionflow/backend && python -c "from app.models.ai_session import AISession; from app.models.ai_session_step import AISessionStep; from app.models.file_upload import FileUpload; print('All models OK')" Expected: All models OK

  • Step 5: Commit
git add backend/app/models/ai_session.py backend/app/models/ai_session_step.py backend/app/models/file_upload.py
git commit -m "feat: add branching columns to ai_sessions, ai_session_steps, file_uploads"

Task 6: Register Models in init.py and alembic env.py

Files:

  • Modify: backend/app/models/__init__.py:1-117

  • Modify: backend/alembic/env.py:10-28

  • Step 1: Add imports to models/init.py

Add after the FileUpload import (line 50):

from .session_branch import SessionBranch
from .fork_point import ForkPoint
from .session_handoff import SessionHandoff
from .session_resolution_output import SessionResolutionOutput

Add to __all__ list:

    "SessionBranch",
    "ForkPoint",
    "SessionHandoff",
    "SessionResolutionOutput",
  • Step 2: Add imports to alembic/env.py

Add after the existing model imports (around line 27):

from app.models.session_branch import SessionBranch  # noqa: F401
from app.models.fork_point import ForkPoint  # noqa: F401
from app.models.session_handoff import SessionHandoff  # noqa: F401
from app.models.session_resolution_output import SessionResolutionOutput  # noqa: F401
  • Step 3: Verify imports

Run: cd /home/coder/resolutionflow/backend && python -c "from app.models import SessionBranch, ForkPoint, SessionHandoff, SessionResolutionOutput; print('All imports OK')" Expected: All imports OK

  • Step 4: Commit
git add backend/app/models/__init__.py backend/alembic/env.py
git commit -m "feat: register branching models in __init__ and alembic env"

Task 7: Write Manual Alembic Migration

Files:

  • Create: backend/alembic/versions/030_add_conversational_branching.py

IMPORTANT: This migration must be written manually per CLAUDE.md lesson 77. The CHECK constraint on ai_session_steps.step_type requires DROP CONSTRAINT then ADD CONSTRAINT — not additive. The active_branch_id on ai_sessions has NO FK constraint (to avoid circular FK with session_branches).

  • Step 1: Write the migration file
"""Add conversational branching tables and columns.

New tables: session_branches, fork_points, session_handoffs, session_resolution_outputs
Modified: ai_sessions (5 cols), ai_session_steps (3 cols + CHECK), file_uploads (5 cols)

Revision ID: 030
Revises: (auto-detected)
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB


# revision identifiers
revision = "030"
down_revision = None  # Will be set by alembic — check `alembic heads` for current head
branch_labels = None
depends_on = None


def upgrade() -> None:
    # ── New table: session_branches ──
    op.create_table(
        "session_branches",
        sa.Column("id", UUID(as_uuid=True), primary_key=True),
        sa.Column("session_id", UUID(as_uuid=True), sa.ForeignKey("ai_sessions.id", ondelete="CASCADE"), nullable=False),
        sa.Column("parent_branch_id", UUID(as_uuid=True), sa.ForeignKey("session_branches.id", ondelete="CASCADE"), nullable=True),
        sa.Column("fork_point_step_id", UUID(as_uuid=True), sa.ForeignKey("ai_session_steps.id", ondelete="SET NULL"), nullable=True),
        sa.Column("branch_order", sa.Integer, nullable=False, server_default="1"),
        sa.Column("label", sa.String(200), nullable=False),
        sa.Column("status", sa.String(20), nullable=False, server_default="active"),
        sa.Column("status_reason", sa.Text, nullable=True),
        sa.Column("status_changed_at", sa.DateTime(timezone=True), nullable=True),
        sa.Column("status_changed_by", UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True),
        sa.Column("conversation_messages", JSONB, nullable=False, server_default="[]"),
        sa.Column("context_summary", JSONB, nullable=True),
        sa.Column("evidence_from_branch_id", UUID(as_uuid=True), sa.ForeignKey("session_branches.id", ondelete="SET NULL"), nullable=True),
        sa.Column("evidence_description", sa.Text, nullable=True),
        sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
        sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
        sa.CheckConstraint("status IN ('active', 'dead_end', 'solved', 'untried', 'revived')", name="ck_session_branches_status"),
        sa.CheckConstraint("branch_order > 0", name="ck_session_branches_branch_order_positive"),
    )
    op.create_index("ix_session_branches_session_id", "session_branches", ["session_id"])
    op.create_index("ix_session_branches_parent_branch_id", "session_branches", ["parent_branch_id"])
    op.create_index("ix_session_branches_session_status", "session_branches", ["session_id", "status"])
    op.create_index("ix_session_branches_session_order", "session_branches", ["session_id", "branch_order"])

    # ── New table: fork_points ──
    op.create_table(
        "fork_points",
        sa.Column("id", UUID(as_uuid=True), primary_key=True),
        sa.Column("session_id", UUID(as_uuid=True), sa.ForeignKey("ai_sessions.id", ondelete="CASCADE"), nullable=False),
        sa.Column("parent_branch_id", UUID(as_uuid=True), sa.ForeignKey("session_branches.id", ondelete="CASCADE"), nullable=False),
        sa.Column("trigger_step_id", UUID(as_uuid=True), sa.ForeignKey("ai_session_steps.id", ondelete="SET NULL"), nullable=True),
        sa.Column("fork_reason", sa.Text, nullable=False),
        sa.Column("options", JSONB, nullable=False, server_default="[]"),
        sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
    )
    op.create_index("ix_fork_points_session_id", "fork_points", ["session_id"])

    # ── New table: session_handoffs ──
    op.create_table(
        "session_handoffs",
        sa.Column("id", UUID(as_uuid=True), primary_key=True),
        sa.Column("session_id", UUID(as_uuid=True), sa.ForeignKey("ai_sessions.id", ondelete="CASCADE"), nullable=False),
        sa.Column("handed_off_by", UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="CASCADE"), nullable=False),
        sa.Column("intent", sa.String(20), nullable=False),
        sa.Column("source_branch_id", UUID(as_uuid=True), sa.ForeignKey("session_branches.id", ondelete="SET NULL"), nullable=True),
        sa.Column("snapshot", JSONB, nullable=False, server_default="{}"),
        sa.Column("ai_assessment", sa.Text, nullable=True),
        sa.Column("ai_assessment_data", JSONB, nullable=True),
        sa.Column("artifacts", JSONB, nullable=True),
        sa.Column("engineer_notes", sa.Text, nullable=True),
        sa.Column("priority", sa.String(20), nullable=False, server_default="normal"),
        sa.Column("claimed_by", UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True),
        sa.Column("claimed_at", sa.DateTime(timezone=True), nullable=True),
        sa.Column("psa_note_pushed", sa.Boolean, server_default="false"),
        sa.Column("psa_note_id", sa.String(100), nullable=True),
        sa.Column("notification_sent", sa.Boolean, server_default="false"),
        sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
        sa.CheckConstraint("intent IN ('park', 'escalate')", name="ck_session_handoffs_intent"),
        sa.CheckConstraint("priority IN ('normal', 'elevated')", name="ck_session_handoffs_priority"),
    )
    op.create_index("ix_session_handoffs_session_id", "session_handoffs", ["session_id"])

    # ── New table: session_resolution_outputs ──
    op.create_table(
        "session_resolution_outputs",
        sa.Column("id", UUID(as_uuid=True), primary_key=True),
        sa.Column("session_id", UUID(as_uuid=True), sa.ForeignKey("ai_sessions.id", ondelete="CASCADE"), nullable=False),
        sa.Column("output_type", sa.String(30), nullable=False),
        sa.Column("generated_content", sa.Text, nullable=False),
        sa.Column("structured_data", JSONB, nullable=True),
        sa.Column("edited_content", sa.Text, nullable=True),
        sa.Column("status", sa.String(20), nullable=False, server_default="draft"),
        sa.Column("pushed_to", sa.String(50), nullable=True),
        sa.Column("pushed_at", sa.DateTime(timezone=True), nullable=True),
        sa.Column("pushed_reference", sa.String(200), nullable=True),
        sa.Column("generated_by_model", sa.String(50), nullable=False),
        sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
        sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
        sa.CheckConstraint("output_type IN ('psa_ticket_notes', 'knowledge_base', 'client_summary')", name="ck_session_resolution_outputs_output_type"),
        sa.CheckConstraint("status IN ('draft', 'approved', 'pushed', 'rejected')", name="ck_session_resolution_outputs_status"),
        sa.UniqueConstraint("session_id", "output_type", name="uq_session_resolution_session_type"),
    )
    op.create_index("ix_session_resolution_outputs_session_id", "session_resolution_outputs", ["session_id"])

    # ── Modify ai_sessions: add 5 columns ──
    op.add_column("ai_sessions", sa.Column("is_branching", sa.Boolean, server_default="false", nullable=False))
    op.add_column("ai_sessions", sa.Column("active_branch_id", UUID(as_uuid=True), nullable=True))
    op.add_column("ai_sessions", sa.Column("handoff_count", sa.Integer, server_default="0", nullable=False))
    op.add_column("ai_sessions", sa.Column("total_active_seconds", sa.Integer, server_default="0", nullable=False))
    op.add_column("ai_sessions", sa.Column("total_parked_seconds", sa.Integer, server_default="0", nullable=False))

    # ── Modify ai_session_steps: add 3 columns + update CHECK constraint ──
    op.add_column("ai_session_steps", sa.Column("branch_id", UUID(as_uuid=True), sa.ForeignKey("session_branches.id", ondelete="SET NULL"), nullable=True))
    op.add_column("ai_session_steps", sa.Column("is_fork_point", sa.Boolean, server_default="false", nullable=False))
    op.add_column("ai_session_steps", sa.Column("fork_point_id", UUID(as_uuid=True), sa.ForeignKey("fork_points.id", ondelete="SET NULL"), nullable=True))
    op.create_index("ix_ai_session_steps_branch_id", "ai_session_steps", ["branch_id"])

    # Drop and recreate step_type CHECK to add 'fork'
    op.drop_constraint("ck_ai_session_steps_step_type", "ai_session_steps", type_="check")
    op.create_check_constraint(
        "ck_ai_session_steps_step_type",
        "ai_session_steps",
        "step_type IN ('question', 'action', 'script_generation', 'verification', "
        "'info_request', 'note', 'intake_analysis', 'fork')",
    )

    # ── Modify file_uploads: add 5 columns ──
    op.add_column("file_uploads", sa.Column("ai_description", sa.Text, nullable=True))
    op.add_column("file_uploads", sa.Column("extracted_content", sa.Text, nullable=True))
    op.add_column("file_uploads", sa.Column("content_summary", sa.Text, nullable=True))
    op.add_column("file_uploads", sa.Column("uploaded_on_branch_id", UUID(as_uuid=True), sa.ForeignKey("session_branches.id", ondelete="SET NULL"), nullable=True))
    op.add_column("file_uploads", sa.Column("uploaded_at_step_id", UUID(as_uuid=True), sa.ForeignKey("ai_session_steps.id", ondelete="SET NULL"), nullable=True))


def downgrade() -> None:
    # ── file_uploads columns ──
    op.drop_column("file_uploads", "uploaded_at_step_id")
    op.drop_column("file_uploads", "uploaded_on_branch_id")
    op.drop_column("file_uploads", "content_summary")
    op.drop_column("file_uploads", "extracted_content")
    op.drop_column("file_uploads", "ai_description")

    # ── ai_session_steps: restore CHECK, drop columns ──
    op.drop_constraint("ck_ai_session_steps_step_type", "ai_session_steps", type_="check")
    op.create_check_constraint(
        "ck_ai_session_steps_step_type",
        "ai_session_steps",
        "step_type IN ('question', 'action', 'script_generation', 'verification', "
        "'info_request', 'note', 'intake_analysis')",
    )
    op.drop_index("ix_ai_session_steps_branch_id", "ai_session_steps")
    op.drop_column("ai_session_steps", "fork_point_id")
    op.drop_column("ai_session_steps", "is_fork_point")
    op.drop_column("ai_session_steps", "branch_id")

    # ── ai_sessions columns ──
    op.drop_column("ai_sessions", "total_parked_seconds")
    op.drop_column("ai_sessions", "total_active_seconds")
    op.drop_column("ai_sessions", "handoff_count")
    op.drop_column("ai_sessions", "active_branch_id")
    op.drop_column("ai_sessions", "is_branching")

    # ── Drop new tables (reverse order of creation for FK deps) ──
    op.drop_table("session_resolution_outputs")
    op.drop_table("session_handoffs")
    op.drop_table("fork_points")
    op.drop_table("session_branches")
  • Step 2: Check current alembic head and set down_revision

Run: cd /home/coder/resolutionflow/backend && alembic heads Set down_revision in the migration to the returned revision ID.

  • Step 3: Run the migration

Run: cd /home/coder/resolutionflow/backend && alembic upgrade head Expected: Migration applies without errors.

  • Step 4: Verify tables exist

Run: docker exec -it patherly_postgres psql -U postgres -d patherly -c "\dt session_*" Expected: Shows session_branches, session_handoffs, session_resolution_outputs.

Run: docker exec -it patherly_postgres psql -U postgres -d patherly -c "\d ai_sessions" | grep -E "is_branching|active_branch_id|handoff_count" Expected: Shows the 3 new columns.

  • Step 5: Commit
git add backend/alembic/versions/030_add_conversational_branching.py
git commit -m "feat: add conversational branching migration — 4 tables, 13 columns"

Task 8: Create Pydantic Schemas

Files:

  • Create: backend/app/schemas/session_branch.py

  • Create: backend/app/schemas/session_handoff.py

  • Create: backend/app/schemas/session_resolution.py

  • Modify: backend/app/schemas/ai_session.py:197-265

  • Step 1: Write branch schemas

# backend/app/schemas/session_branch.py
"""Pydantic schemas for session branches and fork points."""
from __future__ import annotations

from typing import Optional, Any
from uuid import UUID
from datetime import datetime

from pydantic import BaseModel, Field


# ── Branch ──

class BranchCreate(BaseModel):
    """Used internally — branches are created via fork, not direct API."""
    label: str = Field(..., max_length=200)
    status: str = "untried"


class BranchUpdate(BaseModel):
    """Update branch status."""
    status: str = Field(..., pattern="^(active|dead_end|solved|untried|revived)$")
    status_reason: str | None = None


class BranchResponse(BaseModel):
    """Branch detail for API responses."""
    id: UUID
    session_id: UUID
    parent_branch_id: UUID | None
    fork_point_step_id: UUID | None
    branch_order: int
    label: str
    status: str
    status_reason: str | None
    status_changed_at: datetime | None
    context_summary: dict[str, Any] | None
    evidence_from_branch_id: UUID | None
    evidence_description: str | None
    step_count: int = 0
    created_at: datetime
    updated_at: datetime

    model_config = {"from_attributes": True}


class BranchTreeResponse(BaseModel):
    """Full branch tree for the BranchMap sidebar."""
    branches: list[BranchResponse]
    active_branch_id: UUID | None


# ── Fork ──

class ForkOption(BaseModel):
    """One hypothesis option when creating a fork."""
    label: str = Field(..., max_length=200)
    description: str = Field(..., max_length=500)


class ForkCreateRequest(BaseModel):
    """Create a fork point with N branches."""
    fork_reason: str = Field(..., min_length=5, max_length=2000)
    options: list[ForkOption] = Field(..., min_length=2, max_length=10)


class ForkPointResponse(BaseModel):
    """Fork point detail."""
    id: UUID
    session_id: UUID
    parent_branch_id: UUID
    trigger_step_id: UUID | None
    fork_reason: str
    options: list[dict[str, Any]]
    created_at: datetime

    model_config = {"from_attributes": True}


# ── Switch ──

class BranchSwitchResponse(BaseModel):
    """Response after switching branches."""
    active_branch_id: UUID
    branch: BranchResponse
    conversation_messages: list[dict[str, Any]]


# ── Revival ──

class ReviveRequest(BaseModel):
    """Revive a dead-end branch with new evidence."""
    evidence_from_branch_id: UUID
    evidence_description: str = Field(..., min_length=5, max_length=2000)


# ── Branch message ──

class BranchMessageRequest(BaseModel):
    """Send a message on a specific branch."""
    message: str = Field(..., min_length=1, max_length=8000)
    upload_ids: list[UUID] = Field(default_factory=list, max_length=10)


class BranchMessageResponse(BaseModel):
    """AI response on a branch."""
    content: str
    branch_id: UUID
    step_id: UUID | None = None
  • Step 2: Write handoff schemas
# backend/app/schemas/session_handoff.py
"""Pydantic schemas for session handoffs."""
from __future__ import annotations

from typing import Optional, Any
from uuid import UUID
from datetime import datetime

from pydantic import BaseModel, Field


class HandoffCreateRequest(BaseModel):
    """Create a handoff (park or escalate)."""
    intent: str = Field(..., pattern="^(park|escalate)$")
    engineer_notes: str | None = None
    priority: str = Field("normal", pattern="^(normal|elevated)$")


class HandoffResponse(BaseModel):
    """Handoff detail."""
    id: UUID
    session_id: UUID
    handed_off_by: UUID
    intent: str
    source_branch_id: UUID | None
    snapshot: dict[str, Any]
    ai_assessment: str | None
    ai_assessment_data: dict[str, Any] | None
    artifacts: list[dict[str, Any]] | None
    engineer_notes: str | None
    priority: str
    claimed_by: UUID | None
    claimed_at: datetime | None
    psa_note_pushed: bool
    notification_sent: bool
    created_at: datetime

    model_config = {"from_attributes": True}


class HandoffClaimRequest(BaseModel):
    """Claim a handed-off session."""
    pass  # Just needs auth — claiming user comes from JWT


class HandoffBriefingResponse(BaseModel):
    """Natural-language briefing for the claiming engineer."""
    briefing: str
    handoff: HandoffResponse


class QueueItemResponse(BaseModel):
    """Item in the team queue."""
    handoff_id: UUID
    session_id: UUID
    intent: str
    problem_summary: str | None
    problem_domain: str | None
    priority: str
    handed_off_by_name: str | None
    engineer_notes: str | None
    branch_count: int = 0
    created_at: datetime
    claimed_by: UUID | None
    claimed_at: datetime | None

    model_config = {"from_attributes": True}
  • Step 3: Write resolution output schemas
# backend/app/schemas/session_resolution.py
"""Pydantic schemas for session resolution outputs."""
from __future__ import annotations

from typing import Optional, Any
from uuid import UUID
from datetime import datetime

from pydantic import BaseModel, Field


class ResolutionOutputResponse(BaseModel):
    """Single resolution output."""
    id: UUID
    session_id: UUID
    output_type: str
    generated_content: str
    structured_data: dict[str, Any] | None
    edited_content: str | None
    status: str
    pushed_to: str | None
    pushed_at: datetime | None
    pushed_reference: str | None
    generated_by_model: str
    created_at: datetime
    updated_at: datetime

    model_config = {"from_attributes": True}


class ResolutionOutputEditRequest(BaseModel):
    """Edit output before pushing."""
    edited_content: str = Field(..., min_length=1)


class ResolutionOutputPushRequest(BaseModel):
    """Push output to a destination."""
    destination: str = Field(..., pattern="^(psa|kb_library|clipboard|email)$")


class ResolutionOutputPushResponse(BaseModel):
    """Result of pushing an output."""
    output_id: UUID
    status: str
    pushed_to: str
    pushed_reference: str | None = None


class AllResolutionOutputsResponse(BaseModel):
    """All three resolution outputs for a session."""
    outputs: list[ResolutionOutputResponse]
  • Step 4: Add branching fields to AISessionDetail

In backend/app/schemas/ai_session.py, add to AISessionDetail class (after conversation_messages field, line ~230):

    is_branching: bool = False
    active_branch_id: str | None = None
  • Step 5: Verify schemas compile

Run: cd /home/coder/resolutionflow/backend && python -c "from app.schemas.session_branch import *; from app.schemas.session_handoff import *; from app.schemas.session_resolution import *; print('Schemas OK')" Expected: Schemas OK

  • Step 6: Commit
git add backend/app/schemas/session_branch.py backend/app/schemas/session_handoff.py backend/app/schemas/session_resolution.py backend/app/schemas/ai_session.py
git commit -m "feat: add Pydantic schemas for branching, handoffs, and resolution outputs"

Phase 2: Branch Engine

Task 9: Write BranchManager Service — Tests First

Files:

  • Create: backend/tests/test_branch_manager.py

  • Create: backend/app/services/branch_manager.py

  • Step 1: Write failing tests for BranchManager

# backend/tests/test_branch_manager.py
"""Integration tests for BranchManager service."""
import uuid
import pytest
from httpx import AsyncClient

from app.models.ai_session import AISession
from app.models.session_branch import SessionBranch
from app.models.fork_point import ForkPoint
from app.models.ai_session_step import AISessionStep


@pytest.mark.asyncio
async def test_create_root_branch(client: AsyncClient, test_user, auth_headers, test_db):
    """Creating a root branch sets is_branching=True and copies conversation_messages."""
    # Create a session first
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="discovery",
        conversation_messages=[
            {"role": "user", "content": "test message"},
            {"role": "assistant", "content": "test response"},
        ],
    )
    test_db.add(session)
    await test_db.flush()

    from app.services.branch_manager import BranchManager
    manager = BranchManager(test_db)
    root = await manager.create_root_branch(session.id)

    assert root is not None
    assert root.parent_branch_id is None
    assert root.label == "Root"
    assert root.status == "active"
    assert root.branch_order == 1
    assert len(root.conversation_messages) == 2

    # Session should now have is_branching=True
    await test_db.refresh(session)
    assert session.is_branching is True
    assert session.active_branch_id == root.id


@pytest.mark.asyncio
async def test_create_fork(client: AsyncClient, test_user, auth_headers, test_db):
    """Creating a fork produces a ForkPoint + N branches."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="discovery",
        conversation_messages=[],
    )
    test_db.add(session)
    await test_db.flush()

    from app.services.branch_manager import BranchManager
    manager = BranchManager(test_db)
    root = await manager.create_root_branch(session.id)

    # Create a trigger step
    step = AISessionStep(
        session_id=session.id,
        step_order=0,
        step_type="question",
        content={"text": "What's the issue?"},
        confidence_at_step=0.5,
    )
    test_db.add(step)
    await test_db.flush()

    fork_point, branches = await manager.create_fork(
        session_id=session.id,
        parent_branch_id=root.id,
        trigger_step_id=step.id,
        fork_reason="Two possible causes identified",
        options=[
            {"label": "Network connectivity", "description": "Check network stack"},
            {"label": "DNS resolution", "description": "Check DNS config"},
        ],
    )

    assert fork_point is not None
    assert len(branches) == 2
    assert branches[0].label == "Network connectivity"
    assert branches[0].status == "untried"
    assert branches[0].parent_branch_id == root.id
    assert branches[1].label == "DNS resolution"
    assert branches[1].branch_order == 2

    # Trigger step should be marked as fork point
    await test_db.refresh(step)
    assert step.is_fork_point is True
    assert step.fork_point_id == fork_point.id


@pytest.mark.asyncio
async def test_switch_branch(client: AsyncClient, test_user, auth_headers, test_db):
    """Switching branches updates active_branch_id."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="discovery",
        conversation_messages=[],
    )
    test_db.add(session)
    await test_db.flush()

    from app.services.branch_manager import BranchManager
    manager = BranchManager(test_db)
    root = await manager.create_root_branch(session.id)

    step = AISessionStep(
        session_id=session.id, step_order=0, step_type="question",
        content={"text": "test"}, confidence_at_step=0.5,
    )
    test_db.add(step)
    await test_db.flush()

    _, branches = await manager.create_fork(
        session_id=session.id,
        parent_branch_id=root.id,
        trigger_step_id=step.id,
        fork_reason="test fork",
        options=[
            {"label": "Option A", "description": "desc A"},
            {"label": "Option B", "description": "desc B"},
        ],
    )

    branch_b = branches[1]
    result = await manager.switch_branch(session.id, branch_b.id)

    assert result.id == branch_b.id
    await test_db.refresh(session)
    assert session.active_branch_id == branch_b.id


@pytest.mark.asyncio
async def test_mark_branch_dead_end(client: AsyncClient, test_user, auth_headers, test_db):
    """Marking a branch as dead_end updates status."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="discovery",
        conversation_messages=[],
    )
    test_db.add(session)
    await test_db.flush()

    from app.services.branch_manager import BranchManager
    manager = BranchManager(test_db)
    root = await manager.create_root_branch(session.id)

    updated = await manager.mark_branch_status(
        branch_id=root.id,
        status="dead_end",
        reason="Network was fine, not the cause",
        user_id=test_user["user_data"]["id"],
    )

    assert updated.status == "dead_end"
    assert updated.status_reason == "Network was fine, not the cause"
    assert updated.status_changed_at is not None


@pytest.mark.asyncio
async def test_get_branch_tree(client: AsyncClient, test_user, auth_headers, test_db):
    """get_branch_tree returns the full tree structure."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="discovery",
        conversation_messages=[{"role": "user", "content": "help"}],
    )
    test_db.add(session)
    await test_db.flush()

    from app.services.branch_manager import BranchManager
    manager = BranchManager(test_db)
    root = await manager.create_root_branch(session.id)

    step = AISessionStep(
        session_id=session.id, step_order=0, step_type="question",
        content={"text": "test"}, confidence_at_step=0.5,
    )
    test_db.add(step)
    await test_db.flush()

    await manager.create_fork(
        session_id=session.id,
        parent_branch_id=root.id,
        trigger_step_id=step.id,
        fork_reason="test",
        options=[
            {"label": "A", "description": "a"},
            {"label": "B", "description": "b"},
        ],
    )

    tree = await manager.get_branch_tree(session.id)
    # Root + 2 fork branches = 3 total
    assert len(tree) == 3
  • Step 2: Run tests to verify they fail

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_branch_manager.py -v --no-header 2>&1 | head -30 Expected: FAIL — ModuleNotFoundError: No module named 'app.services.branch_manager'

  • Step 3: Implement BranchManager service
# backend/app/services/branch_manager.py
"""Branch lifecycle management for conversational branching.

Handles creating root branches, forking, switching, marking status,
reviving dead-end branches, and querying the branch tree.
"""
import uuid
import logging
from datetime import datetime, timezone
from typing import Any
from uuid import UUID

from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession

from app.models.ai_session import AISession
from app.models.ai_session_step import AISessionStep
from app.models.session_branch import SessionBranch
from app.models.fork_point import ForkPoint

logger = logging.getLogger(__name__)


class BranchManager:
    """Branch lifecycle management."""

    def __init__(self, db: AsyncSession):
        self.db = db

    async def create_root_branch(self, session_id: UUID) -> SessionBranch:
        """Create the root branch, copy conversation_messages, set is_branching=True."""
        result = await self.db.execute(
            select(AISession).where(AISession.id == session_id)
        )
        session = result.scalar_one_or_none()
        if not session:
            raise ValueError(f"Session {session_id} not found")

        root = SessionBranch(
            id=uuid.uuid4(),
            session_id=session_id,
            parent_branch_id=None,
            branch_order=1,
            label="Root",
            status="active",
            conversation_messages=list(session.conversation_messages or []),
        )
        self.db.add(root)

        session.is_branching = True
        session.active_branch_id = root.id

        await self.db.flush()
        return root

    async def create_fork(
        self,
        session_id: UUID,
        parent_branch_id: UUID,
        trigger_step_id: UUID | None,
        fork_reason: str,
        options: list[dict[str, str]],
    ) -> tuple[ForkPoint, list[SessionBranch]]:
        """Create a fork point with N branches.

        Pre-generates all branch UUIDs, then inserts ForkPoint + N SessionBranch
        rows in a single transaction. Enforces plan-tier branch limits.
        """
        # Enforce branch limits by plan tier
        from app.core.ai_quota_service import get_user_plan
        result = await self.db.execute(
            select(AISession).where(AISession.id == session_id)
        )
        session = result.scalar_one()
        existing_count_result = await self.db.execute(
            select(func.count()).select_from(SessionBranch).where(
                SessionBranch.session_id == session_id
            )
        )
        existing_count = existing_count_result.scalar() or 0

        # Plan limits: free=2, pro=5, team=10, enterprise=unlimited
        BRANCH_LIMITS = {"free": 2, "pro": 5, "team": 10}
        plan = await get_user_plan(session.user_id, self.db)
        max_branches = BRANCH_LIMITS.get(plan, 999)
        if existing_count + len(options) > max_branches:
            raise ValueError(
                f"Branch limit reached ({max_branches} for {plan} plan). "
                f"Current: {existing_count}, requested: {len(options)}"
            )

        # Pre-generate branch IDs
        branch_ids = [uuid.uuid4() for _ in options]

        # Build ForkPoint options JSONB with branch_ids
        fork_options = []
        for i, opt in enumerate(options):
            fork_options.append({
                "label": opt["label"],
                "description": opt["description"],
                "branch_id": str(branch_ids[i]),
                "status": "untried",
            })

        fork_point = ForkPoint(
            id=uuid.uuid4(),
            session_id=session_id,
            parent_branch_id=parent_branch_id,
            trigger_step_id=trigger_step_id,
            fork_reason=fork_reason,
            options=fork_options,
        )
        self.db.add(fork_point)

        # Get parent branch's conversation_messages for context
        result = await self.db.execute(
            select(SessionBranch).where(SessionBranch.id == parent_branch_id)
        )
        parent = result.scalar_one_or_none()
        parent_messages = list(parent.conversation_messages or []) if parent else []

        # Create branches
        branches = []
        for i, opt in enumerate(options):
            branch = SessionBranch(
                id=branch_ids[i],
                session_id=session_id,
                parent_branch_id=parent_branch_id,
                fork_point_step_id=trigger_step_id,
                branch_order=i + 1,
                label=opt["label"],
                status="untried",
                conversation_messages=parent_messages,
            )
            self.db.add(branch)
            branches.append(branch)

        # Mark trigger step as fork point
        if trigger_step_id:
            step_result = await self.db.execute(
                select(AISessionStep).where(AISessionStep.id == trigger_step_id)
            )
            step = step_result.scalar_one_or_none()
            if step:
                step.is_fork_point = True
                step.fork_point_id = fork_point.id

        await self.db.flush()
        return fork_point, branches

    async def switch_branch(self, session_id: UUID, target_branch_id: UUID) -> SessionBranch:
        """Switch the active branch for a session."""
        result = await self.db.execute(
            select(SessionBranch).where(
                SessionBranch.id == target_branch_id,
                SessionBranch.session_id == session_id,
            )
        )
        branch = result.scalar_one_or_none()
        if not branch:
            raise ValueError(f"Branch {target_branch_id} not found in session {session_id}")

        session_result = await self.db.execute(
            select(AISession).where(AISession.id == session_id)
        )
        session = session_result.scalar_one()
        session.active_branch_id = target_branch_id

        # Mark branch as active if it was untried
        if branch.status == "untried":
            branch.status = "active"
            branch.status_changed_at = datetime.now(timezone.utc)

        await self.db.flush()
        return branch

    async def mark_branch_status(
        self,
        branch_id: UUID,
        status: str,
        reason: str | None = None,
        user_id: UUID | None = None,
    ) -> SessionBranch:
        """Update a branch's status with reason and timestamp."""
        result = await self.db.execute(
            select(SessionBranch).where(SessionBranch.id == branch_id)
        )
        branch = result.scalar_one_or_none()
        if not branch:
            raise ValueError(f"Branch {branch_id} not found")

        branch.status = status
        branch.status_reason = reason
        branch.status_changed_at = datetime.now(timezone.utc)
        branch.status_changed_by = user_id

        await self.db.flush()
        return branch

    async def revive_branch(
        self,
        branch_id: UUID,
        evidence_from_branch_id: UUID,
        evidence_description: str,
    ) -> SessionBranch:
        """Revive a dead-end branch with evidence from another branch."""
        result = await self.db.execute(
            select(SessionBranch).where(SessionBranch.id == branch_id)
        )
        branch = result.scalar_one_or_none()
        if not branch:
            raise ValueError(f"Branch {branch_id} not found")

        branch.status = "revived"
        branch.status_changed_at = datetime.now(timezone.utc)
        branch.evidence_from_branch_id = evidence_from_branch_id
        branch.evidence_description = evidence_description

        # Prepend revival context to conversation
        revival_msg = {
            "role": "system",
            "content": f"[Branch Revived] New evidence from another branch: {evidence_description}",
        }
        msgs = list(branch.conversation_messages or [])
        msgs.append(revival_msg)
        branch.conversation_messages = msgs

        await self.db.flush()
        return branch

    async def get_branch_tree(self, session_id: UUID) -> list[SessionBranch]:
        """Get all branches for a session, ordered by branch_order."""
        result = await self.db.execute(
            select(SessionBranch)
            .where(SessionBranch.session_id == session_id)
            .order_by(SessionBranch.branch_order)
        )
        return list(result.scalars().all())

    async def build_cross_branch_context(self, branch_id: UUID) -> str:
        """Build cross-branch context from sibling summaries.

        Reads context_summary from all branches in the same session
        (excluding the current branch), prioritized:
        active > untried > revived > dead_end.
        """
        result = await self.db.execute(
            select(SessionBranch).where(SessionBranch.id == branch_id)
        )
        branch = result.scalar_one_or_none()
        if not branch:
            return ""

        siblings_result = await self.db.execute(
            select(SessionBranch)
            .where(
                SessionBranch.session_id == branch.session_id,
                SessionBranch.id != branch_id,
            )
            .order_by(SessionBranch.branch_order)
        )
        siblings = list(siblings_result.scalars().all())

        if not siblings:
            return ""

        # Priority order
        priority = {"active": 0, "untried": 1, "revived": 2, "dead_end": 3, "solved": 4}
        siblings.sort(key=lambda b: priority.get(b.status, 5))

        parts = ["\n## Cross-Branch Context"]
        for sib in siblings:
            summary = sib.context_summary
            if summary:
                tried = ", ".join(summary.get("tried", []))
                concluded = summary.get("concluded", "No conclusion yet")
                parts.append(f"- **{sib.label}** [{sib.status}]: Tried: {tried}. {concluded}")
            else:
                parts.append(f"- **{sib.label}** [{sib.status}]: No summary yet.")

        return "\n".join(parts)
  • Step 4: Run tests to verify they pass

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_branch_manager.py -v --no-header Expected: All 5 tests PASS.

  • Step 5: Commit
git add backend/tests/test_branch_manager.py backend/app/services/branch_manager.py
git commit -m "feat: add BranchManager service with integration tests"

Task 10: Write BranchAwarePromptBuilder — Tests First

Files:

  • Create: backend/tests/test_branch_aware_prompt_builder.py

  • Create: backend/app/services/branch_aware_prompt_builder.py

  • Step 1: Write failing tests for BranchAwarePromptBuilder

# backend/tests/test_branch_aware_prompt_builder.py
"""Unit tests for BranchAwarePromptBuilder — pure function, no DB needed."""
import pytest

from app.services.branch_aware_prompt_builder import BranchAwarePromptBuilder


def test_build_basic():
    """Basic build with no cross-branch context."""
    builder = BranchAwarePromptBuilder()
    result = builder.build(
        branch_messages=[
            {"role": "user", "content": "DNS not resolving"},
            {"role": "assistant", "content": "Let's check DNS config"},
        ],
        sibling_summaries="",
        session_context="Problem: DNS resolution failure. Domain: networking.",
        attachments=[],
        new_message="I ran nslookup and got timeout",
    )

    assert "system_base" in result
    assert "rag_context" in result
    assert "history" in result
    assert "new_message" in result
    assert "images" in result
    assert result["new_message"] == "I ran nslookup and got timeout"
    # History should NOT include the new_message
    assert len(result["history"]) == 2


def test_build_with_cross_branch_context():
    """Cross-branch summaries go into rag_context, not system_base."""
    builder = BranchAwarePromptBuilder()
    sibling_ctx = (
        "\n## Cross-Branch Context\n"
        "- **Network connectivity** [dead_end]: Tried: ping, traceroute. Network was fine."
    )
    result = builder.build(
        branch_messages=[],
        sibling_summaries=sibling_ctx,
        session_context="Problem: test",
        attachments=[],
        new_message="test message",
    )

    assert "Cross-Branch Context" in result["rag_context"]
    assert "Cross-Branch Context" not in result["system_base"]


def test_build_with_images():
    """Image attachments are passed through."""
    builder = BranchAwarePromptBuilder()
    result = builder.build(
        branch_messages=[],
        sibling_summaries="",
        session_context="Problem: test",
        attachments=[{"media_type": "image/png", "data": "base64data"}],
        new_message="check this screenshot",
    )

    assert len(result["images"]) == 1
    assert result["images"][0]["media_type"] == "image/png"


def test_build_with_revival_context():
    """Revival context is prepended to rag_context."""
    builder = BranchAwarePromptBuilder()
    result = builder.build(
        branch_messages=[],
        sibling_summaries="",
        session_context="Problem: test",
        attachments=[],
        new_message="test",
        revival_context="New evidence: the error appears when VPN is active",
    )

    assert "New evidence" in result["rag_context"]


def test_history_excludes_last_user_message_when_matching():
    """If the last message in branch_messages matches new_message, it's excluded from history."""
    builder = BranchAwarePromptBuilder()
    result = builder.build(
        branch_messages=[
            {"role": "user", "content": "first message"},
            {"role": "assistant", "content": "first response"},
        ],
        sibling_summaries="",
        session_context="Problem: test",
        attachments=[],
        new_message="second message",
    )

    # All branch messages should be in history (none match new_message)
    assert len(result["history"]) == 2
  • Step 2: Run tests to verify they fail

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_branch_aware_prompt_builder.py -v --no-header 2>&1 | head -20 Expected: FAIL — ModuleNotFoundError

  • Step 3: Implement BranchAwarePromptBuilder
# backend/app/services/branch_aware_prompt_builder.py
"""Branch-aware prompt builder — assembles AI context with cross-branch awareness.

Pure function: takes data, returns dict matching _call_ai parameter names.
No DB access, no LLM calls. The caller pre-fetches all data.

Return keys: system_base, rag_context, history, new_message, images
- system_base: stable system prompt (cached by Anthropic)
- rag_context: cross-branch summaries + attachment descriptions (NOT cached)
"""
from typing import Any

from app.services.assistant_chat_service import ASSISTANT_SYSTEM_PROMPT


class BranchAwarePromptBuilder:
    """Assembles prompt components for branch-aware AI calls."""

    def build(
        self,
        branch_messages: list[dict[str, Any]],
        sibling_summaries: str,
        session_context: str,
        attachments: list[dict[str, Any]],
        new_message: str,
        revival_context: str | None = None,
        token_budget: int = 100_000,
    ) -> dict[str, Any]:
        """Build prompt components for _call_ai.

        Args:
            branch_messages: Conversation history for the current branch.
            sibling_summaries: Cross-branch context from BranchManager.build_cross_branch_context.
            session_context: Problem summary, domain, client info, PSA data.
            attachments: Image dicts [{media_type, data}] from current branch uploads.
            new_message: The user's current message.
            revival_context: If branch was revived, the evidence description.
            token_budget: Max tokens for the entire prompt (default 100k).

        Returns:
            Dict with keys matching _call_ai params: system_base, rag_context,
            history, new_message, images.
        """
        # 1. system_base — stable, cached across turns
        system_base = ASSISTANT_SYSTEM_PROMPT + "\n\n## Session Context\n" + session_context

        # 2. rag_context — changes per query, NOT cached
        rag_parts = []
        if revival_context:
            rag_parts.append(f"\n## Branch Revival\n{revival_context}")
        if sibling_summaries:
            rag_parts.append(sibling_summaries)
        rag_context = "\n".join(rag_parts)

        # 3. history — branch messages as role/content dicts
        history = []
        for msg in branch_messages:
            if msg.get("role") in ("user", "assistant"):
                history.append({"role": msg["role"], "content": msg["content"]})

        # 4. images
        images = attachments if attachments else None

        return {
            "system_base": system_base,
            "rag_context": rag_context,
            "history": history,
            "new_message": new_message,
            "images": images,
        }
  • Step 4: Run tests to verify they pass

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_branch_aware_prompt_builder.py -v --no-header Expected: All 5 tests PASS.

  • Step 5: Commit
git add backend/tests/test_branch_aware_prompt_builder.py backend/app/services/branch_aware_prompt_builder.py
git commit -m "feat: add BranchAwarePromptBuilder with unit tests"

Task 11: Write Branch API Endpoints

Files:

  • Create: backend/app/api/endpoints/session_branches.py

  • Modify: backend/app/api/router.py:1-88

  • Create: backend/tests/test_session_branches_api.py

  • Step 1: Write failing API tests

# backend/tests/test_session_branches_api.py
"""API endpoint tests for session branches."""
import pytest
from httpx import AsyncClient

from app.models.ai_session import AISession
from app.models.ai_session_step import AISessionStep


@pytest.mark.asyncio
async def test_list_branches_empty(client: AsyncClient, test_user, auth_headers, test_db):
    """GET /ai-sessions/{id}/branches returns empty for non-branching session."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="discovery",
        conversation_messages=[],
    )
    test_db.add(session)
    await test_db.commit()

    resp = await client.get(
        f"/api/v1/ai-sessions/{session.id}/branches",
        headers=auth_headers,
    )
    assert resp.status_code == 200
    data = resp.json()
    assert data["branches"] == []
    assert data["active_branch_id"] is None


@pytest.mark.asyncio
async def test_create_fork(client: AsyncClient, test_user, auth_headers, test_db):
    """POST /ai-sessions/{id}/branches/fork creates branches."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="discovery",
        conversation_messages=[{"role": "user", "content": "help"}],
    )
    test_db.add(session)
    await test_db.flush()

    step = AISessionStep(
        session_id=session.id, step_order=0, step_type="question",
        content={"text": "test"}, confidence_at_step=0.5,
    )
    test_db.add(step)
    await test_db.commit()

    resp = await client.post(
        f"/api/v1/ai-sessions/{session.id}/branches/fork",
        headers=auth_headers,
        json={
            "fork_reason": "Two possible causes",
            "options": [
                {"label": "Network issue", "description": "Check connectivity"},
                {"label": "DNS problem", "description": "Check DNS"},
            ],
        },
    )
    assert resp.status_code == 201
    data = resp.json()
    assert len(data["options"]) == 2


@pytest.mark.asyncio
async def test_switch_branch(client: AsyncClient, test_user, auth_headers, test_db):
    """POST /ai-sessions/{id}/branches/{bid}/switch changes active branch."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="discovery",
        conversation_messages=[{"role": "user", "content": "help"}],
    )
    test_db.add(session)
    await test_db.flush()

    step = AISessionStep(
        session_id=session.id, step_order=0, step_type="question",
        content={"text": "test"}, confidence_at_step=0.5,
    )
    test_db.add(step)
    await test_db.commit()

    # Create fork first
    fork_resp = await client.post(
        f"/api/v1/ai-sessions/{session.id}/branches/fork",
        headers=auth_headers,
        json={
            "fork_reason": "test",
            "options": [
                {"label": "A", "description": "a"},
                {"label": "B", "description": "b"},
            ],
        },
    )
    fork_data = fork_resp.json()
    branch_b_id = fork_data["options"][1]["branch_id"]

    # Switch to branch B
    resp = await client.post(
        f"/api/v1/ai-sessions/{session.id}/branches/{branch_b_id}/switch",
        headers=auth_headers,
    )
    assert resp.status_code == 200
    assert resp.json()["active_branch_id"] == branch_b_id
  • Step 2: Run tests to verify they fail

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_session_branches_api.py -v --no-header 2>&1 | head -20 Expected: FAIL — route not found (404).

  • Step 3: Implement session_branches endpoint
# backend/app/api/endpoints/session_branches.py
"""Branch management endpoints for conversational branching.

  GET    /ai-sessions/{id}/branches              — List all branches (tree)
  POST   /ai-sessions/{id}/branches/fork         — Create fork with N branches
  PATCH  /ai-sessions/{id}/branches/{bid}        — Update branch status
  POST   /ai-sessions/{id}/branches/{bid}/switch — Switch active branch
  POST   /ai-sessions/{id}/branches/{bid}/revive — Revive dead-end branch
  POST   /ai-sessions/{id}/branches/{bid}/message — Send message on branch
"""
import logging
from typing import Annotated
from uuid import UUID

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.deps import get_current_active_user, get_db
from app.models.user import User
from app.models.ai_session import AISession
from app.services.branch_manager import BranchManager
from app.services.branch_aware_prompt_builder import BranchAwarePromptBuilder
from app.services.assistant_chat_service import _call_ai
from app.schemas.session_branch import (
    BranchTreeResponse,
    BranchResponse,
    BranchUpdate,
    ForkCreateRequest,
    ForkPointResponse,
    BranchSwitchResponse,
    ReviveRequest,
    BranchMessageRequest,
    BranchMessageResponse,
)

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/ai-sessions/{session_id}/branches", tags=["session-branches"])


async def _get_user_session(
    session_id: UUID, user: User, db: AsyncSession
) -> AISession:
    """Fetch session owned by user, or raise 404."""
    result = await db.execute(
        select(AISession).where(
            AISession.id == session_id,
            AISession.user_id == user.id,
        )
    )
    session = result.scalar_one_or_none()
    if not session:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
    return session


@router.get("", response_model=BranchTreeResponse)
async def list_branches(
    session_id: UUID,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> BranchTreeResponse:
    """Get branch tree for a session."""
    session = await _get_user_session(session_id, current_user, db)
    manager = BranchManager(db)
    branches = await manager.get_branch_tree(session_id)

    branch_responses = []
    for b in branches:
        branch_responses.append(BranchResponse(
            id=b.id,
            session_id=b.session_id,
            parent_branch_id=b.parent_branch_id,
            fork_point_step_id=b.fork_point_step_id,
            branch_order=b.branch_order,
            label=b.label,
            status=b.status,
            status_reason=b.status_reason,
            status_changed_at=b.status_changed_at,
            context_summary=b.context_summary,
            evidence_from_branch_id=b.evidence_from_branch_id,
            evidence_description=b.evidence_description,
            created_at=b.created_at,
            updated_at=b.updated_at,
        ))

    return BranchTreeResponse(
        branches=branch_responses,
        active_branch_id=session.active_branch_id,
    )


@router.post("/fork", response_model=ForkPointResponse, status_code=status.HTTP_201_CREATED)
async def create_fork(
    session_id: UUID,
    body: ForkCreateRequest,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> ForkPointResponse:
    """Create a fork point with N branches."""
    session = await _get_user_session(session_id, current_user, db)

    if session.status not in ("active", "paused"):
        raise HTTPException(status_code=400, detail=f"Cannot fork a {session.status} session")

    manager = BranchManager(db)

    # Ensure branching is initialized
    if not session.is_branching:
        await manager.create_root_branch(session_id)
        await db.refresh(session)

    # Use the active branch as parent
    parent_branch_id = session.active_branch_id
    if not parent_branch_id:
        raise HTTPException(status_code=400, detail="No active branch to fork from")

    options = [{"label": o.label, "description": o.description} for o in body.options]

    fork_point, branches = await manager.create_fork(
        session_id=session_id,
        parent_branch_id=parent_branch_id,
        trigger_step_id=None,  # Could be set from last step
        fork_reason=body.fork_reason,
        options=options,
    )

    await db.commit()
    return ForkPointResponse.model_validate(fork_point)


@router.patch("/{branch_id}", response_model=BranchResponse)
async def update_branch_status(
    session_id: UUID,
    branch_id: UUID,
    body: BranchUpdate,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> BranchResponse:
    """Update a branch's status."""
    await _get_user_session(session_id, current_user, db)
    manager = BranchManager(db)

    try:
        branch = await manager.mark_branch_status(
            branch_id=branch_id,
            status=body.status,
            reason=body.status_reason,
            user_id=current_user.id,
        )
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))

    await db.commit()
    return BranchResponse.model_validate(branch)


@router.post("/{branch_id}/switch", response_model=BranchSwitchResponse)
async def switch_branch(
    session_id: UUID,
    branch_id: UUID,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> BranchSwitchResponse:
    """Switch the active branch."""
    await _get_user_session(session_id, current_user, db)
    manager = BranchManager(db)

    try:
        branch = await manager.switch_branch(session_id, branch_id)
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))

    await db.commit()
    return BranchSwitchResponse(
        active_branch_id=branch.id,
        branch=BranchResponse.model_validate(branch),
        conversation_messages=branch.conversation_messages,
    )


@router.post("/{branch_id}/revive", response_model=BranchResponse)
async def revive_branch(
    session_id: UUID,
    branch_id: UUID,
    body: ReviveRequest,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> BranchResponse:
    """Revive a dead-end branch with new evidence."""
    await _get_user_session(session_id, current_user, db)
    manager = BranchManager(db)

    try:
        branch = await manager.revive_branch(
            branch_id=branch_id,
            evidence_from_branch_id=body.evidence_from_branch_id,
            evidence_description=body.evidence_description,
        )
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))

    await db.commit()
    return BranchResponse.model_validate(branch)


@router.post("/{branch_id}/message", response_model=BranchMessageResponse)
async def send_branch_message(
    session_id: UUID,
    branch_id: UUID,
    body: BranchMessageRequest,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> BranchMessageResponse:
    """Send a message on a specific branch."""
    session = await _get_user_session(session_id, current_user, db)

    if session.status not in ("active", "paused"):
        raise HTTPException(status_code=400, detail=f"Cannot message a {session.status} session")

    manager = BranchManager(db)

    # Switch to branch if not already active
    if session.active_branch_id != branch_id:
        await manager.switch_branch(session_id, branch_id)
        await db.refresh(session)

    # Get branch
    from sqlalchemy import select as sa_select
    from app.models.session_branch import SessionBranch
    result = await db.execute(
        sa_select(SessionBranch).where(SessionBranch.id == branch_id)
    )
    branch = result.scalar_one_or_none()
    if not branch:
        raise HTTPException(status_code=404, detail="Branch not found")

    # Build cross-branch context
    sibling_ctx = await manager.build_cross_branch_context(branch_id)

    # Build prompt
    builder = BranchAwarePromptBuilder()
    session_context = f"Problem: {session.problem_summary or 'Unknown'}. Domain: {session.problem_domain or 'Unknown'}."
    prompt_args = builder.build(
        branch_messages=branch.conversation_messages,
        sibling_summaries=sibling_ctx,
        session_context=session_context,
        attachments=[],  # TODO: fetch image attachments for this branch
        new_message=body.message,
        revival_context=branch.evidence_description if branch.status == "revived" else None,
    )

    # Call AI
    ai_content, input_tokens, output_tokens = await _call_ai(**prompt_args)

    # Update branch conversation
    msgs = list(branch.conversation_messages or [])
    msgs.append({"role": "user", "content": body.message})
    msgs.append({"role": "assistant", "content": ai_content})
    branch.conversation_messages = msgs

    # Update session token counts
    session.total_input_tokens += input_tokens
    session.total_output_tokens += output_tokens

    # Resume if paused
    if session.status == "paused":
        session.status = "active"

    await db.commit()

    return BranchMessageResponse(
        content=ai_content,
        branch_id=branch_id,
    )
  • Step 4: Register router in router.py

In backend/app/api/router.py, add:

from app.api.endpoints import session_branches

And:

api_router.include_router(session_branches.router)
  • Step 5: Run tests to verify they pass

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_session_branches_api.py -v --no-header Expected: All 3 tests PASS.

  • Step 6: Commit
git add backend/app/api/endpoints/session_branches.py backend/tests/test_session_branches_api.py backend/app/api/router.py
git commit -m "feat: add branch API endpoints with integration tests"

Task 12: Integrate Branching Into Existing Chat Service

Files:

  • Modify: backend/app/services/unified_chat_service.py:54-132

  • Modify: backend/app/services/flowpilot_engine.py (step creation)

  • Step 1: Add is_branching guard to unified_chat_service.send_chat_message

In backend/app/services/unified_chat_service.py, after the session status check (line ~82), add:

    # If branching is active, route to branch message handler
    if session.is_branching and session.active_branch_id:
        from app.services.branch_manager import BranchManager
        from app.services.branch_aware_prompt_builder import BranchAwarePromptBuilder
        from app.models.session_branch import SessionBranch

        branch_result = await db.execute(
            select(SessionBranch).where(SessionBranch.id == session.active_branch_id)
        )
        branch = branch_result.scalar_one_or_none()
        if branch:
            manager = BranchManager(db)
            sibling_ctx = await manager.build_cross_branch_context(branch.id)

            builder = BranchAwarePromptBuilder()
            session_context = f"Problem: {session.problem_summary or 'Unknown'}. Domain: {session.problem_domain or 'Unknown'}."
            prompt_args = builder.build(
                branch_messages=branch.conversation_messages,
                sibling_summaries=sibling_ctx,
                session_context=session_context,
                attachments=[],
                new_message=message,
                revival_context=branch.evidence_description if branch.status == "revived" else None,
            )

            # Override images from prompt_args with actual images if provided
            if images:
                prompt_args["images"] = images
            ai_content, input_tokens, output_tokens = await _call_ai(**prompt_args)

            # Update branch conversation
            msgs = list(branch.conversation_messages or [])
            msgs.append({"role": "user", "content": message})
            msgs.append({"role": "assistant", "content": ai_content})
            branch.conversation_messages = msgs

            session.total_input_tokens += input_tokens
            session.total_output_tokens += output_tokens
            session.step_count += 2

            if session.status == "paused":
                session.status = "active"

            suggested_flows = extract_suggested_flows(
                await rag_search(query=message, account_id=account_id, db=db, limit=8)
            )
            return ai_content, suggested_flows, session
  • Step 2: Add branch_id to step creation in flowpilot_engine

In backend/app/services/flowpilot_engine.py, find the AISessionStep( constructor calls (around lines 640, 926, 1244). In each, add after session_id=session.id:

        branch_id=session.active_branch_id if session.is_branching else None,

This is a one-line addition in each of three places.

  • Step 3: Verify existing tests still pass

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_ai_sessions.py tests/test_ai_chat.py -v --no-header 2>&1 | tail -20 Expected: All existing tests still pass (branching path not triggered).

  • Step 4: Commit
git add backend/app/services/unified_chat_service.py backend/app/services/flowpilot_engine.py
git commit -m "feat: integrate branching into chat service and step creation"

Phase 3: Handoff System

Task 13: Write HandoffManager Service — Tests First

Files:

  • Create: backend/tests/test_handoff_manager.py

  • Create: backend/app/services/handoff_manager.py

  • Step 1: Write failing tests

# backend/tests/test_handoff_manager.py
"""Integration tests for HandoffManager service."""
import pytest
from httpx import AsyncClient

from app.models.ai_session import AISession


@pytest.mark.asyncio
async def test_create_park_handoff(client: AsyncClient, test_user, auth_headers, test_db):
    """Parking a session creates a handoff with snapshot."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="discovery",
        conversation_messages=[{"role": "user", "content": "help me"}],
    )
    test_db.add(session)
    await test_db.flush()

    from app.services.handoff_manager import HandoffManager
    manager = HandoffManager(test_db)

    handoff = await manager.create_handoff(
        session_id=session.id,
        intent="park",
        engineer_notes="Waiting for client to provide logs",
        user_id=test_user["user_data"]["id"],
    )

    assert handoff is not None
    assert handoff.intent == "park"
    assert handoff.engineer_notes == "Waiting for client to provide logs"
    assert handoff.snapshot is not None

    # Session should be paused
    await test_db.refresh(session)
    assert session.status == "paused"
    assert session.handoff_count == 1


@pytest.mark.asyncio
async def test_create_escalate_handoff(client: AsyncClient, test_user, auth_headers, test_db):
    """Escalating creates handoff + dual-writes to escalation_package."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="discovery",
        conversation_messages=[],
    )
    test_db.add(session)
    await test_db.flush()

    from app.services.handoff_manager import HandoffManager
    manager = HandoffManager(test_db)

    handoff = await manager.create_handoff(
        session_id=session.id,
        intent="escalate",
        engineer_notes="Need senior help",
        user_id=test_user["user_data"]["id"],
    )

    assert handoff.intent == "escalate"

    # Dual-write check
    await test_db.refresh(session)
    assert session.status == "escalated"
    assert session.escalation_package is not None
    assert "branch_map" in session.escalation_package or "snapshot" in session.escalation_package


@pytest.mark.asyncio
async def test_claim_session(client: AsyncClient, test_user, test_admin, auth_headers, test_db):
    """Claiming a handoff sets claimed_by and reactivates session."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="discovery",
        conversation_messages=[],
    )
    test_db.add(session)
    await test_db.flush()

    from app.services.handoff_manager import HandoffManager
    manager = HandoffManager(test_db)

    handoff = await manager.create_handoff(
        session_id=session.id,
        intent="escalate",
        engineer_notes="Need help",
        user_id=test_user["user_data"]["id"],
    )

    claimed = await manager.claim_session(
        handoff_id=handoff.id,
        claiming_user_id=test_admin["user_data"]["id"],
    )

    assert claimed.claimed_by == test_admin["user_data"]["id"]
    assert claimed.claimed_at is not None

    await test_db.refresh(session)
    assert session.status == "active"
  • Step 2: Run tests to verify they fail

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_handoff_manager.py -v --no-header 2>&1 | head -15 Expected: FAIL — ModuleNotFoundError

  • Step 3: Implement HandoffManager
# backend/app/services/handoff_manager.py
"""Handoff management — unified park/escalate with dual-write backward compat.

Creates handoff snapshots, AI assessments (for escalations), claim workflow,
and queue queries. Dual-writes to ai_sessions.escalation_package for
backward compatibility with the existing escalation queue.
"""
import logging
from datetime import datetime, timezone
from typing import Any
from uuid import UUID

from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession

from app.models.ai_session import AISession
from app.models.session_branch import SessionBranch
from app.models.session_handoff import SessionHandoff

logger = logging.getLogger(__name__)


class HandoffManager:
    """Unified park/escalate handoff management."""

    def __init__(self, db: AsyncSession):
        self.db = db

    async def create_handoff(
        self,
        session_id: UUID,
        intent: str,
        engineer_notes: str | None,
        user_id: UUID,
        priority: str = "normal",
    ) -> SessionHandoff:
        """Create a handoff (park or escalate).

        Generates snapshot, updates session status, dual-writes to
        escalation_package for backward compat.
        """
        result = await self.db.execute(
            select(AISession).where(AISession.id == session_id)
        )
        session = result.scalar_one_or_none()
        if not session:
            raise ValueError(f"Session {session_id} not found")

        # Generate snapshot
        snapshot = await self._generate_snapshot(session)

        # Generate AI assessment for escalations
        ai_assessment = None
        ai_assessment_data = None
        if intent == "escalate":
            ai_assessment, ai_assessment_data = await self._generate_ai_assessment(session)

        handoff = SessionHandoff(
            session_id=session_id,
            handed_off_by=user_id,
            intent=intent,
            source_branch_id=session.active_branch_id,
            snapshot=snapshot,
            ai_assessment=ai_assessment,
            ai_assessment_data=ai_assessment_data,
            engineer_notes=engineer_notes,
            priority=priority,
        )
        self.db.add(handoff)

        # Update session status
        if intent == "park":
            session.status = "paused"
        elif intent == "escalate":
            session.status = "escalated"

        session.handoff_count = (session.handoff_count or 0) + 1

        # Dual-write for backward compat
        session.escalation_package = {
            "snapshot": snapshot,
            "intent": intent,
            "engineer_notes": engineer_notes,
            "handoff_id": str(handoff.id),
        }

        await self.db.flush()
        return handoff

    async def _generate_snapshot(self, session: AISession) -> dict[str, Any]:
        """Generate a snapshot of the session state at handoff time."""
        snapshot: dict[str, Any] = {
            "problem_summary": session.problem_summary,
            "problem_domain": session.problem_domain,
            "status": session.status,
            "step_count": session.step_count,
            "confidence_tier": session.confidence_tier,
        }

        # Add branch map if branching is active
        if session.is_branching:
            branches_result = await self.db.execute(
                select(SessionBranch)
                .where(SessionBranch.session_id == session.id)
                .order_by(SessionBranch.branch_order)
            )
            branches = list(branches_result.scalars().all())

            branch_map = []
            for b in branches:
                branch_map.append({
                    "id": str(b.id),
                    "label": b.label,
                    "status": b.status,
                    "status_reason": b.status_reason,
                    "parent_branch_id": str(b.parent_branch_id) if b.parent_branch_id else None,
                })
            snapshot["branch_map"] = branch_map
            snapshot["active_branch_id"] = str(session.active_branch_id) if session.active_branch_id else None

        return snapshot

    async def claim_session(
        self,
        handoff_id: UUID,
        claiming_user_id: UUID,
    ) -> SessionHandoff:
        """Claim a handed-off session."""
        result = await self.db.execute(
            select(SessionHandoff).where(SessionHandoff.id == handoff_id)
        )
        handoff = result.scalar_one_or_none()
        if not handoff:
            raise ValueError(f"Handoff {handoff_id} not found")

        handoff.claimed_by = claiming_user_id
        handoff.claimed_at = datetime.now(timezone.utc)

        # Reactivate session
        session_result = await self.db.execute(
            select(AISession).where(AISession.id == handoff.session_id)
        )
        session = session_result.scalar_one()
        session.status = "active"

        # Dual-write
        session.escalated_to_id = claiming_user_id

        await self.db.flush()
        return handoff

    async def _generate_ai_assessment(
        self, session: AISession
    ) -> tuple[str | None, dict[str, Any] | None]:
        """Generate AI diagnostic assessment for escalation handoffs."""
        try:
            from app.services.assistant_chat_service import _call_ai

            context = f"Problem: {session.problem_summary or 'Unknown'}\nDomain: {session.problem_domain or 'Unknown'}"
            msgs = session.conversation_messages or []
            # Include last 10 messages for context
            recent = "\n".join(
                f"[{m.get('role', '?')}]: {m.get('content', '')[:200]}"
                for m in msgs[-10:]
            )

            assessment_text, _, _ = await _call_ai(
                system_base="You are a diagnostic assessment generator for MSP escalations.",
                rag_context="",
                history=[],
                new_message=(
                    f"Generate a brief diagnostic assessment for this escalation.\n"
                    f"{context}\n\nRecent conversation:\n{recent}\n\n"
                    f"Return: 1) Most likely cause, 2) Suggested next steps, 3) Confidence (low/medium/high)"
                ),
                max_tokens=500,
            )

            assessment_data = {
                "likely_cause": "See assessment text",
                "suggested_steps": [],
                "confidence": "medium",
            }

            return assessment_text, assessment_data
        except Exception:
            logger.exception("Failed to generate AI assessment")
            return None, None

    async def generate_briefing(
        self, handoff_id: UUID, claiming_user_id: UUID
    ) -> str:
        """Generate a natural-language briefing for the engineer claiming the session."""
        result = await self.db.execute(
            select(SessionHandoff).where(SessionHandoff.id == handoff_id)
        )
        handoff = result.scalar_one_or_none()
        if not handoff:
            raise ValueError(f"Handoff {handoff_id} not found")

        session_result = await self.db.execute(
            select(AISession).where(AISession.id == handoff.session_id)
        )
        session = session_result.scalar_one()

        from app.services.assistant_chat_service import _call_ai

        snapshot_text = str(handoff.snapshot)[:2000]
        briefing, _, _ = await _call_ai(
            system_base="You are a handoff briefing generator for MSP teams.",
            rag_context="",
            history=[],
            new_message=(
                f"Generate a concise briefing for an engineer picking up this session.\n"
                f"Problem: {session.problem_summary}\n"
                f"Intent: {handoff.intent}\n"
                f"Engineer notes: {handoff.engineer_notes or 'None'}\n"
                f"Snapshot: {snapshot_text}\n"
                f"AI Assessment: {handoff.ai_assessment or 'None'}"
            ),
            max_tokens=500,
        )
        return briefing

    async def push_to_psa(self, handoff_id: UUID) -> SessionHandoff:
        """Push handoff notes to PSA via existing psa_documentation_service."""
        result = await self.db.execute(
            select(SessionHandoff).where(SessionHandoff.id == handoff_id)
        )
        handoff = result.scalar_one_or_none()
        if not handoff:
            raise ValueError(f"Handoff {handoff_id} not found")

        # Route to existing PSA documentation service
        try:
            from app.services.psa_documentation_service import push_session_notes
            session_result = await self.db.execute(
                select(AISession).where(AISession.id == handoff.session_id)
            )
            session = session_result.scalar_one()
            if session.psa_ticket_id and session.psa_connection_id:
                note_id = await push_session_notes(
                    session=session,
                    notes_content=handoff.ai_assessment or str(handoff.snapshot),
                    db=self.db,
                )
                handoff.psa_note_pushed = True
                handoff.psa_note_id = note_id
        except Exception:
            logger.exception(f"Failed to push handoff {handoff_id} to PSA")

        await self.db.flush()
        return handoff

    async def get_queue(
        self,
        team_id: UUID | None = None,
        account_id: UUID | None = None,
    ) -> list[dict[str, Any]]:
        """Get team queue of parked + escalated sessions."""
        query = (
            select(SessionHandoff, AISession)
            .join(AISession, SessionHandoff.session_id == AISession.id)
            .where(SessionHandoff.claimed_by.is_(None))
            .order_by(SessionHandoff.created_at.desc())
        )

        if team_id:
            query = query.where(AISession.team_id == team_id)
        elif account_id:
            query = query.where(AISession.account_id == account_id)

        result = await self.db.execute(query)
        rows = result.all()

        queue_items = []
        for handoff, session in rows:
            queue_items.append({
                "handoff_id": handoff.id,
                "session_id": session.id,
                "intent": handoff.intent,
                "problem_summary": session.problem_summary,
                "problem_domain": session.problem_domain,
                "priority": handoff.priority,
                "engineer_notes": handoff.engineer_notes,
                "created_at": handoff.created_at,
                "claimed_by": handoff.claimed_by,
                "claimed_at": handoff.claimed_at,
            })

        return queue_items
  • Step 4: Run tests to verify they pass

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_handoff_manager.py -v --no-header Expected: All 3 tests PASS.

  • Step 5: Commit
git add backend/tests/test_handoff_manager.py backend/app/services/handoff_manager.py
git commit -m "feat: add HandoffManager service with dual-write and integration tests"

Task 14: Write Handoff API Endpoints

Files:

  • Create: backend/app/api/endpoints/session_handoffs.py

  • Modify: backend/app/api/router.py

  • Create: backend/tests/test_session_handoffs_api.py

  • Step 1: Write failing API tests

# backend/tests/test_session_handoffs_api.py
"""API endpoint tests for session handoffs."""
import pytest
from httpx import AsyncClient

from app.models.ai_session import AISession


@pytest.mark.asyncio
async def test_create_park_handoff_api(client: AsyncClient, test_user, auth_headers, test_db):
    """POST /ai-sessions/{id}/handoff with intent=park."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="discovery",
        conversation_messages=[],
    )
    test_db.add(session)
    await test_db.commit()

    resp = await client.post(
        f"/api/v1/ai-sessions/{session.id}/handoff",
        headers=auth_headers,
        json={"intent": "park", "engineer_notes": "Waiting for logs"},
    )
    assert resp.status_code == 201
    data = resp.json()
    assert data["intent"] == "park"


@pytest.mark.asyncio
async def test_get_queue(client: AsyncClient, test_user, auth_headers, test_db):
    """GET /ai-sessions/queue returns unclaimed handoffs."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="discovery",
        conversation_messages=[],
    )
    test_db.add(session)
    await test_db.commit()

    # Create a handoff
    await client.post(
        f"/api/v1/ai-sessions/{session.id}/handoff",
        headers=auth_headers,
        json={"intent": "escalate", "engineer_notes": "Need help"},
    )

    resp = await client.get("/api/v1/ai-sessions/queue", headers=auth_headers)
    assert resp.status_code == 200
    data = resp.json()
    assert len(data) >= 1
  • Step 2: Implement session_handoffs endpoint
# backend/app/api/endpoints/session_handoffs.py
"""Handoff endpoints — unified park/escalate.

  POST   /ai-sessions/{id}/handoff              — Create handoff
  GET    /ai-sessions/{id}/handoffs              — Handoff history
  POST   /ai-sessions/{id}/handoffs/{hid}/claim  — Claim session
  GET    /ai-sessions/queue                       — Team queue
"""
import logging
from typing import Annotated
from uuid import UUID

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.deps import get_current_active_user, get_db
from app.models.user import User
from app.models.ai_session import AISession
from app.models.session_handoff import SessionHandoff
from app.services.handoff_manager import HandoffManager
from app.schemas.session_handoff import (
    HandoffCreateRequest,
    HandoffResponse,
)

logger = logging.getLogger(__name__)

# Queue endpoint needs its own router (no session_id prefix)
queue_router = APIRouter(prefix="/ai-sessions", tags=["session-handoffs"])

# Session-scoped endpoints
router = APIRouter(prefix="/ai-sessions/{session_id}", tags=["session-handoffs"])


@router.post("/handoff", response_model=HandoffResponse, status_code=status.HTTP_201_CREATED)
async def create_handoff(
    session_id: UUID,
    body: HandoffCreateRequest,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> HandoffResponse:
    """Create a handoff (park or escalate)."""
    result = await db.execute(
        select(AISession).where(
            AISession.id == session_id,
            AISession.user_id == current_user.id,
        )
    )
    session = result.scalar_one_or_none()
    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    manager = HandoffManager(db)
    try:
        handoff = await manager.create_handoff(
            session_id=session_id,
            intent=body.intent,
            engineer_notes=body.engineer_notes,
            user_id=current_user.id,
            priority=body.priority,
        )
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

    await db.commit()
    return HandoffResponse.model_validate(handoff)


@router.get("/handoffs", response_model=list[HandoffResponse])
async def list_handoffs(
    session_id: UUID,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> list[HandoffResponse]:
    """Get handoff history for a session."""
    result = await db.execute(
        select(SessionHandoff)
        .where(SessionHandoff.session_id == session_id)
        .order_by(SessionHandoff.created_at.desc())
    )
    handoffs = result.scalars().all()
    return [HandoffResponse.model_validate(h) for h in handoffs]


@router.post("/handoffs/{handoff_id}/claim", response_model=HandoffResponse)
async def claim_handoff(
    session_id: UUID,
    handoff_id: UUID,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> HandoffResponse:
    """Claim a handed-off session."""
    manager = HandoffManager(db)
    try:
        handoff = await manager.claim_session(
            handoff_id=handoff_id,
            claiming_user_id=current_user.id,
        )
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))

    await db.commit()
    return HandoffResponse.model_validate(handoff)


@queue_router.get("/queue")
async def get_queue(
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> list[dict]:
    """Get team queue of parked + escalated sessions."""
    manager = HandoffManager(db)
    return await manager.get_queue(
        team_id=current_user.team_id,
        account_id=current_user.account_id,
    )
  • Step 3: Register both routers in router.py

Add to backend/app/api/router.py:

from app.api.endpoints import session_handoffs
api_router.include_router(session_handoffs.router)
api_router.include_router(session_handoffs.queue_router)
  • Step 4: Run tests

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_session_handoffs_api.py -v --no-header Expected: All 2 tests PASS.

  • Step 5: Commit
git add backend/app/api/endpoints/session_handoffs.py backend/tests/test_session_handoffs_api.py backend/app/api/router.py
git commit -m "feat: add handoff API endpoints with queue and integration tests"

Phase 4: Resolution Outputs

Task 15: Write ResolutionOutputGenerator Service — Tests First

Files:

  • Create: backend/tests/test_resolution_outputs.py

  • Create: backend/app/services/resolution_output_generator.py

  • Step 1: Write failing tests

# backend/tests/test_resolution_outputs.py
"""Integration tests for ResolutionOutputGenerator."""
import pytest
from unittest.mock import AsyncMock, patch
from httpx import AsyncClient

from app.models.ai_session import AISession
from app.models.session_resolution_output import SessionResolutionOutput


@pytest.mark.asyncio
@patch("app.services.resolution_output_generator._call_ai")
async def test_generate_all_creates_three_outputs(
    mock_call_ai, client: AsyncClient, test_user, auth_headers, test_db
):
    """generate_all creates PSA notes, KB article, and client summary."""
    mock_call_ai.return_value = ("Generated content here", 100, 50)

    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="resolved",
        confidence_tier="guided",
        conversation_messages=[
            {"role": "user", "content": "DNS not working"},
            {"role": "assistant", "content": "Fixed by flushing DNS cache"},
        ],
        resolution_summary="Flushed DNS cache",
    )
    test_db.add(session)
    await test_db.flush()

    from app.services.resolution_output_generator import ResolutionOutputGenerator
    gen = ResolutionOutputGenerator(test_db)
    outputs = await gen.generate_all(session.id)

    assert len(outputs) == 3
    types = {o.output_type for o in outputs}
    assert types == {"psa_ticket_notes", "knowledge_base", "client_summary"}
    assert all(o.status == "draft" for o in outputs)
    assert mock_call_ai.call_count == 3


@pytest.mark.asyncio
async def test_edit_output(client: AsyncClient, test_user, auth_headers, test_db):
    """Editing an output stores edited_content."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="resolved",
        confidence_tier="guided",
        conversation_messages=[],
        resolution_summary="Fixed it",
    )
    test_db.add(session)
    await test_db.flush()

    output = SessionResolutionOutput(
        session_id=session.id,
        output_type="psa_ticket_notes",
        generated_content="Original notes",
        status="draft",
        generated_by_model="claude-sonnet-4-6",
    )
    test_db.add(output)
    await test_db.flush()

    from app.services.resolution_output_generator import ResolutionOutputGenerator
    gen = ResolutionOutputGenerator(test_db)
    edited = await gen.edit_output(output.id, "My edited notes")

    assert edited.edited_content == "My edited notes"
  • Step 2: Run tests to verify they fail

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_resolution_outputs.py -v --no-header 2>&1 | head -15 Expected: FAIL — ModuleNotFoundError

  • Step 3: Implement ResolutionOutputGenerator
# backend/app/services/resolution_output_generator.py
"""Resolution output generator — three deliverables on session resolve.

Generates PSA ticket notes, KB article draft, and client-facing summary.
Each is a separate LLM call through _call_ai.
"""
import logging
from typing import Any
from uuid import UUID

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.dialects.postgresql import insert as pg_insert

from app.core.config import settings
from app.models.ai_session import AISession
from app.models.session_branch import SessionBranch
from app.models.session_resolution_output import SessionResolutionOutput
from app.services.assistant_chat_service import _call_ai

logger = logging.getLogger(__name__)

# Model used for resolution output generation
RESOLUTION_MODEL = "claude-sonnet-4-6"


class ResolutionOutputGenerator:
    """Generates three resolution outputs on session resolve."""

    def __init__(self, db: AsyncSession):
        self.db = db

    async def generate_all(self, session_id: UUID) -> list[SessionResolutionOutput]:
        """Generate all three outputs for a resolved session."""
        result = await self.db.execute(
            select(AISession).where(AISession.id == session_id)
        )
        session = result.scalar_one_or_none()
        if not session:
            raise ValueError(f"Session {session_id} not found")

        # Build session context for prompts
        context = self._build_session_context(session)

        # Generate all three
        outputs = []
        for output_type, prompt in [
            ("psa_ticket_notes", self._psa_notes_prompt(context)),
            ("knowledge_base", self._kb_article_prompt(context)),
            ("client_summary", self._client_summary_prompt(context)),
        ]:
            content, _, _ = await _call_ai(
                system_base="You are a technical documentation assistant for MSP teams.",
                rag_context="",
                history=[],
                new_message=prompt,
                max_tokens=2048,
            )

            output = SessionResolutionOutput(
                session_id=session_id,
                output_type=output_type,
                generated_content=content,
                status="draft",
                generated_by_model=RESOLUTION_MODEL,
            )
            self.db.add(output)
            outputs.append(output)

        await self.db.flush()
        return outputs

    async def edit_output(self, output_id: UUID, edited_content: str) -> SessionResolutionOutput:
        """Save engineer's edited version of an output."""
        result = await self.db.execute(
            select(SessionResolutionOutput).where(SessionResolutionOutput.id == output_id)
        )
        output = result.scalar_one_or_none()
        if not output:
            raise ValueError(f"Output {output_id} not found")

        output.edited_content = edited_content
        await self.db.flush()
        return output

    async def push_output(
        self, output_id: UUID, destination: str
    ) -> SessionResolutionOutput:
        """Push output to a destination (psa, kb_library, clipboard, email)."""
        result = await self.db.execute(
            select(SessionResolutionOutput).where(SessionResolutionOutput.id == output_id)
        )
        output = result.scalar_one_or_none()
        if not output:
            raise ValueError(f"Output {output_id} not found")

        # Use edited content if available, otherwise generated
        content = output.edited_content or output.generated_content

        if destination == "psa":
            # Route to existing PSA documentation service
            # TODO: Wire up psa_documentation_service.push_note()
            pass
        elif destination == "kb_library":
            # TODO: Wire up flow/step library creation
            pass

        from datetime import datetime, timezone
        output.status = "pushed"
        output.pushed_to = destination
        output.pushed_at = datetime.now(timezone.utc)

        await self.db.flush()
        return output

    def _build_session_context(self, session: AISession) -> str:
        """Build a text summary of the session for prompts."""
        parts = [
            f"Problem: {session.problem_summary or 'Unknown'}",
            f"Domain: {session.problem_domain or 'Unknown'}",
            f"Resolution: {session.resolution_summary or 'Not specified'}",
            f"Steps taken: {session.step_count}",
        ]
        # Include conversation highlights
        msgs = session.conversation_messages or []
        if msgs:
            parts.append("\nConversation highlights:")
            for msg in msgs[-10:]:  # Last 10 messages
                role = msg.get("role", "unknown")
                content = msg.get("content", "")[:200]
                parts.append(f"  [{role}]: {content}")

        return "\n".join(parts)

    def _psa_notes_prompt(self, context: str) -> str:
        return (
            f"Generate professional PSA ticket notes for this resolved troubleshooting session.\n"
            f"Format as structured markdown with: Problem, Diagnostic Steps, Resolution, Recommendations.\n\n"
            f"{context}"
        )

    def _kb_article_prompt(self, context: str) -> str:
        return (
            f"Generate a knowledge base article draft from this resolved session.\n"
            f"Include: Symptoms, Root Cause, Resolution Steps, Things to Rule Out First.\n"
            f"Dead-end branches (if any) should become 'Rule Out First' guidance.\n\n"
            f"{context}"
        )

    def _client_summary_prompt(self, context: str) -> str:
        return (
            f"Generate a non-technical summary for the end user/client.\n"
            f"Explain what was wrong and what was done to fix it in plain language.\n"
            f"No jargon. 2-3 paragraphs max.\n\n"
            f"{context}"
        )
  • Step 4: Run tests

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_resolution_outputs.py -v --no-header Expected: All 2 tests PASS.

  • Step 5: Commit
git add backend/tests/test_resolution_outputs.py backend/app/services/resolution_output_generator.py
git commit -m "feat: add ResolutionOutputGenerator with three-output generation"

Task 16: Write Resolution API Endpoints

Files:

  • Create: backend/app/api/endpoints/session_resolutions.py

  • Create: backend/tests/test_session_resolutions_api.py

  • Modify: backend/app/api/router.py

  • Step 1: Write failing API tests

# backend/tests/test_session_resolutions_api.py
"""API tests for resolution output endpoints."""
import pytest
from unittest.mock import patch
from httpx import AsyncClient

from app.models.ai_session import AISession
from app.models.session_resolution_output import SessionResolutionOutput


@pytest.mark.asyncio
@patch("app.services.resolution_output_generator._call_ai")
async def test_resolve_generates_outputs(mock_call_ai, client: AsyncClient, test_user, auth_headers, test_db):
    """POST /ai-sessions/{id}/resolve generates 3 outputs."""
    mock_call_ai.return_value = ("Generated content", 100, 50)

    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="active",
        confidence_tier="guided",
        conversation_messages=[{"role": "user", "content": "help"}],
    )
    test_db.add(session)
    await test_db.commit()

    resp = await client.get(
        f"/api/v1/ai-sessions/{session.id}/outputs",
        headers=auth_headers,
    )
    assert resp.status_code == 200
    data = resp.json()
    assert data["outputs"] == []


@pytest.mark.asyncio
async def test_edit_output_api(client: AsyncClient, test_user, auth_headers, test_db):
    """PATCH /ai-sessions/{id}/outputs/{oid} edits content."""
    session = AISession(
        user_id=test_user["user_data"]["id"],
        account_id=test_user["user_data"]["account_id"],
        session_type="guided",
        intake_type="free_text",
        intake_content={"text": "test"},
        status="resolved",
        confidence_tier="guided",
        conversation_messages=[],
        resolution_summary="Fixed",
    )
    test_db.add(session)
    await test_db.flush()

    output = SessionResolutionOutput(
        session_id=session.id,
        output_type="psa_ticket_notes",
        generated_content="Original",
        status="draft",
        generated_by_model="claude-sonnet-4-6",
    )
    test_db.add(output)
    await test_db.commit()

    resp = await client.patch(
        f"/api/v1/ai-sessions/{session.id}/outputs/{output.id}",
        headers=auth_headers,
        json={"edited_content": "My edited version"},
    )
    assert resp.status_code == 200
    assert resp.json()["edited_content"] == "My edited version"
  • Step 2: Implement session_resolutions endpoint
# backend/app/api/endpoints/session_resolutions.py
"""Resolution output endpoints.

  GET    /ai-sessions/{id}/outputs              — Get all resolution outputs
  PATCH  /ai-sessions/{id}/outputs/{oid}        — Edit output
  POST   /ai-sessions/{id}/outputs/{oid}/push   — Push to destination
"""
import logging
from typing import Annotated
from uuid import UUID

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.deps import get_current_active_user, get_db
from app.models.user import User
from app.models.ai_session import AISession
from app.models.session_resolution_output import SessionResolutionOutput
from app.services.resolution_output_generator import ResolutionOutputGenerator
from app.schemas.session_resolution import (
    ResolutionOutputResponse,
    ResolutionOutputEditRequest,
    ResolutionOutputPushRequest,
    ResolutionOutputPushResponse,
    AllResolutionOutputsResponse,
)

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/ai-sessions/{session_id}", tags=["session-resolutions"])


@router.get("/outputs", response_model=AllResolutionOutputsResponse)
async def get_outputs(
    session_id: UUID,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> AllResolutionOutputsResponse:
    """Get all resolution outputs for a session."""
    result = await db.execute(
        select(SessionResolutionOutput)
        .where(SessionResolutionOutput.session_id == session_id)
        .order_by(SessionResolutionOutput.output_type)
    )
    outputs = result.scalars().all()
    return AllResolutionOutputsResponse(
        outputs=[ResolutionOutputResponse.model_validate(o) for o in outputs]
    )


@router.patch("/outputs/{output_id}", response_model=ResolutionOutputResponse)
async def edit_output(
    session_id: UUID,
    output_id: UUID,
    body: ResolutionOutputEditRequest,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> ResolutionOutputResponse:
    """Edit an output before pushing."""
    gen = ResolutionOutputGenerator(db)
    try:
        output = await gen.edit_output(output_id, body.edited_content)
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))

    await db.commit()
    return ResolutionOutputResponse.model_validate(output)


@router.post("/outputs/{output_id}/push", response_model=ResolutionOutputPushResponse)
async def push_output(
    session_id: UUID,
    output_id: UUID,
    body: ResolutionOutputPushRequest,
    current_user: Annotated[User, Depends(get_current_active_user)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> ResolutionOutputPushResponse:
    """Push output to destination."""
    gen = ResolutionOutputGenerator(db)
    try:
        output = await gen.push_output(output_id, body.destination)
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))

    await db.commit()
    return ResolutionOutputPushResponse(
        output_id=output.id,
        status=output.status,
        pushed_to=output.pushed_to or body.destination,
        pushed_reference=output.pushed_reference,
    )
  • Step 3: Register router

Add to backend/app/api/router.py:

from app.api.endpoints import session_resolutions
api_router.include_router(session_resolutions.router)
  • Step 4: Run tests

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_session_resolutions_api.py -v --no-header Expected: All 2 tests PASS.

  • Step 5: Commit
git add backend/app/api/endpoints/session_resolutions.py backend/tests/test_session_resolutions_api.py backend/app/api/router.py
git commit -m "feat: add resolution output API endpoints"

Phase 5: AI Description Pipeline

Task 17: Extend Upload Endpoint with AI Description

Files:

  • Modify: backend/app/api/endpoints/uploads.py:38-125

  • Step 1: Add background AI description task to upload endpoint

In backend/app/api/endpoints/uploads.py, after the await db.commit() (line ~113), add:

    # Kick off async AI description generation (non-blocking)
    import asyncio
    asyncio.create_task(
        _generate_ai_description(upload.id, file_data, content_type)
    )

And add this function at module level (after the _check_storage_configured function):

async def _generate_ai_description(upload_id: UUID, file_data: bytes, content_type: str) -> None:
    """Background task: generate AI description for uploaded file.

    Catches all exceptions — upload is still usable if this fails.
    """
    try:
        from app.core.database import async_session_maker
        from app.services.assistant_chat_service import _call_ai
        import base64

        async with async_session_maker() as db:
            result = await db.execute(
                select(FileUpload).where(FileUpload.id == upload_id)
            )
            upload = result.scalar_one_or_none()
            if not upload:
                return

            if content_type.startswith("image/"):
                # Image: send to AI with vision
                b64_data = base64.b64encode(file_data).decode("utf-8")
                description, _, _ = await _call_ai(
                    system_base="You are a technical image analyst for IT troubleshooting.",
                    rag_context="",
                    history=[],
                    new_message="Describe this image in one sentence for a troubleshooting context log.",
                    images=[{"media_type": content_type, "data": b64_data}],
                    max_tokens=100,
                )
                upload.ai_description = description
            elif content_type.startswith("text/") or content_type in (
                "application/json", "application/xml", "application/yaml",
            ):
                # Text file: extract content directly
                try:
                    text_content = file_data.decode("utf-8")
                except UnicodeDecodeError:
                    text_content = file_data.decode("latin-1")

                upload.extracted_content = text_content[:10000]  # Cap at 10k chars

                # Summarize if long
                if len(text_content) > 2000:
                    summary, _, _ = await _call_ai(
                        system_base="You are a technical log/config analyst.",
                        rag_context="",
                        history=[],
                        new_message=f"Summarize this file content in 2-3 sentences:\n\n{text_content[:5000]}",
                        max_tokens=200,
                    )
                    upload.content_summary = summary
                    upload.ai_description = summary
                else:
                    upload.ai_description = f"Text file: {upload.filename}"

            await db.commit()
    except Exception:
        logger.exception(f"Failed to generate AI description for upload {upload_id}")

Also add from app.core.database import async_session_maker to the imports if not already present. Check if async_session_maker exists in core/database.py — if not, add it there.

  • Step 2: Verify existing upload tests still pass

Run: cd /home/coder/resolutionflow/backend && pytest tests/ -k "upload" -v --no-header 2>&1 | tail -10 Expected: Existing tests pass (background task is fire-and-forget).

  • Step 3: Commit
git add backend/app/api/endpoints/uploads.py
git commit -m "feat: add async AI description generation on file upload"

Phase 6: Frontend

Task 18: Add TypeScript Types for Branching

Files:

  • Create: frontend/src/types/branching.ts

  • Modify: frontend/src/types/ai-session.ts:185-198

  • Modify: frontend/src/types/index.ts

  • Step 1: Create branching types

// frontend/src/types/branching.ts

// ── Branch ──

export interface BranchResponse {
  id: string
  session_id: string
  parent_branch_id: string | null
  fork_point_step_id: string | null
  branch_order: number
  label: string
  status: 'active' | 'dead_end' | 'solved' | 'untried' | 'revived'
  status_reason: string | null
  status_changed_at: string | null
  context_summary: { tried: string[]; concluded: string; artifacts: string[] } | null
  evidence_from_branch_id: string | null
  evidence_description: string | null
  step_count: number
  created_at: string
  updated_at: string
}

export interface BranchTreeResponse {
  branches: BranchResponse[]
  active_branch_id: string | null
}

// ── Fork ──

export interface ForkOption {
  label: string
  description: string
}

export interface ForkCreateRequest {
  fork_reason: string
  options: ForkOption[]
}

export interface ForkPointResponse {
  id: string
  session_id: string
  parent_branch_id: string
  trigger_step_id: string | null
  fork_reason: string
  options: Array<{ label: string; description: string; branch_id: string; status: string }>
  created_at: string
}

// ── Switch ──

export interface BranchSwitchResponse {
  active_branch_id: string
  branch: BranchResponse
  conversation_messages: Array<{ role: string; content: string }>
}

// ── Revival ──

export interface ReviveRequest {
  evidence_from_branch_id: string
  evidence_description: string
}

// ── Branch message ──

export interface BranchMessageRequest {
  message: string
  upload_ids?: string[]
}

export interface BranchMessageResponse {
  content: string
  branch_id: string
  step_id: string | null
}

// ── Handoff ──

export interface HandoffCreateRequest {
  intent: 'park' | 'escalate'
  engineer_notes?: string
  priority?: 'normal' | 'elevated'
}

export interface HandoffResponse {
  id: string
  session_id: string
  handed_off_by: string
  intent: 'park' | 'escalate'
  source_branch_id: string | null
  snapshot: Record<string, unknown>
  ai_assessment: string | null
  ai_assessment_data: { likely_cause: string; suggested_steps: string[]; confidence: number } | null
  artifacts: Array<{ name: string; type: string; reference: string }> | null
  engineer_notes: string | null
  priority: 'normal' | 'elevated'
  claimed_by: string | null
  claimed_at: string | null
  psa_note_pushed: boolean
  notification_sent: boolean
  created_at: string
}

export interface QueueItemResponse {
  handoff_id: string
  session_id: string
  intent: 'park' | 'escalate'
  problem_summary: string | null
  problem_domain: string | null
  priority: 'normal' | 'elevated'
  handed_off_by_name: string | null
  engineer_notes: string | null
  branch_count: number
  created_at: string
  claimed_by: string | null
  claimed_at: string | null
}

// ── Resolution Outputs ──

export type ResolutionOutputType = 'psa_ticket_notes' | 'knowledge_base' | 'client_summary'
export type ResolutionOutputStatus = 'draft' | 'approved' | 'pushed' | 'rejected'

export interface ResolutionOutputResponse {
  id: string
  session_id: string
  output_type: ResolutionOutputType
  generated_content: string
  structured_data: Record<string, unknown> | null
  edited_content: string | null
  status: ResolutionOutputStatus
  pushed_to: string | null
  pushed_at: string | null
  pushed_reference: string | null
  generated_by_model: string
  created_at: string
  updated_at: string
}

export interface AllResolutionOutputsResponse {
  outputs: ResolutionOutputResponse[]
}

export interface ResolutionOutputEditRequest {
  edited_content: string
}

export type PushDestination = 'psa' | 'kb_library' | 'clipboard' | 'email'

export interface ResolutionOutputPushRequest {
  destination: PushDestination
}
  • Step 2: Add branching fields to AISessionDetail

In frontend/src/types/ai-session.ts, add to AISessionDetail interface (after conversation_messages):

  is_branching: boolean
  active_branch_id: string | null
  • Step 3: Export from types/index.ts

Add export * from './branching' to frontend/src/types/index.ts (or equivalent).

  • Step 4: Verify build

Run: cd /home/coder/resolutionflow/frontend && npx tsc --noEmit 2>&1 | head -20 Expected: No type errors.

  • Step 5: Commit
git add frontend/src/types/branching.ts frontend/src/types/ai-session.ts frontend/src/types/index.ts
git commit -m "feat: add TypeScript types for branching, handoffs, and resolution outputs"

Task 19: Add Frontend API Clients

Files:

  • Create: frontend/src/api/branches.ts

  • Create: frontend/src/api/handoffs.ts

  • Create: frontend/src/api/resolutions.ts

  • Modify: frontend/src/api/index.ts

  • Step 1: Create branches API client

// frontend/src/api/branches.ts
import apiClient from './client'
import type {
  BranchTreeResponse,
  BranchResponse,
  ForkCreateRequest,
  ForkPointResponse,
  BranchSwitchResponse,
  ReviveRequest,
  BranchMessageRequest,
  BranchMessageResponse,
} from '@/types/branching'

export const branchesApi = {
  async getBranches(sessionId: string): Promise<BranchTreeResponse> {
    const resp = await apiClient.get<BranchTreeResponse>(`/ai-sessions/${sessionId}/branches`)
    return resp.data
  },

  async createFork(sessionId: string, data: ForkCreateRequest): Promise<ForkPointResponse> {
    const resp = await apiClient.post<ForkPointResponse>(`/ai-sessions/${sessionId}/branches/fork`, data)
    return resp.data
  },

  async updateBranchStatus(sessionId: string, branchId: string, data: { status: string; status_reason?: string }): Promise<BranchResponse> {
    const resp = await apiClient.patch<BranchResponse>(`/ai-sessions/${sessionId}/branches/${branchId}`, data)
    return resp.data
  },

  async switchBranch(sessionId: string, branchId: string): Promise<BranchSwitchResponse> {
    const resp = await apiClient.post<BranchSwitchResponse>(`/ai-sessions/${sessionId}/branches/${branchId}/switch`)
    return resp.data
  },

  async reviveBranch(sessionId: string, branchId: string, data: ReviveRequest): Promise<BranchResponse> {
    const resp = await apiClient.post<BranchResponse>(`/ai-sessions/${sessionId}/branches/${branchId}/revive`, data)
    return resp.data
  },

  async sendBranchMessage(sessionId: string, branchId: string, data: BranchMessageRequest): Promise<BranchMessageResponse> {
    const resp = await apiClient.post<BranchMessageResponse>(`/ai-sessions/${sessionId}/branches/${branchId}/message`, data)
    return resp.data
  },
}
  • Step 2: Create handoffs API client
// frontend/src/api/handoffs.ts
import apiClient from './client'
import type {
  HandoffCreateRequest,
  HandoffResponse,
  QueueItemResponse,
} from '@/types/branching'

export const handoffsApi = {
  async createHandoff(sessionId: string, data: HandoffCreateRequest): Promise<HandoffResponse> {
    const resp = await apiClient.post<HandoffResponse>(`/ai-sessions/${sessionId}/handoff`, data)
    return resp.data
  },

  async listHandoffs(sessionId: string): Promise<HandoffResponse[]> {
    const resp = await apiClient.get<HandoffResponse[]>(`/ai-sessions/${sessionId}/handoffs`)
    return resp.data
  },

  async claimHandoff(sessionId: string, handoffId: string): Promise<HandoffResponse> {
    const resp = await apiClient.post<HandoffResponse>(`/ai-sessions/${sessionId}/handoffs/${handoffId}/claim`)
    return resp.data
  },

  async getQueue(): Promise<QueueItemResponse[]> {
    const resp = await apiClient.get<QueueItemResponse[]>('/ai-sessions/queue')
    return resp.data
  },
}
  • Step 3: Create resolutions API client
// frontend/src/api/resolutions.ts
import apiClient from './client'
import type {
  AllResolutionOutputsResponse,
  ResolutionOutputResponse,
  ResolutionOutputEditRequest,
  ResolutionOutputPushRequest,
} from '@/types/branching'

export const resolutionsApi = {
  async getOutputs(sessionId: string): Promise<AllResolutionOutputsResponse> {
    const resp = await apiClient.get<AllResolutionOutputsResponse>(`/ai-sessions/${sessionId}/outputs`)
    return resp.data
  },

  async editOutput(sessionId: string, outputId: string, data: ResolutionOutputEditRequest): Promise<ResolutionOutputResponse> {
    const resp = await apiClient.patch<ResolutionOutputResponse>(`/ai-sessions/${sessionId}/outputs/${outputId}`, data)
    return resp.data
  },

  async pushOutput(sessionId: string, outputId: string, data: ResolutionOutputPushRequest): Promise<{ output_id: string; status: string; pushed_to: string; pushed_reference: string | null }> {
    const resp = await apiClient.post(`/ai-sessions/${sessionId}/outputs/${outputId}/push`, data)
    return resp.data
  },
}
  • Step 4: Export from api/index.ts

Add to frontend/src/api/index.ts:

export { branchesApi } from './branches'
export { handoffsApi } from './handoffs'
export { resolutionsApi } from './resolutions'
  • Step 5: Verify build

Run: cd /home/coder/resolutionflow/frontend && npx tsc --noEmit 2>&1 | head -10 Expected: No errors.

  • Step 6: Commit
git add frontend/src/api/branches.ts frontend/src/api/handoffs.ts frontend/src/api/resolutions.ts frontend/src/api/index.ts
git commit -m "feat: add frontend API clients for branches, handoffs, and resolutions"

Task 20: Create useBranching Hook

Files:

  • Create: frontend/src/hooks/useBranching.ts

  • Step 1: Implement the useBranching hook

// frontend/src/hooks/useBranching.ts
import { useState, useCallback } from 'react'
import { branchesApi } from '@/api'
import type {
  BranchResponse,
  BranchTreeResponse,
  ForkCreateRequest,
  ForkPointResponse,
  BranchSwitchResponse,
  BranchMessageResponse,
} from '@/types/branching'
import { toast } from '@/lib/toast'

export interface UseBranching {
  branches: BranchResponse[]
  activeBranchId: string | null
  isLoading: boolean

  loadBranches: (sessionId: string) => Promise<void>
  createFork: (sessionId: string, data: ForkCreateRequest) => Promise<ForkPointResponse | null>
  switchBranch: (sessionId: string, branchId: string) => Promise<BranchSwitchResponse | null>
  updateStatus: (sessionId: string, branchId: string, status: string, reason?: string) => Promise<void>
  reviveBranch: (sessionId: string, branchId: string, evidenceFromId: string, description: string) => Promise<void>
  sendMessage: (sessionId: string, branchId: string, message: string) => Promise<BranchMessageResponse | null>
}

export function useBranching(): UseBranching {
  const [branches, setBranches] = useState<BranchResponse[]>([])
  const [activeBranchId, setActiveBranchId] = useState<string | null>(null)
  const [isLoading, setIsLoading] = useState(false)

  const loadBranches = useCallback(async (sessionId: string) => {
    setIsLoading(true)
    try {
      const data = await branchesApi.getBranches(sessionId)
      setBranches(data.branches)
      setActiveBranchId(data.active_branch_id)
    } catch {
      toast.error('Failed to load branches')
    } finally {
      setIsLoading(false)
    }
  }, [])

  const createFork = useCallback(async (sessionId: string, data: ForkCreateRequest) => {
    try {
      const result = await branchesApi.createFork(sessionId, data)
      await loadBranches(sessionId)
      return result
    } catch {
      toast.error('Failed to create fork')
      return null
    }
  }, [loadBranches])

  const switchBranch = useCallback(async (sessionId: string, branchId: string) => {
    try {
      const result = await branchesApi.switchBranch(sessionId, branchId)
      setActiveBranchId(result.active_branch_id)
      // Update branches list
      setBranches(prev => prev.map(b =>
        b.id === branchId ? result.branch : b
      ))
      return result
    } catch {
      toast.error('Failed to switch branch')
      return null
    }
  }, [])

  const updateStatus = useCallback(async (sessionId: string, branchId: string, status: string, reason?: string) => {
    try {
      const updated = await branchesApi.updateBranchStatus(sessionId, branchId, { status, status_reason: reason })
      setBranches(prev => prev.map(b => b.id === branchId ? updated : b))
    } catch {
      toast.error('Failed to update branch status')
    }
  }, [])

  const reviveBranch = useCallback(async (sessionId: string, branchId: string, evidenceFromId: string, description: string) => {
    try {
      await branchesApi.reviveBranch(sessionId, branchId, {
        evidence_from_branch_id: evidenceFromId,
        evidence_description: description,
      })
      await loadBranches(sessionId)
      toast.success('Branch revived')
    } catch {
      toast.error('Failed to revive branch')
    }
  }, [loadBranches])

  const sendMessage = useCallback(async (sessionId: string, branchId: string, message: string) => {
    try {
      return await branchesApi.sendBranchMessage(sessionId, branchId, { message })
    } catch {
      toast.error('Failed to send message')
      return null
    }
  }, [])

  return {
    branches,
    activeBranchId,
    isLoading,
    loadBranches,
    createFork,
    switchBranch,
    updateStatus,
    reviveBranch,
    sendMessage,
  }
}
  • Step 2: Verify build

Run: cd /home/coder/resolutionflow/frontend && npx tsc --noEmit 2>&1 | head -10 Expected: No errors.

  • Step 3: Commit
git add frontend/src/hooks/useBranching.ts
git commit -m "feat: add useBranching hook for branch state management"

Task 21: Create BranchMap Sidebar Component

Files:

  • Create: frontend/src/components/session/BranchMap.tsx

  • Create: frontend/src/components/session/BranchNode.tsx

  • Step 1: Create BranchNode component

// frontend/src/components/session/BranchNode.tsx
import { cn } from '@/lib/utils'
import { GitBranch, CheckCircle2, XCircle, CircleDot, RotateCcw, Circle } from 'lucide-react'
import type { BranchResponse } from '@/types/branching'

const STATUS_CONFIG = {
  active: { icon: CircleDot, color: 'text-accent', bg: 'bg-accent-dim', label: 'Active' },
  solved: { icon: CheckCircle2, color: 'text-green-400', bg: 'bg-green-400/10', label: 'Solved' },
  dead_end: { icon: XCircle, color: 'text-red-400', bg: 'bg-red-400/10', label: 'Dead End' },
  untried: { icon: Circle, color: 'text-muted', bg: 'bg-elevated', label: 'Untried' },
  revived: { icon: RotateCcw, color: 'text-yellow-400', bg: 'bg-yellow-400/10', label: 'Revived' },
} as const

interface BranchNodeProps {
  branch: BranchResponse
  isActive: boolean
  depth: number
  onClick: (branchId: string) => void
}

export function BranchNode({ branch, isActive, depth, onClick }: BranchNodeProps) {
  const config = STATUS_CONFIG[branch.status] || STATUS_CONFIG.untried
  const Icon = config.icon

  return (
    <button
      onClick={() => onClick(branch.id)}
      className={cn(
        'w-full text-left px-3 py-2 rounded-md flex items-center gap-2 transition-colors',
        'hover:bg-elevated',
        isActive && 'bg-accent-dim border-l-3 border-accent',
      )}
      style={{ paddingLeft: `${12 + depth * 16}px` }}
    >
      <span className={cn('flex-shrink-0', config.color)}>
        <Icon size={14} />
      </span>
      <span className={cn(
        'text-sm truncate flex-1',
        isActive ? 'text-accent-text font-medium' : 'text-secondary',
      )}>
        {branch.label}
      </span>
      <span className={cn(
        'text-[10px] px-1.5 py-0.5 rounded-full',
        config.bg, config.color,
      )}>
        {config.label}
      </span>
    </button>
  )
}
  • Step 2: Create BranchMap component
// frontend/src/components/session/BranchMap.tsx
import { useEffect } from 'react'
import { GitBranch } from 'lucide-react'
import type { BranchResponse } from '@/types/branching'
import { BranchNode } from './BranchNode'

interface BranchMapProps {
  branches: BranchResponse[]
  activeBranchId: string | null
  onSwitchBranch: (branchId: string) => void
}

interface TreeNode {
  branch: BranchResponse
  children: TreeNode[]
  depth: number
}

function buildTree(branches: BranchResponse[]): TreeNode[] {
  const byId = new Map(branches.map(b => [b.id, b]))
  const childrenMap = new Map<string | null, BranchResponse[]>()

  for (const b of branches) {
    const parentId = b.parent_branch_id
    if (!childrenMap.has(parentId)) childrenMap.set(parentId, [])
    childrenMap.get(parentId)!.push(b)
  }

  function buildNode(branch: BranchResponse, depth: number): TreeNode {
    const children = (childrenMap.get(branch.id) || [])
      .sort((a, b) => a.branch_order - b.branch_order)
      .map(child => buildNode(child, depth + 1))
    return { branch, children, depth }
  }

  const roots = (childrenMap.get(null) || [])
    .sort((a, b) => a.branch_order - b.branch_order)

  return roots.map(r => buildNode(r, 0))
}

function flattenTree(nodes: TreeNode[]): TreeNode[] {
  const result: TreeNode[] = []
  for (const node of nodes) {
    result.push(node)
    result.push(...flattenTree(node.children))
  }
  return result
}

export function BranchMap({ branches, activeBranchId, onSwitchBranch }: BranchMapProps) {
  if (branches.length === 0) return null

  const tree = buildTree(branches)
  const flat = flattenTree(tree)

  return (
    <div className="flex flex-col gap-0.5 py-2">
      <div className="px-3 pb-2 flex items-center gap-2">
        <GitBranch size={14} className="text-muted" />
        <span className="text-[10px] font-semibold uppercase tracking-wider text-muted">
          Branch Map
        </span>
      </div>
      {flat.map(node => (
        <BranchNode
          key={node.branch.id}
          branch={node.branch}
          isActive={node.branch.id === activeBranchId}
          depth={node.depth}
          onClick={onSwitchBranch}
        />
      ))}
    </div>
  )
}
  • Step 3: Verify build

Run: cd /home/coder/resolutionflow/frontend && npx tsc --noEmit 2>&1 | head -10 Expected: No errors.

  • Step 4: Commit
git add frontend/src/components/session/BranchMap.tsx frontend/src/components/session/BranchNode.tsx
git commit -m "feat: add BranchMap sidebar with BranchNode tree visualization"

Task 22: Create ForkCard Component

Files:

  • Create: frontend/src/components/session/ForkCard.tsx

  • Step 1: Implement ForkCard

// frontend/src/components/session/ForkCard.tsx
import { GitFork } from 'lucide-react'
import { cn } from '@/lib/utils'

interface ForkOption {
  label: string
  description: string
  branch_id: string
  status: string
}

interface ForkCardProps {
  forkReason: string
  options: ForkOption[]
  activeBranchId: string | null
  onSelectBranch: (branchId: string) => void
}

export function ForkCard({ forkReason, options, activeBranchId, onSelectBranch }: ForkCardProps) {
  return (
    <div className="bg-card border border-default rounded-lg p-4 my-3">
      <div className="flex items-center gap-2 mb-3">
        <GitFork size={16} className="text-accent" />
        <span className="text-sm font-medium text-heading">Fork Point</span>
      </div>
      <p className="text-sm text-secondary mb-3">{forkReason}</p>
      <div className="flex flex-col gap-2">
        {options.map((opt) => (
          <button
            key={opt.branch_id}
            onClick={() => onSelectBranch(opt.branch_id)}
            className={cn(
              'text-left px-3 py-2 rounded-md border transition-colors',
              opt.branch_id === activeBranchId
                ? 'border-accent bg-accent-dim'
                : 'border-default hover:border-hover hover:bg-elevated',
            )}
          >
            <span className={cn(
              'text-sm font-medium block',
              opt.branch_id === activeBranchId ? 'text-accent-text' : 'text-primary',
            )}>
              {opt.label}
            </span>
            <span className="text-xs text-muted">{opt.description}</span>
          </button>
        ))}
      </div>
    </div>
  )
}
  • Step 2: Verify build

Run: cd /home/coder/resolutionflow/frontend && npx tsc --noEmit 2>&1 | head -5 Expected: No errors.

  • Step 3: Commit
git add frontend/src/components/session/ForkCard.tsx
git commit -m "feat: add ForkCard component for in-chat fork decision points"

Task 23: Create HandoffModal Component

Files:

  • Create: frontend/src/components/session/HandoffModal.tsx

  • Step 1: Implement HandoffModal

// frontend/src/components/session/HandoffModal.tsx
import { useState } from 'react'
import { Pause, ArrowUpRight, X } from 'lucide-react'
import { cn } from '@/lib/utils'
import type { HandoffCreateRequest } from '@/types/branching'

interface HandoffModalProps {
  isOpen: boolean
  onClose: () => void
  onSubmit: (data: HandoffCreateRequest) => Promise<void>
}

export function HandoffModal({ isOpen, onClose, onSubmit }: HandoffModalProps) {
  const [intent, setIntent] = useState<'park' | 'escalate'>('park')
  const [notes, setNotes] = useState('')
  const [priority, setPriority] = useState<'normal' | 'elevated'>('normal')
  const [isSubmitting, setIsSubmitting] = useState(false)

  if (!isOpen) return null

  const handleSubmit = async () => {
    setIsSubmitting(true)
    try {
      await onSubmit({ intent, engineer_notes: notes || undefined, priority })
      onClose()
    } finally {
      setIsSubmitting(false)
    }
  }

  return (
    <div className="fixed inset-0 z-50 flex items-end sm:items-center justify-center bg-black/50">
      <div className="bg-card border border-default rounded-lg w-full max-w-full sm:max-w-lg mx-0 sm:mx-4">
        {/* Header */}
        <div className="flex items-center justify-between px-4 py-3 border-b border-default">
          <span className="text-heading font-medium">Hand Off Session</span>
          <button onClick={onClose} className="text-muted hover:text-primary">
            <X size={18} />
          </button>
        </div>

        {/* Intent toggle */}
        <div className="p-4 space-y-4">
          <div className="flex gap-2">
            <button
              onClick={() => setIntent('park')}
              className={cn(
                'flex-1 flex items-center justify-center gap-2 px-3 py-2 rounded-md border text-sm transition-colors',
                intent === 'park'
                  ? 'border-accent bg-accent-dim text-accent-text'
                  : 'border-default text-secondary hover:bg-elevated',
              )}
            >
              <Pause size={14} /> Park
            </button>
            <button
              onClick={() => setIntent('escalate')}
              className={cn(
                'flex-1 flex items-center justify-center gap-2 px-3 py-2 rounded-md border text-sm transition-colors',
                intent === 'escalate'
                  ? 'border-red-400 bg-red-400/10 text-red-400'
                  : 'border-default text-secondary hover:bg-elevated',
              )}
            >
              <ArrowUpRight size={14} /> Escalate
            </button>
          </div>

          {/* Notes */}
          <div>
            <label className="text-[10px] font-semibold uppercase tracking-wider text-muted block mb-1">
              Notes for receiving engineer
            </label>
            <textarea
              value={notes}
              onChange={e => setNotes(e.target.value)}
              placeholder={intent === 'park' ? 'What are you waiting on?' : 'Why does this need escalation?'}
              className="w-full bg-input border border-default rounded-md px-3 py-2 text-sm text-primary placeholder:text-muted focus:border-accent focus:ring-1 focus:ring-accent-dim resize-none"
              rows={3}
            />
          </div>

          {/* Priority (escalate only) */}
          {intent === 'escalate' && (
            <div className="flex items-center gap-2">
              <input
                type="checkbox"
                id="elevated"
                checked={priority === 'elevated'}
                onChange={e => setPriority(e.target.checked ? 'elevated' : 'normal')}
                className="accent-accent"
              />
              <label htmlFor="elevated" className="text-sm text-secondary">
                Elevated priority
              </label>
            </div>
          )}
        </div>

        {/* Footer */}
        <div className="flex justify-end gap-2 px-4 py-3 border-t border-default">
          <button
            onClick={onClose}
            className="px-4 py-2 text-sm text-secondary border border-default rounded-md hover:bg-elevated"
          >
            Cancel
          </button>
          <button
            onClick={handleSubmit}
            disabled={isSubmitting}
            className={cn(
              'px-4 py-2 text-sm font-medium rounded-md text-white',
              intent === 'park' ? 'bg-accent hover:bg-accent/90' : 'bg-red-500 hover:bg-red-600',
              isSubmitting && 'opacity-50 cursor-not-allowed',
            )}
          >
            {isSubmitting ? 'Submitting...' : intent === 'park' ? 'Park Session' : 'Escalate'}
          </button>
        </div>
      </div>
    </div>
  )
}
  • Step 2: Verify build

Run: cd /home/coder/resolutionflow/frontend && npx tsc --noEmit 2>&1 | head -5

  • Step 3: Commit
git add frontend/src/components/session/HandoffModal.tsx
git commit -m "feat: add HandoffModal for unified park/escalate"

Task 24: Create ResolutionOutputPanel Component

Files:

  • Create: frontend/src/components/session/ResolutionOutputPanel.tsx

  • Step 1: Implement ResolutionOutputPanel

// frontend/src/components/session/ResolutionOutputPanel.tsx
import { useState, useEffect } from 'react'
import { FileText, BookOpen, MessageSquare, Copy, Send, Check } from 'lucide-react'
import { cn } from '@/lib/utils'
import { resolutionsApi } from '@/api'
import type { ResolutionOutputResponse, ResolutionOutputType, PushDestination } from '@/types/branching'
import { toast } from '@/lib/toast'

const TABS: { type: ResolutionOutputType; label: string; icon: typeof FileText }[] = [
  { type: 'psa_ticket_notes', label: 'PSA Notes', icon: FileText },
  { type: 'knowledge_base', label: 'KB Article', icon: BookOpen },
  { type: 'client_summary', label: 'Client Summary', icon: MessageSquare },
]

interface ResolutionOutputPanelProps {
  sessionId: string
}

export function ResolutionOutputPanel({ sessionId }: ResolutionOutputPanelProps) {
  const [outputs, setOutputs] = useState<ResolutionOutputResponse[]>([])
  const [activeTab, setActiveTab] = useState<ResolutionOutputType>('psa_ticket_notes')
  const [isEditing, setIsEditing] = useState(false)
  const [editContent, setEditContent] = useState('')
  const [isLoading, setIsLoading] = useState(true)

  useEffect(() => {
    const load = async () => {
      try {
        const data = await resolutionsApi.getOutputs(sessionId)
        setOutputs(data.outputs)
      } catch {
        toast.error('Failed to load resolution outputs')
      } finally {
        setIsLoading(false)
      }
    }
    load()
  }, [sessionId])

  const activeOutput = outputs.find(o => o.output_type === activeTab)
  const displayContent = activeOutput?.edited_content || activeOutput?.generated_content || ''

  const handleSaveEdit = async () => {
    if (!activeOutput) return
    try {
      const updated = await resolutionsApi.editOutput(sessionId, activeOutput.id, { edited_content: editContent })
      setOutputs(prev => prev.map(o => o.id === updated.id ? updated : o))
      setIsEditing(false)
      toast.success('Changes saved')
    } catch {
      toast.error('Failed to save changes')
    }
  }

  const handleCopy = async () => {
    await navigator.clipboard.writeText(displayContent)
    toast.success('Copied to clipboard')
  }

  const handlePush = async (destination: PushDestination) => {
    if (!activeOutput) return
    try {
      await resolutionsApi.pushOutput(sessionId, activeOutput.id, { destination })
      toast.success(`Pushed to ${destination}`)
    } catch {
      toast.error('Failed to push output')
    }
  }

  if (isLoading) {
    return <div className="p-4 text-sm text-muted">Loading outputs...</div>
  }

  if (outputs.length === 0) {
    return <div className="p-4 text-sm text-muted">No resolution outputs generated yet.</div>
  }

  return (
    <div className="bg-card border border-default rounded-lg overflow-hidden">
      {/* Tabs */}
      <div className="flex border-b border-default">
        {TABS.map(tab => {
          const output = outputs.find(o => o.output_type === tab.type)
          const Icon = tab.icon
          return (
            <button
              key={tab.type}
              onClick={() => { setActiveTab(tab.type); setIsEditing(false) }}
              className={cn(
                'flex-1 flex items-center justify-center gap-1.5 px-3 py-2 text-sm transition-colors',
                activeTab === tab.type
                  ? 'text-accent border-b-2 border-accent'
                  : 'text-secondary hover:text-primary',
              )}
            >
              <Icon size={14} />
              {tab.label}
              {output?.status === 'pushed' && <Check size={12} className="text-green-400" />}
            </button>
          )
        })}
      </div>

      {/* Content */}
      <div className="p-4">
        {isEditing ? (
          <textarea
            value={editContent}
            onChange={e => setEditContent(e.target.value)}
            className="w-full bg-input border border-default rounded-md px-3 py-2 text-sm text-primary font-mono resize-y min-h-[200px]"
          />
        ) : (
          <div className="prose prose-invert prose-sm max-w-none text-secondary whitespace-pre-wrap">
            {displayContent}
          </div>
        )}
      </div>

      {/* Actions */}
      <div className="flex justify-between items-center px-4 py-3 border-t border-default">
        <div className="flex gap-2">
          {isEditing ? (
            <>
              <button onClick={handleSaveEdit} className="px-3 py-1.5 text-xs bg-accent text-white rounded-md hover:bg-accent/90">Save</button>
              <button onClick={() => setIsEditing(false)} className="px-3 py-1.5 text-xs text-secondary border border-default rounded-md hover:bg-elevated">Cancel</button>
            </>
          ) : (
            <button
              onClick={() => { setEditContent(displayContent); setIsEditing(true) }}
              className="px-3 py-1.5 text-xs text-secondary border border-default rounded-md hover:bg-elevated"
            >
              Edit
            </button>
          )}
        </div>
        <div className="flex gap-2">
          <button onClick={handleCopy} className="p-1.5 text-muted hover:text-primary" title="Copy">
            <Copy size={14} />
          </button>
          {activeTab === 'psa_ticket_notes' && (
            <button onClick={() => handlePush('psa')} className="px-3 py-1.5 text-xs bg-accent text-white rounded-md hover:bg-accent/90 flex items-center gap-1">
              <Send size={12} /> Push to PSA
            </button>
          )}
        </div>
      </div>
    </div>
  )
}
  • Step 2: Verify build

Run: cd /home/coder/resolutionflow/frontend && npx tsc --noEmit 2>&1 | head -5

  • Step 3: Commit
git add frontend/src/components/session/ResolutionOutputPanel.tsx
git commit -m "feat: add ResolutionOutputPanel with three-tab view, edit, and push"

Task 25: Integrate Resolution Outputs Into Existing Resolve Endpoint

Files:

  • Modify: backend/app/api/endpoints/ai_sessions.py (the existing resolve handler)

  • Step 1: Find the existing resolve endpoint

Run: cd /home/coder/resolutionflow/backend && grep -n "def resolve" app/api/endpoints/ai_sessions.py Note the function name and line number.

  • Step 2: Add resolution output generation after resolve

After the existing resolve logic completes (session status set to "resolved", documentation generated), add:

    # Generate resolution outputs (branching feature)
    try:
        from app.services.resolution_output_generator import ResolutionOutputGenerator
        gen = ResolutionOutputGenerator(db)
        await gen.generate_all(session.id)
    except Exception:
        logger.exception(f"Failed to generate resolution outputs for session {session.id}")
        # Non-blocking — resolve still succeeds even if output generation fails

This is fire-and-forget — if output generation fails, the resolve still completes.

  • Step 3: Verify existing resolve tests still pass

Run: cd /home/coder/resolutionflow/backend && pytest tests/test_ai_sessions.py -k "resolve" -v --no-header Expected: Existing tests pass. Output generation is mocked/skipped in tests without Anthropic keys.

  • Step 4: Commit
git add backend/app/api/endpoints/ai_sessions.py
git commit -m "feat: generate resolution outputs on session resolve"

Task 26: Create Remaining Frontend Components

Files:

  • Create: frontend/src/components/session/BranchTransitionBar.tsx

  • Create: frontend/src/components/session/BranchRevivalCard.tsx

  • Create: frontend/src/hooks/useHandoff.ts

  • Create: frontend/src/hooks/useResolutionOutputs.ts

  • Create: frontend/src/pages/SessionQueuePage.tsx

  • Modify: frontend/src/router.tsx

  • Step 1: Create BranchTransitionBar

// frontend/src/components/session/BranchTransitionBar.tsx
import { ArrowRight } from 'lucide-react'
import type { BranchResponse } from '@/types/branching'

interface BranchTransitionBarProps {
  fromBranch: BranchResponse | null
  toBranch: BranchResponse
}

export function BranchTransitionBar({ fromBranch, toBranch }: BranchTransitionBarProps) {
  return (
    <div className="bg-accent-dim border border-accent/20 rounded-md px-3 py-2 my-2 flex items-center gap-2 text-sm">
      <span className="text-muted">Switched to</span>
      <span className="text-accent-text font-medium">{toBranch.label}</span>
      {fromBranch && (
        <>
          <ArrowRight size={12} className="text-muted" />
          <span className="text-muted">from {fromBranch.label}</span>
        </>
      )}
    </div>
  )
}
  • Step 2: Create BranchRevivalCard
// frontend/src/components/session/BranchRevivalCard.tsx
import { RotateCcw } from 'lucide-react'
import type { BranchResponse } from '@/types/branching'

interface BranchRevivalCardProps {
  branch: BranchResponse
  evidenceSource: BranchResponse | null
}

export function BranchRevivalCard({ branch, evidenceSource }: BranchRevivalCardProps) {
  if (branch.status !== 'revived') return null

  return (
    <div className="bg-yellow-400/5 border border-yellow-400/20 rounded-md px-3 py-2 my-2">
      <div className="flex items-center gap-2 text-sm">
        <RotateCcw size={14} className="text-yellow-400" />
        <span className="text-yellow-400 font-medium">Branch Revived</span>
      </div>
      {branch.evidence_description && (
        <p className="text-xs text-secondary mt-1">{branch.evidence_description}</p>
      )}
      {evidenceSource && (
        <p className="text-xs text-muted mt-0.5">Evidence from: {evidenceSource.label}</p>
      )}
    </div>
  )
}
  • Step 3: Create useHandoff hook
// frontend/src/hooks/useHandoff.ts
import { useState, useCallback } from 'react'
import { handoffsApi } from '@/api'
import type { HandoffCreateRequest, HandoffResponse, QueueItemResponse } from '@/types/branching'
import { toast } from '@/lib/toast'

export function useHandoff() {
  const [isSubmitting, setIsSubmitting] = useState(false)
  const [queue, setQueue] = useState<QueueItemResponse[]>([])
  const [isLoadingQueue, setIsLoadingQueue] = useState(false)

  const createHandoff = useCallback(async (sessionId: string, data: HandoffCreateRequest): Promise<HandoffResponse | null> => {
    setIsSubmitting(true)
    try {
      const result = await handoffsApi.createHandoff(sessionId, data)
      toast.success(data.intent === 'park' ? 'Session parked' : 'Session escalated')
      return result
    } catch {
      toast.error('Failed to hand off session')
      return null
    } finally {
      setIsSubmitting(false)
    }
  }, [])

  const claimHandoff = useCallback(async (sessionId: string, handoffId: string): Promise<HandoffResponse | null> => {
    try {
      const result = await handoffsApi.claimHandoff(sessionId, handoffId)
      toast.success('Session claimed')
      return result
    } catch {
      toast.error('Failed to claim session')
      return null
    }
  }, [])

  const loadQueue = useCallback(async () => {
    setIsLoadingQueue(true)
    try {
      const items = await handoffsApi.getQueue()
      setQueue(items)
    } catch {
      toast.error('Failed to load queue')
    } finally {
      setIsLoadingQueue(false)
    }
  }, [])

  return { isSubmitting, queue, isLoadingQueue, createHandoff, claimHandoff, loadQueue }
}
  • Step 4: Create useResolutionOutputs hook
// frontend/src/hooks/useResolutionOutputs.ts
import { useState, useCallback } from 'react'
import { resolutionsApi } from '@/api'
import type { ResolutionOutputResponse, PushDestination } from '@/types/branching'
import { toast } from '@/lib/toast'

export function useResolutionOutputs(sessionId: string) {
  const [outputs, setOutputs] = useState<ResolutionOutputResponse[]>([])
  const [isLoading, setIsLoading] = useState(false)

  const loadOutputs = useCallback(async () => {
    setIsLoading(true)
    try {
      const data = await resolutionsApi.getOutputs(sessionId)
      setOutputs(data.outputs)
    } catch {
      toast.error('Failed to load resolution outputs')
    } finally {
      setIsLoading(false)
    }
  }, [sessionId])

  const editOutput = useCallback(async (outputId: string, content: string) => {
    try {
      const updated = await resolutionsApi.editOutput(sessionId, outputId, { edited_content: content })
      setOutputs(prev => prev.map(o => o.id === updated.id ? updated : o))
      toast.success('Changes saved')
    } catch {
      toast.error('Failed to save changes')
    }
  }, [sessionId])

  const pushOutput = useCallback(async (outputId: string, destination: PushDestination) => {
    try {
      await resolutionsApi.pushOutput(sessionId, outputId, { destination })
      toast.success(`Pushed to ${destination}`)
      await loadOutputs()
    } catch {
      toast.error('Failed to push output')
    }
  }, [sessionId, loadOutputs])

  return { outputs, isLoading, loadOutputs, editOutput, pushOutput }
}
  • Step 5: Create SessionQueuePage
// frontend/src/pages/SessionQueuePage.tsx
import { useEffect } from 'react'
import { useNavigate } from 'react-router-dom'
import { Inbox, ArrowUpRight, Pause, Clock } from 'lucide-react'
import { cn } from '@/lib/utils'
import { useHandoff } from '@/hooks/useHandoff'

export default function SessionQueuePage() {
  const navigate = useNavigate()
  const { queue, isLoadingQueue, loadQueue, claimHandoff } = useHandoff()

  useEffect(() => {
    loadQueue()
  }, [loadQueue])

  const handleClaim = async (item: typeof queue[0]) => {
    const result = await claimHandoff(item.session_id, item.handoff_id)
    if (result) {
      navigate(`/pilot?sessionId=${item.session_id}`)
    }
  }

  return (
    <div className="flex-1 p-6">
      <div className="flex items-center gap-2 mb-6">
        <Inbox size={20} className="text-accent" />
        <h1 className="text-xl font-heading font-bold text-heading">Session Queue</h1>
      </div>

      {isLoadingQueue ? (
        <p className="text-sm text-muted">Loading queue...</p>
      ) : queue.length === 0 ? (
        <div className="text-center py-12">
          <Inbox size={40} className="text-muted mx-auto mb-3" />
          <p className="text-sm text-muted">No sessions waiting</p>
        </div>
      ) : (
        <div className="space-y-2">
          {queue.map(item => (
            <div
              key={item.handoff_id}
              className="bg-card border border-default rounded-lg p-4 flex items-center justify-between hover:border-hover transition-colors"
            >
              <div className="flex-1">
                <div className="flex items-center gap-2 mb-1">
                  {item.intent === 'escalate' ? (
                    <ArrowUpRight size={14} className="text-red-400" />
                  ) : (
                    <Pause size={14} className="text-yellow-400" />
                  )}
                  <span className="text-sm font-medium text-heading">
                    {item.problem_summary || 'Untitled session'}
                  </span>
                  {item.priority === 'elevated' && (
                    <span className="text-[10px] px-1.5 py-0.5 rounded-full bg-red-400/10 text-red-400">
                      Elevated
                    </span>
                  )}
                </div>
                {item.engineer_notes && (
                  <p className="text-xs text-secondary">{item.engineer_notes}</p>
                )}
                <div className="flex items-center gap-2 mt-1 text-xs text-muted">
                  <Clock size={10} />
                  <span>{new Date(item.created_at).toLocaleString()}</span>
                  {item.problem_domain && <span>· {item.problem_domain}</span>}
                </div>
              </div>
              <button
                onClick={() => handleClaim(item)}
                className="px-3 py-1.5 text-xs bg-accent text-white rounded-md hover:bg-accent/90"
              >
                Claim
              </button>
            </div>
          ))}
        </div>
      )}
    </div>
  )
}
  • Step 6: Add route in router.tsx

In frontend/src/router.tsx, find the route children array inside the ProtectedRoute/AppLayout wrapper. Add:

{
  path: 'queue',
  lazy: async () => {
    const { default: SessionQueuePage } = await lazyWithRetry(() => import('@/pages/SessionQueuePage'))
    return { Component: SessionQueuePage }
  },
},

Use lazyWithRetry per CLAUDE.md lesson 98.

  • Step 7: Verify build

Run: cd /home/coder/resolutionflow/frontend && npm run build 2>&1 | tail -10 Expected: Build succeeds.

  • Step 8: Commit
git add frontend/src/components/session/BranchTransitionBar.tsx frontend/src/components/session/BranchRevivalCard.tsx frontend/src/hooks/useHandoff.ts frontend/src/hooks/useResolutionOutputs.ts frontend/src/pages/SessionQueuePage.tsx frontend/src/router.tsx
git commit -m "feat: add remaining frontend components — transition bar, revival card, queue page, hooks"

Task 27: Final Integration — Run Full Test Suite and Build

Files: No new files — verification only.

  • Step 1: Run all backend tests

Run: cd /home/coder/resolutionflow/backend && pytest --override-ini="addopts=" -v --no-header 2>&1 | tail -30 Expected: All tests pass including existing ones.

  • Step 2: Run frontend build

Run: cd /home/coder/resolutionflow/frontend && npm run build 2>&1 | tail -20 Expected: Build succeeds with no errors. (Per CLAUDE.md lesson 92, tsc -b in the build is stricter.)

  • Step 3: Fix any errors found in steps 1-2

Address any type errors, import issues, or test failures.

  • Step 4: Final commit
git add -A
git commit -m "feat: conversational branching — complete backend + frontend implementation"