From bbe590bfecd6f367b5c7b20aa2219f639e62f34b Mon Sep 17 00:00:00 2001 From: chihlasm Date: Thu, 19 Mar 2026 01:30:05 +0000 Subject: [PATCH] feat(ai-session): add Phase 2 PSA integration, escalation handoff, and session management MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2 of the FlowPilot-First Pivot connecting AI sessions to ConnectWise PSA: Slice 1 — PSA Ticket Intake: - FlowPilotEngine accepts psa_ticket intake with graceful CW API fallback - Ticket picker on intake screen (refactored TicketPickerModal for dual-mode) - Ticket context card in session sidebar Slice 2 — Auto Documentation Push: - PSA documentation service with resolution/escalation note formatting - Time entry creation via new ConnectWise provider method - Automatic retry scheduler (APScheduler, 5min interval, 3 retries) - PSA push status indicators in frontend with manual retry button - Member mapping warning when CW member not mapped Slice 3 — Session Pause/Resume & Escalation Handoff: - Pause/resume endpoints for same-engineer session bookmarking - Escalation flow: requesting_escalation status, self-escalation blocked - Enhanced escalation package with LLM-generated hypotheses/suggestions - Pickup endpoint with continue/fresh resume modes and briefing step - Escalation queue (sidebar nav + dedicated page) - SessionBriefing component with continue/fresh choice UI - EscalateModal with PSA-aware button text Slice 4 — Mid-Session Ticket Linking: - Link ticket retroactively with context injection into system prompt - Link Ticket button in session sidebar Slice 5 — FlowPilot PSA Settings: - Settings tab on IntegrationsPage with 7 configurable options - Stored as flowpilot_settings JSONB on PsaConnection Database: 2 migrations (flowpilot_settings, psa_post_log changes, status constraint) Co-Authored-By: Claude Opus 4.6 (1M context) --- ...378a61_phase2_psa_flowpilot_integration.py | 78 ++ ...489b72_add_requesting_escalation_status.py | 35 + backend/app/api/endpoints/ai_sessions.py | 232 ++++- backend/app/api/endpoints/integrations.py | 63 ++ backend/app/main.py | 10 + backend/app/models/ai_session.py | 2 +- backend/app/models/psa_connection.py | 8 +- backend/app/models/psa_post_log.py | 25 +- backend/app/schemas/ai_session.py | 21 + backend/app/services/flowpilot_engine.py | 425 ++++++++- backend/app/services/psa/base.py | 12 + .../app/services/psa/connectwise/provider.py | 32 + backend/app/services/psa/registry.py | 26 + backend/app/services/psa/types.py | 10 + .../app/services/psa_documentation_service.py | 402 ++++++++ backend/app/services/psa_retry_scheduler.py | 52 + ...2026-03-18-flowpilot-first-pivot-phase2.md | 885 ++++++++++++++++++ frontend/src/api/aiSessions.ts | 37 + frontend/src/api/integrations.ts | 6 +- .../components/flowpilot/EscalateModal.tsx | 82 ++ .../components/flowpilot/EscalationQueue.tsx | 140 +++ .../flowpilot/FlowPilotActionBar.tsx | 74 +- .../components/flowpilot/FlowPilotIntake.tsx | 211 ++++- .../components/flowpilot/FlowPilotSession.tsx | 105 ++- .../components/flowpilot/SessionBriefing.tsx | 173 ++++ .../components/flowpilot/SessionDocView.tsx | 100 +- .../flowpilot/SessionTicketCard.tsx | 106 +++ frontend/src/components/flowpilot/index.ts | 4 + frontend/src/components/layout/Sidebar.tsx | 4 +- .../components/session/TicketPickerModal.tsx | 26 +- frontend/src/hooks/useFlowPilotSession.ts | 51 +- frontend/src/pages/EscalationQueuePage.tsx | 20 + frontend/src/pages/FlowPilotSessionPage.tsx | 111 ++- .../src/pages/account/IntegrationsPage.tsx | 225 ++++- frontend/src/router.tsx | 2 + frontend/src/types/ai-session.ts | 14 + frontend/src/types/integrations.ts | 10 + 37 files changed, 3698 insertions(+), 121 deletions(-) create mode 100644 backend/alembic/versions/bb2101378a61_phase2_psa_flowpilot_integration.py create mode 100644 backend/alembic/versions/cc3201489b72_add_requesting_escalation_status.py create mode 100644 backend/app/services/psa_documentation_service.py create mode 100644 backend/app/services/psa_retry_scheduler.py create mode 100644 docs/plans/2026-03-18-flowpilot-first-pivot-phase2.md create mode 100644 frontend/src/components/flowpilot/EscalateModal.tsx create mode 100644 frontend/src/components/flowpilot/EscalationQueue.tsx create mode 100644 frontend/src/components/flowpilot/SessionBriefing.tsx create mode 100644 frontend/src/components/flowpilot/SessionTicketCard.tsx create mode 100644 frontend/src/pages/EscalationQueuePage.tsx diff --git a/backend/alembic/versions/bb2101378a61_phase2_psa_flowpilot_integration.py b/backend/alembic/versions/bb2101378a61_phase2_psa_flowpilot_integration.py new file mode 100644 index 00000000..5605e0af --- /dev/null +++ b/backend/alembic/versions/bb2101378a61_phase2_psa_flowpilot_integration.py @@ -0,0 +1,78 @@ +"""phase2 psa flowpilot integration + +Revision ID: bb2101378a61 +Revises: f1a2b3c4d5e6 +Create Date: 2026-03-18 23:05:01.099910 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = 'bb2101378a61' +down_revision: Union[str, None] = 'f1a2b3c4d5e6' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # 1. Add flowpilot_settings JSONB to psa_connections + op.add_column('psa_connections', sa.Column( + 'flowpilot_settings', + postgresql.JSONB(astext_type=sa.Text()), + nullable=True, + server_default='{}', + comment='FlowPilot-specific settings: auto_push, time_rounding, note_visibility, etc.', + )) + + # 2. Add ai_session_id FK to psa_post_log + op.add_column('psa_post_log', sa.Column( + 'ai_session_id', + sa.Uuid(), + nullable=True, + comment='FK to AI sessions (Phase 2). Original session_id FK remains for legacy sessions.', + )) + op.create_index( + op.f('ix_psa_post_log_ai_session_id'), + 'psa_post_log', + ['ai_session_id'], + ) + op.create_foreign_key( + 'fk_psa_post_log_ai_session_id', + 'psa_post_log', + 'ai_sessions', + ['ai_session_id'], + ['id'], + ondelete='CASCADE', + ) + + # 3. Make original session_id nullable (was NOT NULL — legacy sessions only) + op.alter_column('psa_post_log', 'session_id', nullable=True) + + # 4. Add retry_count and next_retry_at for automatic retries + op.add_column('psa_post_log', sa.Column( + 'retry_count', + sa.Integer(), + nullable=False, + server_default='0', + comment='Number of retry attempts for failed PSA pushes', + )) + op.add_column('psa_post_log', sa.Column( + 'next_retry_at', + sa.DateTime(timezone=True), + nullable=True, + comment='When to attempt the next retry', + )) + + +def downgrade() -> None: + op.drop_column('psa_post_log', 'next_retry_at') + op.drop_column('psa_post_log', 'retry_count') + op.alter_column('psa_post_log', 'session_id', nullable=False) + op.drop_constraint('fk_psa_post_log_ai_session_id', 'psa_post_log', type_='foreignkey') + op.drop_index(op.f('ix_psa_post_log_ai_session_id'), table_name='psa_post_log') + op.drop_column('psa_post_log', 'ai_session_id') + op.drop_column('psa_connections', 'flowpilot_settings') diff --git a/backend/alembic/versions/cc3201489b72_add_requesting_escalation_status.py b/backend/alembic/versions/cc3201489b72_add_requesting_escalation_status.py new file mode 100644 index 00000000..8c521394 --- /dev/null +++ b/backend/alembic/versions/cc3201489b72_add_requesting_escalation_status.py @@ -0,0 +1,35 @@ +"""add requesting_escalation status + +Revision ID: cc3201489b72 +Revises: bb2101378a61 +Create Date: 2026-03-18 23:30:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = 'cc3201489b72' +down_revision: Union[str, None] = 'bb2101378a61' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Drop old status constraint and recreate with new values + op.drop_constraint('ck_ai_sessions_status', 'ai_sessions', type_='check') + op.create_check_constraint( + 'ck_ai_sessions_status', + 'ai_sessions', + "status IN ('active', 'paused', 'resolved', 'escalated', 'requesting_escalation', 'abandoned')", + ) + + +def downgrade() -> None: + op.drop_constraint('ck_ai_sessions_status', 'ai_sessions', type_='check') + op.create_check_constraint( + 'ck_ai_sessions_status', + 'ai_sessions', + "status IN ('active', 'paused', 'resolved', 'escalated', 'abandoned')", + ) diff --git a/backend/app/api/endpoints/ai_sessions.py b/backend/app/api/endpoints/ai_sessions.py index 7e45bc4d..54bd548b 100644 --- a/backend/app/api/endpoints/ai_sessions.py +++ b/backend/app/api/endpoints/ai_sessions.py @@ -35,12 +35,15 @@ from app.schemas.ai_session import ( SessionCloseResponse, SessionDocumentation, RateSessionRequest, + PickupSessionRequest, + LinkTicketRequest, AISessionSummary, AISessionDetail, AISessionStepResponse, StepOptionSchema, ) from app.services import flowpilot_engine +from app.services.psa_documentation_service import retry_failed_push logger = logging.getLogger(__name__) @@ -272,6 +275,184 @@ async def escalate_session( return result +# ── Pause ── + +@router.post("/{session_id}/pause", status_code=204) +@limiter.limit("15/minute") +async def pause_session( + request: Request, + session_id: UUID, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], + _: None = Depends(require_engineer_or_admin), +): + """Pause an active FlowPilot session for later resume.""" + try: + await flowpilot_engine.pause_session( + session_id=session_id, + user_id=current_user.id, + db=db, + ) + except ValueError as e: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + except PermissionError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) + + await db.commit() + + +# ── Resume ── + +@router.post("/{session_id}/resume", status_code=204) +@limiter.limit("15/minute") +async def resume_session( + request: Request, + session_id: UUID, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], + _: None = Depends(require_engineer_or_admin), +): + """Resume a paused FlowPilot session.""" + try: + await flowpilot_engine.resume_session( + session_id=session_id, + user_id=current_user.id, + db=db, + ) + except ValueError as e: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + except PermissionError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) + + await db.commit() + + +# ── Escalation Queue ── + +@router.get("/escalation-queue", response_model=list[AISessionSummary]) +@limiter.limit("30/minute") +async def get_escalation_queue( + request: Request, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], + _: None = Depends(require_engineer_or_admin), +): + """List sessions requesting escalation for the current user's team.""" + if not current_user.team_id: + return [] + + result = await db.execute( + select(AISession) + .where( + AISession.team_id == current_user.team_id, + AISession.status == "requesting_escalation", + AISession.user_id != current_user.id, # Don't show own escalated sessions + ) + .order_by(AISession.created_at.desc()) + ) + sessions = result.scalars().all() + return [AISessionSummary.model_validate(s) for s in sessions] + + +# ── Pickup Escalated Session ── + +@router.post("/{session_id}/pickup", response_model=StepResponseResponse) +@limiter.limit("5/minute") +async def pickup_session( + request: Request, + session_id: UUID, + data: PickupSessionRequest, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], + _: None = Depends(require_engineer_or_admin), +): + """Pick up an escalated session as a new engineer.""" + _require_ai_enabled() + await _check_quota(current_user, db) + + try: + result = await flowpilot_engine.pickup_session( + session_id=session_id, + resume_mode=data.resume_mode, + additional_context=data.additional_context, + user_id=current_user.id, + team_id=current_user.team_id, + db=db, + ) + except ValueError as e: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + except PermissionError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) + except Exception as e: + logger.exception("FlowPilot pickup failed: %s", e) + await _record_usage( + current_user, db, + generation_type="flowpilot_pickup", + input_tokens=0, output_tokens=0, + succeeded=False, + session_id=session_id, + error_code=type(e).__name__, + ) + await db.commit() + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"AI provider error ({type(e).__name__}). Please try again.", + ) + + await _record_usage( + current_user, db, + generation_type="flowpilot_pickup", + input_tokens=0, output_tokens=0, + succeeded=True, + session_id=session_id, + ) + await db.commit() + + return result + + +# ── Link Ticket ── + +@router.post("/{session_id}/link-ticket", response_model=AISessionDetail) +@limiter.limit("10/minute") +async def link_ticket_to_session( + request: Request, + session_id: UUID, + data: LinkTicketRequest, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], + _: None = Depends(require_engineer_or_admin), +): + """Link a PSA ticket to an in-progress session retroactively.""" + try: + await flowpilot_engine.link_ticket( + session_id=session_id, + psa_ticket_id=data.psa_ticket_id, + psa_connection_id=data.psa_connection_id, + user_id=current_user.id, + db=db, + ) + except ValueError as e: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + except PermissionError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) + + await db.commit() + + # Return updated session detail + result = await db.execute( + select(AISession) + .options(selectinload(AISession.steps)) + .where(AISession.id == session_id) + ) + session = result.scalar_one_or_none() + if not session: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") + + detail = AISessionDetail.model_validate(session) + return detail + + # ── List sessions ── @router.get("", response_model=list[AISessionSummary]) @@ -323,8 +504,10 @@ async def get_session( if not session: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") - # Allow access if user is owner or escalation target - if session.user_id != current_user.id and session.escalated_to_id != current_user.id: + # Allow access if user is owner, escalation target, or picked-up handler + pkg = session.escalation_package or {} + is_handler = pkg.get("picked_up_by") == str(current_user.id) + if session.user_id != current_user.id and session.escalated_to_id != current_user.id and not is_handler: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized") # Build step responses @@ -409,3 +592,48 @@ async def rate_session( raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) await db.commit() + + +# ── Retry PSA Push ── + +@router.post("/{session_id}/retry-psa-push") +@limiter.limit("5/minute") +async def retry_psa_push_endpoint( + request: Request, + session_id: UUID, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], + _: None = Depends(require_engineer_or_admin), +): + """Manually retry a failed PSA documentation push.""" + from app.models.psa_post_log import PsaPostLog + + # Find the latest failed push log for this session + result = await db.execute( + select(PsaPostLog) + .where( + PsaPostLog.ai_session_id == session_id, + PsaPostLog.status.in_(["failed", "pending_retry"]), + ) + .order_by(PsaPostLog.posted_at.desc()) + .limit(1) + ) + log_entry = result.scalar_one_or_none() + + if not log_entry: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="No failed PSA push found for this session", + ) + + # Reset to pending_retry and attempt immediately + log_entry.status = "pending_retry" + log_entry.retry_count = max(0, log_entry.retry_count - 1) # Give one more attempt + + success = await retry_failed_push(log_entry, db) + await db.commit() + + return { + "psa_push_status": "sent" if success else log_entry.status, + "psa_push_error": log_entry.error_message if not success else None, + } diff --git a/backend/app/api/endpoints/integrations.py b/backend/app/api/endpoints/integrations.py index ba78b93c..602de191 100644 --- a/backend/app/api/endpoints/integrations.py +++ b/backend/app/api/endpoints/integrations.py @@ -279,6 +279,69 @@ async def test_connection( return result +# ── FlowPilot PSA Settings ────────────────────────────────────── + + +@router.get("/connections/{connection_id}/flowpilot-settings") +async def get_flowpilot_settings( + connection_id: UUID, + current_user: Annotated[User, Depends(require_account_owner)], + db: Annotated[AsyncSession, Depends(get_db)], +): + """Get FlowPilot-specific settings for a PSA connection.""" + conn = await _get_connection_or_404(connection_id, current_user, db) + # Return settings with defaults filled in + defaults = { + "auto_push": True, + "auto_time_entry": True, + "time_rounding": "15min", + "note_visibility": "internal", + "include_diagnostic_steps": True, + "prompt_status_on_resolution": False, + "prompt_status_on_escalation": False, + } + settings_data = {**defaults, **(conn.flowpilot_settings or {})} + return settings_data + + +@router.put("/connections/{connection_id}/flowpilot-settings") +async def update_flowpilot_settings( + connection_id: UUID, + data: dict, + current_user: Annotated[User, Depends(require_account_owner)], + db: Annotated[AsyncSession, Depends(get_db)], +): + """Update FlowPilot-specific settings for a PSA connection.""" + conn = await _get_connection_or_404(connection_id, current_user, db) + + # Validate allowed keys + allowed_keys = { + "auto_push", "auto_time_entry", "time_rounding", + "note_visibility", "include_diagnostic_steps", + "prompt_status_on_resolution", "prompt_status_on_escalation", + } + filtered = {k: v for k, v in data.items() if k in allowed_keys} + + # Merge with existing + current = conn.flowpilot_settings or {} + current.update(filtered) + conn.flowpilot_settings = current + + await db.commit() + await db.refresh(conn) + + defaults = { + "auto_push": True, + "auto_time_entry": True, + "time_rounding": "15min", + "note_visibility": "internal", + "include_diagnostic_steps": True, + "prompt_status_on_resolution": False, + "prompt_status_on_escalation": False, + } + return {**defaults, **(conn.flowpilot_settings or {})} + + # ── ticket / status / company endpoints ────────────────────────── diff --git a/backend/app/main.py b/backend/app/main.py index b5f310f3..61b5c842 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -180,6 +180,16 @@ async def lifespan(app: FastAPI): replace_existing=True, ) + # PSA push retry (every 5 minutes) + from app.services.psa_retry_scheduler import process_pending_retries + scheduler.add_job( + process_pending_retries, + trigger="interval", + minutes=5, + id="psa_push_retry", + replace_existing=True, + ) + # Auto-seed trees in background on PR environments seed_task = None if settings.SEED_ON_DEPLOY: diff --git a/backend/app/models/ai_session.py b/backend/app/models/ai_session.py index f5898da7..d1c15457 100644 --- a/backend/app/models/ai_session.py +++ b/backend/app/models/ai_session.py @@ -35,7 +35,7 @@ class AISession(Base): name="ck_ai_sessions_intake_type", ), CheckConstraint( - "status IN ('active', 'paused', 'resolved', 'escalated', 'abandoned')", + "status IN ('active', 'paused', 'resolved', 'escalated', 'requesting_escalation', 'abandoned')", name="ck_ai_sessions_status", ), CheckConstraint( diff --git a/backend/app/models/psa_connection.py b/backend/app/models/psa_connection.py index 8cdd609e..c56b26df 100644 --- a/backend/app/models/psa_connection.py +++ b/backend/app/models/psa_connection.py @@ -1,11 +1,11 @@ """PSA connection model — one per account.""" import uuid from datetime import datetime, timezone -from typing import Optional +from typing import Optional, Any from sqlalchemy import String, DateTime, Boolean, Text, ForeignKey from sqlalchemy.orm import Mapped, mapped_column, relationship -from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.dialects.postgresql import UUID, JSONB from app.core.database import Base @@ -43,6 +43,10 @@ class PsaConnection(Base): default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc), ) + flowpilot_settings: Mapped[Optional[dict[str, Any]]] = mapped_column( + JSONB, nullable=True, server_default="{}", + comment="FlowPilot-specific settings: auto_push, time_rounding, note_visibility, etc.", + ) # Relationships account = relationship("Account", back_populates="psa_connection") diff --git a/backend/app/models/psa_post_log.py b/backend/app/models/psa_post_log.py index 54372fe0..14697507 100644 --- a/backend/app/models/psa_post_log.py +++ b/backend/app/models/psa_post_log.py @@ -3,7 +3,7 @@ import uuid from datetime import datetime, timezone from typing import Optional -from sqlalchemy import String, DateTime, Text, ForeignKey +from sqlalchemy import String, DateTime, Text, Integer, ForeignKey from sqlalchemy.orm import Mapped, mapped_column, relationship from sqlalchemy.dialects.postgresql import UUID @@ -16,10 +16,18 @@ class PsaPostLog(Base): id: Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 ) - session_id: Mapped[uuid.UUID] = mapped_column( + # Legacy sessions FK (nullable for AI sessions) + session_id: Mapped[Optional[uuid.UUID]] = mapped_column( UUID(as_uuid=True), ForeignKey("sessions.id", ondelete="CASCADE"), - nullable=False, + nullable=True, + index=True, + ) + # AI sessions FK (Phase 2) + ai_session_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUID(as_uuid=True), + ForeignKey("ai_sessions.id", ondelete="CASCADE"), + nullable=True, index=True, ) psa_connection_id: Mapped[Optional[uuid.UUID]] = mapped_column( @@ -35,8 +43,16 @@ class PsaPostLog(Base): ) status: Mapped[str] = mapped_column( String(20), nullable=False - ) # 'success' or 'failed' + ) # 'success', 'failed', 'pending_retry' error_message: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + retry_count: Mapped[int] = mapped_column( + Integer, nullable=False, default=0, + comment="Number of retry attempts for failed PSA pushes", + ) + next_retry_at: Mapped[Optional[datetime]] = mapped_column( + DateTime(timezone=True), nullable=True, + comment="When to attempt the next retry", + ) status_changed_from: Mapped[Optional[str]] = mapped_column( String(100), nullable=True ) @@ -54,5 +70,6 @@ class PsaPostLog(Base): # Relationships session = relationship("Session", foreign_keys=[session_id]) + ai_session = relationship("AISession", foreign_keys=[ai_session_id]) psa_connection = relationship("PsaConnection", foreign_keys=[psa_connection_id]) user = relationship("User", foreign_keys=[posted_by]) diff --git a/backend/app/schemas/ai_session.py b/backend/app/schemas/ai_session.py index 5cbba3e0..4eb78f53 100644 --- a/backend/app/schemas/ai_session.py +++ b/backend/app/schemas/ai_session.py @@ -42,6 +42,7 @@ class AISessionCreateResponse(BaseModel): matched_flow_name: str | None = None match_score: float | None = None first_step: AISessionStepResponse + psa_context_status: str | None = None # loaded | unavailable | None (no PSA) # ── Step interaction ── @@ -131,6 +132,9 @@ class SessionCloseResponse(BaseModel): session_id: UUID status: str documentation: SessionDocumentation + psa_push_status: str = "no_psa" # sent | pending_retry | no_psa | failed + psa_push_error: str | None = None + member_mapping_warning: str | None = None class RateSessionRequest(BaseModel): @@ -139,6 +143,18 @@ class RateSessionRequest(BaseModel): feedback: str | None = None +class PickupSessionRequest(BaseModel): + """Pick up an escalated session as a new engineer.""" + resume_mode: str = Field("continue", pattern="^(continue|fresh)$") + additional_context: str | None = None + + +class LinkTicketRequest(BaseModel): + """Link a PSA ticket to an in-progress session.""" + psa_ticket_id: str + psa_connection_id: UUID + + # ── List / Detail ── class AISessionSummary(BaseModel): @@ -151,6 +167,8 @@ class AISessionSummary(BaseModel): confidence_tier: str step_count: int session_rating: int | None = None + psa_ticket_id: str | None = None + escalation_reason: str | None = None created_at: datetime resolved_at: datetime | None = None @@ -166,6 +184,9 @@ class AISessionDetail(AISessionSummary): resolution_action: str | None = None escalation_reason: str | None = None session_feedback: str | None = None + psa_ticket_id: str | None = None + psa_connection_id: UUID | None = None + ticket_data: dict[str, Any] | None = None steps: list[AISessionStepResponse] = [] model_config = {"from_attributes": True} diff --git a/backend/app/services/flowpilot_engine.py b/backend/app/services/flowpilot_engine.py index 375a615f..f3f12980 100644 --- a/backend/app/services/flowpilot_engine.py +++ b/backend/app/services/flowpilot_engine.py @@ -170,8 +170,32 @@ async def start_session( ) -> AISessionCreateResponse: """Start a new FlowPilot session: classify intake, match flows, get first step.""" + # 0. Process PSA ticket intake if applicable + ticket_context_block = None + ticket_data = None + psa_context_status = None + + if request.intake_type == "psa_ticket" and request.psa_connection_id and request.psa_ticket_id: + ticket_context_block, ticket_data, psa_context_status = await _process_ticket_intake( + psa_connection_id=request.psa_connection_id, + psa_ticket_id=request.psa_ticket_id, + db=db, + ) + # Enrich intake content with ticket context for classification + if ticket_data: + enriched_content = dict(request.intake_content) + enriched_content["ticket_data"] = { + "summary": ticket_data.get("ticket", {}).get("summary", ""), + "company": ticket_data.get("company", {}).get("name", ""), + "priority": ticket_data.get("ticket", {}).get("priority", ""), + } + request = request.model_copy(update={"intake_content": enriched_content}) + # 1. Classify intake via fast LLM call intake_text = _extract_intake_text(request.intake_content) + # Include ticket context in classification text if available + if ticket_context_block: + intake_text = f"{ticket_context_block}\n\n{intake_text}" classification = await _classify_intake(intake_text) # 2. Try to match existing flows @@ -199,9 +223,14 @@ async def start_session( f"Use it as a guide but adapt to the specific situation." ) + # Include ticket context in system prompt if available + ticket_prompt_section = "" + if ticket_context_block: + ticket_prompt_section = f"\n## PSA TICKET CONTEXT\n{ticket_context_block}\n" + system_prompt = FLOWPILOT_SYSTEM_PROMPT.format( structured_output_schema=STRUCTURED_OUTPUT_SCHEMA, - team_context="", # Phase 2: team-specific context + team_context=ticket_prompt_section, matched_flow_context=matched_flow_context, ) @@ -261,6 +290,7 @@ async def start_session( match_score=match_score, psa_ticket_id=request.psa_ticket_id, psa_connection_id=request.psa_connection_id, + ticket_data=ticket_data, total_input_tokens=input_tokens, total_output_tokens=output_tokens, step_count=1, @@ -294,6 +324,7 @@ async def start_session( matched_flow_name=matched_flow_name, match_score=match_score, first_step=_build_step_response(step, session), + psa_context_status=psa_context_status, ) @@ -419,10 +450,14 @@ async def resolve_session( await db.flush() + # Push documentation to PSA if ticket is linked + psa_result = await _push_to_psa(session, user_id, db) + return SessionCloseResponse( session_id=session.id, status=session.status, documentation=documentation, + **psa_result, ) @@ -432,31 +467,276 @@ async def escalate_session( user_id: UUID, db: AsyncSession, ) -> SessionCloseResponse: - """Escalate a session to another engineer.""" + """Escalate a session — sets status to requesting_escalation for pickup.""" session = await _load_session(session_id, user_id, db) if session.status not in ("active", "paused"): raise ValueError(f"Cannot escalate session in status: {session.status}") - session.status = "escalated" - session.resolved_at = datetime.now(timezone.utc) + # Block self-escalation + if request.escalated_to_id and request.escalated_to_id == user_id: + raise ValueError("Cannot escalate a session to yourself. Use pause instead.") + + session.status = "requesting_escalation" + # Don't set resolved_at — session isn't done yet session.escalation_reason = request.escalation_reason session.escalated_to_id = request.escalated_to_id - # Build escalation package - session.escalation_package = _build_escalation_package(session) + # Build enhanced escalation package + session.escalation_package = await _build_escalation_package_enhanced(session, user_id) documentation = _generate_documentation(session) await db.flush() + # Push documentation to PSA if ticket is linked + psa_result = await _push_to_psa(session, user_id, db) + return SessionCloseResponse( session_id=session.id, status=session.status, documentation=documentation, + **psa_result, ) +async def pickup_session( + session_id: UUID, + resume_mode: str, + additional_context: Optional[str], + user_id: UUID, + team_id: Optional[UUID], + db: AsyncSession, +) -> StepResponseResponse: + """Pick up an escalated session as a new engineer. + + Generates a briefing step summarizing prior work, then either continues + the conversation or starts fresh with the new engineer's context. + """ + session = await _load_session( + session_id, user_id, db, + allow_team_access=True, team_id=team_id, + ) + + if session.status != "requesting_escalation": + raise ValueError(f"Session is {session.status}, not requesting_escalation") + + # Can't pick up your own session + if session.user_id == user_id: + raise ValueError("Cannot pick up your own escalated session") + + # Record the pickup in the escalation package + pkg = session.escalation_package or {} + pkg["picked_up_by"] = str(user_id) + pkg["picked_up_at"] = datetime.now(timezone.utc).isoformat() + session.escalation_package = pkg + + # Reactivate the session + session.status = "active" + + # Build a briefing message for the new engineer + original_user_name = "the previous engineer" + if session.user and hasattr(session.user, 'display_name') and session.user.display_name: + original_user_name = session.user.display_name + + briefing_parts = [ + f"## Escalation Briefing", + f"**Escalated by:** {original_user_name}", + f"**Reason:** {session.escalation_reason or 'Not specified'}", + "", + f"**Problem:** {session.problem_summary or 'Unknown'}", + ] + + steps_tried = pkg.get("steps_tried", []) + if steps_tried: + briefing_parts.append("") + briefing_parts.append("**Steps already taken:**") + for i, step in enumerate(steps_tried, 1): + desc = step.get("description", "") + resp = step.get("response", "") + briefing_parts.append(f"{i}. {desc}") + if resp: + briefing_parts.append(f" → {resp}") + + if hypotheses := pkg.get("remaining_hypotheses"): + briefing_parts.append("") + briefing_parts.append("**Remaining hypotheses:**") + if isinstance(hypotheses, list): + for h in hypotheses: + briefing_parts.append(f"- {h}") + else: + briefing_parts.append(str(hypotheses)) + + if suggestions := pkg.get("suggested_next_steps"): + briefing_parts.append("") + briefing_parts.append("**Suggested next steps:**") + if isinstance(suggestions, list): + for s in suggestions: + briefing_parts.append(f"- {s}") + else: + briefing_parts.append(str(suggestions)) + + briefing_text = "\n".join(briefing_parts) + + # Create a briefing step (special intake_analysis type) + briefing_step = AISessionStep( + id=uuid.uuid4(), + session_id=session.id, + step_order=session.step_count, + step_type="action", + content={ + "text": briefing_text, + "type": "briefing", + "allow_free_text": False, + "allow_skip": False, + }, + context_message="Escalation briefing — here's what was tried before you.", + confidence_at_step=session.confidence_score, + ai_reasoning="Escalation handoff briefing for receiving engineer", + input_tokens=0, + output_tokens=0, + ) + db.add(briefing_step) + session.step_count += 1 + + # Now generate the next step based on resume_mode + if resume_mode == "fresh" and additional_context: + # Engineer B provides their own input + user_message = f"[Picking up escalated session] {additional_context}" + else: + # Continue where A left off + user_message = ( + "[Picking up escalated session] I've reviewed the briefing above. " + "Please continue the diagnosis based on everything tried so far." + ) + + # Append to conversation + session.conversation_messages = session.conversation_messages + [ + {"role": "user", "content": user_message} + ] + + # Call LLM for next step + provider = get_ai_provider(settings.get_model_for_action("open_chat")) + raw_response, input_tokens, output_tokens = await provider.generate_json( + system_prompt=session.system_prompt_snapshot or "", + messages=session.conversation_messages, + max_tokens=2048, + ) + + try: + parsed = _parse_structured_output(raw_response) + except ValueError: + retry_messages = session.conversation_messages + [ + {"role": "assistant", "content": raw_response}, + {"role": "user", "content": "Please respond with ONLY valid JSON matching the required schema."}, + ] + raw_response, retry_in, retry_out = await provider.generate_json( + system_prompt=session.system_prompt_snapshot or "", + messages=retry_messages, + max_tokens=2048, + ) + input_tokens += retry_in + output_tokens += retry_out + parsed = _parse_structured_output(raw_response) + + session.conversation_messages = session.conversation_messages + [ + {"role": "assistant", "content": raw_response} + ] + + confidence = parsed.get("confidence", session.confidence_score) + session.confidence_score = confidence + session.confidence_tier = _confidence_to_tier(confidence) + session.total_input_tokens += input_tokens + session.total_output_tokens += output_tokens + session.step_count += 1 + + next_step = _create_step_from_parsed( + session_id=session.id, + step_order=session.step_count - 1, + parsed=parsed, + input_tokens=input_tokens, + output_tokens=output_tokens, + ) + db.add(next_step) + + await db.flush() + + return StepResponseResponse( + session_id=session.id, + status=session.status, + confidence_tier=session.confidence_tier, + confidence_score=session.confidence_score, + next_step=_build_step_response(next_step, session), + resolution_suggested=parsed["type"] == "resolution_suggestion", + resolution_summary=parsed.get("resolution_summary") if parsed["type"] == "resolution_suggestion" else None, + ) + + +async def link_ticket( + session_id: UUID, + psa_ticket_id: str, + psa_connection_id: UUID, + user_id: UUID, + db: AsyncSession, +) -> None: + """Link a PSA ticket to an in-progress session and inject context.""" + session = await _load_session(session_id, user_id, db) + + if session.status not in ("active", "paused"): + raise ValueError(f"Cannot link ticket to session in status: {session.status}") + + # Store the ticket link + session.psa_ticket_id = psa_ticket_id + session.psa_connection_id = psa_connection_id + + # Try to fetch ticket context + ticket_context_block, ticket_data, _ = await _process_ticket_intake( + psa_connection_id=psa_connection_id, + psa_ticket_id=psa_ticket_id, + db=db, + ) + + if ticket_data: + session.ticket_data = ticket_data + + # Inject ticket context into the system prompt for subsequent steps + if ticket_context_block and session.system_prompt_snapshot: + ticket_section = f"\n\n## PSA TICKET CONTEXT (linked mid-session)\n{ticket_context_block}\n" + session.system_prompt_snapshot = session.system_prompt_snapshot + ticket_section + + await db.flush() + + +async def pause_session( + session_id: UUID, + user_id: UUID, + db: AsyncSession, +) -> None: + """Pause an active session for the same engineer to resume later.""" + session = await _load_session(session_id, user_id, db) + + if session.status != "active": + raise ValueError(f"Cannot pause session in status: {session.status}") + + session.status = "paused" + await db.flush() + + +async def resume_session( + session_id: UUID, + user_id: UUID, + db: AsyncSession, +) -> None: + """Resume a paused session for the same engineer.""" + session = await _load_session(session_id, user_id, db) + + if session.status != "paused": + raise ValueError(f"Cannot resume session in status: {session.status}") + + session.status = "active" + await db.flush() + + async def rate_session( session_id: UUID, rating: int, @@ -487,11 +767,23 @@ async def _load_session( session_id: UUID, user_id: UUID, db: AsyncSession, + allow_team_access: bool = False, + team_id: Optional[UUID] = None, ) -> AISession: - """Load session with steps, verifying ownership.""" + """Load session with steps and user relationships, verifying ownership. + + Args: + allow_team_access: If True, same-team users can access sessions in + 'requesting_escalation' status (for escalation pickup). + team_id: Required when allow_team_access is True. + """ result = await db.execute( select(AISession) - .options(selectinload(AISession.steps)) + .options( + selectinload(AISession.steps), + selectinload(AISession.user), + selectinload(AISession.escalated_to), + ) .where(AISession.id == session_id) ) session = result.scalar_one_or_none() @@ -499,11 +791,21 @@ async def _load_session( if not session: raise ValueError("Session not found") - # Allow access if user is the session owner or the escalation target - if session.user_id != user_id and session.escalated_to_id != user_id: - raise PermissionError("Not authorized to access this session") + # Owner or escalation target always has access + if session.user_id == user_id or session.escalated_to_id == user_id: + return session - return session + # Engineer who picked up an escalated session has access + pkg = session.escalation_package or {} + if pkg.get("picked_up_by") == str(user_id): + return session + + # Team-based access for escalation pickup + if allow_team_access and team_id and session.team_id == team_id: + if session.status == "requesting_escalation": + return session + + raise PermissionError("Not authorized to access this session") async def _classify_intake(intake_text: str) -> dict[str, Any]: @@ -708,8 +1010,67 @@ def _generate_documentation(session: AISession) -> SessionDocumentation: ) -def _build_escalation_package(session: AISession) -> dict[str, Any]: - """Build context package for the receiving engineer.""" +async def _push_to_psa( + session: AISession, + user_id: UUID, + db: AsyncSession, +) -> dict[str, Any]: + """Push documentation to PSA if session has a linked ticket. + + Returns dict with psa_push_status, psa_push_error, member_mapping_warning. + """ + if not session.psa_ticket_id or not session.psa_connection_id: + return {"psa_push_status": "no_psa", "psa_push_error": None, "member_mapping_warning": None} + + try: + from app.services.psa_documentation_service import push_documentation + return await push_documentation(session, user_id, db) + except Exception as e: + logger.warning("PSA documentation push failed for session %s: %s", session.id, e) + return { + "psa_push_status": "failed", + "psa_push_error": str(e)[:200], + "member_mapping_warning": None, + } + + +async def _process_ticket_intake( + psa_connection_id: UUID, + psa_ticket_id: str, + db: AsyncSession, +) -> tuple[Optional[str], Optional[dict[str, Any]], str]: + """Fetch ticket context from PSA and format for AI prompt. + + Returns: + (ticket_context_block, ticket_data_dict, psa_context_status) + - ticket_context_block: formatted text for system prompt, or None on failure + - ticket_data_dict: serialized TicketContext for storage, or None on failure + - psa_context_status: "loaded" or "unavailable" + """ + try: + from app.services.psa.registry import get_provider_for_connection + from app.services.psa.ticket_context import format_ticket_context_for_prompt + + provider = await get_provider_for_connection(psa_connection_id, db) + ticket_context = await provider.get_ticket_context( + int(psa_ticket_id), str(psa_connection_id) + ) + ticket_prompt_block = format_ticket_context_for_prompt(ticket_context) + ticket_data = ticket_context.model_dump(mode="json") + return ticket_prompt_block, ticket_data, "loaded" + except Exception as e: + logger.warning( + "Failed to fetch ticket context for ticket %s (connection %s): %s", + psa_ticket_id, psa_connection_id, e, + ) + return None, None, "unavailable" + + +async def _build_escalation_package_enhanced( + session: AISession, + user_id: UUID, +) -> dict[str, Any]: + """Build enhanced context package with LLM-generated hypotheses.""" steps_tried = [] for step in session.steps: content = step.content or {} @@ -727,7 +1088,8 @@ def _build_escalation_package(session: AISession) -> dict[str, Any]: entry["action_result"] = step.action_result steps_tried.append(entry) - return { + package = { + "original_user_id": str(user_id), "problem_summary": session.problem_summary, "problem_domain": session.problem_domain, "intake_content": session.intake_content, @@ -735,3 +1097,36 @@ def _build_escalation_package(session: AISession) -> dict[str, Any]: "steps_tried": steps_tried, "escalation_reason": session.escalation_reason, } + + # LLM call for remaining hypotheses and suggested next steps (fast model) + try: + conversation_summary = "\n".join( + f"- {s.get('description', '')} → {s.get('response', 'no response')}" + for s in steps_tried + ) + prompt = ( + "Based on this diagnostic conversation for an IT troubleshooting session:\n\n" + f"Problem: {session.problem_summary}\n" + f"Domain: {session.problem_domain}\n\n" + f"Steps taken:\n{conversation_summary}\n\n" + f"Escalation reason: {session.escalation_reason}\n\n" + "Respond with ONLY a JSON object:\n" + '{"remaining_hypotheses": ["hypothesis1", "hypothesis2"], ' + '"suggested_next_steps": ["step1", "step2"], ' + '"steps_ruled_out": ["ruled_out1"]}' + ) + provider = get_ai_provider(settings.get_model_for_action("quick_action")) + raw, _, _ = await provider.generate_json( + system_prompt="You are an expert IT diagnostic assistant. Analyze the escalation context and provide concise insights.", + messages=[{"role": "user", "content": prompt}], + max_tokens=1024, + ) + insights = json.loads(raw.strip().strip("`").lstrip("json\n")) + package["remaining_hypotheses"] = insights.get("remaining_hypotheses", []) + package["suggested_next_steps"] = insights.get("suggested_next_steps", []) + package["steps_ruled_out"] = insights.get("steps_ruled_out", []) + except Exception as e: + logger.warning("Failed to generate escalation insights: %s", e) + # Fall back gracefully — don't block the escalation + + return package diff --git a/backend/app/services/psa/base.py b/backend/app/services/psa/base.py index e2230aa0..f2522e43 100644 --- a/backend/app/services/psa/base.py +++ b/backend/app/services/psa/base.py @@ -11,6 +11,7 @@ from .types import ( PSACompany, PSAMember, PSAConfiguration, + PSATimeEntry, ) @@ -66,3 +67,14 @@ class PSAProvider(ABC): @abstractmethod async def get_ticket_configurations(self, ticket_id: str) -> list[PSAConfiguration]: ... + + @abstractmethod + async def create_time_entry( + self, + ticket_id: str, + member_id: str, + hours: float, + notes: str | None = None, + work_type: str | None = None, + ) -> PSATimeEntry: + ... diff --git a/backend/app/services/psa/connectwise/provider.py b/backend/app/services/psa/connectwise/provider.py index a4aca59b..34b2ecd3 100644 --- a/backend/app/services/psa/connectwise/provider.py +++ b/backend/app/services/psa/connectwise/provider.py @@ -15,6 +15,7 @@ from app.services.psa.types import ( PSACompany, PSAMember, PSAConfiguration, + PSATimeEntry, ) from .client import ConnectWiseClient @@ -514,6 +515,37 @@ class ConnectWiseProvider(PSAProvider): psa_cache.set(cache_key, ctx, ttl_seconds=300) return ctx + async def create_time_entry( + self, + ticket_id: str, + member_id: str, + hours: float, + notes: str | None = None, + work_type: str | None = None, + ) -> PSATimeEntry: + """Create a time entry on a CW ticket via POST /time/entries.""" + payload: dict = { + "chargeToId": int(ticket_id), + "chargeToType": "ServiceTicket", + "member": {"id": int(member_id)}, + "timeStart": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + "actualHours": hours, + } + if notes: + payload["notes"] = notes[:2000] # CW limit + if work_type: + payload["workType"] = {"name": work_type} + + data = await self._client.post("/time/entries", payload) + return PSATimeEntry( + id=str(data["id"]), + ticket_id=ticket_id, + member_id=member_id, + hours=data.get("actualHours", hours), + notes=data.get("notes"), + created_at=data.get("timeStart"), + ) + # ── Private helpers ─────────────────────────────────────────────── @staticmethod diff --git a/backend/app/services/psa/registry.py b/backend/app/services/psa/registry.py index ff84c3cc..ab09222b 100644 --- a/backend/app/services/psa/registry.py +++ b/backend/app/services/psa/registry.py @@ -31,6 +31,32 @@ async def get_provider_for_account( provider="unknown", ) + return _instantiate_provider(connection) + + +async def get_provider_for_connection( + connection_id: UUID, db: AsyncSession +) -> PSAProvider: + """Look up a specific PSA connection by ID, decrypt credentials, instantiate provider.""" + result = await db.execute( + select(PsaConnection).where( + PsaConnection.id == connection_id, + PsaConnection.is_active.is_(True), + ) + ) + connection = result.scalar_one_or_none() + + if not connection: + raise PSAConnectionError( + "PSA connection not found or inactive.", + provider="unknown", + ) + + return _instantiate_provider(connection) + + +def _instantiate_provider(connection: PsaConnection) -> PSAProvider: + """Create the appropriate provider instance from a connection record.""" if connection.provider == "connectwise": from app.services.psa.connectwise.client import ConnectWiseClient from app.services.psa.connectwise.provider import ConnectWiseProvider diff --git a/backend/app/services/psa/types.py b/backend/app/services/psa/types.py index 9515ab6c..49338079 100644 --- a/backend/app/services/psa/types.py +++ b/backend/app/services/psa/types.py @@ -57,6 +57,16 @@ class PSAConfiguration(BaseModel): company_name: str | None = None +class PSATimeEntry(BaseModel): + id: str + ticket_id: str + member_id: str | None = None + hours: float + notes: str | None = None + work_type: str | None = None + created_at: str | None = None + + class NoteType: INTERNAL_ANALYSIS = "internal_analysis" RESOLUTION = "resolution" diff --git a/backend/app/services/psa_documentation_service.py b/backend/app/services/psa_documentation_service.py new file mode 100644 index 00000000..f8cf4a01 --- /dev/null +++ b/backend/app/services/psa_documentation_service.py @@ -0,0 +1,402 @@ +"""PSA Documentation Push Service. + +Generates structured documentation from FlowPilot AI sessions and pushes +it back to ConnectWise as internal notes + optional time entries. +""" +import logging +import math +import uuid +from datetime import datetime, timezone, timedelta +from typing import Optional, Any +from uuid import UUID + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.ai_session import AISession +from app.models.psa_connection import PsaConnection +from app.models.psa_member_mapping import PsaMemberMapping +from app.models.psa_post_log import PsaPostLog +from app.services.psa.registry import get_provider_for_connection +from app.services.psa.types import NoteType +from app.services.redaction_service import apply_redaction_to_text + +logger = logging.getLogger(__name__) + +# Default flowpilot_settings values +DEFAULT_SETTINGS = { + "auto_push": True, + "auto_time_entry": True, + "time_rounding": "15min", # "15min", "30min", "exact", "none" + "note_visibility": "internal", # "internal", "both" + "include_diagnostic_steps": True, +} + + +def _get_setting(connection: PsaConnection, key: str) -> Any: + """Get a flowpilot setting with default fallback.""" + settings = connection.flowpilot_settings or {} + return settings.get(key, DEFAULT_SETTINGS.get(key)) + + +def _round_hours(hours: float, rounding: str) -> float: + """Round hours according to the rounding setting.""" + if rounding == "exact": + return round(hours, 2) + elif rounding == "30min": + return math.ceil(hours * 2) / 2 + else: # default 15min + return math.ceil(hours * 4) / 4 + + +def _format_datetime(dt: datetime | None) -> str: + """Format a datetime for display in notes.""" + if not dt: + return "N/A" + return dt.strftime("%Y-%m-%d %I:%M %p UTC") + + +def format_resolution_note(session: AISession, include_steps: bool = True) -> str: + """Format a resolved session as a plain-text note for CW.""" + lines = [ + "═══ FlowPilot Session Documentation ═══", + f"Session: {session.id}", + ] + + # Engineer name from relationship if loaded, otherwise user_id + engineer_name = getattr(session, 'user', None) + if engineer_name and hasattr(engineer_name, 'display_name'): + lines.append(f"Engineer: {engineer_name.display_name}") + + lines.extend([ + f"Date: {_format_datetime(session.resolved_at)}", + f"Started: {_format_datetime(session.created_at)}", + f"Ended: {_format_datetime(session.resolved_at)}", + ]) + + # Duration + if session.resolved_at and session.created_at: + delta = session.resolved_at - session.created_at + minutes = int(delta.total_seconds() / 60) + if minutes < 60: + lines.append(f"Duration: {minutes}m") + else: + lines.append(f"Duration: {minutes // 60}h {minutes % 60}m") + + lines.append("") + lines.append("── Problem ──") + lines.append(session.problem_summary or "No summary available") + if session.problem_domain: + lines.append(f"Domain: {session.problem_domain}") + + # Diagnostic steps + if include_steps and session.steps: + lines.append("") + lines.append("── Diagnosis Path ──") + for step in session.steps: + content = step.content or {} + step_type = content.get("type", step.step_type).capitalize() + description = content.get("text", "") + + response_text = "" + if step.was_skipped: + response_text = "Skipped" + elif step.selected_option: + # Try to find the label + if step.options_presented: + for opt in step.options_presented: + if opt.get("value") == step.selected_option: + response_text = opt.get("label", step.selected_option) + break + else: + response_text = step.selected_option + else: + response_text = step.selected_option + elif step.free_text_input: + response_text = step.free_text_input + + lines.append(f"{step.step_order + 1}. [{step_type}] {description}") + if response_text: + lines.append(f" → Response: {response_text}") + if step.action_result: + result = step.action_result + outcome = "Succeeded" if result.get("success") else "Did not resolve" + if details := result.get("details"): + outcome += f" — {details}" + lines.append(f" → Result: {outcome}") + + # Resolution + lines.append("") + lines.append("── Resolution ──") + lines.append(session.resolution_summary or "No resolution summary") + if session.resolution_action: + lines.append(session.resolution_action) + + # Confidence + lines.append("") + lines.append("── AI Confidence ──") + lines.append(f"Final confidence: {session.confidence_tier} ({session.confidence_score:.0%})") + + # Timing section (always present) + lines.append("") + lines.append("── Session Timing ──") + lines.append(f"Start: {_format_datetime(session.created_at)}") + lines.append(f"End: {_format_datetime(session.resolved_at)}") + if session.resolved_at and session.created_at: + delta = session.resolved_at - session.created_at + minutes = int(delta.total_seconds() / 60) + lines.append(f"Total: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Total: {minutes}m") + + lines.append("") + lines.append("Generated by ResolutionFlow FlowPilot") + + return "\n".join(lines) + + +def format_escalation_note(session: AISession, include_steps: bool = True) -> str: + """Format an escalated session as a plain-text note for CW.""" + lines = [ + "═══ FlowPilot Escalation Documentation ═══", + f"Session: {session.id}", + ] + + engineer_name = getattr(session, 'user', None) + if engineer_name and hasattr(engineer_name, 'display_name'): + lines.append(f"Escalated by: {engineer_name.display_name}") + + escalated_to = getattr(session, 'escalated_to', None) + if escalated_to and hasattr(escalated_to, 'display_name'): + lines.append(f"Escalated to: {escalated_to.display_name}") + else: + lines.append("Escalated to: Unassigned") + + lines.extend([ + f"Date: {_format_datetime(session.resolved_at or datetime.now(timezone.utc))}", + f"Started: {_format_datetime(session.created_at)}", + ]) + + if session.resolved_at and session.created_at: + delta = session.resolved_at - session.created_at + minutes = int(delta.total_seconds() / 60) + lines.append(f"Duration: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Duration: {minutes}m") + + lines.append("") + lines.append("── Problem ──") + lines.append(session.problem_summary or "No summary available") + + # Work completed + if include_steps and session.steps: + lines.append("") + lines.append("── Work Completed ──") + for step in session.steps: + content = step.content or {} + description = content.get("text", "") + lines.append(f"{step.step_order + 1}. {description}") + + # Escalation reason + lines.append("") + lines.append("── Escalation Reason ──") + lines.append(session.escalation_reason or "No reason provided") + + # Escalation package details + pkg = session.escalation_package or {} + if hypotheses := pkg.get("remaining_hypotheses"): + lines.append("") + lines.append("── Remaining Hypotheses ──") + if isinstance(hypotheses, list): + for h in hypotheses: + lines.append(f"- {h}") + else: + lines.append(str(hypotheses)) + + if suggestions := pkg.get("suggested_next_steps"): + lines.append("") + lines.append("── Suggested Next Steps ──") + if isinstance(suggestions, list): + for s in suggestions: + lines.append(f"- {s}") + else: + lines.append(str(suggestions)) + + # Timing + lines.append("") + lines.append("── Session Timing ──") + lines.append(f"Start: {_format_datetime(session.created_at)}") + escalated_at = session.resolved_at or datetime.now(timezone.utc) + lines.append(f"Escalated: {_format_datetime(escalated_at)}") + if session.created_at: + delta = escalated_at - session.created_at + minutes = int(delta.total_seconds() / 60) + lines.append(f"Total: {minutes // 60}h {minutes % 60}m" if minutes >= 60 else f"Total: {minutes}m") + + lines.append("") + lines.append("Generated by ResolutionFlow FlowPilot") + + return "\n".join(lines) + + +async def push_documentation( + session: AISession, + user_id: UUID, + db: AsyncSession, +) -> dict[str, Any]: + """Push session documentation to PSA ticket. + + Returns: + { + "psa_push_status": "sent" | "pending_retry" | "failed" | "no_psa", + "psa_push_error": str | None, + "member_mapping_warning": str | None, + } + """ + if not session.psa_ticket_id or not session.psa_connection_id: + return {"psa_push_status": "no_psa", "psa_push_error": None, "member_mapping_warning": None} + + # Load connection and check settings + result = await db.execute( + select(PsaConnection).where(PsaConnection.id == session.psa_connection_id) + ) + connection = result.scalar_one_or_none() + if not connection: + return {"psa_push_status": "failed", "psa_push_error": "PSA connection not found", "member_mapping_warning": None} + + if not _get_setting(connection, "auto_push"): + return {"psa_push_status": "no_psa", "psa_push_error": None, "member_mapping_warning": None} + + # Format the note + include_steps = _get_setting(connection, "include_diagnostic_steps") + if session.status == "resolved": + note_text = format_resolution_note(session, include_steps=include_steps) + else: + note_text = format_escalation_note(session, include_steps=include_steps) + + # Redact sensitive data + note_text, _ = apply_redaction_to_text(note_text) + + # Determine note type + visibility = _get_setting(connection, "note_visibility") + note_type = NoteType.INTERNAL_ANALYSIS if visibility == "internal" else NoteType.DESCRIPTION + + # Check member mapping for time entry + member_mapping_warning = None + member_mapping = None + if _get_setting(connection, "auto_time_entry") and _get_setting(connection, "time_rounding") != "none": + mapping_result = await db.execute( + select(PsaMemberMapping).where( + PsaMemberMapping.psa_connection_id == session.psa_connection_id, + PsaMemberMapping.user_id == user_id, + ) + ) + member_mapping = mapping_result.scalar_one_or_none() + if not member_mapping: + member_mapping_warning = "Map your CW account in Settings → Integrations to enable auto-logged time entries." + + # Push to PSA + try: + provider = await get_provider_for_connection(session.psa_connection_id, db) + + # Post the note + posted_note = await provider.post_note( + ticket_id=session.psa_ticket_id, + text=note_text, + note_type=note_type, + ) + + # Create time entry if member mapping exists + if member_mapping and session.resolved_at and session.created_at: + try: + delta = session.resolved_at - session.created_at + hours = delta.total_seconds() / 3600 + rounding = _get_setting(connection, "time_rounding") + rounded_hours = _round_hours(hours, rounding) + if rounded_hours > 0: + await provider.create_time_entry( + ticket_id=session.psa_ticket_id, + member_id=member_mapping.external_member_id, + hours=rounded_hours, + notes=f"FlowPilot session: {session.problem_summary or 'Troubleshooting'}", + ) + except Exception as e: + logger.warning("Failed to create time entry for session %s: %s", session.id, e) + # Don't fail the note push just because time entry failed + + # Log success + log_entry = PsaPostLog( + id=uuid.uuid4(), + ai_session_id=session.id, + psa_connection_id=session.psa_connection_id, + ticket_id=session.psa_ticket_id, + note_type=note_type, + content_posted=note_text[:10000], # Truncate for storage + external_note_id=posted_note.id, + status="success", + posted_by=user_id, + ) + db.add(log_entry) + + return { + "psa_push_status": "sent", + "psa_push_error": None, + "member_mapping_warning": member_mapping_warning, + } + + except Exception as e: + logger.warning("PSA push failed for session %s: %s", session.id, e) + + # Log failure with retry scheduling + log_entry = PsaPostLog( + id=uuid.uuid4(), + ai_session_id=session.id, + psa_connection_id=session.psa_connection_id, + ticket_id=session.psa_ticket_id, + note_type=note_type, + content_posted=note_text[:10000], + status="pending_retry", + error_message=str(e)[:500], + retry_count=0, + next_retry_at=datetime.now(timezone.utc) + timedelta(minutes=5), + posted_by=user_id, + ) + db.add(log_entry) + + return { + "psa_push_status": "pending_retry", + "psa_push_error": str(e)[:200], + "member_mapping_warning": member_mapping_warning, + } + + +async def retry_failed_push( + log_entry: PsaPostLog, + db: AsyncSession, +) -> bool: + """Retry a failed PSA push. Returns True on success.""" + try: + provider = await get_provider_for_connection(log_entry.psa_connection_id, db) + posted_note = await provider.post_note( + ticket_id=log_entry.ticket_id, + text=log_entry.content_posted, + note_type=log_entry.note_type, + ) + log_entry.status = "success" + log_entry.external_note_id = posted_note.id + log_entry.error_message = None + log_entry.next_retry_at = None + return True + except Exception as e: + log_entry.retry_count += 1 + log_entry.error_message = str(e)[:500] + + if log_entry.retry_count >= 3: + log_entry.status = "failed" + log_entry.next_retry_at = None + else: + # Exponential backoff: 5min, 15min, 45min + backoff_minutes = 5 * (3 ** log_entry.retry_count) + log_entry.next_retry_at = datetime.now(timezone.utc) + timedelta(minutes=backoff_minutes) + + logger.warning( + "PSA retry %d failed for log %s: %s", + log_entry.retry_count, log_entry.id, e, + ) + return False diff --git a/backend/app/services/psa_retry_scheduler.py b/backend/app/services/psa_retry_scheduler.py new file mode 100644 index 00000000..f3403eb2 --- /dev/null +++ b/backend/app/services/psa_retry_scheduler.py @@ -0,0 +1,52 @@ +"""Background scheduler for retrying failed PSA documentation pushes. + +Runs every 5 minutes via APScheduler, picks up PsaPostLog entries +with status='pending_retry' and next_retry_at <= now. +""" +import logging +from datetime import datetime, timezone + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.database import async_session_maker +from app.models.psa_post_log import PsaPostLog +from app.services.psa_documentation_service import retry_failed_push + +logger = logging.getLogger(__name__) + + +async def process_pending_retries() -> None: + """Process all pending PSA push retries that are due.""" + async with async_session_maker() as db: + try: + result = await db.execute( + select(PsaPostLog) + .where( + PsaPostLog.status == "pending_retry", + PsaPostLog.next_retry_at <= datetime.now(timezone.utc), + PsaPostLog.retry_count < 3, + ) + .limit(20) # Process in batches + ) + entries = result.scalars().all() + + if not entries: + return + + logger.info("Processing %d pending PSA push retries", len(entries)) + + for entry in entries: + success = await retry_failed_push(entry, db) + if success: + logger.info("PSA retry succeeded for log %s", entry.id) + else: + logger.warning( + "PSA retry %d/%d failed for log %s", + entry.retry_count, 3, entry.id, + ) + + await db.commit() + except Exception as e: + logger.error("PSA retry scheduler error: %s", e) + await db.rollback() diff --git a/docs/plans/2026-03-18-flowpilot-first-pivot-phase2.md b/docs/plans/2026-03-18-flowpilot-first-pivot-phase2.md new file mode 100644 index 00000000..200e82cf --- /dev/null +++ b/docs/plans/2026-03-18-flowpilot-first-pivot-phase2.md @@ -0,0 +1,885 @@ +# FlowPilot-First Pivot — Phase 2: PSA Integration & Escalation Handoff + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Connect FlowPilot to ConnectWise PSA so engineers can start sessions from tickets, and documentation flows back automatically on resolution or escalation. Also implement escalation handoffs with full context briefing, session pause/resume for individual engineers, and in-app escalation notifications. + +**Architecture:** Builds on existing PSA infrastructure (`services/psa/`, `PsaConnection` model, ConnectWise client) and Phase 1 AI session models (`AISession`, `AISessionStep`, `FlowPilotEngine`). Adds PSA ticket intake to sessions, auto-documentation push on close, session pause/resume, and escalation handoff mechanics. + +**Tech Stack:** FastAPI, SQLAlchemy 2.0 (async), httpx (ConnectWise API), React, TypeScript, Tailwind CSS v4, shadcn/ui + +**Prerequisites:** +- Phase 1 complete (AI session core — models, engine, API, frontend) +- Existing PSA integration (`docs/plans/2026-03-14-connectwise-psa-integration-plan.md`) +- Existing models: `PsaConnection`, `PsaMemberMapping`, `PsaPostLog` +- Existing services: `services/psa/base.py`, `services/psa/connectwise/client.py`, `services/psa/connectwise/provider.py` +- Existing service: `services/psa/ticket_context.py` — has `format_ticket_context_for_prompt()` already +- Existing schemas: `schemas/psa_context.py` — has `TicketContext`, `TicketDetails`, `CompanyInfo`, `ConfigItem`, `TicketNote`, etc. +- Existing frontend: `TicketPickerModal.tsx`, `TicketContextPanel.tsx`, `IntegrationsPage.tsx` +- Existing service: `services/redaction_service.py` — has `apply_redaction_to_text()` for password redaction + +**Existing patterns to follow:** +- PSA: `app/services/psa/` — abstract `PSAProvider` interface + ConnectWise implementation +- PSA context: `app/services/psa/connectwise/provider.py` — `get_ticket_context()` already fetches ticket + company + contact + configs + notes + related tickets in parallel with caching +- PSA prompt formatting: `app/services/psa/ticket_context.py` — `format_ticket_context_for_prompt()` already formats `TicketContext` into structured text for AI prompts +- Sessions: `app/api/endpoints/sessions.py` — existing ticket linking patterns +- Phase 1: `app/services/flowpilot_engine.py`, `app/api/endpoints/ai_sessions.py` +- Frontend API pattern: `src/api/aiSessions.ts` uses `aiSessionsApi` object pattern (not standalone exports) +- Frontend ticket UI: `src/components/session/TicketPickerModal.tsx` (note: currently takes `sessionId` prop for old sessions — needs adapter) + +--- + +## Key Design Decisions (from product review) + +These decisions were confirmed during product review before implementation: + +1. **PSA connection scope:** Per-account (one CW connection per MSP). Individual engineers mapped to CW members via `PsaMemberMapping`. +2. **CW API failure at intake:** Graceful degradation — engineer can manually type ticket number and paste ticket notes. Session starts without rich context. Ticket can be linked later to pull in contact/company details. +3. **Missing CW member mapping for time entries:** Show warning "Map your CW account in Settings to enable auto-logged time entries." Always include start time, end time, and total duration in the note text regardless. +4. **PSA push failure retry:** Automatic background retries via APScheduler (up to 3 attempts, exponential backoff). Plus a manual "Retry" button that only appears when auto-retries are exhausted or push is in failed state. +5. **Session ownership on escalation:** Engineer A **keeps ownership** (`session.user_id` unchanged). Session goes to `requesting_escalation` status. Engineer B works within the same session but A remains the originator. Both see it in their history. +6. **Escalation vs Pause:** Two separate features: + - **Pause/Resume** — same engineer, bookmark for later or recover from browser crash. Status: `paused`. + - **Escalation** — handoff to another engineer with context briefing. Status: `requesting_escalation` → `escalated` (when picked up). Self-escalation blocked. +7. **Escalation queue location:** Both — sidebar nav item "Escalations" with badge count AND a tab in session history page. +8. **Escalation pickup UX:** Engineer B sees a briefing card summarizing A's work, then chooses: (a) "Continue where they left off" (picks up same conversation), or (b) "Start fresh with context" (types their own input, but FlowPilot knows everything A tried so it won't repeat steps). +9. **Mid-session ticket linking:** Inject ticket context into system prompt immediately. FlowPilot naturally acknowledges the new context in its next response ("Thanks for linking that ticket. I can see this is for [client]..."). +10. **Ticket status on resolve:** Contextual dropdown pulled dynamically from the linked ticket's board statuses (not a global setting). Admin setting just controls whether engineers are prompted to pick a status. +11. **Tests:** Mocked CW responses based on OpenAPI spec. No sandbox available yet. + +--- + +## Context: What Phase 2 Adds + +Phase 1 delivered FlowPilot with free-text intake only. Phase 2 makes it a ticket workflow tool: + +**PSA Ticket Intake:** Engineer selects a ConnectWise ticket → FlowPilot pulls ticket data (summary, client, priority, history, configuration items) and uses it as rich context for diagnosis. If CW is unavailable, engineer can manually enter ticket number and paste notes — graceful degradation, not a hard block. + +**Auto Documentation Push:** On resolution or escalation, FlowPilot auto-generates documentation and pushes it back to the ConnectWise ticket as internal notes + time entry. Automatic background retries on failure, with manual retry fallback. Notes always include session timing (start, end, duration) even if time entry creation fails due to missing member mapping. + +**Session Pause/Resume:** Engineer can pause a session and come back later, or recover seamlessly from browser crashes and page reloads. Same engineer, same session. + +**Escalation Handoff:** Engineer A hits a wall → clicks Escalate → FlowPilot packages everything tried so far → session goes to "requesting escalation" status → Engineer B sees it in the escalation queue (sidebar + session history tab) → picks it up with a briefing card → chooses to continue where A left off or start fresh with full context. Engineer A retains session ownership throughout. Self-escalation is blocked. + +--- + +## Slice 1: PSA Ticket Intake for AI Sessions + +### Task 1: Extend FlowPilotEngine to accept PSA ticket intake + +**Files:** +- Edit: `backend/app/services/flowpilot_engine.py` + +**What to add:** + +Add a new method `_process_ticket_intake()` that: + +1. Receives `psa_connection_id` and `psa_ticket_id` from the intake request +2. Loads the `PsaConnection` from the database +3. **Attempts** to use `ConnectWiseProvider.get_ticket_context()` — if this fails (API down, bad credentials), catch the error and fall back gracefully (session starts with just the ticket ID stored, no rich context) +4. On success: stores the full ticket context (serialized `TicketContext`) in `session.ticket_data` JSONB +5. Uses the existing `format_ticket_context_for_prompt()` from `services/psa/ticket_context.py` to build the system prompt context block — do NOT rewrite this formatting, it already handles all fields correctly +6. Builds enriched intake content that includes both the formatted ticket context and any additional free-text the engineer provided +7. Passes the enriched context to `_classify_intake()` and the system prompt + +**Graceful degradation on CW failure:** + +If `get_ticket_context()` fails, the session still starts: +- `session.psa_ticket_id` is set (so we know which ticket to push docs to later) +- `session.psa_connection_id` is set +- `session.ticket_data` is null or minimal (just the ticket ID) +- The engineer's free-text intake (which may include pasted ticket notes) is used as the sole context +- A warning is returned in the response: `"psa_context_status": "unavailable"` so the frontend can show "Couldn't pull ticket details — ConnectWise may be unavailable" + +**IMPORTANT — Reuse existing infrastructure:** + +The ConnectWise provider at `services/psa/connectwise/provider.py` already has `get_ticket_context()` which returns a `TicketContext` schema (defined in `schemas/psa_context.py`). And `services/psa/ticket_context.py` already has `format_ticket_context_for_prompt()` that converts a `TicketContext` into a structured text block for AI prompts. Both of these are battle-tested from the existing session/copilot system. Phase 2 should call them directly: + +```python +from app.services.psa.ticket_context import format_ticket_context_for_prompt +from app.services.psa.registry import get_provider_for_connection + +# In _process_ticket_intake(): +try: + provider = await get_provider_for_connection(psa_connection_id, db) + ticket_context = await provider.get_ticket_context(int(psa_ticket_id), str(psa_connection_id)) + ticket_prompt_block = format_ticket_context_for_prompt(ticket_context) + session.ticket_data = ticket_context.model_dump(mode="json") + psa_context_status = "loaded" +except Exception as e: + logger.warning(f"Failed to fetch ticket context: {e}") + ticket_prompt_block = None + psa_context_status = "unavailable" +``` + +**Modify `start_session()`** to detect `intake_type == 'psa_ticket'` and call `_process_ticket_intake()` before the normal flow. The ticket context gets injected into the system prompt alongside any matched flow context. + +**Key detail:** The engineer may also type additional context alongside the ticket pull (e.g., "Ticket #12345 — user called back and said it's also affecting their second monitor"). The intake content should merge both sources. + +**Verification:** Start a session with `intake_type: "psa_ticket"` and a valid ticket ID. Verify FlowPilot's first question references the ticket content. Check `session.ticket_data` is populated. Also test with a bad connection — verify session still starts with a warning. + +``` +git commit -m "feat(ai-session): add PSA ticket intake to FlowPilot Engine" +``` + +### Task 2: Add ticket picker to FlowPilot intake screen + +**Files:** +- Edit: `frontend/src/components/flowpilot/FlowPilotIntake.tsx` + +**What to add:** + +The "Pull from Ticket" button (currently disabled from Phase 1) becomes active when the user's account has a PSA connection configured. + +**IMPORTANT — TicketPickerModal adaptation:** The existing `TicketPickerModal` at `src/components/session/TicketPickerModal.tsx` was built for legacy sessions — it requires a `sessionId` prop and calls `sessionPsaApi.linkTicket()` internally. For the FlowPilot intake screen, you need to either: +- (a) Create a new `FlowPilotTicketPicker` component that reuses the search/display logic but returns the selected ticket data to the parent instead of calling the link API, or +- (b) Refactor `TicketPickerModal` to accept an `onSelect` callback prop as an alternative to `sessionId`, making it usable in both contexts + +Option (b) is preferred since it avoids code duplication. Add an `onSelect?: (ticketId: string, ticket: PSATicketInfo) => void` prop. When provided, the modal calls `onSelect` instead of the internal link API. The existing legacy usage passes `sessionId` + `onLinked` as before (no breaking change). + +On click, open the adapted `TicketPickerModal`. When a ticket is selected: + +1. The ticket summary populates the intake area as a styled ticket card (showing ticket #, summary, client name, priority badge) +2. An additional textarea appears below for "Add context" — optional free text the engineer can add +3. The intake type switches to `psa_ticket` (or `combined` if they also add text) +4. On submit, `createAISession()` is called with `intake_type: "psa_ticket"`, `psa_ticket_id`, `psa_connection_id`, and `intake_content` containing both the ticket reference and any additional text + +**Manual ticket entry fallback:** If the ticket picker fails to connect to CW, or the engineer prefers, they can also manually type a ticket number and paste relevant notes into the free-text area. This still sets `intake_type: "psa_ticket"` with the ticket number, but `psa_connection_id` triggers a context fetch attempt on the backend (which may gracefully fail per Task 1). + +**UX details:** +- Check for active PSA connections via existing `useTicketContext` hook or the integrations API +- If no PSA connection exists, the "Pull from Ticket" button shows a tooltip: "Connect your PSA in Settings → Integrations" +- The ticket card should match the existing `TicketContextPanel` styling — dark glass card with cyan accent border, ticket number prominent +- After ticket selection, "Start Session" button text changes to "Start Session with Ticket #12345" +- If CW fetch fails, show toast: "Couldn't reach ConnectWise — you can still type the ticket details manually" + +**Verification:** Open the FlowPilot intake. Click "Pull from Ticket". Search for a ticket in ConnectWise. Select it. See the ticket card appear. Add optional context. Submit. Verify the session starts with ticket data. + +``` +git commit -m "feat(ai-session): add PSA ticket picker to FlowPilot intake" +``` + +### Task 3: Display ticket context in active session sidebar + +**Files:** +- Edit: `frontend/src/components/flowpilot/FlowPilotSession.tsx` +- Create: `frontend/src/components/flowpilot/SessionTicketCard.tsx` + +**What to add:** + +When the active session has `psa_ticket_id` set, show a `SessionTicketCard` in the right sidebar above the confidence indicator. This card shows: + +- Ticket # (clickable — opens ConnectWise ticket in new tab if URL is available) +- Ticket summary +- Client name +- Priority badge (color-coded) +- Status badge +- Config items list (if any) + +If `ticket_data` is minimal (CW was unavailable at intake), show a simplified card with just the ticket number and a "Refresh from CW" button that attempts to pull context again. + +Reuse styling patterns from the existing `TicketContextPanel` and `TicketLinkIndicator` components. + +**Verification:** Start a ticket-based session. See the ticket card in the sidebar with all relevant info. + +``` +git commit -m "feat(ai-session): display ticket context in FlowPilot session sidebar" +``` + +--- + +## Slice 2: Auto Documentation Push to PSA + +### Task 4: Build PSA documentation push service + +**Files:** +- Create: `backend/app/services/psa_documentation_service.py` + +**Architecture:** + +This service takes a completed `AISession` and pushes structured documentation back to ConnectWise. It handles three operations: + +1. **Internal Note:** Full diagnostic trail posted as an internal note on the ticket +2. **Time Entry:** Auto-create a time entry with the session duration (if CW member mapping exists) +3. **Status Update:** Optionally update ticket status based on contextual selection at resolution time + +**Internal note format:** + +``` +═══ FlowPilot Session Documentation ═══ +Session: {session_id} +Engineer: {user.display_name} +Date: {resolved_at} +Started: {created_at} +Ended: {resolved_at} +Duration: {duration_display} + +── Problem ── +{problem_summary} +Domain: {problem_domain} + +── Diagnosis Path ── +1. [Question] {context_message} + → Response: {selected_option or free_text_input} +2. [Action] {content description} + → Result: {action_result summary} +3. [Question] {context_message} + → Response: {selected_option} +... (all steps) + +── Resolution ── +{resolution_summary} +{resolution_action} + +── AI Confidence ── +Final confidence: {confidence_tier} ({confidence_score}) +Matched flow: {matched_flow_name or "None - new discovery"} + +── Session Timing ── +Start: {created_at formatted} +End: {resolved_at formatted} +Total: {duration_display} + +Generated by ResolutionFlow FlowPilot +``` + +**IMPORTANT — Always include timing:** The "Session Timing" section is always present in the note, even when a time entry can't be created (missing member mapping). This ensures the time data is always on the ticket for manual entry. + +**For escalations, the format changes:** + +``` +═══ FlowPilot Escalation Documentation ═══ +Session: {session_id} +Escalated by: {user.display_name} +Escalated to: {escalated_to.display_name or "Unassigned"} +Date: {resolved_at} +Started: {created_at} +Duration: {duration_display} + +── Problem ── +{problem_summary} + +── Work Completed ── +{numbered list of all steps taken} + +── Escalation Reason ── +{escalation_reason} + +── Remaining Hypotheses ── +{from escalation_package.hypotheses} + +── Suggested Next Steps ── +{from escalation_package.suggestions} + +── Session Timing ── +Start: {created_at formatted} +Escalated: {escalated_at formatted} +Total: {duration_display} + +Generated by ResolutionFlow FlowPilot +``` + +**Key implementation details:** + +- Use the existing PSA provider abstraction (`services/psa/base.py` → `post_note()`) +- **PsaPostLog FK issue:** The existing `PsaPostLog` model has `ForeignKey("sessions.id")` pointing to old sessions, NOT `ai_sessions`. You must add an `ai_session_id` nullable UUID FK column to `PsaPostLog` (via migration) so it can reference AI sessions. Keep the original `session_id` column for backward compatibility — make it nullable if it isn't already. +- **Time entry method missing:** The PSA base class and ConnectWise provider do NOT currently have a `create_time_entry()` method. You must add: (1) `async def create_time_entry(ticket_id, member_id, hours, notes, work_type)` to `services/psa/base.py` as an abstract method, (2) implement it in `services/psa/connectwise/provider.py` using the CW `POST /time/entries` endpoint, (3) add a `PSATimeEntry` type to `services/psa/types.py` +- **Missing member mapping handling:** Before creating a time entry, look up the engineer's CW member ID via `PsaMemberMapping`. If no mapping exists: skip the time entry, include a `member_mapping_warning` in the response ("Map your CW account in Settings → Integrations to enable auto-logged time entries"). The note text always includes timing regardless. +- Use `apply_redaction_to_text()` from `services/redaction_service.py` to scrub passwords and sensitive data before pushing to ConnectWise +- Time entry calculation: `session.resolved_at - session.created_at`, rounded to nearest 15 minutes (configurable via `flowpilot_settings`) +- **Automatic retry on failure:** If the PSA push fails, create a `PsaPostLog` entry with `status='pending_retry'`. APScheduler job runs every 5 minutes, retries failed pushes up to 3 times with exponential backoff (5min, 15min, 45min). After 3 failures, status becomes `failed` and the frontend shows a manual "Retry" button. +- The documentation text should be plain text (ConnectWise notes don't support markdown well) + +**Verification:** Resolve an AI session that has a linked ticket. Check ConnectWise — verify the internal note appeared on the ticket with the full diagnostic trail including timing. Verify a time entry was created (if member mapped). Check `psa_post_logs` table for the audit record. + +``` +git commit -m "feat(ai-session): add PSA documentation push service" +``` + +### Task 5: Wire documentation push into session resolution/escalation + background retry + +**Files:** +- Edit: `backend/app/services/flowpilot_engine.py` +- Edit: `backend/app/api/endpoints/ai_sessions.py` +- Create: `backend/app/services/psa_retry_scheduler.py` + +**What to add:** + +In the `resolve_session()` and `escalate_session()` methods, after generating documentation, check if the session has a `psa_ticket_id` and `psa_connection_id`. If so, call `psa_documentation_service.push_documentation()`. + +**Flow:** +1. Engineer clicks Resolve → `POST /ai-sessions/{id}/resolve` +2. `flowpilot_engine.resolve_session()` generates documentation (existing) +3. **New:** If session has PSA link, call `psa_documentation_service.push_documentation(session, documentation)` +4. Push runs async — don't block the response +5. Return `SessionCloseResponse` with new fields: `psa_push_status`, `member_mapping_warning` + +Same flow for escalation. + +**Background retry scheduler (`psa_retry_scheduler.py`):** + +APScheduler job that runs every 5 minutes: +1. Query `PsaPostLog` for entries with `status='pending_retry'` and `retry_count < 3` +2. For each, attempt the push again via `psa_documentation_service` +3. On success: update `status='sent'` +4. On failure: increment `retry_count`, set next retry with exponential backoff +5. After 3 failures: set `status='failed'` + +Register the scheduler in the FastAPI lifespan (follows existing APScheduler pattern for maintenance flows). + +**Add to response schemas:** + +Edit `backend/app/schemas/ai_session.py` — add PSA fields to `SessionCloseResponse`: + +```python +class SessionCloseResponse(BaseModel): + session_id: UUID + status: str + documentation: SessionDocumentation + psa_push_status: str = "no_psa" # sent | pending_retry | no_psa | failed + psa_push_error: str | None = None + member_mapping_warning: str | None = None # Set when time entry skipped due to missing mapping +``` + +**Add manual retry endpoint:** + +``` +POST /api/v1/ai-sessions/{id}/retry-psa-push +``` + +Only callable when the session's latest `PsaPostLog` entry has `status='failed'`. Resets to `pending_retry` and triggers an immediate push attempt. + +**Verification:** Resolve a ticket-linked session. Verify the response includes `psa_push_status: "sent"`. Check ConnectWise for the note. Resolve a session without a ticket — verify `psa_push_status: "no_psa"`. Test with a user who has no CW member mapping — verify `member_mapping_warning` is present and note still includes timing. + +``` +git commit -m "feat(ai-session): wire PSA documentation push into resolve/escalate with auto-retry" +``` + +### Task 6: Show PSA push status in frontend + +**Files:** +- Edit: `frontend/src/components/flowpilot/SessionDocView.tsx` +- Edit: `frontend/src/types/ai-session.ts` + +**What to add:** + +After resolution/escalation, the documentation view now shows a PSA sync indicator: + +- **"sent":** Green checkmark + "Documentation pushed to ticket #{ticket_id}" +- **"pending_retry":** Amber clock icon + "Documentation queued for push — will sync shortly" +- **"failed":** Red warning + "Failed to push to ticket — {error}" with a "Retry" button that calls `POST /ai-sessions/{id}/retry-psa-push` +- **"no_psa":** No indicator shown (session wasn't linked to a ticket) + +If `member_mapping_warning` is present, show an info banner: "Time entry was not created — [Map your CW account](link to settings) to enable auto-logged time. Session timing is included in the ticket note." + +Update the TypeScript types to include `psa_push_status`, `psa_push_error`, and `member_mapping_warning` on `SessionCloseResponse`. + +**Verification:** Resolve a ticket-linked session. See "Documentation pushed to ticket #12345" in the documentation view. Test retry button with a simulated failure. + +``` +git commit -m "feat(ai-session): show PSA push status in documentation view" +``` + +--- + +## Slice 3: Session Pause/Resume & Escalation Handoff + +### Task 7: Session pause/resume for same engineer + +**Files:** +- Edit: `backend/app/services/flowpilot_engine.py` +- Edit: `backend/app/api/endpoints/ai_sessions.py` +- Edit: `backend/app/schemas/ai_session.py` + +**What to add:** + +Engineers need to pause a session and come back later (lunch break, waiting for info, browser crash recovery). + +**New endpoint — Pause session:** + +``` +POST /api/v1/ai-sessions/{id}/pause +``` + +Flow: +1. Verify session is `active` and belongs to current user +2. Set `session.status = "paused"`, `session.paused_at = utcnow()` +3. Return updated session + +**New endpoint — Resume own paused session:** + +``` +POST /api/v1/ai-sessions/{id}/resume +``` + +Flow: +1. Verify session is `paused` and belongs to current user +2. Set `session.status = "active"`, clear `paused_at` +3. Return the session with all existing steps (engineer picks up exactly where they left off) +4. No briefing step needed — it's the same engineer + +**Browser crash recovery:** + +Sessions in `active` status should be resumable by navigating back to `/pilot/{sessionId}`. The frontend should detect an existing active session and restore it (conversation history is already in `conversation_messages` JSONB). This is mostly a frontend concern — the backend already stores all state. + +**Verification:** Start a session, progress 3 steps. Pause it. Navigate away. Come back. Resume. Verify you're back at step 3 with full context. Also test: close the browser tab while in an active session, reopen, navigate to session — verify it loads correctly. + +``` +git commit -m "feat(ai-session): add session pause/resume for same engineer" +``` + +### Task 8: Build escalation handoff backend + +**Files:** +- Edit: `backend/app/services/flowpilot_engine.py` +- Edit: `backend/app/api/endpoints/ai_sessions.py` +- Edit: `backend/app/schemas/ai_session.py` + +**Session status lifecycle (updated from Phase 1):** + +``` +active → paused (same engineer pause) +paused → active (same engineer resume) +active → requesting_escalation (engineer requests escalation) +requesting_escalation → active (another engineer picks it up) +active → resolved (session completed) +active → escalated (escalation completed — terminal, session was handed off and resolved by another engineer) +requesting_escalation → escalated (escalation expired or cancelled — terminal) +``` + +**Key ownership rule:** `session.user_id` ALWAYS stays as Engineer A (the originator). When Engineer B picks up the session, we track them via a new `current_handler_id` field (or in the `escalation_package` JSONB). Both engineers see the session in their history — A sees "I escalated this" and B sees "I picked this up." + +**Modify the existing `escalate_session()` in `flowpilot_engine.py`:** +1. Change `session.status = "escalated"` → `session.status = "requesting_escalation"` +2. Do NOT set `session.resolved_at` yet (session isn't done — it's waiting for pickup) +3. Store `session.escalation_package["original_user_id"] = str(user_id)` +4. **Block self-escalation:** If `escalated_to_id == current_user.id`, return 400 error + +**Enhance `_build_escalation_package()`:** + +The existing Phase 1 implementation builds a basic package with `problem_summary`, `steps_tried`, and `escalation_reason`. Enhance it to also include: + +- `remaining_hypotheses`: Make a quick LLM call (haiku-tier via `AI_MODEL_TIERS["fast"]`) asking: "Based on this diagnostic conversation, what are the most likely remaining causes that haven't been ruled out?" Pass the conversation_messages as context. +- `suggested_next_steps`: From the same LLM call: "What should the next engineer try first?" +- `steps_ruled_out`: Walk the steps and identify options that were tested and failed +- `environment_context`: Extract any environment-specific info mentioned during the session (server names, IP addresses, software versions, etc.) +- `original_user_id`: The engineer who escalated (for attribution in the briefing) + +This LLM call should use the fast model since it's a summarization task, not a diagnostic one. If the call fails, fall back to the basic package without hypotheses/suggestions — don't block the escalation. + +**New endpoint — Pick up escalated session:** + +``` +POST /api/v1/ai-sessions/{id}/pickup +``` + +Request body: +```python +class PickupSessionRequest(BaseModel): + """Pick up an escalated session as a new engineer.""" + resume_mode: str = "continue" # "continue" or "fresh" + additional_context: str | None = None # New info or question from the receiving engineer +``` + +**Pickup flow:** +1. Verify session status is `requesting_escalation` +2. Verify the current user has permission (same team) and is NOT the original engineer +3. Track the new handler (add to `escalation_package["picked_up_by"] = str(user_id)`, `escalation_package["picked_up_at"] = utcnow()`) +4. Set `session.status = "active"` +5. Generate a "briefing step" — a special step that summarizes everything for the new engineer: + - "Here's what {original_engineer} found so far: ..." + - "They ruled out X, Y, Z" + - "Remaining hypotheses: A, B" + - "Suggested next steps: ..." +6. Based on `resume_mode`: + - `"continue"`: Generate the next diagnostic step as usual (picks up where A left off) + - `"fresh"`: Use `additional_context` as new input, but FlowPilot's system prompt includes all of A's work so it won't repeat steps +7. Return the briefing step + next step + +**New endpoint — List sessions requesting escalation for team:** + +``` +GET /api/v1/ai-sessions/escalation-queue +``` + +Returns sessions with `status = "requesting_escalation"` for the current user's team, sorted by most recent. This is the "pickup queue" for escalated tickets. Includes: problem summary, escalation reason, who escalated, when, ticket # (if linked), step count, assigned-to (if specified). + +**Verification:** Start a session, progress 3-4 steps, escalate with a reason. Verify session is `requesting_escalation`. Log in as another user on the same team. Hit `/ai-sessions/escalation-queue`. See the session. Pick it up with `resume_mode: "continue"`. Verify the briefing step accurately summarizes prior work. Continue diagnosis. Also test `resume_mode: "fresh"` with additional context. + +``` +git commit -m "feat(ai-session): add escalation handoff backend with pickup flow" +``` + +### Task 9: Escalation handoff frontend + in-app notifications + +**Files:** +- Create: `frontend/src/components/flowpilot/EscalateModal.tsx` +- Create: `frontend/src/components/flowpilot/EscalationQueue.tsx` +- Create: `frontend/src/components/flowpilot/SessionBriefing.tsx` +- Edit: `frontend/src/components/flowpilot/FlowPilotActionBar.tsx` +- Edit: `frontend/src/components/flowpilot/FlowPilotSession.tsx` +- Edit: `frontend/src/hooks/useFlowPilotSession.ts` +- Edit: `frontend/src/api/aiSessions.ts` +- Edit: `frontend/src/types/ai-session.ts` +- Edit: `frontend/src/components/layout/Sidebar.tsx` (or equivalent nav component) +- Edit: `frontend/src/router.tsx` + +**EscalateModal:** + +When the engineer clicks "Escalate" in the action bar, this modal opens: + +- Textarea: "Why are you escalating?" (required) +- Dropdown: "Assign to" — list of team members (optional, defaults to unassigned) +- Summary card: auto-generated preview of the escalation package (steps taken, hypotheses remaining) +- "Escalate & Update Ticket" button (if PSA linked) / "Escalate" button (if not) +- **Self-escalation blocked:** Current user excluded from the "Assign to" dropdown + +**EscalationQueue:** + +New component accessible from **both** the sidebar nav and as a tab in session history. + +**Sidebar nav item:** "Escalations" with a badge showing count of sessions in `requesting_escalation` status for the user's team. Badge uses amber-400 color. Positioned below "Sessions" in the nav. + +**Session history tab:** New tab "Escalated" alongside existing tabs. Shows the same queue content. + +Queue content: +- Card for each session in `requesting_escalation`: problem summary, escalation reason, who escalated, when, ticket # (if linked), step count +- "Pick Up" button on each card +- Sort by most recent +- Filter by: assigned to me, unassigned, all + +**SessionBriefing:** + +When an engineer picks up an escalated session, the first thing they see is a styled briefing card (distinct from normal step cards — use an amber/purple accent border to distinguish from regular cyan steps): + +- "Escalation from {original_engineer}" +- Problem summary +- Steps already taken (collapsed list, expandable) +- What was ruled out +- Remaining hypotheses +- Suggested next steps +- Two action buttons: + - **"Continue Where They Left Off"** → calls pickup with `resume_mode: "continue"`, proceeds to FlowPilot's next question + - **"Start Fresh With Context"** → shows a textarea for the engineer to type their own input/question, then calls pickup with `resume_mode: "fresh"` and `additional_context` + +**Pause/Resume UI (from Task 7):** + +- Add "Pause" button to `FlowPilotActionBar` (alongside Resolve and Escalate) +- Paused sessions show in session history with a "Paused" badge +- Clicking a paused session resumes it automatically (or shows a "Resume" button) +- On page load, if navigating to `/pilot/{sessionId}` and session is `active`, restore the full conversation (browser crash recovery) + +**API client additions:** + +Add to the existing `aiSessionsApi` object in `src/api/aiSessions.ts` (follow the same pattern as existing methods): + +```typescript +// Add to aiSessionsApi object: +async pauseSession(sessionId: string): Promise { + const response = await apiClient.post( + `/ai-sessions/${sessionId}/pause` + ) + return response.data +}, + +async resumeSession(sessionId: string): Promise { + const response = await apiClient.post( + `/ai-sessions/${sessionId}/resume` + ) + return response.data +}, + +async pickupSession(sessionId: string, data: { resume_mode: string; additional_context?: string }): Promise { + const response = await apiClient.post( + `/ai-sessions/${sessionId}/pickup`, + data + ) + return response.data +}, + +async getEscalationQueue(): Promise { + const response = await apiClient.get('/ai-sessions/escalation-queue') + return response.data +}, + +async linkTicket(sessionId: string, data: { psa_ticket_id: string; psa_connection_id: string }): Promise { + const response = await apiClient.post( + `/ai-sessions/${sessionId}/link-ticket`, + data + ) + return response.data +}, + +async retryPsaPush(sessionId: string): Promise<{ psa_push_status: string }> { + const response = await apiClient.post<{ psa_push_status: string }>( + `/ai-sessions/${sessionId}/retry-psa-push` + ) + return response.data +}, +``` + +**Hook updates:** + +Add `pauseSession`, `resumeSession`, `pickupSession`, `escalationQueue`, `linkTicket`, `retryPsaPush` to `useFlowPilotSession`. + +**Router updates:** + +- Add route for the escalation queue page (e.g., `/escalations`) +- Ensure `/pilot/{sessionId}` handles all session states (active, paused, requesting_escalation) + +**Verification:** Full escalation flow — Engineer A starts session, progresses, escalates with reason. Engineer B sees it in the sidebar queue (badge count), picks it up via "Continue Where They Left Off", sees the briefing, continues diagnosis, resolves. Also test: Engineer B picks up via "Start Fresh With Context" with their own input. Also test pause/resume for same engineer. + +``` +git commit -m "feat(ai-session): add escalation handoff and pause/resume frontend" +``` + +--- + +## Slice 4: Session-to-Ticket Linking for Existing Sessions + +### Task 10: Link an in-progress session to a ticket retroactively + +**Files:** +- Edit: `backend/app/api/endpoints/ai_sessions.py` +- Edit: `backend/app/services/flowpilot_engine.py` +- Edit: `frontend/src/components/flowpilot/FlowPilotSession.tsx` + +**What to add:** + +Sometimes an engineer starts a session with free-text intake and then realizes "oh, this is ticket #12345." They should be able to link a ticket mid-session. + +**New endpoint:** + +``` +POST /api/v1/ai-sessions/{id}/link-ticket +``` + +Request: +```python +class LinkTicketRequest(BaseModel): + psa_ticket_id: str + psa_connection_id: UUID +``` + +**Flow:** +1. Fetch ticket data from ConnectWise (graceful failure — if CW is down, still store the ticket ID for later doc push) +2. Update `session.psa_ticket_id`, `session.psa_connection_id`, `session.ticket_data` +3. **Inject ticket context into FlowPilot's system prompt** for subsequent steps — append the formatted ticket context to `session.conversation_messages` system prompt. FlowPilot will naturally acknowledge the new context in its next response. +4. Return updated session data + +**Frontend:** + +Add a "Link Ticket" button in the session sidebar (where the ticket card would be, if there isn't one). Opens the adapted `TicketPickerModal` (with `onSelect` prop from Task 2). On selection, calls `linkTicket()` and the `SessionTicketCard` appears in the sidebar. + +**Verification:** Start a free-text session. Progress a few steps. Click "Link Ticket". Select a ticket. Verify ticket card appears in sidebar. Continue diagnosis — verify FlowPilot's next response acknowledges the ticket context. Resolve. Verify documentation pushes to the linked ticket. + +``` +git commit -m "feat(ai-session): add mid-session ticket linking with context injection" +``` + +--- + +## Slice 5: Configuration & Settings + +### Task 11: FlowPilot PSA settings + +**Files:** +- Edit: `frontend/src/pages/account/IntegrationsPage.tsx` (or create a new section) +- Edit: `backend/app/models/psa_connection.py` (add fields if needed) +- Edit: `backend/app/api/endpoints/integrations.py` (settings CRUD) + +**What to add:** + +Under the existing PSA integrations settings, add a "FlowPilot Settings" section: + +- **Auto-push documentation:** Toggle (default: on) — automatically push session documentation to linked tickets on resolution +- **Auto-create time entry:** Toggle (default: on) — automatically create a time entry when resolving (requires CW member mapping) +- **Time rounding:** Dropdown — "Nearest 15 minutes" (default), "Nearest 30 minutes", "Exact", "Don't create time entries" +- **Default note visibility:** Dropdown — "Internal only" (default), "Internal and external" +- **Include diagnostic steps in notes:** Toggle (default: on) — if off, only push the summary, not the full step trail +- **Prompt for ticket status on resolution:** Toggle (default: off) — when on, engineer sees a status dropdown at resolution time, populated dynamically from the linked ticket's board statuses via `get_ticket_statuses(board_id)`. When off, ticket status is not changed. +- **Prompt for ticket status on escalation:** Toggle (default: off) — same as above but for escalation + +**Note on status dropdowns:** These are NOT global dropdowns in settings. The setting is just a toggle for whether the engineer is prompted. The actual status options are pulled dynamically at resolution/escalation time based on the specific ticket's board (using the existing `get_ticket_statuses(board_id)` method). This is board-agnostic — works correctly regardless of which CW board the ticket is on. + +These settings should be stored on the `PsaConnection` model as a `flowpilot_settings` JSONB column (add via migration if needed). + +**Verification:** Navigate to integrations settings. See FlowPilot settings section. Toggle settings. Resolve a session with "prompt for status" enabled — verify the status dropdown shows the correct statuses for that ticket's board. Verify the documentation push respects all configured settings. + +``` +git commit -m "feat(ai-session): add FlowPilot PSA configuration settings" +``` + +--- + +## Summary of All New/Modified Files + +### Backend — New +``` +app/services/psa_documentation_service.py # Documentation push to PSA +app/services/psa_retry_scheduler.py # APScheduler job for retrying failed PSA pushes +``` + +### Backend — Modified +``` +app/services/flowpilot_engine.py # PSA ticket intake, pause/resume, enhanced escalation, pickup +app/api/endpoints/ai_sessions.py # Pause, resume, pickup, escalation queue, link-ticket, retry-push endpoints +app/schemas/ai_session.py # New schemas: PickupSessionRequest, LinkTicketRequest, psa_push_status, member_mapping_warning +app/models/psa_connection.py # Add flowpilot_settings JSONB column +app/models/psa_post_log.py # Add ai_session_id FK, make session_id nullable, add retry_count +app/services/psa/base.py # Add abstract create_time_entry() method +app/services/psa/types.py # Add PSATimeEntry type +app/services/psa/connectwise/provider.py # Implement create_time_entry() for CW API +app/components/session/TicketPickerModal.tsx # Add onSelect callback prop for dual-mode usage +alembic/versions/xxx_phase2_psa_flowpilot.py # Migration: flowpilot_settings, psa_post_log changes +``` + +### Frontend — New +``` +src/components/flowpilot/EscalateModal.tsx # Enhanced escalation dialog with team member dropdown +src/components/flowpilot/EscalationQueue.tsx # Pickup queue for escalated sessions +src/components/flowpilot/SessionBriefing.tsx # Handoff briefing card with continue/fresh options +src/components/flowpilot/SessionTicketCard.tsx # Ticket info in session sidebar +``` + +### Frontend — Modified +``` +src/components/flowpilot/FlowPilotIntake.tsx # Ticket picker integration, manual fallback, PSA connection check +src/components/flowpilot/FlowPilotSession.tsx # Ticket card in sidebar, link ticket button, pause/resume +src/components/flowpilot/FlowPilotActionBar.tsx # Pause button, escalate opens enhanced modal +src/components/flowpilot/SessionDocView.tsx # PSA push status indicator, retry button, member mapping warning +src/components/session/TicketPickerModal.tsx # Add onSelect prop for FlowPilot intake usage +src/components/layout/Sidebar.tsx # Escalation queue nav item with badge count +src/hooks/useFlowPilotSession.ts # Pause, resume, pickup, linkTicket, escalationQueue, retryPsaPush +src/api/aiSessions.ts # New API functions (follow aiSessionsApi object pattern) +src/types/ai-session.ts # New types: psa_push_status, PickupSessionRequest, etc. +src/pages/account/IntegrationsPage.tsx # FlowPilot PSA settings section +src/router.tsx # Escalation queue route +``` + +--- + +## Database Changes + +**Migration:** This phase requires a single migration with multiple changes: + +```python +# 1. Add flowpilot_settings to psa_connections +op.add_column('psa_connections', sa.Column( + 'flowpilot_settings', + sa.dialects.postgresql.JSONB(), + nullable=True, + server_default='{}', + comment='FlowPilot-specific settings: auto_push, time_rounding, note_visibility, etc.' +)) + +# 2. Add ai_session_id FK to psa_post_log (existing table points to old sessions only) +op.add_column('psa_post_log', sa.Column( + 'ai_session_id', + sa.dialects.postgresql.UUID(as_uuid=True), + sa.ForeignKey('ai_sessions.id', ondelete='CASCADE'), + nullable=True, + comment='FK to AI sessions (Phase 2). Original session_id FK remains for legacy sessions.' +)) +op.create_index('ix_psa_post_log_ai_session_id', 'psa_post_log', ['ai_session_id']) + +# 3. Make original session_id nullable (was NOT NULL — legacy sessions only) +op.alter_column('psa_post_log', 'session_id', nullable=True) + +# 4. Add retry_count to psa_post_log for automatic retries +op.add_column('psa_post_log', sa.Column( + 'retry_count', + sa.Integer(), + nullable=False, + server_default='0', + comment='Number of retry attempts for failed PSA pushes' +)) +op.add_column('psa_post_log', sa.Column( + 'next_retry_at', + sa.DateTime(timezone=True), + nullable=True, + comment='When to attempt the next retry' +)) +``` + +**Also update `PsaPostLog` model** (`app/models/psa_post_log.py`): Add the `ai_session_id` mapped column and relationship. Make `session_id` `Optional`. Add `retry_count` and `next_retry_at`. + +**Also update `PsaConnection` model** (`app/models/psa_connection.py`): Add the `flowpilot_settings` JSONB mapped column. + +**Also update PSA abstraction layer:** +- `services/psa/types.py`: Add `PSATimeEntry` model +- `services/psa/base.py`: Add abstract `create_time_entry()` method +- `services/psa/connectwise/provider.py`: Implement `create_time_entry()` using CW `POST /time/entries` + +**Run migration:** +```bash +cd /projects/patherly/backend +DATABASE_URL=postgresql://postgres:postgres@resolutionflow_postgres:5432/resolutionflow \ + venv/bin/alembic upgrade head +``` + +--- + +## Testing Strategy + +All tests use mocked ConnectWise responses based on the OpenAPI spec (no CW sandbox available yet). Mock shapes should match `docs/connectwise/connectwise-psa-resolutionflow-reference.json`. + +### Backend Unit Tests + +**Files:** `backend/tests/test_psa_documentation_service.py` + +- Test documentation formatting for resolved sessions (verify timing section always present) +- Test documentation formatting for escalated sessions +- Test password redaction in documentation +- Test time entry calculation (rounding logic for 15min, 30min, exact) +- Test PSA push with mock ConnectWise client +- Test missing member mapping — verify warning returned and note still includes timing +- Test retry logic — verify exponential backoff scheduling + +**Files:** `backend/tests/test_escalation_handoff.py` + +- Test escalation package generation (including LLM-generated hypotheses) +- Test self-escalation blocked (400 error) +- Test session pickup flow — "continue" mode (new engineer, briefing step) +- Test session pickup flow — "fresh" mode (new engineer provides own context) +- Test ownership preserved (session.user_id stays as Engineer A) +- Test permission enforcement (can't pick up session from another team) +- Test pause/resume for same engineer + +**Files:** `backend/tests/test_ai_sessions_psa.py` + +- Full flow: create ticket-based session → diagnose → resolve → verify PSA push with timing +- Full flow: create session → escalate → pickup by another user → resolve +- Test mid-session ticket linking with context injection +- Test PSA push failure → automatic retry → eventual success +- Test PSA push failure → exhaust retries → manual retry button +- Test graceful degradation when CW API is unavailable at intake + +### Frontend Manual Testing + +1. Start a session from a ticket — verify FlowPilot references ticket context +2. Start a session with CW unavailable — verify manual fallback works +3. Resolve a ticket session — verify ConnectWise shows the note with timing +4. Resolve without CW member mapping — verify warning shown, timing in notes +5. Start a free-text session → link ticket mid-session → verify FlowPilot acknowledges context → resolve → verify push +6. Pause a session → navigate away → come back → resume → verify full context preserved +7. Close browser tab during active session → reopen → navigate to session → verify recovery +8. Full escalation: Engineer A escalates → Engineer B sees badge in sidebar → picks up via "Continue" → sees briefing → resolves +9. Full escalation: Engineer B picks up via "Start Fresh" with own context → FlowPilot doesn't repeat A's steps +10. Verify Engineer A still sees the session in their history after B picks it up +11. Test PSA settings — toggle options, verify behavior changes +12. Test ticket status prompt at resolution — verify correct statuses shown for that ticket's board + +--- + +## What Comes Next (Phase 3 — NOT in scope here) + +For context only — do NOT implement these in Phase 2: + +- **Knowledge Flywheel:** Post-session flow proposal generation +- **Review Queue:** UI for approving AI-generated flow proposals +- **Flow Editor as curation tool:** Repurpose for reviewing AI-generated flows +- **In-session Script Generator:** FlowPilot invokes script generation contextually +- **Knowledge gap detection:** Track free-text escapes, high escalation categories +- **Team analytics:** MTTR, resolution rates, knowledge coverage +- **Escalation notifications:** Push notifications or email alerts for escalation queue (Phase 2 has in-app badge only) diff --git a/frontend/src/api/aiSessions.ts b/frontend/src/api/aiSessions.ts index 4e5f5179..2d74135c 100644 --- a/frontend/src/api/aiSessions.ts +++ b/frontend/src/api/aiSessions.ts @@ -10,6 +10,7 @@ import type { SessionDocumentation, AISessionSummary, AISessionDetail, + PickupSessionRequest, } from '@/types/ai-session' export const aiSessionsApi = { @@ -62,6 +63,42 @@ export const aiSessionsApi = { async rateSession(sessionId: string, data: { rating: number; feedback?: string }): Promise { await apiClient.post(`/ai-sessions/${sessionId}/rate`, data) }, + + async retryPsaPush(sessionId: string): Promise<{ psa_push_status: string; psa_push_error: string | null }> { + const response = await apiClient.post<{ psa_push_status: string; psa_push_error: string | null }>( + `/ai-sessions/${sessionId}/retry-psa-push` + ) + return response.data + }, + + async pauseSession(sessionId: string): Promise { + await apiClient.post(`/ai-sessions/${sessionId}/pause`) + }, + + async resumeSession(sessionId: string): Promise { + await apiClient.post(`/ai-sessions/${sessionId}/resume`) + }, + + async pickupSession(sessionId: string, data: PickupSessionRequest): Promise { + const response = await apiClient.post( + `/ai-sessions/${sessionId}/pickup`, + data + ) + return response.data + }, + + async linkTicket(sessionId: string, data: { psa_ticket_id: string; psa_connection_id: string }): Promise { + const response = await apiClient.post( + `/ai-sessions/${sessionId}/link-ticket`, + data + ) + return response.data + }, + + async getEscalationQueue(): Promise { + const response = await apiClient.get('/ai-sessions/escalation-queue') + return response.data + }, } export default aiSessionsApi diff --git a/frontend/src/api/integrations.ts b/frontend/src/api/integrations.ts index ed90f88d..e63687fd 100644 --- a/frontend/src/api/integrations.ts +++ b/frontend/src/api/integrations.ts @@ -1,6 +1,6 @@ import { apiClient } from './client' import type { PsaConnectionResponse, PsaConnectionCreate, PsaConnectionUpdate, PsaConnectionTestResponse } from '@/types' -import type { TicketLinkResponse, PSATicketSearchResult, PSATicketInfo, PSATicketStatusItem, PsaPreviewResponse, PsaPostResponse, PsaPostLogEntry, PsaMemberResponse, PsaMemberMappingResponse, AutoMatchResult } from '@/types/integrations' +import type { TicketLinkResponse, PSATicketSearchResult, PSATicketInfo, PSATicketStatusItem, PsaPreviewResponse, PsaPostResponse, PsaPostLogEntry, PsaMemberResponse, PsaMemberMappingResponse, AutoMatchResult, FlowpilotSettings } from '@/types/integrations' export const integrationsApi = { getConnection: () => @@ -27,6 +27,10 @@ export const integrationsApi = { apiClient.post('/integrations/psa/member-mappings', mappings).then(r => r.data), autoMatchMembers: () => apiClient.post('/integrations/psa/member-mappings/auto-match').then(r => r.data), + getFlowpilotSettings: (connectionId: string) => + apiClient.get(`/integrations/psa/connections/${connectionId}/flowpilot-settings`).then(r => r.data), + updateFlowpilotSettings: (connectionId: string, data: Partial) => + apiClient.put(`/integrations/psa/connections/${connectionId}/flowpilot-settings`, data).then(r => r.data), } export const sessionPsaApi = { diff --git a/frontend/src/components/flowpilot/EscalateModal.tsx b/frontend/src/components/flowpilot/EscalateModal.tsx new file mode 100644 index 00000000..9346fcab --- /dev/null +++ b/frontend/src/components/flowpilot/EscalateModal.tsx @@ -0,0 +1,82 @@ +import { useState } from 'react' +import { AlertTriangle, Loader2 } from 'lucide-react' +import { Modal } from '@/components/common/Modal' +import type { EscalateSessionRequest } from '@/types/ai-session' + +interface EscalateModalProps { + open: boolean + onClose: () => void + onEscalate: (data: EscalateSessionRequest) => Promise + isProcessing: boolean + hasPsaTicket: boolean +} + +export function EscalateModal({ open, onClose, onEscalate, isProcessing, hasPsaTicket }: EscalateModalProps) { + const [reason, setReason] = useState('') + + const handleSubmit = async () => { + if (!reason.trim() || reason.trim().length < 5) return + await onEscalate({ escalation_reason: reason.trim() }) + setReason('') + onClose() + } + + const handleClose = () => { + if (!isProcessing) { + setReason('') + onClose() + } + } + + return ( + +
+
+ +

+ This will mark the session as requesting escalation. Team members will see it in their escalation queue and can pick it up with full context. +

+
+ +
+ +