feat(pilot): Phase 3 — Suggested fix tracking + Resolve preview with state_version cache

Adds the AI-proposed resolution path and the inline preview of the
markdown that will be posted to the customer ticket on Resolve. The
preview is keyed on (session_id, ai_sessions.state_version) so back-to-
back fetches against unchanged state hit an in-process cache instead
of paying for a Sonnet call.

Backend:
- preview_cache: in-process LRU keyed on (kind, session_id, state_version).
  No TTL — state_version is the source of truth. Soft-cap 5000 entries.
- unified_chat_service: [SUGGEST_FIX] parser (last-block-wins, JSON
  payload, confidence clamped 0-100), supersession persistence (sets
  superseded_at on prior active row), atomic state_version bump.
- ResolutionNoteGeneratorService: pulls session, facts, active fix, and
  redacted script_generations into a structured input bundle for Sonnet;
  produces the four-section markdown (Problem / What we confirmed /
  Root cause / Resolution). Sensitive script parameters redacted via
  ScriptTemplateEngine.redact_sensitive driven by the template's
  parameters_schema.
- /api/v1/ai-sessions/{id}/suggested-fixes/active — 200 with the active
  fix or 404.
- /api/v1/ai-sessions/{id}/suggested-fixes/{fix_id}/decision — records
  one_off / draft_template / build_template / dismissed; dismiss
  supersedes; bumps state_version. 409 on dismissing an already-
  superseded fix.
- /api/v1/ai-sessions/{id}/resolution-note/preview — generates or returns
  cached markdown; from_cache flag in payload signals cache hit.
- scripts.py POST /generate now bumps state_version on the linked
  ai_session_id when present (third source of preview-cache invalidation
  per Section 5.5).
- ASSISTANT_SYSTEM_PROMPT documents [SUGGEST_FIX] (when to/not to emit,
  format, supersession semantics).
- 12 tests covering the parser (well-formed, last-wins, malformed,
  confidence clamping), supersession + state_version invariant, all
  decision branches, preview cache hit-on-no-change + miss-after-write.

Frontend:
- src/components/pilot/sections/SuggestedFix.tsx — amber-accented card
  with confidence badge; dismiss action wired to the decision endpoint.
- src/components/pilot/ResolutionNotePreview.tsx — popover with refresh,
  loading state, cached/fresh indicator, ticket-ref display.
- src/api/sessionSuggestedFixes.ts — typed client; getActive normalizes
  404 to null so callers don't have to special-case.
- TaskLane gains suggestedFixSlot + bottomSlot props (rendered after
  Diagnostic Checks; bottomSlot anchors the Resolve action).
- AssistantChatPage: refreshSessionDerived helper batches fact + fix
  refresh; fact mutations and chat sends both schedule a 500ms-debounced
  preview refresh per the Section 5.5 spec.

Verified end-to-end against the dev stack with a real Sonnet call:
- /active 404 → fact create → preview generates four-section markdown
  grounded only in provided facts → second preview call hits cache
  (from_cache=true, no LLM call) → fact write 2 → cache miss, regenerates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-21 21:45:52 -04:00
parent 625dba7548
commit 66e592096c
16 changed files with 1617 additions and 22 deletions

View File

@@ -5,7 +5,7 @@ import re
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, or_, literal
from sqlalchemy import select, func, or_, literal, update as sa_update
from app.core.database import get_db
from app.api.deps import get_current_active_user
@@ -374,6 +374,20 @@ async def generate_script(
)
db.add(generation)
template.usage_count += 1
# FlowPilot Phase 3: bump the linked AI session's state_version so the
# resolution-note preview cache invalidates. One-off scripts run outside
# any FlowPilot session — in that case the UPDATE matches zero rows.
if data.ai_session_id is not None:
# Local import: scripts endpoint stays independent of AI-session
# imports for non-AI generation paths.
from app.models.ai_session import AISession
await db.execute(
sa_update(AISession)
.where(AISession.id == data.ai_session_id)
.values(state_version=AISession.state_version + 1)
)
await db.commit()
await db.refresh(generation)

View File

