From 58c2a80d8a29f7e0a7132d38a79d2a0f5d81ef30 Mon Sep 17 00:00:00 2001 From: chihlasm Date: Tue, 24 Mar 2026 08:15:19 +0000 Subject: [PATCH 01/57] docs: add Conversational Branching implementation plan 27-task plan covering 6 phases: data foundation (4 models, migration), branch engine (BranchManager, prompt builder, API), handoff system (park/escalate with dual-write), resolution outputs (3-output generator), AI description pipeline, and frontend (BranchMap, ForkCard, HandoffModal, ResolutionOutputPanel, SessionQueuePage). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../2026-03-24-conversational-branching.md | 5051 +++++++++++++++++ 1 file changed, 5051 insertions(+) create mode 100644 docs/superpowers/plans/2026-03-24-conversational-branching.md diff --git a/docs/superpowers/plans/2026-03-24-conversational-branching.md b/docs/superpowers/plans/2026-03-24-conversational-branching.md new file mode 100644 index 00000000..8f40987d --- /dev/null +++ b/docs/superpowers/plans/2026-03-24-conversational-branching.md @@ -0,0 +1,5051 @@ +# Conversational Branching Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add branching troubleshooting to FlowPilot — engineers explore multiple diagnostic hypotheses as first-class branches with cross-branch AI context, unified park/escalate handoff, and three-output resolution packages. + +**Architecture:** Additive layer on top of existing FlowPilot. Four new DB tables + nullable columns on three existing tables. Four new backend services that call `_call_ai` from `assistant_chat_service.py` for all LLM interactions. Frontend adds Branch Map sidebar, Fork Cards, Handoff modal, and Resolution panel. All branching logic gated behind `session.is_branching` — removing the feature = drop tables + remove guards. + +**Tech Stack:** Python FastAPI, SQLAlchemy 2.0 (async), PostgreSQL, Alembic migrations, React 19 + TypeScript, Tailwind CSS v4, Zustand, Axios, Lucide React icons. + +**Spec:** `docs/superpowers/specs/2026-03-24-conversational-branching-design.md` + +--- + +## File Structure + +### New Backend Files + +| File | Responsibility | +|------|---------------| +| `backend/app/models/session_branch.py` | `SessionBranch` SQLAlchemy model | +| `backend/app/models/fork_point.py` | `ForkPoint` SQLAlchemy model | +| `backend/app/models/session_handoff.py` | `SessionHandoff` SQLAlchemy model | +| `backend/app/models/session_resolution_output.py` | `SessionResolutionOutput` SQLAlchemy model | +| `backend/app/schemas/session_branch.py` | Pydantic schemas for branches + fork points | +| `backend/app/schemas/session_handoff.py` | Pydantic schemas for handoffs | +| `backend/app/schemas/session_resolution.py` | Pydantic schemas for resolution outputs | +| `backend/app/services/branch_manager.py` | Branch lifecycle: create root, fork, switch, mark status, revive, tree query, context summary | +| `backend/app/services/branch_aware_prompt_builder.py` | Pure function: assembles system prompt + messages + images with cross-branch context for `_call_ai` | +| `backend/app/services/handoff_manager.py` | Park/escalate with dual-write backward compat, snapshot, AI assessment, claim, PSA push | +| `backend/app/services/resolution_output_generator.py` | PSA notes, KB article, client summary — three LLM calls on resolve | +| `backend/app/api/endpoints/session_branches.py` | Branch CRUD + fork + switch + revive + branch message endpoints | +| `backend/app/api/endpoints/session_handoffs.py` | Handoff create + history + claim + queue endpoints | +| `backend/app/api/endpoints/session_resolutions.py` | Resolution output CRUD + push endpoints | +| `backend/alembic/versions/030_add_conversational_branching.py` | Single migration: 4 new tables + columns on 3 existing tables | +| `backend/tests/test_branch_manager.py` | Integration tests for BranchManager | +| `backend/tests/test_branch_aware_prompt_builder.py` | Unit tests for prompt builder (pure function, no DB) | +| `backend/tests/test_handoff_manager.py` | Integration tests for HandoffManager | +| `backend/tests/test_resolution_outputs.py` | Integration tests for ResolutionOutputGenerator | +| `backend/tests/test_session_branches_api.py` | API endpoint tests for branches | +| `backend/tests/test_session_handoffs_api.py` | API endpoint tests for handoffs | +| `backend/tests/test_session_resolutions_api.py` | API endpoint tests for resolutions | + +### New Frontend Files + +| File | Responsibility | +|------|---------------| +| `frontend/src/types/branching.ts` | TypeScript interfaces for branches, forks, handoffs, resolution outputs | +| `frontend/src/api/branches.ts` | Axios client for branch endpoints | +| `frontend/src/api/handoffs.ts` | Axios client for handoff endpoints | +| `frontend/src/api/resolutions.ts` | Axios client for resolution output endpoints | +| `frontend/src/hooks/useBranching.ts` | Branch state management hook | +| `frontend/src/hooks/useHandoff.ts` | Handoff flow state hook | +| `frontend/src/hooks/useResolutionOutputs.ts` | Resolution output state hook | +| `frontend/src/components/session/BranchMap.tsx` | Sidebar tree visualization with status badges | +| `frontend/src/components/session/BranchNode.tsx` | Individual node in branch map | +| `frontend/src/components/session/ForkCard.tsx` | In-chat fork decision point card | +| `frontend/src/components/session/BranchTransitionBar.tsx` | Context bar shown on branch switch | +| `frontend/src/components/session/BranchRevivalCard.tsx` | Evidence card for revival | +| `frontend/src/components/session/HandoffModal.tsx` | Unified park/escalate modal | +| `frontend/src/components/session/ResolutionOutputPanel.tsx` | Three-tab resolution view + edit + push | +| `frontend/src/pages/SessionQueuePage.tsx` | Team queue for parked/escalated sessions | + +### Modified Existing Files + +| File | Change | +|------|--------| +| `backend/app/models/ai_session.py` | Add 5 columns: `is_branching`, `active_branch_id`, `handoff_count`, `total_active_seconds`, `total_parked_seconds` | +| `backend/app/models/ai_session_step.py` | Add 3 columns: `branch_id`, `is_fork_point`, `fork_point_id`. Add `'fork'` to step_type CHECK constraint | +| `backend/app/models/file_upload.py` | Add 5 columns: `ai_description`, `extracted_content`, `content_summary`, `uploaded_on_branch_id`, `uploaded_at_step_id` | +| `backend/app/models/__init__.py` | Import 4 new models, add to `__all__` | +| `backend/alembic/env.py` | Import 4 new models for metadata tracking | +| `backend/app/api/router.py` | Register 3 new endpoint routers | +| `backend/app/schemas/ai_session.py` | Add `is_branching`, `active_branch_id` to `AISessionDetail` | +| `backend/app/services/unified_chat_service.py` | Add `if session.is_branching:` guard to route messages to branch | +| `backend/app/services/flowpilot_engine.py` | Set `branch_id` on `AISessionStep` creation when `is_branching=True` | +| `backend/app/api/endpoints/uploads.py` | Add async AI description generation background task on upload | +| `frontend/src/types/ai-session.ts` | Add `is_branching`, `active_branch_id` to `AISessionDetail` | +| `frontend/src/api/index.ts` | Export new API clients | +| `frontend/src/types/index.ts` | Export new types | +| `frontend/src/pages/FlowPilotSessionPage.tsx` | Integrate BranchMap sidebar, ForkCard rendering, branch-aware message routing | +| `frontend/src/router.tsx` | Add SessionQueuePage route | + +--- + +## Phase 1: Data Foundation + +### Task 1: Create SessionBranch Model + +**Files:** +- Create: `backend/app/models/session_branch.py` +- Test: `backend/tests/test_branch_manager.py` (started here, completed in Phase 2) + +- [ ] **Step 1: Write the SessionBranch model** + +```python +# backend/app/models/session_branch.py +"""Session branch model — represents a diagnostic hypothesis path within a FlowPilot session.""" +import uuid +from datetime import datetime, timezone +from typing import Optional, Any, TYPE_CHECKING + +from sqlalchemy import String, Text, DateTime, ForeignKey, Integer, CheckConstraint, Index +from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.dialects.postgresql import UUID, JSONB + +from app.core.database import Base + +if TYPE_CHECKING: + from app.models.ai_session import AISession + from app.models.ai_session_step import AISessionStep + from app.models.user import User + + +class SessionBranch(Base): + """A diagnostic branch within a FlowPilot session. + + Each branch represents one hypothesis being explored. Branches form a tree + via parent_branch_id (NULL = root branch). Status tracks the outcome: + active, dead_end, solved, untried, revived. + """ + __tablename__ = "session_branches" + __table_args__ = ( + CheckConstraint( + "status IN ('active', 'dead_end', 'solved', 'untried', 'revived')", + name="ck_session_branches_status", + ), + CheckConstraint( + "branch_order > 0", + name="ck_session_branches_branch_order_positive", + ), + Index("ix_session_branches_session_id", "session_id"), + Index("ix_session_branches_parent_branch_id", "parent_branch_id"), + Index("ix_session_branches_session_status", "session_id", "status"), + Index("ix_session_branches_session_order", "session_id", "branch_order"), + ) + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + session_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("ai_sessions.id", ondelete="CASCADE"), + nullable=False, + ) + parent_branch_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("session_branches.id", ondelete="CASCADE"), + nullable=True, + ) + fork_point_step_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("ai_session_steps.id", ondelete="SET NULL"), + nullable=True, + ) + branch_order: Mapped[int] = mapped_column(Integer, nullable=False, default=1) + label: Mapped[str] = mapped_column(String(200), nullable=False) + status: Mapped[str] = mapped_column(String(20), nullable=False, default="active") + status_reason: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + status_changed_at: Mapped[Optional[datetime]] = mapped_column( + DateTime(timezone=True), nullable=True + ) + status_changed_by: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="SET NULL"), + nullable=True, + ) + conversation_messages: Mapped[list[dict[str, Any]]] = mapped_column( + JSONB, nullable=False, default=list, + comment="LLM message history scoped to this branch", + ) + context_summary: Mapped[Optional[dict[str, Any]]] = mapped_column( + JSONB, nullable=True, + comment="{tried: [], concluded: str, artifacts: []}", + ) + evidence_from_branch_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("session_branches.id", ondelete="SET NULL"), + nullable=True, + ) + evidence_description: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=lambda: datetime.now(timezone.utc), + onupdate=lambda: datetime.now(timezone.utc), + ) + + # Relationships + session: Mapped["AISession"] = relationship("AISession", foreign_keys=[session_id]) + parent_branch: Mapped[Optional["SessionBranch"]] = relationship( + "SessionBranch", remote_side="SessionBranch.id", + foreign_keys=[parent_branch_id], + ) + fork_point_step: Mapped[Optional["AISessionStep"]] = relationship( + "AISessionStep", foreign_keys=[fork_point_step_id] + ) + status_changed_by_user: Mapped[Optional["User"]] = relationship( + "User", foreign_keys=[status_changed_by] + ) + evidence_source: Mapped[Optional["SessionBranch"]] = relationship( + "SessionBranch", remote_side="SessionBranch.id", + foreign_keys=[evidence_from_branch_id], + ) +``` + +- [ ] **Step 2: Verify file was created correctly** + +Run: `cd /home/coder/resolutionflow/backend && python -c "from app.models.session_branch import SessionBranch; print('OK:', SessionBranch.__tablename__)"` +Expected: `OK: session_branches` + +- [ ] **Step 3: Commit** + +```bash +git add backend/app/models/session_branch.py +git commit -m "feat: add SessionBranch model for conversational branching" +``` + +### Task 2: Create ForkPoint Model + +**Files:** +- Create: `backend/app/models/fork_point.py` + +- [ ] **Step 1: Write the ForkPoint model** + +```python +# backend/app/models/fork_point.py +"""Fork point model — captures decision points where a session branches.""" +import uuid +from datetime import datetime, timezone +from typing import Optional, Any, TYPE_CHECKING + +from sqlalchemy import Text, DateTime, ForeignKey +from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.dialects.postgresql import UUID, JSONB + +from app.core.database import Base + +if TYPE_CHECKING: + from app.models.ai_session import AISession + from app.models.session_branch import SessionBranch + from app.models.ai_session_step import AISessionStep + + +class ForkPoint(Base): + """A decision point where a session forks into multiple branches. + + Stores the fork reason and the options presented (each becoming a branch). + options JSONB: [{label, description, branch_id, status}] + """ + __tablename__ = "fork_points" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + session_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("ai_sessions.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + parent_branch_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("session_branches.id", ondelete="CASCADE"), + nullable=False, + ) + trigger_step_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("ai_session_steps.id", ondelete="SET NULL"), + nullable=True, + ) + fork_reason: Mapped[str] = mapped_column(Text, nullable=False) + options: Mapped[list[dict[str, Any]]] = mapped_column( + JSONB, nullable=False, default=list, + comment="[{label, description, branch_id, status}]", + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) + + # Relationships + session: Mapped["AISession"] = relationship("AISession") + parent_branch: Mapped["SessionBranch"] = relationship("SessionBranch") + trigger_step: Mapped[Optional["AISessionStep"]] = relationship("AISessionStep") +``` + +- [ ] **Step 2: Verify file was created correctly** + +Run: `cd /home/coder/resolutionflow/backend && python -c "from app.models.fork_point import ForkPoint; print('OK:', ForkPoint.__tablename__)"` +Expected: `OK: fork_points` + +- [ ] **Step 3: Commit** + +```bash +git add backend/app/models/fork_point.py +git commit -m "feat: add ForkPoint model for branch decision points" +``` + +### Task 3: Create SessionHandoff Model + +**Files:** +- Create: `backend/app/models/session_handoff.py` + +- [ ] **Step 1: Write the SessionHandoff model** + +```python +# backend/app/models/session_handoff.py +"""Session handoff model — unified park/escalate with history.""" +import uuid +from datetime import datetime, timezone +from typing import Optional, Any, TYPE_CHECKING + +from sqlalchemy import String, Text, Boolean, DateTime, ForeignKey, CheckConstraint +from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.dialects.postgresql import UUID, JSONB + +from app.core.database import Base + +if TYPE_CHECKING: + from app.models.ai_session import AISession + from app.models.session_branch import SessionBranch + from app.models.user import User + + +class SessionHandoff(Base): + """A handoff event — either parking or escalating a session. + + Captures a snapshot of the session state at handoff time, including + branch map, AI assessment (for escalations), and artifacts. + Dual-writes to ai_sessions.escalation_package for backward compat. + """ + __tablename__ = "session_handoffs" + __table_args__ = ( + CheckConstraint( + "intent IN ('park', 'escalate')", + name="ck_session_handoffs_intent", + ), + CheckConstraint( + "priority IN ('normal', 'elevated')", + name="ck_session_handoffs_priority", + ), + ) + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + session_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("ai_sessions.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + handed_off_by: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="CASCADE"), + nullable=False, + ) + intent: Mapped[str] = mapped_column(String(20), nullable=False) + source_branch_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("session_branches.id", ondelete="SET NULL"), + nullable=True, + ) + snapshot: Mapped[dict[str, Any]] = mapped_column( + JSONB, nullable=False, default=dict, + comment="Branch map, status, next step, waiting on, watch out", + ) + ai_assessment: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + ai_assessment_data: Mapped[Optional[dict[str, Any]]] = mapped_column( + JSONB, nullable=True, + comment="{likely_cause, suggested_steps, confidence}", + ) + artifacts: Mapped[Optional[list[dict[str, Any]]]] = mapped_column( + JSONB, nullable=True, + comment="[{name, type, reference}]", + ) + engineer_notes: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + priority: Mapped[str] = mapped_column( + String(20), nullable=False, default="normal" + ) + claimed_by: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("users.id", ondelete="SET NULL"), + nullable=True, + ) + claimed_at: Mapped[Optional[datetime]] = mapped_column( + DateTime(timezone=True), nullable=True + ) + psa_note_pushed: Mapped[bool] = mapped_column(Boolean, default=False) + psa_note_id: Mapped[Optional[str]] = mapped_column( + String(100), nullable=True + ) + notification_sent: Mapped[bool] = mapped_column(Boolean, default=False) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) + + # Relationships + session: Mapped["AISession"] = relationship("AISession") + handed_off_by_user: Mapped["User"] = relationship("User", foreign_keys=[handed_off_by]) + source_branch: Mapped[Optional["SessionBranch"]] = relationship("SessionBranch") + claimed_by_user: Mapped[Optional["User"]] = relationship("User", foreign_keys=[claimed_by]) +``` + +- [ ] **Step 2: Verify file was created correctly** + +Run: `cd /home/coder/resolutionflow/backend && python -c "from app.models.session_handoff import SessionHandoff; print('OK:', SessionHandoff.__tablename__)"` +Expected: `OK: session_handoffs` + +- [ ] **Step 3: Commit** + +```bash +git add backend/app/models/session_handoff.py +git commit -m "feat: add SessionHandoff model for unified park/escalate" +``` + +### Task 4: Create SessionResolutionOutput Model + +**Files:** +- Create: `backend/app/models/session_resolution_output.py` + +- [ ] **Step 1: Write the SessionResolutionOutput model** + +```python +# backend/app/models/session_resolution_output.py +"""Session resolution output model — three deliverables generated on resolve.""" +import uuid +from datetime import datetime, timezone +from typing import Optional, Any + +from sqlalchemy import String, Text, DateTime, ForeignKey, CheckConstraint, UniqueConstraint +from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.dialects.postgresql import UUID, JSONB + +from app.core.database import Base + + +class SessionResolutionOutput(Base): + """One of three resolution deliverables: PSA ticket notes, KB article, client summary. + + Uses UNIQUE(session_id, output_type) + upsert pattern so outputs can be + regenerated if a session is re-opened after resolution. + """ + __tablename__ = "session_resolution_outputs" + __table_args__ = ( + CheckConstraint( + "output_type IN ('psa_ticket_notes', 'knowledge_base', 'client_summary')", + name="ck_session_resolution_outputs_output_type", + ), + CheckConstraint( + "status IN ('draft', 'approved', 'pushed', 'rejected')", + name="ck_session_resolution_outputs_status", + ), + UniqueConstraint("session_id", "output_type", name="uq_session_resolution_session_type"), + ) + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + session_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("ai_sessions.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + output_type: Mapped[str] = mapped_column(String(30), nullable=False) + generated_content: Mapped[str] = mapped_column(Text, nullable=False) + structured_data: Mapped[Optional[dict[str, Any]]] = mapped_column( + JSONB, nullable=True, + comment="For KB: {symptoms, root_cause, steps, tags}", + ) + edited_content: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + status: Mapped[str] = mapped_column(String(20), nullable=False, default="draft") + pushed_to: Mapped[Optional[str]] = mapped_column(String(50), nullable=True) + pushed_at: Mapped[Optional[datetime]] = mapped_column( + DateTime(timezone=True), nullable=True + ) + pushed_reference: Mapped[Optional[str]] = mapped_column(String(200), nullable=True) + generated_by_model: Mapped[str] = mapped_column(String(50), nullable=False) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=lambda: datetime.now(timezone.utc), + onupdate=lambda: datetime.now(timezone.utc), + ) + + # Relationships + session = relationship("AISession") +``` + +- [ ] **Step 2: Verify file was created correctly** + +Run: `cd /home/coder/resolutionflow/backend && python -c "from app.models.session_resolution_output import SessionResolutionOutput; print('OK:', SessionResolutionOutput.__tablename__)"` +Expected: `OK: session_resolution_outputs` + +- [ ] **Step 3: Commit** + +```bash +git add backend/app/models/session_resolution_output.py +git commit -m "feat: add SessionResolutionOutput model for three-output resolution" +``` + +### Task 5: Add Columns to Existing Models + +**Files:** +- Modify: `backend/app/models/ai_session.py:25-221` — add 5 columns + relationships +- Modify: `backend/app/models/ai_session_step.py:21-134` — add 3 columns + update CHECK, add relationships +- Modify: `backend/app/models/file_upload.py:13-33` — add 5 columns + +- [ ] **Step 1: Add branching columns to AISession model** + +In `backend/app/models/ai_session.py`, add these columns after `conversation_messages` (line ~207) and before the Relationships section: + +```python + # ── Branching ── + is_branching: Mapped[bool] = mapped_column( + default=False, + comment="Whether conversational branching is active for this session", + ) + active_branch_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), nullable=True, + comment="Currently viewed branch. No FK — soft pointer to avoid circular FK with session_branches", + ) + handoff_count: Mapped[int] = mapped_column( + Integer, nullable=False, default=0, + comment="Number of times this session has been handed off", + ) + total_active_seconds: Mapped[int] = mapped_column( + Integer, nullable=False, default=0, + comment="Cumulative active time in seconds", + ) + total_parked_seconds: Mapped[int] = mapped_column( + Integer, nullable=False, default=0, + comment="Cumulative parked time in seconds", + ) +``` + +Also add these TYPE_CHECKING imports at the top: + +```python + from app.models.session_branch import SessionBranch + from app.models.session_handoff import SessionHandoff + from app.models.session_resolution_output import SessionResolutionOutput +``` + +And add these relationships at the bottom of the class: + +```python + branches: Mapped[list["SessionBranch"]] = relationship( + "SessionBranch", + foreign_keys="SessionBranch.session_id", + cascade="all, delete-orphan", + order_by="SessionBranch.branch_order", + ) + handoffs: Mapped[list["SessionHandoff"]] = relationship( + "SessionHandoff", + cascade="all, delete-orphan", + order_by="SessionHandoff.created_at", + ) + resolution_outputs: Mapped[list["SessionResolutionOutput"]] = relationship( + "SessionResolutionOutput", + cascade="all, delete-orphan", + ) +``` + +- [ ] **Step 2: Add branching columns to AISessionStep model** + +In `backend/app/models/ai_session_step.py`: + +1. Update the CHECK constraint (line ~36) to include `'fork'`: + +```python + __table_args__ = ( + CheckConstraint( + "step_type IN ('question', 'action', 'script_generation', 'verification', " + "'info_request', 'note', 'intake_analysis', 'fork')", + name="ck_ai_session_steps_step_type", + ), + ) +``` + +2. Add these columns after `output_tokens` (line ~120): + +```python + # ── Branching ── + branch_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("session_branches.id", ondelete="SET NULL"), + nullable=True, + index=True, + comment="NULL = pre-branching/root messages", + ) + is_fork_point: Mapped[bool] = mapped_column( + default=False, + comment="Whether this step triggered a fork", + ) + fork_point_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("fork_points.id", ondelete="SET NULL"), + nullable=True, + ) +``` + +3. Add TYPE_CHECKING imports for `SessionBranch` and `ForkPoint`. + +- [ ] **Step 3: Add columns to FileUpload model** + +In `backend/app/models/file_upload.py`, add after `created_at` (line ~32): + +```python + # ── AI description + branching context ── + ai_description: Mapped[Optional[str]] = mapped_column( + Text, nullable=True, + comment="AI-generated one-sentence description of the file", + ) + extracted_content: Mapped[Optional[str]] = mapped_column( + Text, nullable=True, + comment="Extracted text from logs/configs", + ) + content_summary: Mapped[Optional[str]] = mapped_column( + Text, nullable=True, + comment="AI summary for long text files (>2000 tokens)", + ) + uploaded_on_branch_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("session_branches.id", ondelete="SET NULL"), + nullable=True, + ) + uploaded_at_step_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("ai_session_steps.id", ondelete="SET NULL"), + nullable=True, + ) +``` + +Add the needed imports: `Text` from sqlalchemy, `ForeignKey` (already imported? check), `Optional` from typing. + +- [ ] **Step 4: Verify all model changes compile** + +Run: `cd /home/coder/resolutionflow/backend && python -c "from app.models.ai_session import AISession; from app.models.ai_session_step import AISessionStep; from app.models.file_upload import FileUpload; print('All models OK')"` +Expected: `All models OK` + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/models/ai_session.py backend/app/models/ai_session_step.py backend/app/models/file_upload.py +git commit -m "feat: add branching columns to ai_sessions, ai_session_steps, file_uploads" +``` + +### Task 6: Register Models in __init__.py and alembic env.py + +**Files:** +- Modify: `backend/app/models/__init__.py:1-117` +- Modify: `backend/alembic/env.py:10-28` + +- [ ] **Step 1: Add imports to models/__init__.py** + +Add after the `FileUpload` import (line 50): + +```python +from .session_branch import SessionBranch +from .fork_point import ForkPoint +from .session_handoff import SessionHandoff +from .session_resolution_output import SessionResolutionOutput +``` + +Add to `__all__` list: + +```python + "SessionBranch", + "ForkPoint", + "SessionHandoff", + "SessionResolutionOutput", +``` + +- [ ] **Step 2: Add imports to alembic/env.py** + +Add after the existing model imports (around line 27): + +```python +from app.models.session_branch import SessionBranch # noqa: F401 +from app.models.fork_point import ForkPoint # noqa: F401 +from app.models.session_handoff import SessionHandoff # noqa: F401 +from app.models.session_resolution_output import SessionResolutionOutput # noqa: F401 +``` + +- [ ] **Step 3: Verify imports** + +Run: `cd /home/coder/resolutionflow/backend && python -c "from app.models import SessionBranch, ForkPoint, SessionHandoff, SessionResolutionOutput; print('All imports OK')"` +Expected: `All imports OK` + +- [ ] **Step 4: Commit** + +```bash +git add backend/app/models/__init__.py backend/alembic/env.py +git commit -m "feat: register branching models in __init__ and alembic env" +``` + +### Task 7: Write Manual Alembic Migration + +**Files:** +- Create: `backend/alembic/versions/030_add_conversational_branching.py` + +**IMPORTANT:** This migration must be written manually per CLAUDE.md lesson 77. The CHECK constraint on `ai_session_steps.step_type` requires `DROP CONSTRAINT` then `ADD CONSTRAINT` — not additive. The `active_branch_id` on `ai_sessions` has NO FK constraint (to avoid circular FK with `session_branches`). + +- [ ] **Step 1: Write the migration file** + +```python +"""Add conversational branching tables and columns. + +New tables: session_branches, fork_points, session_handoffs, session_resolution_outputs +Modified: ai_sessions (5 cols), ai_session_steps (3 cols + CHECK), file_uploads (5 cols) + +Revision ID: 030 +Revises: (auto-detected) +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID, JSONB + + +# revision identifiers +revision = "030" +down_revision = None # Will be set by alembic — check `alembic heads` for current head +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ── New table: session_branches ── + op.create_table( + "session_branches", + sa.Column("id", UUID(as_uuid=True), primary_key=True), + sa.Column("session_id", UUID(as_uuid=True), sa.ForeignKey("ai_sessions.id", ondelete="CASCADE"), nullable=False), + sa.Column("parent_branch_id", UUID(as_uuid=True), sa.ForeignKey("session_branches.id", ondelete="CASCADE"), nullable=True), + sa.Column("fork_point_step_id", UUID(as_uuid=True), sa.ForeignKey("ai_session_steps.id", ondelete="SET NULL"), nullable=True), + sa.Column("branch_order", sa.Integer, nullable=False, server_default="1"), + sa.Column("label", sa.String(200), nullable=False), + sa.Column("status", sa.String(20), nullable=False, server_default="active"), + sa.Column("status_reason", sa.Text, nullable=True), + sa.Column("status_changed_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("status_changed_by", UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True), + sa.Column("conversation_messages", JSONB, nullable=False, server_default="[]"), + sa.Column("context_summary", JSONB, nullable=True), + sa.Column("evidence_from_branch_id", UUID(as_uuid=True), sa.ForeignKey("session_branches.id", ondelete="SET NULL"), nullable=True), + sa.Column("evidence_description", sa.Text, nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + sa.CheckConstraint("status IN ('active', 'dead_end', 'solved', 'untried', 'revived')", name="ck_session_branches_status"), + sa.CheckConstraint("branch_order > 0", name="ck_session_branches_branch_order_positive"), + ) + op.create_index("ix_session_branches_session_id", "session_branches", ["session_id"]) + op.create_index("ix_session_branches_parent_branch_id", "session_branches", ["parent_branch_id"]) + op.create_index("ix_session_branches_session_status", "session_branches", ["session_id", "status"]) + op.create_index("ix_session_branches_session_order", "session_branches", ["session_id", "branch_order"]) + + # ── New table: fork_points ── + op.create_table( + "fork_points", + sa.Column("id", UUID(as_uuid=True), primary_key=True), + sa.Column("session_id", UUID(as_uuid=True), sa.ForeignKey("ai_sessions.id", ondelete="CASCADE"), nullable=False), + sa.Column("parent_branch_id", UUID(as_uuid=True), sa.ForeignKey("session_branches.id", ondelete="CASCADE"), nullable=False), + sa.Column("trigger_step_id", UUID(as_uuid=True), sa.ForeignKey("ai_session_steps.id", ondelete="SET NULL"), nullable=True), + sa.Column("fork_reason", sa.Text, nullable=False), + sa.Column("options", JSONB, nullable=False, server_default="[]"), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + ) + op.create_index("ix_fork_points_session_id", "fork_points", ["session_id"]) + + # ── New table: session_handoffs ── + op.create_table( + "session_handoffs", + sa.Column("id", UUID(as_uuid=True), primary_key=True), + sa.Column("session_id", UUID(as_uuid=True), sa.ForeignKey("ai_sessions.id", ondelete="CASCADE"), nullable=False), + sa.Column("handed_off_by", UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="CASCADE"), nullable=False), + sa.Column("intent", sa.String(20), nullable=False), + sa.Column("source_branch_id", UUID(as_uuid=True), sa.ForeignKey("session_branches.id", ondelete="SET NULL"), nullable=True), + sa.Column("snapshot", JSONB, nullable=False, server_default="{}"), + sa.Column("ai_assessment", sa.Text, nullable=True), + sa.Column("ai_assessment_data", JSONB, nullable=True), + sa.Column("artifacts", JSONB, nullable=True), + sa.Column("engineer_notes", sa.Text, nullable=True), + sa.Column("priority", sa.String(20), nullable=False, server_default="normal"), + sa.Column("claimed_by", UUID(as_uuid=True), sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True), + sa.Column("claimed_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("psa_note_pushed", sa.Boolean, server_default="false"), + sa.Column("psa_note_id", sa.String(100), nullable=True), + sa.Column("notification_sent", sa.Boolean, server_default="false"), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + sa.CheckConstraint("intent IN ('park', 'escalate')", name="ck_session_handoffs_intent"), + sa.CheckConstraint("priority IN ('normal', 'elevated')", name="ck_session_handoffs_priority"), + ) + op.create_index("ix_session_handoffs_session_id", "session_handoffs", ["session_id"]) + + # ── New table: session_resolution_outputs ── + op.create_table( + "session_resolution_outputs", + sa.Column("id", UUID(as_uuid=True), primary_key=True), + sa.Column("session_id", UUID(as_uuid=True), sa.ForeignKey("ai_sessions.id", ondelete="CASCADE"), nullable=False), + sa.Column("output_type", sa.String(30), nullable=False), + sa.Column("generated_content", sa.Text, nullable=False), + sa.Column("structured_data", JSONB, nullable=True), + sa.Column("edited_content", sa.Text, nullable=True), + sa.Column("status", sa.String(20), nullable=False, server_default="draft"), + sa.Column("pushed_to", sa.String(50), nullable=True), + sa.Column("pushed_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("pushed_reference", sa.String(200), nullable=True), + sa.Column("generated_by_model", sa.String(50), nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()), + sa.CheckConstraint("output_type IN ('psa_ticket_notes', 'knowledge_base', 'client_summary')", name="ck_session_resolution_outputs_output_type"), + sa.CheckConstraint("status IN ('draft', 'approved', 'pushed', 'rejected')", name="ck_session_resolution_outputs_status"), + sa.UniqueConstraint("session_id", "output_type", name="uq_session_resolution_session_type"), + ) + op.create_index("ix_session_resolution_outputs_session_id", "session_resolution_outputs", ["session_id"]) + + # ── Modify ai_sessions: add 5 columns ── + op.add_column("ai_sessions", sa.Column("is_branching", sa.Boolean, server_default="false", nullable=False)) + op.add_column("ai_sessions", sa.Column("active_branch_id", UUID(as_uuid=True), nullable=True)) + op.add_column("ai_sessions", sa.Column("handoff_count", sa.Integer, server_default="0", nullable=False)) + op.add_column("ai_sessions", sa.Column("total_active_seconds", sa.Integer, server_default="0", nullable=False)) + op.add_column("ai_sessions", sa.Column("total_parked_seconds", sa.Integer, server_default="0", nullable=False)) + + # ── Modify ai_session_steps: add 3 columns + update CHECK constraint ── + op.add_column("ai_session_steps", sa.Column("branch_id", UUID(as_uuid=True), sa.ForeignKey("session_branches.id", ondelete="SET NULL"), nullable=True)) + op.add_column("ai_session_steps", sa.Column("is_fork_point", sa.Boolean, server_default="false", nullable=False)) + op.add_column("ai_session_steps", sa.Column("fork_point_id", UUID(as_uuid=True), sa.ForeignKey("fork_points.id", ondelete="SET NULL"), nullable=True)) + op.create_index("ix_ai_session_steps_branch_id", "ai_session_steps", ["branch_id"]) + + # Drop and recreate step_type CHECK to add 'fork' + op.drop_constraint("ck_ai_session_steps_step_type", "ai_session_steps", type_="check") + op.create_check_constraint( + "ck_ai_session_steps_step_type", + "ai_session_steps", + "step_type IN ('question', 'action', 'script_generation', 'verification', " + "'info_request', 'note', 'intake_analysis', 'fork')", + ) + + # ── Modify file_uploads: add 5 columns ── + op.add_column("file_uploads", sa.Column("ai_description", sa.Text, nullable=True)) + op.add_column("file_uploads", sa.Column("extracted_content", sa.Text, nullable=True)) + op.add_column("file_uploads", sa.Column("content_summary", sa.Text, nullable=True)) + op.add_column("file_uploads", sa.Column("uploaded_on_branch_id", UUID(as_uuid=True), sa.ForeignKey("session_branches.id", ondelete="SET NULL"), nullable=True)) + op.add_column("file_uploads", sa.Column("uploaded_at_step_id", UUID(as_uuid=True), sa.ForeignKey("ai_session_steps.id", ondelete="SET NULL"), nullable=True)) + + +def downgrade() -> None: + # ── file_uploads columns ── + op.drop_column("file_uploads", "uploaded_at_step_id") + op.drop_column("file_uploads", "uploaded_on_branch_id") + op.drop_column("file_uploads", "content_summary") + op.drop_column("file_uploads", "extracted_content") + op.drop_column("file_uploads", "ai_description") + + # ── ai_session_steps: restore CHECK, drop columns ── + op.drop_constraint("ck_ai_session_steps_step_type", "ai_session_steps", type_="check") + op.create_check_constraint( + "ck_ai_session_steps_step_type", + "ai_session_steps", + "step_type IN ('question', 'action', 'script_generation', 'verification', " + "'info_request', 'note', 'intake_analysis')", + ) + op.drop_index("ix_ai_session_steps_branch_id", "ai_session_steps") + op.drop_column("ai_session_steps", "fork_point_id") + op.drop_column("ai_session_steps", "is_fork_point") + op.drop_column("ai_session_steps", "branch_id") + + # ── ai_sessions columns ── + op.drop_column("ai_sessions", "total_parked_seconds") + op.drop_column("ai_sessions", "total_active_seconds") + op.drop_column("ai_sessions", "handoff_count") + op.drop_column("ai_sessions", "active_branch_id") + op.drop_column("ai_sessions", "is_branching") + + # ── Drop new tables (reverse order of creation for FK deps) ── + op.drop_table("session_resolution_outputs") + op.drop_table("session_handoffs") + op.drop_table("fork_points") + op.drop_table("session_branches") +``` + +- [ ] **Step 2: Check current alembic head and set down_revision** + +Run: `cd /home/coder/resolutionflow/backend && alembic heads` +Set `down_revision` in the migration to the returned revision ID. + +- [ ] **Step 3: Run the migration** + +Run: `cd /home/coder/resolutionflow/backend && alembic upgrade head` +Expected: Migration applies without errors. + +- [ ] **Step 4: Verify tables exist** + +Run: `docker exec -it patherly_postgres psql -U postgres -d patherly -c "\dt session_*"` +Expected: Shows `session_branches`, `session_handoffs`, `session_resolution_outputs`. + +Run: `docker exec -it patherly_postgres psql -U postgres -d patherly -c "\d ai_sessions" | grep -E "is_branching|active_branch_id|handoff_count"` +Expected: Shows the 3 new columns. + +- [ ] **Step 5: Commit** + +```bash +git add backend/alembic/versions/030_add_conversational_branching.py +git commit -m "feat: add conversational branching migration — 4 tables, 13 columns" +``` + +### Task 8: Create Pydantic Schemas + +**Files:** +- Create: `backend/app/schemas/session_branch.py` +- Create: `backend/app/schemas/session_handoff.py` +- Create: `backend/app/schemas/session_resolution.py` +- Modify: `backend/app/schemas/ai_session.py:197-265` + +- [ ] **Step 1: Write branch schemas** + +```python +# backend/app/schemas/session_branch.py +"""Pydantic schemas for session branches and fork points.""" +from __future__ import annotations + +from typing import Optional, Any +from uuid import UUID +from datetime import datetime + +from pydantic import BaseModel, Field + + +# ── Branch ── + +class BranchCreate(BaseModel): + """Used internally — branches are created via fork, not direct API.""" + label: str = Field(..., max_length=200) + status: str = "untried" + + +class BranchUpdate(BaseModel): + """Update branch status.""" + status: str = Field(..., pattern="^(active|dead_end|solved|untried|revived)$") + status_reason: str | None = None + + +class BranchResponse(BaseModel): + """Branch detail for API responses.""" + id: UUID + session_id: UUID + parent_branch_id: UUID | None + fork_point_step_id: UUID | None + branch_order: int + label: str + status: str + status_reason: str | None + status_changed_at: datetime | None + context_summary: dict[str, Any] | None + evidence_from_branch_id: UUID | None + evidence_description: str | None + step_count: int = 0 + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} + + +class BranchTreeResponse(BaseModel): + """Full branch tree for the BranchMap sidebar.""" + branches: list[BranchResponse] + active_branch_id: UUID | None + + +# ── Fork ── + +class ForkOption(BaseModel): + """One hypothesis option when creating a fork.""" + label: str = Field(..., max_length=200) + description: str = Field(..., max_length=500) + + +class ForkCreateRequest(BaseModel): + """Create a fork point with N branches.""" + fork_reason: str = Field(..., min_length=5, max_length=2000) + options: list[ForkOption] = Field(..., min_length=2, max_length=10) + + +class ForkPointResponse(BaseModel): + """Fork point detail.""" + id: UUID + session_id: UUID + parent_branch_id: UUID + trigger_step_id: UUID | None + fork_reason: str + options: list[dict[str, Any]] + created_at: datetime + + model_config = {"from_attributes": True} + + +# ── Switch ── + +class BranchSwitchResponse(BaseModel): + """Response after switching branches.""" + active_branch_id: UUID + branch: BranchResponse + conversation_messages: list[dict[str, Any]] + + +# ── Revival ── + +class ReviveRequest(BaseModel): + """Revive a dead-end branch with new evidence.""" + evidence_from_branch_id: UUID + evidence_description: str = Field(..., min_length=5, max_length=2000) + + +# ── Branch message ── + +class BranchMessageRequest(BaseModel): + """Send a message on a specific branch.""" + message: str = Field(..., min_length=1, max_length=8000) + upload_ids: list[UUID] = Field(default_factory=list, max_length=10) + + +class BranchMessageResponse(BaseModel): + """AI response on a branch.""" + content: str + branch_id: UUID + step_id: UUID | None = None +``` + +- [ ] **Step 2: Write handoff schemas** + +```python +# backend/app/schemas/session_handoff.py +"""Pydantic schemas for session handoffs.""" +from __future__ import annotations + +from typing import Optional, Any +from uuid import UUID +from datetime import datetime + +from pydantic import BaseModel, Field + + +class HandoffCreateRequest(BaseModel): + """Create a handoff (park or escalate).""" + intent: str = Field(..., pattern="^(park|escalate)$") + engineer_notes: str | None = None + priority: str = Field("normal", pattern="^(normal|elevated)$") + + +class HandoffResponse(BaseModel): + """Handoff detail.""" + id: UUID + session_id: UUID + handed_off_by: UUID + intent: str + source_branch_id: UUID | None + snapshot: dict[str, Any] + ai_assessment: str | None + ai_assessment_data: dict[str, Any] | None + artifacts: list[dict[str, Any]] | None + engineer_notes: str | None + priority: str + claimed_by: UUID | None + claimed_at: datetime | None + psa_note_pushed: bool + notification_sent: bool + created_at: datetime + + model_config = {"from_attributes": True} + + +class HandoffClaimRequest(BaseModel): + """Claim a handed-off session.""" + pass # Just needs auth — claiming user comes from JWT + + +class HandoffBriefingResponse(BaseModel): + """Natural-language briefing for the claiming engineer.""" + briefing: str + handoff: HandoffResponse + + +class QueueItemResponse(BaseModel): + """Item in the team queue.""" + handoff_id: UUID + session_id: UUID + intent: str + problem_summary: str | None + problem_domain: str | None + priority: str + handed_off_by_name: str | None + engineer_notes: str | None + branch_count: int = 0 + created_at: datetime + claimed_by: UUID | None + claimed_at: datetime | None + + model_config = {"from_attributes": True} +``` + +- [ ] **Step 3: Write resolution output schemas** + +```python +# backend/app/schemas/session_resolution.py +"""Pydantic schemas for session resolution outputs.""" +from __future__ import annotations + +from typing import Optional, Any +from uuid import UUID +from datetime import datetime + +from pydantic import BaseModel, Field + + +class ResolutionOutputResponse(BaseModel): + """Single resolution output.""" + id: UUID + session_id: UUID + output_type: str + generated_content: str + structured_data: dict[str, Any] | None + edited_content: str | None + status: str + pushed_to: str | None + pushed_at: datetime | None + pushed_reference: str | None + generated_by_model: str + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} + + +class ResolutionOutputEditRequest(BaseModel): + """Edit output before pushing.""" + edited_content: str = Field(..., min_length=1) + + +class ResolutionOutputPushRequest(BaseModel): + """Push output to a destination.""" + destination: str = Field(..., pattern="^(psa|kb_library|clipboard|email)$") + + +class ResolutionOutputPushResponse(BaseModel): + """Result of pushing an output.""" + output_id: UUID + status: str + pushed_to: str + pushed_reference: str | None = None + + +class AllResolutionOutputsResponse(BaseModel): + """All three resolution outputs for a session.""" + outputs: list[ResolutionOutputResponse] +``` + +- [ ] **Step 4: Add branching fields to AISessionDetail** + +In `backend/app/schemas/ai_session.py`, add to `AISessionDetail` class (after `conversation_messages` field, line ~230): + +```python + is_branching: bool = False + active_branch_id: str | None = None +``` + +- [ ] **Step 5: Verify schemas compile** + +Run: `cd /home/coder/resolutionflow/backend && python -c "from app.schemas.session_branch import *; from app.schemas.session_handoff import *; from app.schemas.session_resolution import *; print('Schemas OK')"` +Expected: `Schemas OK` + +- [ ] **Step 6: Commit** + +```bash +git add backend/app/schemas/session_branch.py backend/app/schemas/session_handoff.py backend/app/schemas/session_resolution.py backend/app/schemas/ai_session.py +git commit -m "feat: add Pydantic schemas for branching, handoffs, and resolution outputs" +``` + +--- + +## Phase 2: Branch Engine + +### Task 9: Write BranchManager Service — Tests First + +**Files:** +- Create: `backend/tests/test_branch_manager.py` +- Create: `backend/app/services/branch_manager.py` + +- [ ] **Step 1: Write failing tests for BranchManager** + +```python +# backend/tests/test_branch_manager.py +"""Integration tests for BranchManager service.""" +import uuid +import pytest +from httpx import AsyncClient + +from app.models.ai_session import AISession +from app.models.session_branch import SessionBranch +from app.models.fork_point import ForkPoint +from app.models.ai_session_step import AISessionStep + + +@pytest.mark.asyncio +async def test_create_root_branch(client: AsyncClient, test_user, auth_headers, test_db): + """Creating a root branch sets is_branching=True and copies conversation_messages.""" + # Create a session first + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="discovery", + conversation_messages=[ + {"role": "user", "content": "test message"}, + {"role": "assistant", "content": "test response"}, + ], + ) + test_db.add(session) + await test_db.flush() + + from app.services.branch_manager import BranchManager + manager = BranchManager(test_db) + root = await manager.create_root_branch(session.id) + + assert root is not None + assert root.parent_branch_id is None + assert root.label == "Root" + assert root.status == "active" + assert root.branch_order == 1 + assert len(root.conversation_messages) == 2 + + # Session should now have is_branching=True + await test_db.refresh(session) + assert session.is_branching is True + assert session.active_branch_id == root.id + + +@pytest.mark.asyncio +async def test_create_fork(client: AsyncClient, test_user, auth_headers, test_db): + """Creating a fork produces a ForkPoint + N branches.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.flush() + + from app.services.branch_manager import BranchManager + manager = BranchManager(test_db) + root = await manager.create_root_branch(session.id) + + # Create a trigger step + step = AISessionStep( + session_id=session.id, + step_order=0, + step_type="question", + content={"text": "What's the issue?"}, + confidence_at_step=0.5, + ) + test_db.add(step) + await test_db.flush() + + fork_point, branches = await manager.create_fork( + session_id=session.id, + parent_branch_id=root.id, + trigger_step_id=step.id, + fork_reason="Two possible causes identified", + options=[ + {"label": "Network connectivity", "description": "Check network stack"}, + {"label": "DNS resolution", "description": "Check DNS config"}, + ], + ) + + assert fork_point is not None + assert len(branches) == 2 + assert branches[0].label == "Network connectivity" + assert branches[0].status == "untried" + assert branches[0].parent_branch_id == root.id + assert branches[1].label == "DNS resolution" + assert branches[1].branch_order == 2 + + # Trigger step should be marked as fork point + await test_db.refresh(step) + assert step.is_fork_point is True + assert step.fork_point_id == fork_point.id + + +@pytest.mark.asyncio +async def test_switch_branch(client: AsyncClient, test_user, auth_headers, test_db): + """Switching branches updates active_branch_id.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.flush() + + from app.services.branch_manager import BranchManager + manager = BranchManager(test_db) + root = await manager.create_root_branch(session.id) + + step = AISessionStep( + session_id=session.id, step_order=0, step_type="question", + content={"text": "test"}, confidence_at_step=0.5, + ) + test_db.add(step) + await test_db.flush() + + _, branches = await manager.create_fork( + session_id=session.id, + parent_branch_id=root.id, + trigger_step_id=step.id, + fork_reason="test fork", + options=[ + {"label": "Option A", "description": "desc A"}, + {"label": "Option B", "description": "desc B"}, + ], + ) + + branch_b = branches[1] + result = await manager.switch_branch(session.id, branch_b.id) + + assert result.id == branch_b.id + await test_db.refresh(session) + assert session.active_branch_id == branch_b.id + + +@pytest.mark.asyncio +async def test_mark_branch_dead_end(client: AsyncClient, test_user, auth_headers, test_db): + """Marking a branch as dead_end updates status.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.flush() + + from app.services.branch_manager import BranchManager + manager = BranchManager(test_db) + root = await manager.create_root_branch(session.id) + + updated = await manager.mark_branch_status( + branch_id=root.id, + status="dead_end", + reason="Network was fine, not the cause", + user_id=test_user["user_data"]["id"], + ) + + assert updated.status == "dead_end" + assert updated.status_reason == "Network was fine, not the cause" + assert updated.status_changed_at is not None + + +@pytest.mark.asyncio +async def test_get_branch_tree(client: AsyncClient, test_user, auth_headers, test_db): + """get_branch_tree returns the full tree structure.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="discovery", + conversation_messages=[{"role": "user", "content": "help"}], + ) + test_db.add(session) + await test_db.flush() + + from app.services.branch_manager import BranchManager + manager = BranchManager(test_db) + root = await manager.create_root_branch(session.id) + + step = AISessionStep( + session_id=session.id, step_order=0, step_type="question", + content={"text": "test"}, confidence_at_step=0.5, + ) + test_db.add(step) + await test_db.flush() + + await manager.create_fork( + session_id=session.id, + parent_branch_id=root.id, + trigger_step_id=step.id, + fork_reason="test", + options=[ + {"label": "A", "description": "a"}, + {"label": "B", "description": "b"}, + ], + ) + + tree = await manager.get_branch_tree(session.id) + # Root + 2 fork branches = 3 total + assert len(tree) == 3 +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/test_branch_manager.py -v --no-header 2>&1 | head -30` +Expected: FAIL — `ModuleNotFoundError: No module named 'app.services.branch_manager'` + +- [ ] **Step 3: Implement BranchManager service** + +```python +# backend/app/services/branch_manager.py +"""Branch lifecycle management for conversational branching. + +Handles creating root branches, forking, switching, marking status, +reviving dead-end branches, and querying the branch tree. +""" +import uuid +import logging +from datetime import datetime, timezone +from typing import Any +from uuid import UUID + +from sqlalchemy import select, func +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.ai_session import AISession +from app.models.ai_session_step import AISessionStep +from app.models.session_branch import SessionBranch +from app.models.fork_point import ForkPoint + +logger = logging.getLogger(__name__) + + +class BranchManager: + """Branch lifecycle management.""" + + def __init__(self, db: AsyncSession): + self.db = db + + async def create_root_branch(self, session_id: UUID) -> SessionBranch: + """Create the root branch, copy conversation_messages, set is_branching=True.""" + result = await self.db.execute( + select(AISession).where(AISession.id == session_id) + ) + session = result.scalar_one_or_none() + if not session: + raise ValueError(f"Session {session_id} not found") + + root = SessionBranch( + id=uuid.uuid4(), + session_id=session_id, + parent_branch_id=None, + branch_order=1, + label="Root", + status="active", + conversation_messages=list(session.conversation_messages or []), + ) + self.db.add(root) + + session.is_branching = True + session.active_branch_id = root.id + + await self.db.flush() + return root + + async def create_fork( + self, + session_id: UUID, + parent_branch_id: UUID, + trigger_step_id: UUID | None, + fork_reason: str, + options: list[dict[str, str]], + ) -> tuple[ForkPoint, list[SessionBranch]]: + """Create a fork point with N branches. + + Pre-generates all branch UUIDs, then inserts ForkPoint + N SessionBranch + rows in a single transaction. Enforces plan-tier branch limits. + """ + # Enforce branch limits by plan tier + from app.core.ai_quota_service import get_user_plan + result = await self.db.execute( + select(AISession).where(AISession.id == session_id) + ) + session = result.scalar_one() + existing_count_result = await self.db.execute( + select(func.count()).select_from(SessionBranch).where( + SessionBranch.session_id == session_id + ) + ) + existing_count = existing_count_result.scalar() or 0 + + # Plan limits: free=2, pro=5, team=10, enterprise=unlimited + BRANCH_LIMITS = {"free": 2, "pro": 5, "team": 10} + plan = await get_user_plan(session.user_id, self.db) + max_branches = BRANCH_LIMITS.get(plan, 999) + if existing_count + len(options) > max_branches: + raise ValueError( + f"Branch limit reached ({max_branches} for {plan} plan). " + f"Current: {existing_count}, requested: {len(options)}" + ) + + # Pre-generate branch IDs + branch_ids = [uuid.uuid4() for _ in options] + + # Build ForkPoint options JSONB with branch_ids + fork_options = [] + for i, opt in enumerate(options): + fork_options.append({ + "label": opt["label"], + "description": opt["description"], + "branch_id": str(branch_ids[i]), + "status": "untried", + }) + + fork_point = ForkPoint( + id=uuid.uuid4(), + session_id=session_id, + parent_branch_id=parent_branch_id, + trigger_step_id=trigger_step_id, + fork_reason=fork_reason, + options=fork_options, + ) + self.db.add(fork_point) + + # Get parent branch's conversation_messages for context + result = await self.db.execute( + select(SessionBranch).where(SessionBranch.id == parent_branch_id) + ) + parent = result.scalar_one_or_none() + parent_messages = list(parent.conversation_messages or []) if parent else [] + + # Create branches + branches = [] + for i, opt in enumerate(options): + branch = SessionBranch( + id=branch_ids[i], + session_id=session_id, + parent_branch_id=parent_branch_id, + fork_point_step_id=trigger_step_id, + branch_order=i + 1, + label=opt["label"], + status="untried", + conversation_messages=parent_messages, + ) + self.db.add(branch) + branches.append(branch) + + # Mark trigger step as fork point + if trigger_step_id: + step_result = await self.db.execute( + select(AISessionStep).where(AISessionStep.id == trigger_step_id) + ) + step = step_result.scalar_one_or_none() + if step: + step.is_fork_point = True + step.fork_point_id = fork_point.id + + await self.db.flush() + return fork_point, branches + + async def switch_branch(self, session_id: UUID, target_branch_id: UUID) -> SessionBranch: + """Switch the active branch for a session.""" + result = await self.db.execute( + select(SessionBranch).where( + SessionBranch.id == target_branch_id, + SessionBranch.session_id == session_id, + ) + ) + branch = result.scalar_one_or_none() + if not branch: + raise ValueError(f"Branch {target_branch_id} not found in session {session_id}") + + session_result = await self.db.execute( + select(AISession).where(AISession.id == session_id) + ) + session = session_result.scalar_one() + session.active_branch_id = target_branch_id + + # Mark branch as active if it was untried + if branch.status == "untried": + branch.status = "active" + branch.status_changed_at = datetime.now(timezone.utc) + + await self.db.flush() + return branch + + async def mark_branch_status( + self, + branch_id: UUID, + status: str, + reason: str | None = None, + user_id: UUID | None = None, + ) -> SessionBranch: + """Update a branch's status with reason and timestamp.""" + result = await self.db.execute( + select(SessionBranch).where(SessionBranch.id == branch_id) + ) + branch = result.scalar_one_or_none() + if not branch: + raise ValueError(f"Branch {branch_id} not found") + + branch.status = status + branch.status_reason = reason + branch.status_changed_at = datetime.now(timezone.utc) + branch.status_changed_by = user_id + + await self.db.flush() + return branch + + async def revive_branch( + self, + branch_id: UUID, + evidence_from_branch_id: UUID, + evidence_description: str, + ) -> SessionBranch: + """Revive a dead-end branch with evidence from another branch.""" + result = await self.db.execute( + select(SessionBranch).where(SessionBranch.id == branch_id) + ) + branch = result.scalar_one_or_none() + if not branch: + raise ValueError(f"Branch {branch_id} not found") + + branch.status = "revived" + branch.status_changed_at = datetime.now(timezone.utc) + branch.evidence_from_branch_id = evidence_from_branch_id + branch.evidence_description = evidence_description + + # Prepend revival context to conversation + revival_msg = { + "role": "system", + "content": f"[Branch Revived] New evidence from another branch: {evidence_description}", + } + msgs = list(branch.conversation_messages or []) + msgs.append(revival_msg) + branch.conversation_messages = msgs + + await self.db.flush() + return branch + + async def get_branch_tree(self, session_id: UUID) -> list[SessionBranch]: + """Get all branches for a session, ordered by branch_order.""" + result = await self.db.execute( + select(SessionBranch) + .where(SessionBranch.session_id == session_id) + .order_by(SessionBranch.branch_order) + ) + return list(result.scalars().all()) + + async def build_cross_branch_context(self, branch_id: UUID) -> str: + """Build cross-branch context from sibling summaries. + + Reads context_summary from all branches in the same session + (excluding the current branch), prioritized: + active > untried > revived > dead_end. + """ + result = await self.db.execute( + select(SessionBranch).where(SessionBranch.id == branch_id) + ) + branch = result.scalar_one_or_none() + if not branch: + return "" + + siblings_result = await self.db.execute( + select(SessionBranch) + .where( + SessionBranch.session_id == branch.session_id, + SessionBranch.id != branch_id, + ) + .order_by(SessionBranch.branch_order) + ) + siblings = list(siblings_result.scalars().all()) + + if not siblings: + return "" + + # Priority order + priority = {"active": 0, "untried": 1, "revived": 2, "dead_end": 3, "solved": 4} + siblings.sort(key=lambda b: priority.get(b.status, 5)) + + parts = ["\n## Cross-Branch Context"] + for sib in siblings: + summary = sib.context_summary + if summary: + tried = ", ".join(summary.get("tried", [])) + concluded = summary.get("concluded", "No conclusion yet") + parts.append(f"- **{sib.label}** [{sib.status}]: Tried: {tried}. {concluded}") + else: + parts.append(f"- **{sib.label}** [{sib.status}]: No summary yet.") + + return "\n".join(parts) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/test_branch_manager.py -v --no-header` +Expected: All 5 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/tests/test_branch_manager.py backend/app/services/branch_manager.py +git commit -m "feat: add BranchManager service with integration tests" +``` + +### Task 10: Write BranchAwarePromptBuilder — Tests First + +**Files:** +- Create: `backend/tests/test_branch_aware_prompt_builder.py` +- Create: `backend/app/services/branch_aware_prompt_builder.py` + +- [ ] **Step 1: Write failing tests for BranchAwarePromptBuilder** + +```python +# backend/tests/test_branch_aware_prompt_builder.py +"""Unit tests for BranchAwarePromptBuilder — pure function, no DB needed.""" +import pytest + +from app.services.branch_aware_prompt_builder import BranchAwarePromptBuilder + + +def test_build_basic(): + """Basic build with no cross-branch context.""" + builder = BranchAwarePromptBuilder() + result = builder.build( + branch_messages=[ + {"role": "user", "content": "DNS not resolving"}, + {"role": "assistant", "content": "Let's check DNS config"}, + ], + sibling_summaries="", + session_context="Problem: DNS resolution failure. Domain: networking.", + attachments=[], + new_message="I ran nslookup and got timeout", + ) + + assert "system_base" in result + assert "rag_context" in result + assert "history" in result + assert "new_message" in result + assert "images" in result + assert result["new_message"] == "I ran nslookup and got timeout" + # History should NOT include the new_message + assert len(result["history"]) == 2 + + +def test_build_with_cross_branch_context(): + """Cross-branch summaries go into rag_context, not system_base.""" + builder = BranchAwarePromptBuilder() + sibling_ctx = ( + "\n## Cross-Branch Context\n" + "- **Network connectivity** [dead_end]: Tried: ping, traceroute. Network was fine." + ) + result = builder.build( + branch_messages=[], + sibling_summaries=sibling_ctx, + session_context="Problem: test", + attachments=[], + new_message="test message", + ) + + assert "Cross-Branch Context" in result["rag_context"] + assert "Cross-Branch Context" not in result["system_base"] + + +def test_build_with_images(): + """Image attachments are passed through.""" + builder = BranchAwarePromptBuilder() + result = builder.build( + branch_messages=[], + sibling_summaries="", + session_context="Problem: test", + attachments=[{"media_type": "image/png", "data": "base64data"}], + new_message="check this screenshot", + ) + + assert len(result["images"]) == 1 + assert result["images"][0]["media_type"] == "image/png" + + +def test_build_with_revival_context(): + """Revival context is prepended to rag_context.""" + builder = BranchAwarePromptBuilder() + result = builder.build( + branch_messages=[], + sibling_summaries="", + session_context="Problem: test", + attachments=[], + new_message="test", + revival_context="New evidence: the error appears when VPN is active", + ) + + assert "New evidence" in result["rag_context"] + + +def test_history_excludes_last_user_message_when_matching(): + """If the last message in branch_messages matches new_message, it's excluded from history.""" + builder = BranchAwarePromptBuilder() + result = builder.build( + branch_messages=[ + {"role": "user", "content": "first message"}, + {"role": "assistant", "content": "first response"}, + ], + sibling_summaries="", + session_context="Problem: test", + attachments=[], + new_message="second message", + ) + + # All branch messages should be in history (none match new_message) + assert len(result["history"]) == 2 +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/test_branch_aware_prompt_builder.py -v --no-header 2>&1 | head -20` +Expected: FAIL — `ModuleNotFoundError` + +- [ ] **Step 3: Implement BranchAwarePromptBuilder** + +```python +# backend/app/services/branch_aware_prompt_builder.py +"""Branch-aware prompt builder — assembles AI context with cross-branch awareness. + +Pure function: takes data, returns dict matching _call_ai parameter names. +No DB access, no LLM calls. The caller pre-fetches all data. + +Return keys: system_base, rag_context, history, new_message, images +- system_base: stable system prompt (cached by Anthropic) +- rag_context: cross-branch summaries + attachment descriptions (NOT cached) +""" +from typing import Any + +from app.services.assistant_chat_service import ASSISTANT_SYSTEM_PROMPT + + +class BranchAwarePromptBuilder: + """Assembles prompt components for branch-aware AI calls.""" + + def build( + self, + branch_messages: list[dict[str, Any]], + sibling_summaries: str, + session_context: str, + attachments: list[dict[str, Any]], + new_message: str, + revival_context: str | None = None, + token_budget: int = 100_000, + ) -> dict[str, Any]: + """Build prompt components for _call_ai. + + Args: + branch_messages: Conversation history for the current branch. + sibling_summaries: Cross-branch context from BranchManager.build_cross_branch_context. + session_context: Problem summary, domain, client info, PSA data. + attachments: Image dicts [{media_type, data}] from current branch uploads. + new_message: The user's current message. + revival_context: If branch was revived, the evidence description. + token_budget: Max tokens for the entire prompt (default 100k). + + Returns: + Dict with keys matching _call_ai params: system_base, rag_context, + history, new_message, images. + """ + # 1. system_base — stable, cached across turns + system_base = ASSISTANT_SYSTEM_PROMPT + "\n\n## Session Context\n" + session_context + + # 2. rag_context — changes per query, NOT cached + rag_parts = [] + if revival_context: + rag_parts.append(f"\n## Branch Revival\n{revival_context}") + if sibling_summaries: + rag_parts.append(sibling_summaries) + rag_context = "\n".join(rag_parts) + + # 3. history — branch messages as role/content dicts + history = [] + for msg in branch_messages: + if msg.get("role") in ("user", "assistant"): + history.append({"role": msg["role"], "content": msg["content"]}) + + # 4. images + images = attachments if attachments else None + + return { + "system_base": system_base, + "rag_context": rag_context, + "history": history, + "new_message": new_message, + "images": images, + } +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/test_branch_aware_prompt_builder.py -v --no-header` +Expected: All 5 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/tests/test_branch_aware_prompt_builder.py backend/app/services/branch_aware_prompt_builder.py +git commit -m "feat: add BranchAwarePromptBuilder with unit tests" +``` + +### Task 11: Write Branch API Endpoints + +**Files:** +- Create: `backend/app/api/endpoints/session_branches.py` +- Modify: `backend/app/api/router.py:1-88` +- Create: `backend/tests/test_session_branches_api.py` + +- [ ] **Step 1: Write failing API tests** + +```python +# backend/tests/test_session_branches_api.py +"""API endpoint tests for session branches.""" +import pytest +from httpx import AsyncClient + +from app.models.ai_session import AISession +from app.models.ai_session_step import AISessionStep + + +@pytest.mark.asyncio +async def test_list_branches_empty(client: AsyncClient, test_user, auth_headers, test_db): + """GET /ai-sessions/{id}/branches returns empty for non-branching session.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.commit() + + resp = await client.get( + f"/api/v1/ai-sessions/{session.id}/branches", + headers=auth_headers, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["branches"] == [] + assert data["active_branch_id"] is None + + +@pytest.mark.asyncio +async def test_create_fork(client: AsyncClient, test_user, auth_headers, test_db): + """POST /ai-sessions/{id}/branches/fork creates branches.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="discovery", + conversation_messages=[{"role": "user", "content": "help"}], + ) + test_db.add(session) + await test_db.flush() + + step = AISessionStep( + session_id=session.id, step_order=0, step_type="question", + content={"text": "test"}, confidence_at_step=0.5, + ) + test_db.add(step) + await test_db.commit() + + resp = await client.post( + f"/api/v1/ai-sessions/{session.id}/branches/fork", + headers=auth_headers, + json={ + "fork_reason": "Two possible causes", + "options": [ + {"label": "Network issue", "description": "Check connectivity"}, + {"label": "DNS problem", "description": "Check DNS"}, + ], + }, + ) + assert resp.status_code == 201 + data = resp.json() + assert len(data["options"]) == 2 + + +@pytest.mark.asyncio +async def test_switch_branch(client: AsyncClient, test_user, auth_headers, test_db): + """POST /ai-sessions/{id}/branches/{bid}/switch changes active branch.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="discovery", + conversation_messages=[{"role": "user", "content": "help"}], + ) + test_db.add(session) + await test_db.flush() + + step = AISessionStep( + session_id=session.id, step_order=0, step_type="question", + content={"text": "test"}, confidence_at_step=0.5, + ) + test_db.add(step) + await test_db.commit() + + # Create fork first + fork_resp = await client.post( + f"/api/v1/ai-sessions/{session.id}/branches/fork", + headers=auth_headers, + json={ + "fork_reason": "test", + "options": [ + {"label": "A", "description": "a"}, + {"label": "B", "description": "b"}, + ], + }, + ) + fork_data = fork_resp.json() + branch_b_id = fork_data["options"][1]["branch_id"] + + # Switch to branch B + resp = await client.post( + f"/api/v1/ai-sessions/{session.id}/branches/{branch_b_id}/switch", + headers=auth_headers, + ) + assert resp.status_code == 200 + assert resp.json()["active_branch_id"] == branch_b_id +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/test_session_branches_api.py -v --no-header 2>&1 | head -20` +Expected: FAIL — route not found (404). + +- [ ] **Step 3: Implement session_branches endpoint** + +```python +# backend/app/api/endpoints/session_branches.py +"""Branch management endpoints for conversational branching. + + GET /ai-sessions/{id}/branches — List all branches (tree) + POST /ai-sessions/{id}/branches/fork — Create fork with N branches + PATCH /ai-sessions/{id}/branches/{bid} — Update branch status + POST /ai-sessions/{id}/branches/{bid}/switch — Switch active branch + POST /ai-sessions/{id}/branches/{bid}/revive — Revive dead-end branch + POST /ai-sessions/{id}/branches/{bid}/message — Send message on branch +""" +import logging +from typing import Annotated +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.deps import get_current_active_user, get_db +from app.models.user import User +from app.models.ai_session import AISession +from app.services.branch_manager import BranchManager +from app.services.branch_aware_prompt_builder import BranchAwarePromptBuilder +from app.services.assistant_chat_service import _call_ai +from app.schemas.session_branch import ( + BranchTreeResponse, + BranchResponse, + BranchUpdate, + ForkCreateRequest, + ForkPointResponse, + BranchSwitchResponse, + ReviveRequest, + BranchMessageRequest, + BranchMessageResponse, +) + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/ai-sessions/{session_id}/branches", tags=["session-branches"]) + + +async def _get_user_session( + session_id: UUID, user: User, db: AsyncSession +) -> AISession: + """Fetch session owned by user, or raise 404.""" + result = await db.execute( + select(AISession).where( + AISession.id == session_id, + AISession.user_id == user.id, + ) + ) + session = result.scalar_one_or_none() + if not session: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") + return session + + +@router.get("", response_model=BranchTreeResponse) +async def list_branches( + session_id: UUID, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], +) -> BranchTreeResponse: + """Get branch tree for a session.""" + session = await _get_user_session(session_id, current_user, db) + manager = BranchManager(db) + branches = await manager.get_branch_tree(session_id) + + branch_responses = [] + for b in branches: + branch_responses.append(BranchResponse( + id=b.id, + session_id=b.session_id, + parent_branch_id=b.parent_branch_id, + fork_point_step_id=b.fork_point_step_id, + branch_order=b.branch_order, + label=b.label, + status=b.status, + status_reason=b.status_reason, + status_changed_at=b.status_changed_at, + context_summary=b.context_summary, + evidence_from_branch_id=b.evidence_from_branch_id, + evidence_description=b.evidence_description, + created_at=b.created_at, + updated_at=b.updated_at, + )) + + return BranchTreeResponse( + branches=branch_responses, + active_branch_id=session.active_branch_id, + ) + + +@router.post("/fork", response_model=ForkPointResponse, status_code=status.HTTP_201_CREATED) +async def create_fork( + session_id: UUID, + body: ForkCreateRequest, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], +) -> ForkPointResponse: + """Create a fork point with N branches.""" + session = await _get_user_session(session_id, current_user, db) + + if session.status not in ("active", "paused"): + raise HTTPException(status_code=400, detail=f"Cannot fork a {session.status} session") + + manager = BranchManager(db) + + # Ensure branching is initialized + if not session.is_branching: + await manager.create_root_branch(session_id) + await db.refresh(session) + + # Use the active branch as parent + parent_branch_id = session.active_branch_id + if not parent_branch_id: + raise HTTPException(status_code=400, detail="No active branch to fork from") + + options = [{"label": o.label, "description": o.description} for o in body.options] + + fork_point, branches = await manager.create_fork( + session_id=session_id, + parent_branch_id=parent_branch_id, + trigger_step_id=None, # Could be set from last step + fork_reason=body.fork_reason, + options=options, + ) + + await db.commit() + return ForkPointResponse.model_validate(fork_point) + + +@router.patch("/{branch_id}", response_model=BranchResponse) +async def update_branch_status( + session_id: UUID, + branch_id: UUID, + body: BranchUpdate, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], +) -> BranchResponse: + """Update a branch's status.""" + await _get_user_session(session_id, current_user, db) + manager = BranchManager(db) + + try: + branch = await manager.mark_branch_status( + branch_id=branch_id, + status=body.status, + reason=body.status_reason, + user_id=current_user.id, + ) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + await db.commit() + return BranchResponse.model_validate(branch) + + +@router.post("/{branch_id}/switch", response_model=BranchSwitchResponse) +async def switch_branch( + session_id: UUID, + branch_id: UUID, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], +) -> BranchSwitchResponse: + """Switch the active branch.""" + await _get_user_session(session_id, current_user, db) + manager = BranchManager(db) + + try: + branch = await manager.switch_branch(session_id, branch_id) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + await db.commit() + return BranchSwitchResponse( + active_branch_id=branch.id, + branch=BranchResponse.model_validate(branch), + conversation_messages=branch.conversation_messages, + ) + + +@router.post("/{branch_id}/revive", response_model=BranchResponse) +async def revive_branch( + session_id: UUID, + branch_id: UUID, + body: ReviveRequest, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], +) -> BranchResponse: + """Revive a dead-end branch with new evidence.""" + await _get_user_session(session_id, current_user, db) + manager = BranchManager(db) + + try: + branch = await manager.revive_branch( + branch_id=branch_id, + evidence_from_branch_id=body.evidence_from_branch_id, + evidence_description=body.evidence_description, + ) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + await db.commit() + return BranchResponse.model_validate(branch) + + +@router.post("/{branch_id}/message", response_model=BranchMessageResponse) +async def send_branch_message( + session_id: UUID, + branch_id: UUID, + body: BranchMessageRequest, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], +) -> BranchMessageResponse: + """Send a message on a specific branch.""" + session = await _get_user_session(session_id, current_user, db) + + if session.status not in ("active", "paused"): + raise HTTPException(status_code=400, detail=f"Cannot message a {session.status} session") + + manager = BranchManager(db) + + # Switch to branch if not already active + if session.active_branch_id != branch_id: + await manager.switch_branch(session_id, branch_id) + await db.refresh(session) + + # Get branch + from sqlalchemy import select as sa_select + from app.models.session_branch import SessionBranch + result = await db.execute( + sa_select(SessionBranch).where(SessionBranch.id == branch_id) + ) + branch = result.scalar_one_or_none() + if not branch: + raise HTTPException(status_code=404, detail="Branch not found") + + # Build cross-branch context + sibling_ctx = await manager.build_cross_branch_context(branch_id) + + # Build prompt + builder = BranchAwarePromptBuilder() + session_context = f"Problem: {session.problem_summary or 'Unknown'}. Domain: {session.problem_domain or 'Unknown'}." + prompt_args = builder.build( + branch_messages=branch.conversation_messages, + sibling_summaries=sibling_ctx, + session_context=session_context, + attachments=[], # TODO: fetch image attachments for this branch + new_message=body.message, + revival_context=branch.evidence_description if branch.status == "revived" else None, + ) + + # Call AI + ai_content, input_tokens, output_tokens = await _call_ai(**prompt_args) + + # Update branch conversation + msgs = list(branch.conversation_messages or []) + msgs.append({"role": "user", "content": body.message}) + msgs.append({"role": "assistant", "content": ai_content}) + branch.conversation_messages = msgs + + # Update session token counts + session.total_input_tokens += input_tokens + session.total_output_tokens += output_tokens + + # Resume if paused + if session.status == "paused": + session.status = "active" + + await db.commit() + + return BranchMessageResponse( + content=ai_content, + branch_id=branch_id, + ) +``` + +- [ ] **Step 4: Register router in router.py** + +In `backend/app/api/router.py`, add: + +```python +from app.api.endpoints import session_branches +``` + +And: + +```python +api_router.include_router(session_branches.router) +``` + +- [ ] **Step 5: Run tests to verify they pass** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/test_session_branches_api.py -v --no-header` +Expected: All 3 tests PASS. + +- [ ] **Step 6: Commit** + +```bash +git add backend/app/api/endpoints/session_branches.py backend/tests/test_session_branches_api.py backend/app/api/router.py +git commit -m "feat: add branch API endpoints with integration tests" +``` + +### Task 12: Integrate Branching Into Existing Chat Service + +**Files:** +- Modify: `backend/app/services/unified_chat_service.py:54-132` +- Modify: `backend/app/services/flowpilot_engine.py` (step creation) + +- [ ] **Step 1: Add is_branching guard to unified_chat_service.send_chat_message** + +In `backend/app/services/unified_chat_service.py`, after the session status check (line ~82), add: + +```python + # If branching is active, route to branch message handler + if session.is_branching and session.active_branch_id: + from app.services.branch_manager import BranchManager + from app.services.branch_aware_prompt_builder import BranchAwarePromptBuilder + from app.models.session_branch import SessionBranch + + branch_result = await db.execute( + select(SessionBranch).where(SessionBranch.id == session.active_branch_id) + ) + branch = branch_result.scalar_one_or_none() + if branch: + manager = BranchManager(db) + sibling_ctx = await manager.build_cross_branch_context(branch.id) + + builder = BranchAwarePromptBuilder() + session_context = f"Problem: {session.problem_summary or 'Unknown'}. Domain: {session.problem_domain or 'Unknown'}." + prompt_args = builder.build( + branch_messages=branch.conversation_messages, + sibling_summaries=sibling_ctx, + session_context=session_context, + attachments=[], + new_message=message, + revival_context=branch.evidence_description if branch.status == "revived" else None, + ) + + # Override images from prompt_args with actual images if provided + if images: + prompt_args["images"] = images + ai_content, input_tokens, output_tokens = await _call_ai(**prompt_args) + + # Update branch conversation + msgs = list(branch.conversation_messages or []) + msgs.append({"role": "user", "content": message}) + msgs.append({"role": "assistant", "content": ai_content}) + branch.conversation_messages = msgs + + session.total_input_tokens += input_tokens + session.total_output_tokens += output_tokens + session.step_count += 2 + + if session.status == "paused": + session.status = "active" + + suggested_flows = extract_suggested_flows( + await rag_search(query=message, account_id=account_id, db=db, limit=8) + ) + return ai_content, suggested_flows, session +``` + +- [ ] **Step 2: Add branch_id to step creation in flowpilot_engine** + +In `backend/app/services/flowpilot_engine.py`, find the `AISessionStep(` constructor calls (around lines 640, 926, 1244). In each, add after `session_id=session.id`: + +```python + branch_id=session.active_branch_id if session.is_branching else None, +``` + +This is a one-line addition in each of three places. + +- [ ] **Step 3: Verify existing tests still pass** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/test_ai_sessions.py tests/test_ai_chat.py -v --no-header 2>&1 | tail -20` +Expected: All existing tests still pass (branching path not triggered). + +- [ ] **Step 4: Commit** + +```bash +git add backend/app/services/unified_chat_service.py backend/app/services/flowpilot_engine.py +git commit -m "feat: integrate branching into chat service and step creation" +``` + +--- + +## Phase 3: Handoff System + +### Task 13: Write HandoffManager Service — Tests First + +**Files:** +- Create: `backend/tests/test_handoff_manager.py` +- Create: `backend/app/services/handoff_manager.py` + +- [ ] **Step 1: Write failing tests** + +```python +# backend/tests/test_handoff_manager.py +"""Integration tests for HandoffManager service.""" +import pytest +from httpx import AsyncClient + +from app.models.ai_session import AISession + + +@pytest.mark.asyncio +async def test_create_park_handoff(client: AsyncClient, test_user, auth_headers, test_db): + """Parking a session creates a handoff with snapshot.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="discovery", + conversation_messages=[{"role": "user", "content": "help me"}], + ) + test_db.add(session) + await test_db.flush() + + from app.services.handoff_manager import HandoffManager + manager = HandoffManager(test_db) + + handoff = await manager.create_handoff( + session_id=session.id, + intent="park", + engineer_notes="Waiting for client to provide logs", + user_id=test_user["user_data"]["id"], + ) + + assert handoff is not None + assert handoff.intent == "park" + assert handoff.engineer_notes == "Waiting for client to provide logs" + assert handoff.snapshot is not None + + # Session should be paused + await test_db.refresh(session) + assert session.status == "paused" + assert session.handoff_count == 1 + + +@pytest.mark.asyncio +async def test_create_escalate_handoff(client: AsyncClient, test_user, auth_headers, test_db): + """Escalating creates handoff + dual-writes to escalation_package.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.flush() + + from app.services.handoff_manager import HandoffManager + manager = HandoffManager(test_db) + + handoff = await manager.create_handoff( + session_id=session.id, + intent="escalate", + engineer_notes="Need senior help", + user_id=test_user["user_data"]["id"], + ) + + assert handoff.intent == "escalate" + + # Dual-write check + await test_db.refresh(session) + assert session.status == "escalated" + assert session.escalation_package is not None + assert "branch_map" in session.escalation_package or "snapshot" in session.escalation_package + + +@pytest.mark.asyncio +async def test_claim_session(client: AsyncClient, test_user, test_admin, auth_headers, test_db): + """Claiming a handoff sets claimed_by and reactivates session.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.flush() + + from app.services.handoff_manager import HandoffManager + manager = HandoffManager(test_db) + + handoff = await manager.create_handoff( + session_id=session.id, + intent="escalate", + engineer_notes="Need help", + user_id=test_user["user_data"]["id"], + ) + + claimed = await manager.claim_session( + handoff_id=handoff.id, + claiming_user_id=test_admin["user_data"]["id"], + ) + + assert claimed.claimed_by == test_admin["user_data"]["id"] + assert claimed.claimed_at is not None + + await test_db.refresh(session) + assert session.status == "active" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/test_handoff_manager.py -v --no-header 2>&1 | head -15` +Expected: FAIL — `ModuleNotFoundError` + +- [ ] **Step 3: Implement HandoffManager** + +```python +# backend/app/services/handoff_manager.py +"""Handoff management — unified park/escalate with dual-write backward compat. + +Creates handoff snapshots, AI assessments (for escalations), claim workflow, +and queue queries. Dual-writes to ai_sessions.escalation_package for +backward compatibility with the existing escalation queue. +""" +import logging +from datetime import datetime, timezone +from typing import Any +from uuid import UUID + +from sqlalchemy import select, func +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.ai_session import AISession +from app.models.session_branch import SessionBranch +from app.models.session_handoff import SessionHandoff + +logger = logging.getLogger(__name__) + + +class HandoffManager: + """Unified park/escalate handoff management.""" + + def __init__(self, db: AsyncSession): + self.db = db + + async def create_handoff( + self, + session_id: UUID, + intent: str, + engineer_notes: str | None, + user_id: UUID, + priority: str = "normal", + ) -> SessionHandoff: + """Create a handoff (park or escalate). + + Generates snapshot, updates session status, dual-writes to + escalation_package for backward compat. + """ + result = await self.db.execute( + select(AISession).where(AISession.id == session_id) + ) + session = result.scalar_one_or_none() + if not session: + raise ValueError(f"Session {session_id} not found") + + # Generate snapshot + snapshot = await self._generate_snapshot(session) + + # Generate AI assessment for escalations + ai_assessment = None + ai_assessment_data = None + if intent == "escalate": + ai_assessment, ai_assessment_data = await self._generate_ai_assessment(session) + + handoff = SessionHandoff( + session_id=session_id, + handed_off_by=user_id, + intent=intent, + source_branch_id=session.active_branch_id, + snapshot=snapshot, + ai_assessment=ai_assessment, + ai_assessment_data=ai_assessment_data, + engineer_notes=engineer_notes, + priority=priority, + ) + self.db.add(handoff) + + # Update session status + if intent == "park": + session.status = "paused" + elif intent == "escalate": + session.status = "escalated" + + session.handoff_count = (session.handoff_count or 0) + 1 + + # Dual-write for backward compat + session.escalation_package = { + "snapshot": snapshot, + "intent": intent, + "engineer_notes": engineer_notes, + "handoff_id": str(handoff.id), + } + + await self.db.flush() + return handoff + + async def _generate_snapshot(self, session: AISession) -> dict[str, Any]: + """Generate a snapshot of the session state at handoff time.""" + snapshot: dict[str, Any] = { + "problem_summary": session.problem_summary, + "problem_domain": session.problem_domain, + "status": session.status, + "step_count": session.step_count, + "confidence_tier": session.confidence_tier, + } + + # Add branch map if branching is active + if session.is_branching: + branches_result = await self.db.execute( + select(SessionBranch) + .where(SessionBranch.session_id == session.id) + .order_by(SessionBranch.branch_order) + ) + branches = list(branches_result.scalars().all()) + + branch_map = [] + for b in branches: + branch_map.append({ + "id": str(b.id), + "label": b.label, + "status": b.status, + "status_reason": b.status_reason, + "parent_branch_id": str(b.parent_branch_id) if b.parent_branch_id else None, + }) + snapshot["branch_map"] = branch_map + snapshot["active_branch_id"] = str(session.active_branch_id) if session.active_branch_id else None + + return snapshot + + async def claim_session( + self, + handoff_id: UUID, + claiming_user_id: UUID, + ) -> SessionHandoff: + """Claim a handed-off session.""" + result = await self.db.execute( + select(SessionHandoff).where(SessionHandoff.id == handoff_id) + ) + handoff = result.scalar_one_or_none() + if not handoff: + raise ValueError(f"Handoff {handoff_id} not found") + + handoff.claimed_by = claiming_user_id + handoff.claimed_at = datetime.now(timezone.utc) + + # Reactivate session + session_result = await self.db.execute( + select(AISession).where(AISession.id == handoff.session_id) + ) + session = session_result.scalar_one() + session.status = "active" + + # Dual-write + session.escalated_to_id = claiming_user_id + + await self.db.flush() + return handoff + + async def _generate_ai_assessment( + self, session: AISession + ) -> tuple[str | None, dict[str, Any] | None]: + """Generate AI diagnostic assessment for escalation handoffs.""" + try: + from app.services.assistant_chat_service import _call_ai + + context = f"Problem: {session.problem_summary or 'Unknown'}\nDomain: {session.problem_domain or 'Unknown'}" + msgs = session.conversation_messages or [] + # Include last 10 messages for context + recent = "\n".join( + f"[{m.get('role', '?')}]: {m.get('content', '')[:200]}" + for m in msgs[-10:] + ) + + assessment_text, _, _ = await _call_ai( + system_base="You are a diagnostic assessment generator for MSP escalations.", + rag_context="", + history=[], + new_message=( + f"Generate a brief diagnostic assessment for this escalation.\n" + f"{context}\n\nRecent conversation:\n{recent}\n\n" + f"Return: 1) Most likely cause, 2) Suggested next steps, 3) Confidence (low/medium/high)" + ), + max_tokens=500, + ) + + assessment_data = { + "likely_cause": "See assessment text", + "suggested_steps": [], + "confidence": "medium", + } + + return assessment_text, assessment_data + except Exception: + logger.exception("Failed to generate AI assessment") + return None, None + + async def generate_briefing( + self, handoff_id: UUID, claiming_user_id: UUID + ) -> str: + """Generate a natural-language briefing for the engineer claiming the session.""" + result = await self.db.execute( + select(SessionHandoff).where(SessionHandoff.id == handoff_id) + ) + handoff = result.scalar_one_or_none() + if not handoff: + raise ValueError(f"Handoff {handoff_id} not found") + + session_result = await self.db.execute( + select(AISession).where(AISession.id == handoff.session_id) + ) + session = session_result.scalar_one() + + from app.services.assistant_chat_service import _call_ai + + snapshot_text = str(handoff.snapshot)[:2000] + briefing, _, _ = await _call_ai( + system_base="You are a handoff briefing generator for MSP teams.", + rag_context="", + history=[], + new_message=( + f"Generate a concise briefing for an engineer picking up this session.\n" + f"Problem: {session.problem_summary}\n" + f"Intent: {handoff.intent}\n" + f"Engineer notes: {handoff.engineer_notes or 'None'}\n" + f"Snapshot: {snapshot_text}\n" + f"AI Assessment: {handoff.ai_assessment or 'None'}" + ), + max_tokens=500, + ) + return briefing + + async def push_to_psa(self, handoff_id: UUID) -> SessionHandoff: + """Push handoff notes to PSA via existing psa_documentation_service.""" + result = await self.db.execute( + select(SessionHandoff).where(SessionHandoff.id == handoff_id) + ) + handoff = result.scalar_one_or_none() + if not handoff: + raise ValueError(f"Handoff {handoff_id} not found") + + # Route to existing PSA documentation service + try: + from app.services.psa_documentation_service import push_session_notes + session_result = await self.db.execute( + select(AISession).where(AISession.id == handoff.session_id) + ) + session = session_result.scalar_one() + if session.psa_ticket_id and session.psa_connection_id: + note_id = await push_session_notes( + session=session, + notes_content=handoff.ai_assessment or str(handoff.snapshot), + db=self.db, + ) + handoff.psa_note_pushed = True + handoff.psa_note_id = note_id + except Exception: + logger.exception(f"Failed to push handoff {handoff_id} to PSA") + + await self.db.flush() + return handoff + + async def get_queue( + self, + team_id: UUID | None = None, + account_id: UUID | None = None, + ) -> list[dict[str, Any]]: + """Get team queue of parked + escalated sessions.""" + query = ( + select(SessionHandoff, AISession) + .join(AISession, SessionHandoff.session_id == AISession.id) + .where(SessionHandoff.claimed_by.is_(None)) + .order_by(SessionHandoff.created_at.desc()) + ) + + if team_id: + query = query.where(AISession.team_id == team_id) + elif account_id: + query = query.where(AISession.account_id == account_id) + + result = await self.db.execute(query) + rows = result.all() + + queue_items = [] + for handoff, session in rows: + queue_items.append({ + "handoff_id": handoff.id, + "session_id": session.id, + "intent": handoff.intent, + "problem_summary": session.problem_summary, + "problem_domain": session.problem_domain, + "priority": handoff.priority, + "engineer_notes": handoff.engineer_notes, + "created_at": handoff.created_at, + "claimed_by": handoff.claimed_by, + "claimed_at": handoff.claimed_at, + }) + + return queue_items +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/test_handoff_manager.py -v --no-header` +Expected: All 3 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/tests/test_handoff_manager.py backend/app/services/handoff_manager.py +git commit -m "feat: add HandoffManager service with dual-write and integration tests" +``` + +### Task 14: Write Handoff API Endpoints + +**Files:** +- Create: `backend/app/api/endpoints/session_handoffs.py` +- Modify: `backend/app/api/router.py` +- Create: `backend/tests/test_session_handoffs_api.py` + +- [ ] **Step 1: Write failing API tests** + +```python +# backend/tests/test_session_handoffs_api.py +"""API endpoint tests for session handoffs.""" +import pytest +from httpx import AsyncClient + +from app.models.ai_session import AISession + + +@pytest.mark.asyncio +async def test_create_park_handoff_api(client: AsyncClient, test_user, auth_headers, test_db): + """POST /ai-sessions/{id}/handoff with intent=park.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.commit() + + resp = await client.post( + f"/api/v1/ai-sessions/{session.id}/handoff", + headers=auth_headers, + json={"intent": "park", "engineer_notes": "Waiting for logs"}, + ) + assert resp.status_code == 201 + data = resp.json() + assert data["intent"] == "park" + + +@pytest.mark.asyncio +async def test_get_queue(client: AsyncClient, test_user, auth_headers, test_db): + """GET /ai-sessions/queue returns unclaimed handoffs.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="discovery", + conversation_messages=[], + ) + test_db.add(session) + await test_db.commit() + + # Create a handoff + await client.post( + f"/api/v1/ai-sessions/{session.id}/handoff", + headers=auth_headers, + json={"intent": "escalate", "engineer_notes": "Need help"}, + ) + + resp = await client.get("/api/v1/ai-sessions/queue", headers=auth_headers) + assert resp.status_code == 200 + data = resp.json() + assert len(data) >= 1 +``` + +- [ ] **Step 2: Implement session_handoffs endpoint** + +```python +# backend/app/api/endpoints/session_handoffs.py +"""Handoff endpoints — unified park/escalate. + + POST /ai-sessions/{id}/handoff — Create handoff + GET /ai-sessions/{id}/handoffs — Handoff history + POST /ai-sessions/{id}/handoffs/{hid}/claim — Claim session + GET /ai-sessions/queue — Team queue +""" +import logging +from typing import Annotated +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.deps import get_current_active_user, get_db +from app.models.user import User +from app.models.ai_session import AISession +from app.models.session_handoff import SessionHandoff +from app.services.handoff_manager import HandoffManager +from app.schemas.session_handoff import ( + HandoffCreateRequest, + HandoffResponse, +) + +logger = logging.getLogger(__name__) + +# Queue endpoint needs its own router (no session_id prefix) +queue_router = APIRouter(prefix="/ai-sessions", tags=["session-handoffs"]) + +# Session-scoped endpoints +router = APIRouter(prefix="/ai-sessions/{session_id}", tags=["session-handoffs"]) + + +@router.post("/handoff", response_model=HandoffResponse, status_code=status.HTTP_201_CREATED) +async def create_handoff( + session_id: UUID, + body: HandoffCreateRequest, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], +) -> HandoffResponse: + """Create a handoff (park or escalate).""" + result = await db.execute( + select(AISession).where( + AISession.id == session_id, + AISession.user_id == current_user.id, + ) + ) + session = result.scalar_one_or_none() + if not session: + raise HTTPException(status_code=404, detail="Session not found") + + manager = HandoffManager(db) + try: + handoff = await manager.create_handoff( + session_id=session_id, + intent=body.intent, + engineer_notes=body.engineer_notes, + user_id=current_user.id, + priority=body.priority, + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + await db.commit() + return HandoffResponse.model_validate(handoff) + + +@router.get("/handoffs", response_model=list[HandoffResponse]) +async def list_handoffs( + session_id: UUID, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], +) -> list[HandoffResponse]: + """Get handoff history for a session.""" + result = await db.execute( + select(SessionHandoff) + .where(SessionHandoff.session_id == session_id) + .order_by(SessionHandoff.created_at.desc()) + ) + handoffs = result.scalars().all() + return [HandoffResponse.model_validate(h) for h in handoffs] + + +@router.post("/handoffs/{handoff_id}/claim", response_model=HandoffResponse) +async def claim_handoff( + session_id: UUID, + handoff_id: UUID, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], +) -> HandoffResponse: + """Claim a handed-off session.""" + manager = HandoffManager(db) + try: + handoff = await manager.claim_session( + handoff_id=handoff_id, + claiming_user_id=current_user.id, + ) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + await db.commit() + return HandoffResponse.model_validate(handoff) + + +@queue_router.get("/queue") +async def get_queue( + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], +) -> list[dict]: + """Get team queue of parked + escalated sessions.""" + manager = HandoffManager(db) + return await manager.get_queue( + team_id=current_user.team_id, + account_id=current_user.account_id, + ) +``` + +- [ ] **Step 3: Register both routers in router.py** + +Add to `backend/app/api/router.py`: + +```python +from app.api.endpoints import session_handoffs +``` + +```python +api_router.include_router(session_handoffs.router) +api_router.include_router(session_handoffs.queue_router) +``` + +- [ ] **Step 4: Run tests** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/test_session_handoffs_api.py -v --no-header` +Expected: All 2 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/api/endpoints/session_handoffs.py backend/tests/test_session_handoffs_api.py backend/app/api/router.py +git commit -m "feat: add handoff API endpoints with queue and integration tests" +``` + +--- + +## Phase 4: Resolution Outputs + +### Task 15: Write ResolutionOutputGenerator Service — Tests First + +**Files:** +- Create: `backend/tests/test_resolution_outputs.py` +- Create: `backend/app/services/resolution_output_generator.py` + +- [ ] **Step 1: Write failing tests** + +```python +# backend/tests/test_resolution_outputs.py +"""Integration tests for ResolutionOutputGenerator.""" +import pytest +from unittest.mock import AsyncMock, patch +from httpx import AsyncClient + +from app.models.ai_session import AISession +from app.models.session_resolution_output import SessionResolutionOutput + + +@pytest.mark.asyncio +@patch("app.services.resolution_output_generator._call_ai") +async def test_generate_all_creates_three_outputs( + mock_call_ai, client: AsyncClient, test_user, auth_headers, test_db +): + """generate_all creates PSA notes, KB article, and client summary.""" + mock_call_ai.return_value = ("Generated content here", 100, 50) + + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="resolved", + confidence_tier="guided", + conversation_messages=[ + {"role": "user", "content": "DNS not working"}, + {"role": "assistant", "content": "Fixed by flushing DNS cache"}, + ], + resolution_summary="Flushed DNS cache", + ) + test_db.add(session) + await test_db.flush() + + from app.services.resolution_output_generator import ResolutionOutputGenerator + gen = ResolutionOutputGenerator(test_db) + outputs = await gen.generate_all(session.id) + + assert len(outputs) == 3 + types = {o.output_type for o in outputs} + assert types == {"psa_ticket_notes", "knowledge_base", "client_summary"} + assert all(o.status == "draft" for o in outputs) + assert mock_call_ai.call_count == 3 + + +@pytest.mark.asyncio +async def test_edit_output(client: AsyncClient, test_user, auth_headers, test_db): + """Editing an output stores edited_content.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="resolved", + confidence_tier="guided", + conversation_messages=[], + resolution_summary="Fixed it", + ) + test_db.add(session) + await test_db.flush() + + output = SessionResolutionOutput( + session_id=session.id, + output_type="psa_ticket_notes", + generated_content="Original notes", + status="draft", + generated_by_model="claude-sonnet-4-6", + ) + test_db.add(output) + await test_db.flush() + + from app.services.resolution_output_generator import ResolutionOutputGenerator + gen = ResolutionOutputGenerator(test_db) + edited = await gen.edit_output(output.id, "My edited notes") + + assert edited.edited_content == "My edited notes" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/test_resolution_outputs.py -v --no-header 2>&1 | head -15` +Expected: FAIL — `ModuleNotFoundError` + +- [ ] **Step 3: Implement ResolutionOutputGenerator** + +```python +# backend/app/services/resolution_output_generator.py +"""Resolution output generator — three deliverables on session resolve. + +Generates PSA ticket notes, KB article draft, and client-facing summary. +Each is a separate LLM call through _call_ai. +""" +import logging +from typing import Any +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from app.core.config import settings +from app.models.ai_session import AISession +from app.models.session_branch import SessionBranch +from app.models.session_resolution_output import SessionResolutionOutput +from app.services.assistant_chat_service import _call_ai + +logger = logging.getLogger(__name__) + +# Model used for resolution output generation +RESOLUTION_MODEL = "claude-sonnet-4-6" + + +class ResolutionOutputGenerator: + """Generates three resolution outputs on session resolve.""" + + def __init__(self, db: AsyncSession): + self.db = db + + async def generate_all(self, session_id: UUID) -> list[SessionResolutionOutput]: + """Generate all three outputs for a resolved session.""" + result = await self.db.execute( + select(AISession).where(AISession.id == session_id) + ) + session = result.scalar_one_or_none() + if not session: + raise ValueError(f"Session {session_id} not found") + + # Build session context for prompts + context = self._build_session_context(session) + + # Generate all three + outputs = [] + for output_type, prompt in [ + ("psa_ticket_notes", self._psa_notes_prompt(context)), + ("knowledge_base", self._kb_article_prompt(context)), + ("client_summary", self._client_summary_prompt(context)), + ]: + content, _, _ = await _call_ai( + system_base="You are a technical documentation assistant for MSP teams.", + rag_context="", + history=[], + new_message=prompt, + max_tokens=2048, + ) + + output = SessionResolutionOutput( + session_id=session_id, + output_type=output_type, + generated_content=content, + status="draft", + generated_by_model=RESOLUTION_MODEL, + ) + self.db.add(output) + outputs.append(output) + + await self.db.flush() + return outputs + + async def edit_output(self, output_id: UUID, edited_content: str) -> SessionResolutionOutput: + """Save engineer's edited version of an output.""" + result = await self.db.execute( + select(SessionResolutionOutput).where(SessionResolutionOutput.id == output_id) + ) + output = result.scalar_one_or_none() + if not output: + raise ValueError(f"Output {output_id} not found") + + output.edited_content = edited_content + await self.db.flush() + return output + + async def push_output( + self, output_id: UUID, destination: str + ) -> SessionResolutionOutput: + """Push output to a destination (psa, kb_library, clipboard, email).""" + result = await self.db.execute( + select(SessionResolutionOutput).where(SessionResolutionOutput.id == output_id) + ) + output = result.scalar_one_or_none() + if not output: + raise ValueError(f"Output {output_id} not found") + + # Use edited content if available, otherwise generated + content = output.edited_content or output.generated_content + + if destination == "psa": + # Route to existing PSA documentation service + # TODO: Wire up psa_documentation_service.push_note() + pass + elif destination == "kb_library": + # TODO: Wire up flow/step library creation + pass + + from datetime import datetime, timezone + output.status = "pushed" + output.pushed_to = destination + output.pushed_at = datetime.now(timezone.utc) + + await self.db.flush() + return output + + def _build_session_context(self, session: AISession) -> str: + """Build a text summary of the session for prompts.""" + parts = [ + f"Problem: {session.problem_summary or 'Unknown'}", + f"Domain: {session.problem_domain or 'Unknown'}", + f"Resolution: {session.resolution_summary or 'Not specified'}", + f"Steps taken: {session.step_count}", + ] + # Include conversation highlights + msgs = session.conversation_messages or [] + if msgs: + parts.append("\nConversation highlights:") + for msg in msgs[-10:]: # Last 10 messages + role = msg.get("role", "unknown") + content = msg.get("content", "")[:200] + parts.append(f" [{role}]: {content}") + + return "\n".join(parts) + + def _psa_notes_prompt(self, context: str) -> str: + return ( + f"Generate professional PSA ticket notes for this resolved troubleshooting session.\n" + f"Format as structured markdown with: Problem, Diagnostic Steps, Resolution, Recommendations.\n\n" + f"{context}" + ) + + def _kb_article_prompt(self, context: str) -> str: + return ( + f"Generate a knowledge base article draft from this resolved session.\n" + f"Include: Symptoms, Root Cause, Resolution Steps, Things to Rule Out First.\n" + f"Dead-end branches (if any) should become 'Rule Out First' guidance.\n\n" + f"{context}" + ) + + def _client_summary_prompt(self, context: str) -> str: + return ( + f"Generate a non-technical summary for the end user/client.\n" + f"Explain what was wrong and what was done to fix it in plain language.\n" + f"No jargon. 2-3 paragraphs max.\n\n" + f"{context}" + ) +``` + +- [ ] **Step 4: Run tests** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/test_resolution_outputs.py -v --no-header` +Expected: All 2 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/tests/test_resolution_outputs.py backend/app/services/resolution_output_generator.py +git commit -m "feat: add ResolutionOutputGenerator with three-output generation" +``` + +### Task 16: Write Resolution API Endpoints + +**Files:** +- Create: `backend/app/api/endpoints/session_resolutions.py` +- Create: `backend/tests/test_session_resolutions_api.py` +- Modify: `backend/app/api/router.py` + +- [ ] **Step 1: Write failing API tests** + +```python +# backend/tests/test_session_resolutions_api.py +"""API tests for resolution output endpoints.""" +import pytest +from unittest.mock import patch +from httpx import AsyncClient + +from app.models.ai_session import AISession +from app.models.session_resolution_output import SessionResolutionOutput + + +@pytest.mark.asyncio +@patch("app.services.resolution_output_generator._call_ai") +async def test_resolve_generates_outputs(mock_call_ai, client: AsyncClient, test_user, auth_headers, test_db): + """POST /ai-sessions/{id}/resolve generates 3 outputs.""" + mock_call_ai.return_value = ("Generated content", 100, 50) + + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="active", + confidence_tier="guided", + conversation_messages=[{"role": "user", "content": "help"}], + ) + test_db.add(session) + await test_db.commit() + + resp = await client.get( + f"/api/v1/ai-sessions/{session.id}/outputs", + headers=auth_headers, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["outputs"] == [] + + +@pytest.mark.asyncio +async def test_edit_output_api(client: AsyncClient, test_user, auth_headers, test_db): + """PATCH /ai-sessions/{id}/outputs/{oid} edits content.""" + session = AISession( + user_id=test_user["user_data"]["id"], + account_id=test_user["user_data"]["account_id"], + session_type="guided", + intake_type="free_text", + intake_content={"text": "test"}, + status="resolved", + confidence_tier="guided", + conversation_messages=[], + resolution_summary="Fixed", + ) + test_db.add(session) + await test_db.flush() + + output = SessionResolutionOutput( + session_id=session.id, + output_type="psa_ticket_notes", + generated_content="Original", + status="draft", + generated_by_model="claude-sonnet-4-6", + ) + test_db.add(output) + await test_db.commit() + + resp = await client.patch( + f"/api/v1/ai-sessions/{session.id}/outputs/{output.id}", + headers=auth_headers, + json={"edited_content": "My edited version"}, + ) + assert resp.status_code == 200 + assert resp.json()["edited_content"] == "My edited version" +``` + +- [ ] **Step 2: Implement session_resolutions endpoint** + +```python +# backend/app/api/endpoints/session_resolutions.py +"""Resolution output endpoints. + + GET /ai-sessions/{id}/outputs — Get all resolution outputs + PATCH /ai-sessions/{id}/outputs/{oid} — Edit output + POST /ai-sessions/{id}/outputs/{oid}/push — Push to destination +""" +import logging +from typing import Annotated +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.api.deps import get_current_active_user, get_db +from app.models.user import User +from app.models.ai_session import AISession +from app.models.session_resolution_output import SessionResolutionOutput +from app.services.resolution_output_generator import ResolutionOutputGenerator +from app.schemas.session_resolution import ( + ResolutionOutputResponse, + ResolutionOutputEditRequest, + ResolutionOutputPushRequest, + ResolutionOutputPushResponse, + AllResolutionOutputsResponse, +) + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/ai-sessions/{session_id}", tags=["session-resolutions"]) + + +@router.get("/outputs", response_model=AllResolutionOutputsResponse) +async def get_outputs( + session_id: UUID, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], +) -> AllResolutionOutputsResponse: + """Get all resolution outputs for a session.""" + result = await db.execute( + select(SessionResolutionOutput) + .where(SessionResolutionOutput.session_id == session_id) + .order_by(SessionResolutionOutput.output_type) + ) + outputs = result.scalars().all() + return AllResolutionOutputsResponse( + outputs=[ResolutionOutputResponse.model_validate(o) for o in outputs] + ) + + +@router.patch("/outputs/{output_id}", response_model=ResolutionOutputResponse) +async def edit_output( + session_id: UUID, + output_id: UUID, + body: ResolutionOutputEditRequest, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], +) -> ResolutionOutputResponse: + """Edit an output before pushing.""" + gen = ResolutionOutputGenerator(db) + try: + output = await gen.edit_output(output_id, body.edited_content) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + await db.commit() + return ResolutionOutputResponse.model_validate(output) + + +@router.post("/outputs/{output_id}/push", response_model=ResolutionOutputPushResponse) +async def push_output( + session_id: UUID, + output_id: UUID, + body: ResolutionOutputPushRequest, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], +) -> ResolutionOutputPushResponse: + """Push output to destination.""" + gen = ResolutionOutputGenerator(db) + try: + output = await gen.push_output(output_id, body.destination) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + await db.commit() + return ResolutionOutputPushResponse( + output_id=output.id, + status=output.status, + pushed_to=output.pushed_to or body.destination, + pushed_reference=output.pushed_reference, + ) +``` + +- [ ] **Step 3: Register router** + +Add to `backend/app/api/router.py`: + +```python +from app.api.endpoints import session_resolutions +``` + +```python +api_router.include_router(session_resolutions.router) +``` + +- [ ] **Step 4: Run tests** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/test_session_resolutions_api.py -v --no-header` +Expected: All 2 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add backend/app/api/endpoints/session_resolutions.py backend/tests/test_session_resolutions_api.py backend/app/api/router.py +git commit -m "feat: add resolution output API endpoints" +``` + +--- + +## Phase 5: AI Description Pipeline + +### Task 17: Extend Upload Endpoint with AI Description + +**Files:** +- Modify: `backend/app/api/endpoints/uploads.py:38-125` + +- [ ] **Step 1: Add background AI description task to upload endpoint** + +In `backend/app/api/endpoints/uploads.py`, after the `await db.commit()` (line ~113), add: + +```python + # Kick off async AI description generation (non-blocking) + import asyncio + asyncio.create_task( + _generate_ai_description(upload.id, file_data, content_type) + ) +``` + +And add this function at module level (after the `_check_storage_configured` function): + +```python +async def _generate_ai_description(upload_id: UUID, file_data: bytes, content_type: str) -> None: + """Background task: generate AI description for uploaded file. + + Catches all exceptions — upload is still usable if this fails. + """ + try: + from app.core.database import async_session_maker + from app.services.assistant_chat_service import _call_ai + import base64 + + async with async_session_maker() as db: + result = await db.execute( + select(FileUpload).where(FileUpload.id == upload_id) + ) + upload = result.scalar_one_or_none() + if not upload: + return + + if content_type.startswith("image/"): + # Image: send to AI with vision + b64_data = base64.b64encode(file_data).decode("utf-8") + description, _, _ = await _call_ai( + system_base="You are a technical image analyst for IT troubleshooting.", + rag_context="", + history=[], + new_message="Describe this image in one sentence for a troubleshooting context log.", + images=[{"media_type": content_type, "data": b64_data}], + max_tokens=100, + ) + upload.ai_description = description + elif content_type.startswith("text/") or content_type in ( + "application/json", "application/xml", "application/yaml", + ): + # Text file: extract content directly + try: + text_content = file_data.decode("utf-8") + except UnicodeDecodeError: + text_content = file_data.decode("latin-1") + + upload.extracted_content = text_content[:10000] # Cap at 10k chars + + # Summarize if long + if len(text_content) > 2000: + summary, _, _ = await _call_ai( + system_base="You are a technical log/config analyst.", + rag_context="", + history=[], + new_message=f"Summarize this file content in 2-3 sentences:\n\n{text_content[:5000]}", + max_tokens=200, + ) + upload.content_summary = summary + upload.ai_description = summary + else: + upload.ai_description = f"Text file: {upload.filename}" + + await db.commit() + except Exception: + logger.exception(f"Failed to generate AI description for upload {upload_id}") +``` + +Also add `from app.core.database import async_session_maker` to the imports if not already present. Check if `async_session_maker` exists in `core/database.py` — if not, add it there. + +- [ ] **Step 2: Verify existing upload tests still pass** + +Run: `cd /home/coder/resolutionflow/backend && pytest tests/ -k "upload" -v --no-header 2>&1 | tail -10` +Expected: Existing tests pass (background task is fire-and-forget). + +- [ ] **Step 3: Commit** + +```bash +git add backend/app/api/endpoints/uploads.py +git commit -m "feat: add async AI description generation on file upload" +``` + +--- + +## Phase 6: Frontend + +### Task 18: Add TypeScript Types for Branching + +**Files:** +- Create: `frontend/src/types/branching.ts` +- Modify: `frontend/src/types/ai-session.ts:185-198` +- Modify: `frontend/src/types/index.ts` + +- [ ] **Step 1: Create branching types** + +```typescript +// frontend/src/types/branching.ts + +// ── Branch ── + +export interface BranchResponse { + id: string + session_id: string + parent_branch_id: string | null + fork_point_step_id: string | null + branch_order: number + label: string + status: 'active' | 'dead_end' | 'solved' | 'untried' | 'revived' + status_reason: string | null + status_changed_at: string | null + context_summary: { tried: string[]; concluded: string; artifacts: string[] } | null + evidence_from_branch_id: string | null + evidence_description: string | null + step_count: number + created_at: string + updated_at: string +} + +export interface BranchTreeResponse { + branches: BranchResponse[] + active_branch_id: string | null +} + +// ── Fork ── + +export interface ForkOption { + label: string + description: string +} + +export interface ForkCreateRequest { + fork_reason: string + options: ForkOption[] +} + +export interface ForkPointResponse { + id: string + session_id: string + parent_branch_id: string + trigger_step_id: string | null + fork_reason: string + options: Array<{ label: string; description: string; branch_id: string; status: string }> + created_at: string +} + +// ── Switch ── + +export interface BranchSwitchResponse { + active_branch_id: string + branch: BranchResponse + conversation_messages: Array<{ role: string; content: string }> +} + +// ── Revival ── + +export interface ReviveRequest { + evidence_from_branch_id: string + evidence_description: string +} + +// ── Branch message ── + +export interface BranchMessageRequest { + message: string + upload_ids?: string[] +} + +export interface BranchMessageResponse { + content: string + branch_id: string + step_id: string | null +} + +// ── Handoff ── + +export interface HandoffCreateRequest { + intent: 'park' | 'escalate' + engineer_notes?: string + priority?: 'normal' | 'elevated' +} + +export interface HandoffResponse { + id: string + session_id: string + handed_off_by: string + intent: 'park' | 'escalate' + source_branch_id: string | null + snapshot: Record + ai_assessment: string | null + ai_assessment_data: { likely_cause: string; suggested_steps: string[]; confidence: number } | null + artifacts: Array<{ name: string; type: string; reference: string }> | null + engineer_notes: string | null + priority: 'normal' | 'elevated' + claimed_by: string | null + claimed_at: string | null + psa_note_pushed: boolean + notification_sent: boolean + created_at: string +} + +export interface QueueItemResponse { + handoff_id: string + session_id: string + intent: 'park' | 'escalate' + problem_summary: string | null + problem_domain: string | null + priority: 'normal' | 'elevated' + handed_off_by_name: string | null + engineer_notes: string | null + branch_count: number + created_at: string + claimed_by: string | null + claimed_at: string | null +} + +// ── Resolution Outputs ── + +export type ResolutionOutputType = 'psa_ticket_notes' | 'knowledge_base' | 'client_summary' +export type ResolutionOutputStatus = 'draft' | 'approved' | 'pushed' | 'rejected' + +export interface ResolutionOutputResponse { + id: string + session_id: string + output_type: ResolutionOutputType + generated_content: string + structured_data: Record | null + edited_content: string | null + status: ResolutionOutputStatus + pushed_to: string | null + pushed_at: string | null + pushed_reference: string | null + generated_by_model: string + created_at: string + updated_at: string +} + +export interface AllResolutionOutputsResponse { + outputs: ResolutionOutputResponse[] +} + +export interface ResolutionOutputEditRequest { + edited_content: string +} + +export type PushDestination = 'psa' | 'kb_library' | 'clipboard' | 'email' + +export interface ResolutionOutputPushRequest { + destination: PushDestination +} +``` + +- [ ] **Step 2: Add branching fields to AISessionDetail** + +In `frontend/src/types/ai-session.ts`, add to `AISessionDetail` interface (after `conversation_messages`): + +```typescript + is_branching: boolean + active_branch_id: string | null +``` + +- [ ] **Step 3: Export from types/index.ts** + +Add `export * from './branching'` to `frontend/src/types/index.ts` (or equivalent). + +- [ ] **Step 4: Verify build** + +Run: `cd /home/coder/resolutionflow/frontend && npx tsc --noEmit 2>&1 | head -20` +Expected: No type errors. + +- [ ] **Step 5: Commit** + +```bash +git add frontend/src/types/branching.ts frontend/src/types/ai-session.ts frontend/src/types/index.ts +git commit -m "feat: add TypeScript types for branching, handoffs, and resolution outputs" +``` + +### Task 19: Add Frontend API Clients + +**Files:** +- Create: `frontend/src/api/branches.ts` +- Create: `frontend/src/api/handoffs.ts` +- Create: `frontend/src/api/resolutions.ts` +- Modify: `frontend/src/api/index.ts` + +- [ ] **Step 1: Create branches API client** + +```typescript +// frontend/src/api/branches.ts +import apiClient from './client' +import type { + BranchTreeResponse, + BranchResponse, + ForkCreateRequest, + ForkPointResponse, + BranchSwitchResponse, + ReviveRequest, + BranchMessageRequest, + BranchMessageResponse, +} from '@/types/branching' + +export const branchesApi = { + async getBranches(sessionId: string): Promise { + const resp = await apiClient.get(`/ai-sessions/${sessionId}/branches`) + return resp.data + }, + + async createFork(sessionId: string, data: ForkCreateRequest): Promise { + const resp = await apiClient.post(`/ai-sessions/${sessionId}/branches/fork`, data) + return resp.data + }, + + async updateBranchStatus(sessionId: string, branchId: string, data: { status: string; status_reason?: string }): Promise { + const resp = await apiClient.patch(`/ai-sessions/${sessionId}/branches/${branchId}`, data) + return resp.data + }, + + async switchBranch(sessionId: string, branchId: string): Promise { + const resp = await apiClient.post(`/ai-sessions/${sessionId}/branches/${branchId}/switch`) + return resp.data + }, + + async reviveBranch(sessionId: string, branchId: string, data: ReviveRequest): Promise { + const resp = await apiClient.post(`/ai-sessions/${sessionId}/branches/${branchId}/revive`, data) + return resp.data + }, + + async sendBranchMessage(sessionId: string, branchId: string, data: BranchMessageRequest): Promise { + const resp = await apiClient.post(`/ai-sessions/${sessionId}/branches/${branchId}/message`, data) + return resp.data + }, +} +``` + +- [ ] **Step 2: Create handoffs API client** + +```typescript +// frontend/src/api/handoffs.ts +import apiClient from './client' +import type { + HandoffCreateRequest, + HandoffResponse, + QueueItemResponse, +} from '@/types/branching' + +export const handoffsApi = { + async createHandoff(sessionId: string, data: HandoffCreateRequest): Promise { + const resp = await apiClient.post(`/ai-sessions/${sessionId}/handoff`, data) + return resp.data + }, + + async listHandoffs(sessionId: string): Promise { + const resp = await apiClient.get(`/ai-sessions/${sessionId}/handoffs`) + return resp.data + }, + + async claimHandoff(sessionId: string, handoffId: string): Promise { + const resp = await apiClient.post(`/ai-sessions/${sessionId}/handoffs/${handoffId}/claim`) + return resp.data + }, + + async getQueue(): Promise { + const resp = await apiClient.get('/ai-sessions/queue') + return resp.data + }, +} +``` + +- [ ] **Step 3: Create resolutions API client** + +```typescript +// frontend/src/api/resolutions.ts +import apiClient from './client' +import type { + AllResolutionOutputsResponse, + ResolutionOutputResponse, + ResolutionOutputEditRequest, + ResolutionOutputPushRequest, +} from '@/types/branching' + +export const resolutionsApi = { + async getOutputs(sessionId: string): Promise { + const resp = await apiClient.get(`/ai-sessions/${sessionId}/outputs`) + return resp.data + }, + + async editOutput(sessionId: string, outputId: string, data: ResolutionOutputEditRequest): Promise { + const resp = await apiClient.patch(`/ai-sessions/${sessionId}/outputs/${outputId}`, data) + return resp.data + }, + + async pushOutput(sessionId: string, outputId: string, data: ResolutionOutputPushRequest): Promise<{ output_id: string; status: string; pushed_to: string; pushed_reference: string | null }> { + const resp = await apiClient.post(`/ai-sessions/${sessionId}/outputs/${outputId}/push`, data) + return resp.data + }, +} +``` + +- [ ] **Step 4: Export from api/index.ts** + +Add to `frontend/src/api/index.ts`: + +```typescript +export { branchesApi } from './branches' +export { handoffsApi } from './handoffs' +export { resolutionsApi } from './resolutions' +``` + +- [ ] **Step 5: Verify build** + +Run: `cd /home/coder/resolutionflow/frontend && npx tsc --noEmit 2>&1 | head -10` +Expected: No errors. + +- [ ] **Step 6: Commit** + +```bash +git add frontend/src/api/branches.ts frontend/src/api/handoffs.ts frontend/src/api/resolutions.ts frontend/src/api/index.ts +git commit -m "feat: add frontend API clients for branches, handoffs, and resolutions" +``` + +### Task 20: Create useBranching Hook + +**Files:** +- Create: `frontend/src/hooks/useBranching.ts` + +- [ ] **Step 1: Implement the useBranching hook** + +```typescript +// frontend/src/hooks/useBranching.ts +import { useState, useCallback } from 'react' +import { branchesApi } from '@/api' +import type { + BranchResponse, + BranchTreeResponse, + ForkCreateRequest, + ForkPointResponse, + BranchSwitchResponse, + BranchMessageResponse, +} from '@/types/branching' +import { toast } from '@/lib/toast' + +export interface UseBranching { + branches: BranchResponse[] + activeBranchId: string | null + isLoading: boolean + + loadBranches: (sessionId: string) => Promise + createFork: (sessionId: string, data: ForkCreateRequest) => Promise + switchBranch: (sessionId: string, branchId: string) => Promise + updateStatus: (sessionId: string, branchId: string, status: string, reason?: string) => Promise + reviveBranch: (sessionId: string, branchId: string, evidenceFromId: string, description: string) => Promise + sendMessage: (sessionId: string, branchId: string, message: string) => Promise +} + +export function useBranching(): UseBranching { + const [branches, setBranches] = useState([]) + const [activeBranchId, setActiveBranchId] = useState(null) + const [isLoading, setIsLoading] = useState(false) + + const loadBranches = useCallback(async (sessionId: string) => { + setIsLoading(true) + try { + const data = await branchesApi.getBranches(sessionId) + setBranches(data.branches) + setActiveBranchId(data.active_branch_id) + } catch { + toast.error('Failed to load branches') + } finally { + setIsLoading(false) + } + }, []) + + const createFork = useCallback(async (sessionId: string, data: ForkCreateRequest) => { + try { + const result = await branchesApi.createFork(sessionId, data) + await loadBranches(sessionId) + return result + } catch { + toast.error('Failed to create fork') + return null + } + }, [loadBranches]) + + const switchBranch = useCallback(async (sessionId: string, branchId: string) => { + try { + const result = await branchesApi.switchBranch(sessionId, branchId) + setActiveBranchId(result.active_branch_id) + // Update branches list + setBranches(prev => prev.map(b => + b.id === branchId ? result.branch : b + )) + return result + } catch { + toast.error('Failed to switch branch') + return null + } + }, []) + + const updateStatus = useCallback(async (sessionId: string, branchId: string, status: string, reason?: string) => { + try { + const updated = await branchesApi.updateBranchStatus(sessionId, branchId, { status, status_reason: reason }) + setBranches(prev => prev.map(b => b.id === branchId ? updated : b)) + } catch { + toast.error('Failed to update branch status') + } + }, []) + + const reviveBranch = useCallback(async (sessionId: string, branchId: string, evidenceFromId: string, description: string) => { + try { + await branchesApi.reviveBranch(sessionId, branchId, { + evidence_from_branch_id: evidenceFromId, + evidence_description: description, + }) + await loadBranches(sessionId) + toast.success('Branch revived') + } catch { + toast.error('Failed to revive branch') + } + }, [loadBranches]) + + const sendMessage = useCallback(async (sessionId: string, branchId: string, message: string) => { + try { + return await branchesApi.sendBranchMessage(sessionId, branchId, { message }) + } catch { + toast.error('Failed to send message') + return null + } + }, []) + + return { + branches, + activeBranchId, + isLoading, + loadBranches, + createFork, + switchBranch, + updateStatus, + reviveBranch, + sendMessage, + } +} +``` + +- [ ] **Step 2: Verify build** + +Run: `cd /home/coder/resolutionflow/frontend && npx tsc --noEmit 2>&1 | head -10` +Expected: No errors. + +- [ ] **Step 3: Commit** + +```bash +git add frontend/src/hooks/useBranching.ts +git commit -m "feat: add useBranching hook for branch state management" +``` + +### Task 21: Create BranchMap Sidebar Component + +**Files:** +- Create: `frontend/src/components/session/BranchMap.tsx` +- Create: `frontend/src/components/session/BranchNode.tsx` + +- [ ] **Step 1: Create BranchNode component** + +```tsx +// frontend/src/components/session/BranchNode.tsx +import { cn } from '@/lib/utils' +import { GitBranch, CheckCircle2, XCircle, CircleDot, RotateCcw, Circle } from 'lucide-react' +import type { BranchResponse } from '@/types/branching' + +const STATUS_CONFIG = { + active: { icon: CircleDot, color: 'text-accent', bg: 'bg-accent-dim', label: 'Active' }, + solved: { icon: CheckCircle2, color: 'text-green-400', bg: 'bg-green-400/10', label: 'Solved' }, + dead_end: { icon: XCircle, color: 'text-red-400', bg: 'bg-red-400/10', label: 'Dead End' }, + untried: { icon: Circle, color: 'text-muted', bg: 'bg-elevated', label: 'Untried' }, + revived: { icon: RotateCcw, color: 'text-yellow-400', bg: 'bg-yellow-400/10', label: 'Revived' }, +} as const + +interface BranchNodeProps { + branch: BranchResponse + isActive: boolean + depth: number + onClick: (branchId: string) => void +} + +export function BranchNode({ branch, isActive, depth, onClick }: BranchNodeProps) { + const config = STATUS_CONFIG[branch.status] || STATUS_CONFIG.untried + const Icon = config.icon + + return ( + + ) +} +``` + +- [ ] **Step 2: Create BranchMap component** + +```tsx +// frontend/src/components/session/BranchMap.tsx +import { useEffect } from 'react' +import { GitBranch } from 'lucide-react' +import type { BranchResponse } from '@/types/branching' +import { BranchNode } from './BranchNode' + +interface BranchMapProps { + branches: BranchResponse[] + activeBranchId: string | null + onSwitchBranch: (branchId: string) => void +} + +interface TreeNode { + branch: BranchResponse + children: TreeNode[] + depth: number +} + +function buildTree(branches: BranchResponse[]): TreeNode[] { + const byId = new Map(branches.map(b => [b.id, b])) + const childrenMap = new Map() + + for (const b of branches) { + const parentId = b.parent_branch_id + if (!childrenMap.has(parentId)) childrenMap.set(parentId, []) + childrenMap.get(parentId)!.push(b) + } + + function buildNode(branch: BranchResponse, depth: number): TreeNode { + const children = (childrenMap.get(branch.id) || []) + .sort((a, b) => a.branch_order - b.branch_order) + .map(child => buildNode(child, depth + 1)) + return { branch, children, depth } + } + + const roots = (childrenMap.get(null) || []) + .sort((a, b) => a.branch_order - b.branch_order) + + return roots.map(r => buildNode(r, 0)) +} + +function flattenTree(nodes: TreeNode[]): TreeNode[] { + const result: TreeNode[] = [] + for (const node of nodes) { + result.push(node) + result.push(...flattenTree(node.children)) + } + return result +} + +export function BranchMap({ branches, activeBranchId, onSwitchBranch }: BranchMapProps) { + if (branches.length === 0) return null + + const tree = buildTree(branches) + const flat = flattenTree(tree) + + return ( +
+
+ + + Branch Map + +
+ {flat.map(node => ( + + ))} +
+ ) +} +``` + +- [ ] **Step 3: Verify build** + +Run: `cd /home/coder/resolutionflow/frontend && npx tsc --noEmit 2>&1 | head -10` +Expected: No errors. + +- [ ] **Step 4: Commit** + +```bash +git add frontend/src/components/session/BranchMap.tsx frontend/src/components/session/BranchNode.tsx +git commit -m "feat: add BranchMap sidebar with BranchNode tree visualization" +``` + +### Task 22: Create ForkCard Component + +**Files:** +- Create: `frontend/src/components/session/ForkCard.tsx` + +- [ ] **Step 1: Implement ForkCard** + +```tsx +// frontend/src/components/session/ForkCard.tsx +import { GitFork } from 'lucide-react' +import { cn } from '@/lib/utils' + +interface ForkOption { + label: string + description: string + branch_id: string + status: string +} + +interface ForkCardProps { + forkReason: string + options: ForkOption[] + activeBranchId: string | null + onSelectBranch: (branchId: string) => void +} + +export function ForkCard({ forkReason, options, activeBranchId, onSelectBranch }: ForkCardProps) { + return ( +
+
+ + Fork Point +
+

