"""Suggested-fix + resolution-note / escalation-package preview-and-post endpoints. Phase 3: active suggested fix lookup + decision recording, resolution-note preview with state_version cache. Phase 4: resolution-note POST (writeback to PSA + mark resolved), escalation package preview + POST (writeback + mark escalated). Local-only path when the session has no linked PSA ticket: markdown is stored on the session and the status flipped, no external call. Per FLOWPILOT-MIGRATION.md Sections 5.2 + 5.4. """ import logging from datetime import datetime, timezone from typing import Annotated from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin from app.models.ai_session import AISession from app.models.session_suggested_fix import SessionSuggestedFix from app.models.user import User from app.schemas.session_suggested_fix import ( EscalationPackagePostRequest, ResolutionNotePostRequest, ResolutionNotePreviewResponse, ResolutionPostResponse, SessionSuggestedFixDecisionRequest, SessionSuggestedFixDecisionResponse, SessionSuggestedFixResponse, ) from app.services.escalation_package_generator import EscalationPackageGeneratorService from app.services.preview_cache import preview_cache from app.services.psa_writeback_service import ( PSAStatusVerificationError, PSAWritebackService, ) from app.services.resolution_note_generator import ResolutionNoteGeneratorService logger = logging.getLogger(__name__) router = APIRouter(prefix="/ai-sessions/{session_id}", tags=["session-suggested-fixes"]) async def _load_session_or_404(db: AsyncSession, session_id: UUID) -> AISession: """RLS-scoped session load. 404 covers both missing and cross-tenant.""" result = await db.execute(select(AISession).where(AISession.id == session_id)) session = result.scalar_one_or_none() if session is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found") return session # ── Suggested fix: active ────────────────────────────────────────────────── @router.get( "/suggested-fixes/active", response_model=SessionSuggestedFixResponse, ) async def get_active_suggested_fix( session_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ) -> SessionSuggestedFixResponse: """Return the current active suggested fix (`superseded_at IS NULL`) or 404. A session has at most one active fix. Multiple historical rows persist for audit, but only the most-recent un-superseded one is returned here. """ await _load_session_or_404(db, session_id) result = await db.execute( select(SessionSuggestedFix) .where( SessionSuggestedFix.session_id == session_id, SessionSuggestedFix.superseded_at.is_(None), ) .order_by(SessionSuggestedFix.created_at.desc()) ) fix = result.scalars().first() if fix is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No active suggested fix for this session", ) return SessionSuggestedFixResponse.model_validate(fix) # ── Suggested fix: decision ──────────────────────────────────────────────── @router.post( "/suggested-fixes/{fix_id}/decision", response_model=SessionSuggestedFixDecisionResponse, ) async def record_decision( session_id: UUID, fix_id: UUID, body: SessionSuggestedFixDecisionRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ) -> SessionSuggestedFixDecisionResponse: """Record the engineer's path choice on a suggested fix. Phase 3 only persists the decision and (for `dismissed`) supersedes the row. Side effects — script generation for `one_off` / `draft_template`, redirect for `build_template` — land in Phase 5 alongside the inline Script Generator integration. The response shape is forward-compatible. """ await _load_session_or_404(db, session_id) result = await db.execute( select(SessionSuggestedFix).where( SessionSuggestedFix.id == fix_id, SessionSuggestedFix.session_id == session_id, ) ) fix = result.scalar_one_or_none() if fix is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Suggested fix not found" ) # Once a fix has been superseded we still record the engineer's # decision (it's a historical signal — "engineer dismissed the # interim hypothesis"), but `dismissed` on a superseded row would # be redundant noise. if fix.superseded_at is not None and body.decision == "dismissed": raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="This fix is already superseded by a newer suggestion", ) fix.user_decision = body.decision if body.decision == "dismissed" and fix.superseded_at is None: fix.superseded_at = datetime.now(timezone.utc) # Engineer's choice changes the bundle the resolution-note preview sees, # so bump state_version too. await db.execute( update(AISession) .where(AISession.id == session_id) .values(state_version=AISession.state_version + 1) ) await db.commit() await db.refresh(fix) return SessionSuggestedFixDecisionResponse( id=fix.id, user_decision=fix.user_decision, # type: ignore[arg-type] ) # ── Resolution note preview ──────────────────────────────────────────────── @router.post( "/resolution-note/preview", response_model=ResolutionNotePreviewResponse, ) async def resolution_note_preview( session_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ) -> ResolutionNotePreviewResponse: """Generate (or return cached) draft markdown for the Resolve note. Cache key: `(resolution_note, session_id, state_version)`. State_version is bumped by every fact / suggested-fix / script-generation write, so two consecutive calls with no intervening writes return the same cached payload (and won't pay for a Sonnet call). Posted to PSA in Phase 4. Until then, this endpoint is read-only. """ await _load_session_or_404(db, session_id) gen = ResolutionNoteGeneratorService(db) try: payload = await gen.generate_or_get_cached(session_id) except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) except Exception as e: logger.exception("Resolution note preview failed for session %s", session_id) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Resolution-note generator error ({type(e).__name__})", ) return ResolutionNotePreviewResponse(**payload) # ── Phase 4: escalation-package preview ──────────────────────────────────── @router.post( "/escalation-package/preview", response_model=ResolutionNotePreviewResponse, ) async def escalation_package_preview( session_id: UUID, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ) -> ResolutionNotePreviewResponse: """Generate (or return cached) draft markdown for the Escalate handoff package. Same caching story as the resolution-note preview: keyed on `(session_id, state_version)`. Separate cache kind so a Resolve preview and an Escalate preview for the same state can coexist. """ await _load_session_or_404(db, session_id) gen = EscalationPackageGeneratorService(db) try: payload = await gen.generate_or_get_cached(session_id) except ValueError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e)) except Exception as e: logger.exception("Escalation package preview failed for session %s", session_id) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Escalation-package generator error ({type(e).__name__})", ) return ResolutionNotePreviewResponse(**payload) # ── Phase 4: Resolve & post ──────────────────────────────────────────────── @router.post( "/resolution-note/post", response_model=ResolutionPostResponse, ) async def post_resolution_note( session_id: UUID, body: ResolutionNotePostRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ) -> ResolutionPostResponse: """Commit the engineer-edited resolution note and close the session. Three outcomes: - **External post + status verified** — session.status='resolved', markdown + external_id + posted_at persisted, CW status flipped to the configured Resolved status ID and re-fetch-verified. - **External post only** — markdown posted, but no cw_resolved_status_id configured → session.status='resolved', `status_transition_skipped_reason` explains the skip. Not an error — posting the note is meaningful. - **Local-only** — session has no linked PSA ticket → markdown stored on `resolution_note_markdown`, session.status='resolved', outcome = 'resolved_local'. No external call. Status verification failure raises 502: the engineer intended to close the ticket but we cannot confirm it actually closed. Surfacing silent success would be a footgun. """ session_obj = await _load_session_or_404(db, session_id) if session_obj.status not in ("active", "paused", "requesting_escalation", "escalated"): # Already-resolved sessions shouldn't be re-posted; caller should # query first. escalated→resolved is allowed (engineer revised course). raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"Session is already {session_obj.status}", ) service = PSAWritebackService(db) summary = (body.resolution_summary or body.markdown.strip().splitlines()[0])[:500] # Local-only path — no PSA ticket linked, nothing to post. if not session_obj.psa_ticket_id or not session_obj.psa_connection_id: session_obj.resolution_note_markdown = body.markdown.strip() session_obj.status = "resolved" session_obj.resolved_at = datetime.now(timezone.utc) session_obj.resolution_summary = summary await db.commit() return ResolutionPostResponse( outcome="resolved_local", session_status=session_obj.status, ) try: posted = await service.post_resolution_note(session_obj, body.markdown) except Exception as e: logger.exception("post_resolution_note failed for session %s", session_id) await db.rollback() raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"PSA post failed ({type(e).__name__})", ) # Attempt the status transition if configured; failed verification is # surfaced loudly (status_code 502) per the ConnectWise anti-silent- # success principle. Not configured → skip with a reason, not an error. target_status_id = await service.resolved_status_id_for_account(session_obj.account_id) verified_status_id: int | None = None verified_status_name: str | None = None skipped_reason: str | None = None if target_status_id is None: skipped_reason = ( "No cw_resolved_status_id configured in account_settings.preferences — " "note posted, status unchanged." ) else: try: result = await service.transition_ticket_status(session_obj, target_status_id) verified_status_id = result["verified_status_id"] verified_status_name = result["verified_status_name"] except PSAStatusVerificationError as e: logger.error("Status verification failed for session %s: %s", session_id, e) # Note was already posted — roll that partial side effect back in # the session record (the CW note itself can't be un-posted). await db.rollback() raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=str(e), ) except Exception as e: logger.exception("Status transition failed for session %s", session_id) await db.rollback() raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"PSA status transition error ({type(e).__name__})", ) session_obj.status = "resolved" session_obj.resolved_at = datetime.now(timezone.utc) session_obj.resolution_summary = summary await db.commit() return ResolutionPostResponse( outcome="resolved", session_status=session_obj.status, external_id=posted["external_id"], posted_at=posted["posted_at"], verified_status_id=verified_status_id, verified_status_name=verified_status_name, status_transition_skipped_reason=skipped_reason, ) # ── Phase 4: Escalate & post ────────────────────────────────────────────── @router.post( "/escalation-package/post", response_model=ResolutionPostResponse, ) async def post_escalation_package( session_id: UUID, body: EscalationPackagePostRequest, current_user: Annotated[User, Depends(get_current_active_user)], db: Annotated[AsyncSession, Depends(get_db)], _: None = Depends(require_engineer_or_admin), ) -> ResolutionPostResponse: """Commit the engineer-edited escalation package and mark the session escalated. Structure mirrors post_resolution_note: - Local-only when no PSA ticket: markdown stored, session.status='escalated'. - PSA post: internal-analysis note (handoff is for the next engineer, not the customer), optional status transition via cw_escalated_status_id, re-fetch verified. """ session_obj = await _load_session_or_404(db, session_id) if session_obj.status not in ("active", "paused", "resolved"): # resolved→escalated is allowed (engineer realized they need help # after closing); escalated→escalated would be a no-op, block it. raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"Session is already {session_obj.status}", ) service = PSAWritebackService(db) reason = body.escalation_reason or body.markdown.strip().splitlines()[0][:500] if not session_obj.psa_ticket_id or not session_obj.psa_connection_id: session_obj.escalation_package_markdown = body.markdown.strip() session_obj.status = "escalated" session_obj.escalation_reason = reason await db.commit() return ResolutionPostResponse( outcome="escalated_local", session_status=session_obj.status, ) try: posted = await service.post_escalation_package(session_obj, body.markdown) except Exception as e: logger.exception("post_escalation_package failed for session %s", session_id) await db.rollback() raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"PSA post failed ({type(e).__name__})", ) target_status_id = await service.escalated_status_id_for_account(session_obj.account_id) verified_status_id: int | None = None verified_status_name: str | None = None skipped_reason: str | None = None if target_status_id is None: skipped_reason = ( "No cw_escalated_status_id configured — package posted, status unchanged." ) else: try: result = await service.transition_ticket_status(session_obj, target_status_id) verified_status_id = result["verified_status_id"] verified_status_name = result["verified_status_name"] except PSAStatusVerificationError as e: logger.error("Status verification failed for session %s: %s", session_id, e) await db.rollback() raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(e)) except Exception as e: logger.exception("Status transition failed for session %s", session_id) await db.rollback() raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"PSA status transition error ({type(e).__name__})", ) session_obj.status = "escalated" session_obj.escalation_reason = reason await db.commit() return ResolutionPostResponse( outcome="escalated", session_status=session_obj.status, external_id=posted["external_id"], posted_at=posted["posted_at"], verified_status_id=verified_status_id, verified_status_name=verified_status_name, status_transition_skipped_reason=skipped_reason, ) # ── Helper used by tests ─────────────────────────────────────────────────── def _clear_preview_cache_for_tests() -> None: """Reset the singleton cache between tests.""" preview_cache._store.clear() # noqa: SLF001 — test-only access