@@ -0,0 +1,183 @@
"""Suggested-fix and resolution-note preview endpoints (Phase 3).
Per FLOWPILOT-MIGRATION.md Sections 5.2 + 5.4. The preview is keyed on
`(session_id, ai_sessions.state_version)` so repeat fetches against the same
state hit the in-process cache instead of paying for a Sonnet call.
"""
import logging
from datetime import datetime, timezone
from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_active_user, get_db, require_engineer_or_admin
from app.models.ai_session import AISession
from app.models.session_suggested_fix import SessionSuggestedFix
from app.models.user import User
from app.schemas.session_suggested_fix import (
ResolutionNotePreviewResponse,
SessionSuggestedFixDecisionRequest,
SessionSuggestedFixDecisionResponse,
SessionSuggestedFixResponse,
)
from app.services.preview_cache import preview_cache
from app.services.resolution_note_generator import ResolutionNoteGeneratorService
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/ai-sessions/{session_id}", tags=["session-suggested-fixes"])
async def _load_session_or_404(db: AsyncSession, session_id: UUID) -> AISession:
"""RLS-scoped session load. 404 covers both missing and cross-tenant."""
result = await db.execute(select(AISession).where(AISession.id == session_id))
session = result.scalar_one_or_none()
if session is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session not found")
return session
# ── Suggested fix: active ──────────────────────────────────────────────────
@router.get(
"/suggested-fixes/active",
response_model=SessionSuggestedFixResponse,
)
async def get_active_suggested_fix(
session_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
) -> SessionSuggestedFixResponse:
"""Return the current active suggested fix (`superseded_at IS NULL`) or 404.
A session has at most one active fix. Multiple historical rows persist
for audit, but only the most-recent un-superseded one is returned here.
"""
await _load_session_or_404(db, session_id)
result = await db.execute(
select(SessionSuggestedFix)
.where(
SessionSuggestedFix.session_id == session_id,
SessionSuggestedFix.superseded_at.is_(None),
)
.order_by(SessionSuggestedFix.created_at.desc())
)
fix = result.scalars().first()
if fix is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No active suggested fix for this session",
)
return SessionSuggestedFixResponse.model_validate(fix)
# ── Suggested fix: decision ────────────────────────────────────────────────
@router.post(
"/suggested-fixes/{fix_id}/decision",
response_model=SessionSuggestedFixDecisionResponse,
)
async def record_decision(
session_id: UUID,
fix_id: UUID,
body: SessionSuggestedFixDecisionRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
) -> SessionSuggestedFixDecisionResponse:
"""Record the engineer's path choice on a suggested fix.
Phase 3 only persists the decision and (for `dismissed`) supersedes the
row. Side effects — script generation for `one_off` / `draft_template`,
redirect for `build_template` — land in Phase 5 alongside the inline
Script Generator integration. The response shape is forward-compatible.
"""
await _load_session_or_404(db, session_id)
result = await db.execute(
select(SessionSuggestedFix).where(
SessionSuggestedFix.id == fix_id,
SessionSuggestedFix.session_id == session_id,
)
)
fix = result.scalar_one_or_none()
if fix is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Suggested fix not found"
)
# Once a fix has been superseded we still record the engineer's
# decision (it's a historical signal — "engineer dismissed the
# interim hypothesis"), but `dismissed` on a superseded row would
# be redundant noise.
if fix.superseded_at is not None and body.decision == "dismissed":
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="This fix is already superseded by a newer suggestion",
)
fix.user_decision = body.decision
if body.decision == "dismissed" and fix.superseded_at is None:
fix.superseded_at = datetime.now(timezone.utc)
# Engineer's choice changes the bundle the resolution-note preview sees,
# so bump state_version too.
await db.execute(
update(AISession)
.where(AISession.id == session_id)
.values(state_version=AISession.state_version + 1)
)
await db.commit()
await db.refresh(fix)
return SessionSuggestedFixDecisionResponse(
id=fix.id,
user_decision=fix.user_decision, # type: ignore[arg-type]
)
# ── Resolution note preview ────────────────────────────────────────────────
@router.post(
"/resolution-note/preview",
response_model=ResolutionNotePreviewResponse,
)
async def resolution_note_preview(
session_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)],
_: None = Depends(require_engineer_or_admin),
) -> ResolutionNotePreviewResponse:
"""Generate (or return cached) draft markdown for the Resolve note.
Cache key: `(resolution_note, session_id, state_version)`. State_version is
bumped by every fact / suggested-fix / script-generation write, so two
consecutive calls with no intervening writes return the same cached
payload (and won't pay for a Sonnet call).
Posted to PSA in Phase 4. Until then, this endpoint is read-only.
"""
await _load_session_or_404(db, session_id)
gen = ResolutionNoteGeneratorService(db)
try:
payload = await gen.generate_or_get_cached(session_id)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
except Exception as e:
logger.exception("Resolution note preview failed for session %s", session_id)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Resolution-note generator error ({type(e).__name__})",
)
return ResolutionNotePreviewResponse(**payload)
# ── Helper used by tests ───────────────────────────────────────────────────
def _clear_preview_cache_for_tests() -> None:
"""Reset the singleton cache between tests."""
preview_cache._store.clear() # noqa: SLF001 — test-only access