{forkReason}

+
+ {options.map((opt) => ( + + ))} +
+
+ ) +} +``` + +- [ ] **Step 2: Verify build** + +Run: `cd /home/coder/resolutionflow/frontend && npx tsc --noEmit 2>&1 | head -5` +Expected: No errors. + +- [ ] **Step 3: Commit** + +```bash +git add frontend/src/components/session/ForkCard.tsx +git commit -m "feat: add ForkCard component for in-chat fork decision points" +``` + +### Task 23: Create HandoffModal Component + +**Files:** +- Create: `frontend/src/components/session/HandoffModal.tsx` + +- [ ] **Step 1: Implement HandoffModal** + +```tsx +// frontend/src/components/session/HandoffModal.tsx +import { useState } from 'react' +import { Pause, ArrowUpRight, X } from 'lucide-react' +import { cn } from '@/lib/utils' +import type { HandoffCreateRequest } from '@/types/branching' + +interface HandoffModalProps { + isOpen: boolean + onClose: () => void + onSubmit: (data: HandoffCreateRequest) => Promise +} + +export function HandoffModal({ isOpen, onClose, onSubmit }: HandoffModalProps) { + const [intent, setIntent] = useState<'park' | 'escalate'>('park') + const [notes, setNotes] = useState('') + const [priority, setPriority] = useState<'normal' | 'elevated'>('normal') + const [isSubmitting, setIsSubmitting] = useState(false) + + if (!isOpen) return null + + const handleSubmit = async () => { + setIsSubmitting(true) + try { + await onSubmit({ intent, engineer_notes: notes || undefined, priority }) + onClose() + } finally { + setIsSubmitting(false) + } + } + + return ( +
+
+ {/* Header */} +
+ Hand Off Session + +
+ + {/* Intent toggle */} +
+
+ + +
+ + {/* Notes */} +
+ +