View File

@@ -44,6 +44,7 @@ from app.api.endpoints import (
session_facts,
session_handoffs,
session_resolutions,
session_suggested_fixes,
sessions,
shared,
shares,
@@ -139,6 +140,7 @@ api_router.include_router(session_resolutions.router, dependencies=_tenant_deps)
# session_facts mounts under /ai-sessions/{id}/facts — register before ai_sessions
# so the {session_id}/facts subpaths take precedence over any future generic catchalls.
api_router.include_router(session_facts.router, dependencies=_tenant_deps)
api_router.include_router(session_suggested_fixes.router, dependencies=_tenant_deps)
api_router.include_router(ai_sessions.router, dependencies=_tenant_deps)
api_router.include_router(flow_proposals.router, dependencies=_tenant_deps)
api_router.include_router(flowpilot_analytics.router, dependencies=_tenant_deps)

View File

@@ -134,6 +134,11 @@ class Settings(BaseSettings):
# Doc Section 6.6 sets Haiku as the default; instrumentation tracks
# disputed_fact_rate so we can escalate to Sonnet if quality drops.
"fact_synthesis": "fast",
# FlowPilot migration Phase 3 — resolution-note preview that ships to
# the customer ticket. Sonnet because customer-facing artifact quality
# matters more than latency; the in-process state_version cache keeps
# cost manageable.
"resolution_note": "standard",
}
def get_model_for_action(self, action_type: str) -> str:

View File

@@ -0,0 +1,63 @@
"""Pydantic schemas for session suggested fixes (Phase 3).
See FLOWPILOT-MIGRATION.md Section 5.2.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any, Literal
from uuid import UUID
from pydantic import BaseModel, Field
UserDecision = Literal["one_off", "draft_template", "build_template", "dismissed"]
class SessionSuggestedFixResponse(BaseModel):
id: UUID
session_id: UUID
title: str
description: str
confidence_pct: int
script_template_id: UUID | None
ai_drafted_script: str | None
ai_drafted_parameters: dict[str, Any] | None
user_decision: UserDecision | None
superseded_at: datetime | None
created_at: datetime
model_config = {"from_attributes": True}
class SessionSuggestedFixDecisionRequest(BaseModel):
"""Engineer's path choice on a suggested fix.
Server-side side effects per Section 5.2:
- one_off: render the script (Phase 5), no template created.
- draft_template: render + queue a draft_templates row (Phase 5/6).
- build_template: redirect to full template creation (Phase 5).
- dismissed: mark the fix superseded so a fresh suggestion can take over.
"""
decision: UserDecision
class SessionSuggestedFixDecisionResponse(BaseModel):
"""Returned after recording a decision; richer payloads land in Phase 5."""
id: UUID
user_decision: UserDecision
# Set when the decision triggered side effects (e.g. a script generation).
# Phase 3 only records the choice; this stays None until Phase 5 wires it.
rendered_script: str | None = None
redirect_path: str | None = Field(
None,
description="Where to send the engineer next (e.g. /scripts/builder?... for build_template)",
)
# ── Resolution note preview ────────────────────────────────────────────────
class ResolutionNotePreviewResponse(BaseModel):
markdown: str
target_ticket_ref: str | None
state_version: int
from_cache: bool

View File

@@ -159,6 +159,44 @@ words, no period.
the engineer's message confirms the fact, do not emit a [PROMOTE]. Hallucinated \
facts get posted to customer tickets and will erode trust in the system.
## Proposing a fix with [SUGGEST_FIX]
When you have a concrete proposed resolution path with reasonable confidence, \
emit a `[SUGGEST_FIX]` marker. This populates the "Suggested fix" card the \
engineer can act on (run a script, build a template, etc.). A new \
[SUGGEST_FIX] supersedes any prior suggested fix on the session — emit a fresh \
one whenever your top hypothesis changes meaningfully.
**When to emit [SUGGEST_FIX]:**
- You have a concrete resolution path (not just "investigate further")
- Confidence is at least ~50% — below that, keep diagnosing
- Either a known Script Library template applies, OR you can draft a script \
that resolves the issue end-to-end
**When NOT to emit [SUGGEST_FIX]:**
- You're still narrowing causes and the fix depends on the next answer
- The "fix" is just running another diagnostic — that goes in [ACTIONS]
- Two paths are equally likely — fork or ask first, suggest later
**[SUGGEST_FIX] marker format (one block per response, last one wins):**
[SUGGEST_FIX]
{"title": "Clear cached credentials + rebuild Outlook profile", "description": "Stale cached credential in Credential Manager is holding the pre-reset token. Clearing it and recreating the profile completes the password change.", "confidence": 94, "script_template_slug": "clear-outlook-credentials"}
[/SUGGEST_FIX]
- `title`: short imperative summary, ≤ 200 chars
- `description`: one short paragraph explaining the root cause and the fix
- `confidence`: integer 0-100 (what you'd bet this resolves the ticket)
- `script_template_slug`: slug of an existing Script Library template if one \
applies; OMIT or set null otherwise
- `ai_drafted_script`: full script body if no template matches (only when \
`script_template_slug` is null/omitted)
- `ai_drafted_parameters`: optional JSON object of suggested parameter values \
for the drafted script
The marker is stripped from display — the engineer sees the suggested fix as \
an interactive card with confidence badge, not raw JSON.
## Using the Team's Flow Library
Your team has built troubleshooting flows in ResolutionFlow. When relevant flows \
appear in the context below, reference them by name so the engineer can launch them \
@@ -232,6 +270,9 @@ in your markers unless you are ≥75% confident that information is no longer re
[PROMOTE] markers are OPTIONAL and IN ADDITION to the required ones — emit them only \
when the engineer's most recent message confirmed something worth recording, and copy \
the originating item's `id` into `source_ref` verbatim.
[SUGGEST_FIX] is OPTIONAL — emit one at most per response, only when you have a \
concrete proposed resolution at ~50%+ confidence. A new [SUGGEST_FIX] supersedes \
any prior suggested fix.
"""

View File

@@ -0,0 +1,52 @@
"""In-process preview cache for FlowPilot resolution-note / escalation-package previews.
Phase 3 implementation per FLOWPILOT-MIGRATION.md Section 5.5:
- Cache key: `(kind, session_id, state_version)` — no TTL needed, state_version
is the source of truth.
- Invalidation: any write to session_facts, session_suggested_fixes, or
script_generations bumps `ai_sessions.state_version`. Old entries simply
stop being looked up and leak harmlessly until process restart.
- Storage: plain dict, single-process. When Session Sharing brings Redis,
swap the storage without changing the call sites.
Bound: best-effort soft cap of 5000 entries. When exceeded we drop the
oldest insertion. Not a TTL — at current scale, the cap is more about
resident-memory hygiene than correctness.
"""
from __future__ import annotations
from collections import OrderedDict
from typing import Any
from uuid import UUID
_MAX_ENTRIES = 5000
class _PreviewCache:
def __init__(self) -> None:
self._store: OrderedDict[tuple[str, UUID, int], Any] = OrderedDict()
def get(self, kind: str, session_id: UUID, state_version: int) -> Any | None:
key = (kind, session_id, state_version)
if key not in self._store:
return None
# Touch on access so LRU eviction is meaningful.
self._store.move_to_end(key)
return self._store[key]
def set(self, kind: str, session_id: UUID, state_version: int, value: Any) -> None:
key = (kind, session_id, state_version)
self._store[key] = value
self._store.move_to_end(key)
# Evict oldest if over cap. OrderedDict.popitem(last=False) is O(1).
while len(self._store) > _MAX_ENTRIES:
self._store.popitem(last=False)
def invalidate_session(self, session_id: UUID) -> None:
"""Drop all entries for a session — used when the session is deleted."""
keys = [k for k in self._store if k[1] == session_id]
for k in keys:
del self._store[k]
preview_cache = _PreviewCache()

View File

@@ -0,0 +1,320 @@
"""ResolutionNoteGeneratorService — drafts the structured Resolve note for a session.
Produces the four-section markdown that ships to the customer ticket (per
FLOWPILOT-MIGRATION.md Section 6.2):
## Problem
## What we confirmed
## Root cause
## Resolution
The output is the *draft* — engineers review and edit in the preview popover
before clicking Confirm & post (Phase 4). Caching is keyed on
`(session_id, ai_sessions.state_version)` per Section 5.5; the cache lives in
`preview_cache` and invalidates automatically when any fact / suggested fix /
script generation bumps the session's state_version.
Model: Sonnet (`resolution_note` action tier — quality matters because the
output is customer-facing). MCP intentionally disabled — this is a summary
of existing state, not a research task.
Sensitive parameter values in script_generations are redacted using the
script template's `parameters_schema` (`field_type: "password"`). Existing
ScriptTemplateEngine.redact_sensitive handles the substitution.
"""
from __future__ import annotations
import logging
from typing import Any
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.ai_provider import get_ai_provider
from app.core.config import settings
from app.models.ai_session import AISession
from app.models.script_template import ScriptGeneration, ScriptTemplate
from app.models.session_fact import SessionFact
from app.models.session_suggested_fix import SessionSuggestedFix
from app.services.preview_cache import preview_cache
from app.services.script_template_engine import ScriptTemplateEngine
logger = logging.getLogger(__name__)
_RESOLUTION_NOTE_SYSTEM_PROMPT = """\
You produce structured resolution notes for an MSP troubleshooting platform. \
The notes are posted as ticket notes in the customer's PSA, so they must read \
like a competent senior engineer summarized the work — not like an AI \
narration. Your output goes in front of paying customers.
Output exactly this markdown structure, no preamble, no closing remarks, no \
extra headings:
## Problem
<one short paragraph stating the issue the engineer worked on, derived from the \
session's intake/title and the incident header. Past tense. No "user reported" \
hedging — state the problem directly.>
## What we confirmed
<bulleted list of facts from the "What we know" section, each one a short line. \
Group similar facts together; do not invent connecting prose. If there are no \
facts, write "Nothing was confirmed." and skip to Root cause.>
## Root cause
<one short paragraph naming the root cause based on the active suggested fix \
and confirmed facts. If the suggested fix is low-confidence (<60%) or absent, \
say "Root cause not definitively isolated." and explain what is suspected based \
on facts.>
## Resolution
<one short paragraph describing the resolution applied. If a script ran during \
the session, mention it (e.g. "Cleared cached credentials via the \
clear-outlook-credentials script."). If no resolution has been performed yet, \
write "Resolution not yet applied — fix proposed: <fix title>." Pull verbatim \
script names and template references when available.>
Strict rules:
- Use ONLY the facts and state I provide. Never invent specifics that are not \
in the input.
- Do not include placeholder text like "TBD", "TODO", or empty bullets.
- Do not include the engineer's name, the AI's name, internal session IDs, or \
the session's chat transcript.
- Markdown headings exactly as shown (## level), no bolding the headings.
- No trailing whitespace, no double-blank lines, no horizontal rules.
"""
class ResolutionNoteGeneratorService:
"""Generates and caches the four-section Resolve note markdown."""
KIND = "resolution_note"
def __init__(self, db: AsyncSession) -> None:
self.db = db
async def generate_or_get_cached(
self, session_id: UUID, *, force: bool = False,
) -> dict[str, Any]:
"""Return the preview for the session.
Reads `(KIND, session_id, state_version)` from the in-process cache;
on miss, generates fresh markdown and stores under the same key.
`force=True` bypasses the cache and refreshes the cached entry.
Returns `{"markdown": str, "target_ticket_ref": str | None,
"state_version": int, "from_cache": bool}`.
"""
session = await self._load_session(session_id)
cached = preview_cache.get(self.KIND, session.id, session.state_version) if not force else None
if cached is not None:
return {**cached, "from_cache": True}
markdown = await self._render(session)
target = self._target_ticket_ref(session)
payload = {
"markdown": markdown,
"target_ticket_ref": target,
"state_version": session.state_version,
}
preview_cache.set(self.KIND, session.id, session.state_version, payload)
return {**payload, "from_cache": False}
# ── Internals ─────────────────────────────────────────────────────────
async def _load_session(self, session_id: UUID) -> AISession:
result = await self.db.execute(
select(AISession).where(AISession.id == session_id)
)
session = result.scalar_one_or_none()
if session is None:
raise ValueError(f"Session {session_id} not found")
return session
async def _render(self, session: AISession) -> str:
"""Build the prompt input bundle, call the model, return markdown."""
facts = await self._load_facts(session.id)
active_fix = await self._load_active_fix(session.id)
gens = await self._load_redacted_generations(session.id)
bundle = self._build_input_bundle(session, facts, active_fix, gens)
model = settings.get_model_for_action("resolution_note")
provider = get_ai_provider(model=model)
# Cache the system prompt — identical across every preview call for
# every session. Per-session bundle is in the user message, uncached.
system_blocks: list[dict[str, Any]] = [
{
"type": "text",
"text": _RESOLUTION_NOTE_SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"},
# cacheable: identical across every resolution-note preview call
},
]
try:
text, _in, _out = await provider.generate_text(
system_prompt=system_blocks,
messages=[{"role": "user", "content": bundle}],
max_tokens=1200,
)
except Exception:
logger.exception("Resolution note generation failed for session %s", session.id)
raise
return text.strip()
async def _load_facts(self, session_id: UUID) -> list[SessionFact]:
result = await self.db.execute(
select(SessionFact)
.where(
SessionFact.session_id == session_id,
SessionFact.deleted_at.is_(None),
)
.order_by(SessionFact.created_at.asc())
)
return list(result.scalars().all())
async def _load_active_fix(self, session_id: UUID) -> SessionSuggestedFix | None:
result = await self.db.execute(
select(SessionSuggestedFix)
.where(
SessionSuggestedFix.session_id == session_id,
SessionSuggestedFix.superseded_at.is_(None),
)
.order_by(SessionSuggestedFix.created_at.desc())
)
return result.scalars().first()
async def _load_redacted_generations(
self, session_id: UUID
) -> list[dict[str, Any]]:
"""Pull script_generations for the session, redacting password params.
Password fields are inferred from the linked template's
`parameters_schema` (`field_type: "password"`). The existing
ScriptTemplateEngine.redact_sensitive handles the substitution.
"""
result = await self.db.execute(
select(ScriptGeneration)
.where(ScriptGeneration.ai_session_id == session_id)
.order_by(ScriptGeneration.created_at.asc())
)
gens = list(result.scalars().all())
if not gens:
return []
template_ids = {g.template_id for g in gens}
tpl_result = await self.db.execute(
select(ScriptTemplate).where(ScriptTemplate.id.in_(template_ids))
)
templates_by_id = {t.id: t for t in tpl_result.scalars().all()}
engine = ScriptTemplateEngine()
out: list[dict[str, Any]] = []
for g in gens:
tpl = templates_by_id.get(g.template_id)
sensitive_keys = self._sensitive_keys_from_schema(
(tpl.parameters_schema if tpl else {}) or {}
)
redacted_params = engine.redact_sensitive(g.parameters_used or {}, sensitive_keys)
out.append({
"template_name": tpl.name if tpl else "(unknown template)",
"template_slug": tpl.slug if tpl else None,
"parameters_used": redacted_params,
"created_at": g.created_at.isoformat(),
})
return out
@staticmethod
def _sensitive_keys_from_schema(schema: dict[str, Any]) -> set[str]:
"""Extract password-typed parameter keys from a template's schema.
The schema shape is `{"parameters": [{"key": "...", "field_type": "password", ...}]}`
per the existing Script Generator convention. Tolerate both that shape
and the simpler `{"key": {"field_type": "password"}}` form.
"""
keys: set[str] = set()
params = schema.get("parameters") if isinstance(schema, dict) else None
if isinstance(params, list):
for p in params:
if isinstance(p, dict) and p.get("field_type") == "password":
k = p.get("key") or p.get("variable_name")
if isinstance(k, str):
keys.add(k)
elif isinstance(schema, dict):
for k, v in schema.items():
if isinstance(v, dict) and v.get("field_type") == "password":
keys.add(k)
return keys
@staticmethod
def _target_ticket_ref(session: AISession) -> str | None:
"""Display ref for the linked PSA ticket, e.g. 'CW #48291'.
ConnectWise is the only PSA wired today (per the Phase 1 constraint),
so a CW prefix is reasonable. Other PSAs will need provider-aware
formatting in Phase 4.
"""
if not session.psa_ticket_id:
return None
return f"CW #{session.psa_ticket_id}"
@staticmethod
def _build_input_bundle(
session: AISession,
facts: list[SessionFact],
active_fix: SessionSuggestedFix | None,
generations: list[dict[str, Any]],
) -> str:
"""Compose the structured input the LLM sees for one preview call."""
lines: list[str] = []
lines.append("# Session context")
lines.append(f"Title: {session.title or '(untitled)'}")
if session.problem_summary:
lines.append(f"Problem summary: {session.problem_summary}")
if session.problem_domain:
lines.append(f"Domain: {session.problem_domain}")
intake_text = (session.intake_content or {}).get("text") if isinstance(session.intake_content, dict) else None
if intake_text:
lines.append(f"Intake message: {intake_text}")
if session.psa_ticket_id:
lines.append(f"Linked PSA ticket: CW #{session.psa_ticket_id}")
lines.append("")
lines.append("# Confirmed facts (What we know)")
if not facts:
lines.append("(none)")
else:
for f in facts:
tag = f.source_type
summary = f"{f.source_summary}" if f.source_summary else ""
lines.append(f"- [{tag}] {f.text}{summary}")
lines.append("")
lines.append("# Active suggested fix")
if active_fix is None:
lines.append("(no active suggested fix)")
else:
lines.append(f"Title: {active_fix.title}")
lines.append(f"Confidence: {active_fix.confidence_pct}%")
lines.append(f"Description: {active_fix.description}")
if active_fix.user_decision:
lines.append(f"Engineer decision: {active_fix.user_decision}")
lines.append("")
lines.append("# Scripts run during the session (passwords redacted)")
if not generations:
lines.append("(none)")
else:
for g in generations:
lines.append(f"- {g['template_name']} (slug={g['template_slug']})")
if g["parameters_used"]:
lines.append(f" parameters: {g['parameters_used']}")
lines.append("")
lines.append(
"Produce the four-section resolution note now. Use only the input above."
)
return "\n".join(lines)

View File

@@ -11,6 +11,9 @@ infrastructure and system prompt from assistant_chat_service.
Items in pending_task_lane carry stable UUIDs (assigned here) so PROMOTE
source_refs survive across turns even when the model re-emits the same
question/action.
- `[SUGGEST_FIX]` (Phase 3) — proposes a resolution path for the session.
Each new emission supersedes the previous active row (sets superseded_at)
so there's exactly one active fix at a time.
"""
import json
import logging
@@ -22,7 +25,13 @@ from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime, timezone
from sqlalchemy import update
from app.models.ai_session import AISession
from app.models.script_template import ScriptTemplate
from app.models.session_suggested_fix import SessionSuggestedFix
from app.services.assistant_chat_service import (
ASSISTANT_SYSTEM_PROMPT,
_call_ai,
@@ -287,6 +296,125 @@ def _assign_stable_task_lane_ids(
return out_questions, out_actions
def _parse_suggest_fix_marker(
ai_content: str,
) -> tuple[str, dict[str, Any] | None]:
"""Extract a single [SUGGEST_FIX]...[/SUGGEST_FIX] JSON block from AI response.
The block contains:
{"title": "...", "description": "...", "confidence": 0..100,
"script_template_slug": "..." | null,
"ai_drafted_script": "..." | null,
"ai_drafted_parameters": {...} | null}
Per FLOWPILOT-MIGRATION.md Section 8.2. Only the LAST block in the response
is honored — if the model emits multiple, only its final view of the fix
matters; earlier ones in the same turn are stale even before persistence.
Returns (cleaned_content, fix_dict_or_None). Marker stripped from display.
"""
blocks = list(re.finditer(r"\[SUGGEST_FIX\]\s*([\s\S]*?)\s*\[/SUGGEST_FIX\]", ai_content))
if not blocks:
return ai_content, None
# Take the last block — most-recent intent wins within a single turn.
last = blocks[-1]
raw = last.group(1).strip()
if raw.startswith("```"):
raw = re.sub(r"^```(?:json)?\s*", "", raw)
raw = re.sub(r"\s*```$", "", raw)
try:
data = json.loads(raw)
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Failed to parse [SUGGEST_FIX] block: %s", e)
return re.sub(r"\[SUGGEST_FIX\]\s*[\s\S]*?\s*\[/SUGGEST_FIX\]", "", ai_content).strip(), None
if not isinstance(data, dict):
return re.sub(r"\[SUGGEST_FIX\]\s*[\s\S]*?\s*\[/SUGGEST_FIX\]", "", ai_content).strip(), None
title = (data.get("title") or "").strip()
description = (data.get("description") or "").strip()
confidence = data.get("confidence")
if not title or not description or not isinstance(confidence, (int, float)):
logger.warning("[SUGGEST_FIX] missing required fields, dropping")
return re.sub(r"\[SUGGEST_FIX\]\s*[\s\S]*?\s*\[/SUGGEST_FIX\]", "", ai_content).strip(), None
confidence_int = max(0, min(100, int(round(float(confidence)))))
parsed = {
"title": title[:200],
"description": description,
"confidence_pct": confidence_int,
"script_template_slug": (data.get("script_template_slug") or None),
"ai_drafted_script": (data.get("ai_drafted_script") or None),
"ai_drafted_parameters": data.get("ai_drafted_parameters") if isinstance(data.get("ai_drafted_parameters"), dict) else None,
}
cleaned = re.sub(r"\[SUGGEST_FIX\]\s*[\s\S]*?\s*\[/SUGGEST_FIX\]", "", ai_content).strip()
return cleaned, parsed
async def _persist_suggested_fix(
*,
db: AsyncSession,
session: AISession,
fix: dict[str, Any],
) -> None:
"""Supersede the prior active fix and insert the new one. Bumps state_version.
A session has at most one active suggested fix (`superseded_at IS NULL`).
Emitting [SUGGEST_FIX] is the only way to introduce a new one; the
engineer's user_decision is recorded via the decision endpoint.
"""
now = datetime.now(timezone.utc)
# Mark any prior active rows for this session as superseded.
await db.execute(
update(SessionSuggestedFix)
.where(
SessionSuggestedFix.session_id == session.id,
SessionSuggestedFix.superseded_at.is_(None),
)
.values(superseded_at=now)
)
# Resolve script_template_slug → script_template_id if provided.
script_template_id = None
slug = fix.get("script_template_slug")
if slug:
result = await db.execute(
select(ScriptTemplate).where(ScriptTemplate.slug == slug)
)
tpl = result.scalar_one_or_none()
if tpl is not None:
script_template_id = tpl.id
else:
logger.warning(
"SUGGEST_FIX referenced unknown script_template_slug=%r"
"treating as no template match", slug,
)
new_fix = SessionSuggestedFix(
session_id=session.id,
account_id=session.account_id,
title=fix["title"],
description=fix["description"],
confidence_pct=fix["confidence_pct"],
script_template_id=script_template_id,
ai_drafted_script=fix.get("ai_drafted_script"),
ai_drafted_parameters=fix.get("ai_drafted_parameters"),
)
db.add(new_fix)
# Bump preview-cache version atomically with the supersession+insert.
await db.execute(
update(AISession)
.where(AISession.id == session.id)
.values(state_version=AISession.state_version + 1)
)
await db.flush()
async def _persist_promote_items(
*,
db: AsyncSession,
@@ -431,11 +559,13 @@ async def send_chat_message(
if session.status == "paused":
session.status = "active"
# Check for fork, actions, questions, and promote markers in branch response too
# Check for fork, actions, questions, promote, and suggest_fix markers
# in branch response too
branch_display, branch_fork_data = _parse_fork_marker(ai_content)
branch_display, branch_actions_data = _parse_actions_marker(branch_display)
branch_display, branch_questions_data = _parse_questions_marker(branch_display)
branch_display, branch_promote_items = _parse_promote_marker(branch_display)
branch_display, branch_suggest_fix = _parse_suggest_fix_marker(branch_display)
if branch_display != ai_content:
# Store stripped content in branch history
msgs[-1] = {"role": "assistant", "content": branch_display}
@@ -493,6 +623,12 @@ async def send_chat_message(
db=db, session=session, user_id=user_id, items=branch_promote_items,
)
# Persist a [SUGGEST_FIX] if the branch turn included one.
if branch_suggest_fix:
await _persist_suggested_fix(
db=db, session=session, fix=branch_suggest_fix,
)
suggested_flows = extract_suggested_flows(
await rag_search(query=message, account_id=account_id, db=db, limit=8)
)
@@ -542,10 +678,14 @@ async def send_chat_message(
# Check for promote markers — facts the AI is surfacing to What we know.
display_content, promote_items = _parse_promote_marker(display_content)
# Check for a [SUGGEST_FIX] marker — supersedes the prior active fix.
display_content, suggest_fix_data = _parse_suggest_fix_marker(display_content)
logger.info(
"Marker parsing results — actions: %s, questions: %s, fork: %s, promote: %d, raw_length: %d, display_length: %d",
"Marker parsing results — actions: %s, questions: %s, fork: %s, "
"promote: %d, suggest_fix: %s, raw_length: %d, display_length: %d",
bool(actions_data), bool(questions_data), bool(fork_data),
len(promote_items or []),
len(promote_items or []), bool(suggest_fix_data),
len(ai_content), len(display_content),
)
@@ -630,6 +770,10 @@ async def send_chat_message(
db=db, session=session, user_id=user_id, items=promote_items,
)
# Persist a [SUGGEST_FIX] if this turn included one — supersedes prior fix.
if suggest_fix_data:
await _persist_suggested_fix(db=db, session=session, fix=suggest_fix_data)
suggested_flows = extract_suggested_flows(rag_results)
return display_content, suggested_flows, session, fork_metadata, actions_data, questions_data