"""Unified chat service — chat sessions on ai_sessions table. Replaces assistant_chat_service for new chat sessions. Messages are stored in ai_sessions.conversation_messages JSONB. Reuses the same AI calling infrastructure and system prompt from assistant_chat_service. ## Markers parsed here - `[QUESTIONS]` / `[ACTIONS]` — task-lane items shown to the engineer - `[FORK]` — diagnostic forking, creates SessionBranch rows - `[PROMOTE]` (Phase 2) — surfaces a fact to the What-we-know section. Items in pending_task_lane carry stable UUIDs (assigned here) so PROMOTE source_refs survive across turns even when the model re-emits the same question/action. - `[SUGGEST_FIX]` (Phase 3) — proposes a resolution path for the session. Each new emission supersedes the previous active row (sets superseded_at) so there's exactly one active fix at a time. """ import json import logging import re import uuid as _uuid from typing import Any from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from datetime import datetime, timezone from sqlalchemy import update from app.models.ai_session import AISession from app.models.script_template import ScriptTemplate from app.models.session_suggested_fix import SessionSuggestedFix from app.services.assistant_chat_service import ( ASSISTANT_SYSTEM_PROMPT, _call_ai, _auto_title, ) from app.services.fact_synthesis_service import FactSynthesisService from app.services.rag_service import search as rag_search, build_rag_context, extract_suggested_flows logger = logging.getLogger(__name__) def _parse_fork_marker(ai_content: str) -> tuple[str, dict[str, Any] | None]: """Extract [FORK]...[/FORK] JSON from AI response. Returns (cleaned_content, fork_data_or_None). The fork marker is stripped from the display text. """ match = re.search(r'\[FORK\]\s*([\s\S]*?)\s*\[/FORK\]', ai_content) if not match: return ai_content, None try: raw = match.group(1).strip() # Strip markdown fences if AI wrapped it if raw.startswith("```"): raw = re.sub(r'^```(?:json)?\s*', '', raw) raw = re.sub(r'\s*```$', '', raw) fork_data = json.loads(raw) except (json.JSONDecodeError, ValueError) as e: logger.warning("Failed to parse [FORK] marker: %s", e) return ai_content, None # Validate structure if not isinstance(fork_data, dict) or "options" not in fork_data: logger.warning("Invalid [FORK] data — missing 'options'") return ai_content, None options = fork_data["options"] if not isinstance(options, list) or len(options) < 2: logger.warning("Invalid [FORK] data — need at least 2 options") return ai_content, None # Strip the marker from display text cleaned = ai_content[:match.start()] + ai_content[match.end():] cleaned = cleaned.strip() return cleaned, fork_data def _parse_actions_marker(ai_content: str) -> tuple[str, list[dict[str, Any]] | None]: """Extract [ACTIONS]...[/ACTIONS] JSON from AI response. Returns (cleaned_content, actions_list_or_None). The actions marker is stripped from the display text. """ match = re.search(r'\[ACTIONS\]\s*([\s\S]*?)\s*\[/ACTIONS\]', ai_content) if not match: return ai_content, None try: raw = match.group(1).strip() if raw.startswith("```"): raw = re.sub(r'^```(?:json)?\s*', '', raw) raw = re.sub(r'\s*```$', '', raw) actions = json.loads(raw) except (json.JSONDecodeError, ValueError) as e: logger.warning("Failed to parse [ACTIONS] marker: %s", e) return ai_content, None if not isinstance(actions, list) or len(actions) == 0: logger.warning("Invalid [ACTIONS] data — need at least 1 action") return ai_content, None # Validate each action has at minimum a label valid_actions = [] for a in actions: if isinstance(a, dict) and a.get("label"): valid_actions.append({ "label": a["label"], "command": a.get("command"), "description": a.get("description", ""), }) if not valid_actions: return ai_content, None cleaned = ai_content[:match.start()] + ai_content[match.end():] cleaned = cleaned.strip() return cleaned, valid_actions def _parse_questions_marker(ai_content: str) -> tuple[str, list[dict[str, Any]] | None]: """Extract [QUESTIONS]...[/QUESTIONS] JSON from AI response. Returns (cleaned_content, questions_list_or_None). The questions marker is stripped from the display text. """ match = re.search(r'\[QUESTIONS\]\s*([\s\S]*?)\s*\[/QUESTIONS\]', ai_content) if not match: return ai_content, None try: raw = match.group(1).strip() if raw.startswith("```"): raw = re.sub(r'^```(?:json)?\s*', '', raw) raw = re.sub(r'\s*```$', '', raw) questions = json.loads(raw) except (json.JSONDecodeError, ValueError) as e: logger.warning("Failed to parse [QUESTIONS] marker: %s", e) return ai_content, None if not isinstance(questions, list) or len(questions) == 0: logger.warning("Invalid [QUESTIONS] data — need at least 1 question") return ai_content, None # Validate each question has at minimum a text field valid_questions = [] for q in questions: if isinstance(q, dict) and q.get("text"): valid_questions.append({ "text": q["text"], "context": q.get("context", ""), }) if not valid_questions: return ai_content, None cleaned = ai_content[:match.start()] + ai_content[match.end():] cleaned = cleaned.strip() return cleaned, valid_questions def _parse_promote_marker(ai_content: str) -> tuple[str, list[dict[str, Any]] | None]: """Extract one or more [PROMOTE]...[/PROMOTE] JSON blocks from AI response. Each block contains a JSON object describing a candidate fact: {"source_type": "question"|"diagnostic_check"|"ai_synthesis", "source_ref": "" | null, "text": "", "summary": ""} Returns (cleaned_content, list_of_items_or_None). All matched blocks are stripped from display text. Invalid items are dropped silently with a warning — a malformed PROMOTE should never break the chat response. Per FLOWPILOT-MIGRATION.md Section 8.1, the model emits text + summary inline so no LLM round-trip is needed to persist the fact. """ blocks = list(re.finditer(r"\[PROMOTE\]\s*([\s\S]*?)\s*\[/PROMOTE\]", ai_content)) if not blocks: return ai_content, None items: list[dict[str, Any]] = [] for m in blocks: raw = m.group(1).strip() if raw.startswith("```"): raw = re.sub(r"^```(?:json)?\s*", "", raw) raw = re.sub(r"\s*```$", "", raw) try: data = json.loads(raw) except (json.JSONDecodeError, ValueError) as e: logger.warning("Failed to parse [PROMOTE] block: %s", e) continue if not isinstance(data, dict): logger.warning("[PROMOTE] block must be a JSON object, got %s", type(data).__name__) continue source_type = data.get("source_type") text = (data.get("text") or "").strip() summary = (data.get("summary") or "").strip() or None source_ref_raw = data.get("source_ref") if source_type not in ("question", "diagnostic_check", "ai_synthesis"): # `user_note` is engineer-only, not an AI-emittable type. logger.warning("Invalid [PROMOTE] source_type=%r, skipping", source_type) continue if not text: logger.warning("[PROMOTE] block missing text, skipping") continue source_ref: UUID | None = None if source_ref_raw: try: source_ref = UUID(str(source_ref_raw)) except (ValueError, AttributeError): logger.warning("[PROMOTE] source_ref %r is not a valid UUID, dropping ref", source_ref_raw) source_ref = None # `ai_synthesis` must NEVER carry a source_ref (no question/check item # to point at) — surface mistakes from the model rather than tripping # the FactSynthesisService validation later. if source_type == "ai_synthesis": source_ref = None items.append({ "source_type": source_type, "source_ref": source_ref, "text": text, "summary": summary, }) # Strip all PROMOTE blocks from display content — engineers see facts in # the What-we-know panel, not as raw markers in the chat. cleaned = re.sub(r"\[PROMOTE\]\s*[\s\S]*?\s*\[/PROMOTE\]", "", ai_content).strip() return cleaned, items or None def _assign_stable_task_lane_ids( prev_lane: dict[str, Any] | None, questions: list[dict[str, Any]] | None, actions: list[dict[str, Any]] | None, ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: """Assign stable UUIDs to task-lane items, preserving them across turns. The model often re-emits the same question/action across multiple turns (it is told to keep `_(not yet completed)_` items alive). When the question text matches a prior turn's, we keep the prior UUID so any `session_facts.source_ref` pointing at it stays valid. Match key: - Questions: exact `text` - Actions: exact `label` Returns the questions/actions lists augmented with an `id` field. """ prev_questions = (prev_lane or {}).get("questions") or [] prev_actions = (prev_lane or {}).get("actions") or [] prev_q_ids: dict[str, str] = { str(q.get("text") or "").strip(): str(q["id"]) for q in prev_questions if isinstance(q, dict) and q.get("id") and q.get("text") } prev_a_ids: dict[str, str] = { str(a.get("label") or "").strip(): str(a["id"]) for a in prev_actions if isinstance(a, dict) and a.get("id") and a.get("label") } out_questions: list[dict[str, Any]] = [] for q in questions or []: text = str(q.get("text") or "").strip() existing = prev_q_ids.get(text) if text else None out_questions.append({ **q, "id": existing or str(_uuid.uuid4()), }) out_actions: list[dict[str, Any]] = [] for a in actions or []: label = str(a.get("label") or "").strip() existing = prev_a_ids.get(label) if label else None out_actions.append({ **a, "id": existing or str(_uuid.uuid4()), }) return out_questions, out_actions def _parse_suggest_fix_marker( ai_content: str, ) -> tuple[str, dict[str, Any] | None]: """Extract a single [SUGGEST_FIX]...[/SUGGEST_FIX] JSON block from AI response. The block contains: {"title": "...", "description": "...", "confidence": 0..100, "script_template_slug": "..." | null, "ai_drafted_script": "..." | null, "ai_drafted_parameters": {...} | null} Per FLOWPILOT-MIGRATION.md Section 8.2. Only the LAST block in the response is honored — if the model emits multiple, only its final view of the fix matters; earlier ones in the same turn are stale even before persistence. Returns (cleaned_content, fix_dict_or_None). Marker stripped from display. """ blocks = list(re.finditer(r"\[SUGGEST_FIX\]\s*([\s\S]*?)\s*\[/SUGGEST_FIX\]", ai_content)) if not blocks: return ai_content, None # Take the last block — most-recent intent wins within a single turn. last = blocks[-1] raw = last.group(1).strip() if raw.startswith("```"): raw = re.sub(r"^```(?:json)?\s*", "", raw) raw = re.sub(r"\s*```$", "", raw) try: data = json.loads(raw) except (json.JSONDecodeError, ValueError) as e: logger.warning("Failed to parse [SUGGEST_FIX] block: %s", e) return re.sub(r"\[SUGGEST_FIX\]\s*[\s\S]*?\s*\[/SUGGEST_FIX\]", "", ai_content).strip(), None if not isinstance(data, dict): return re.sub(r"\[SUGGEST_FIX\]\s*[\s\S]*?\s*\[/SUGGEST_FIX\]", "", ai_content).strip(), None title = (data.get("title") or "").strip() description = (data.get("description") or "").strip() confidence = data.get("confidence") if not title or not description or not isinstance(confidence, (int, float)): logger.warning("[SUGGEST_FIX] missing required fields, dropping") return re.sub(r"\[SUGGEST_FIX\]\s*[\s\S]*?\s*\[/SUGGEST_FIX\]", "", ai_content).strip(), None confidence_int = max(0, min(100, int(round(float(confidence))))) parsed = { "title": title[:200], "description": description, "confidence_pct": confidence_int, "script_template_slug": (data.get("script_template_slug") or None), "ai_drafted_script": (data.get("ai_drafted_script") or None), "ai_drafted_parameters": data.get("ai_drafted_parameters") if isinstance(data.get("ai_drafted_parameters"), dict) else None, } cleaned = re.sub(r"\[SUGGEST_FIX\]\s*[\s\S]*?\s*\[/SUGGEST_FIX\]", "", ai_content).strip() return cleaned, parsed def _parse_fix_outcome_marker( ai_content: str, ) -> tuple[str, dict[str, Any] | None]: """Extract a single [FIX_OUTCOME]...[/FIX_OUTCOME] JSON block. Block shape: {"fix_id": "", "outcome": "success"|"failure"|"partial", "reason": ""} Emitted by the AI when the engineer clearly indicates in chat that a prior suggested fix worked, didn't work, or was partially applied. The marker PROPOSES an outcome — the engineer confirms via the UI. Only the last block in a response is honored. """ blocks = list(re.finditer( r"\[FIX_OUTCOME\]\s*([\s\S]*?)\s*\[/FIX_OUTCOME\]", ai_content, )) if not blocks: return ai_content, None last = blocks[-1] raw = last.group(1).strip() if raw.startswith("```"): raw = re.sub(r"^```(?:json)?\s*", "", raw) raw = re.sub(r"\s*```$", "", raw) cleaned = re.sub( r"\[FIX_OUTCOME\]\s*[\s\S]*?\s*\[/FIX_OUTCOME\]", "", ai_content, ).strip() try: data = json.loads(raw) except (json.JSONDecodeError, ValueError) as e: logger.warning("Failed to parse [FIX_OUTCOME] block: %s", e) return cleaned, None if not isinstance(data, dict): return cleaned, None fix_id = str(data.get("fix_id") or "").strip() outcome = str(data.get("outcome") or "").strip().lower() reason = str(data.get("reason") or "").strip() if not fix_id or outcome not in {"success", "failure", "partial"}: logger.warning("[FIX_OUTCOME] missing/invalid fields, dropping") return cleaned, None return cleaned, {"fix_id": fix_id, "outcome": outcome, "reason": reason} async def _persist_suggested_fix( *, db: AsyncSession, session: AISession, fix: dict[str, Any], ) -> None: """Supersede the prior active fix and insert the new one. Bumps state_version. A session has at most one active suggested fix (`superseded_at IS NULL`). Emitting [SUGGEST_FIX] is the only way to introduce a new one; the engineer's user_decision is recorded via the decision endpoint. """ now = datetime.now(timezone.utc) # Mark any prior active rows for this session as superseded. await db.execute( update(SessionSuggestedFix) .where( SessionSuggestedFix.session_id == session.id, SessionSuggestedFix.superseded_at.is_(None), ) .values(superseded_at=now) ) # Resolve script_template_slug → script_template_id if provided. script_template_id = None slug = fix.get("script_template_slug") if slug: result = await db.execute( select(ScriptTemplate).where(ScriptTemplate.slug == slug) ) tpl = result.scalar_one_or_none() if tpl is not None: script_template_id = tpl.id else: logger.warning( "SUGGEST_FIX referenced unknown script_template_slug=%r — " "treating as no template match", slug, ) new_fix = SessionSuggestedFix( session_id=session.id, account_id=session.account_id, title=fix["title"], description=fix["description"], confidence_pct=fix["confidence_pct"], script_template_id=script_template_id, ai_drafted_script=fix.get("ai_drafted_script"), ai_drafted_parameters=fix.get("ai_drafted_parameters"), ) db.add(new_fix) # Bump preview-cache version atomically with the supersession+insert. await db.execute( update(AISession) .where(AISession.id == session.id) .values(state_version=AISession.state_version + 1) ) await db.flush() async def _record_ai_outcome_proposal( *, db: AsyncSession, session: AISession, proposal: dict[str, Any], ) -> None: """Persist the AI's proposed outcome on the active fix. Writes to session_suggested_fixes.ai_outcome_proposal. Frontend polls the active fix and renders the AI-confirming banner state when this is non-null. Does NOT mutate the fix's status — the engineer's confirmation click via PATCH /outcome is what changes the status. Drops silently when the fix_id isn't a valid UUID or doesn't belong to this session. """ try: fix_uuid = UUID(proposal["fix_id"]) except (ValueError, KeyError, TypeError): logger.warning("[FIX_OUTCOME] invalid fix_id, dropping") return await db.execute( update(SessionSuggestedFix) .where( SessionSuggestedFix.id == fix_uuid, SessionSuggestedFix.session_id == session.id, ) .values(ai_outcome_proposal=proposal) ) await db.flush() async def _persist_promote_items( *, db: AsyncSession, session: AISession, user_id: UUID, items: list[dict[str, Any]], ) -> None: """Persist parsed [PROMOTE] items as session_facts. Failures are logged. A malformed PROMOTE must never break the chat response — the engineer still gets the AI's analysis; the missing fact can be added manually. """ if not items: return service = FactSynthesisService(db) for item in items: try: await service.create_fact( session_id=session.id, account_id=session.account_id, user_id=user_id, source_type=item["source_type"], text=item["text"], summary=item["summary"], source_ref=item["source_ref"], ) except ValueError: # Validation failure (e.g. empty text after strip, or # source_ref-on-ai_synthesis race). Log and continue — losing # one fact is better than aborting the whole chat turn. logger.warning( "Skipping invalid PROMOTE item for session %s: %r", session.id, item, exc_info=True, ) except Exception: logger.exception( "Failed to persist PROMOTE item for session %s", session.id ) async def create_chat_session( user_id: UUID, account_id: UUID, team_id: UUID | None, intake_content: dict[str, Any], db: AsyncSession, ) -> AISession: """Create a new chat session on ai_sessions.""" first_message = intake_content.get("text", "") title = _auto_title(first_message) if first_message else "New Chat" session = AISession( user_id=user_id, account_id=account_id, team_id=team_id, session_type="chat", title=title, intake_type="free_text", intake_content=intake_content, status="active", confidence_tier="discovery", confidence_score=0.0, conversation_messages=[], ) db.add(session) await db.flush() return session async def send_chat_message( session_id: UUID, user_id: UUID, account_id: UUID, message: str, db: AsyncSession, images: list[dict[str, Any]] | None = None, ) -> tuple[str, list[dict[str, Any]], AISession, dict[str, Any] | None, list[dict[str, Any]] | None, list[dict[str, Any]] | None]: """Send a message in a chat session and get AI response. Args: images: Optional list of {"media_type": str, "data": str (base64)} for vision content attached to this message. Returns (ai_content, suggested_flows, session, fork_metadata, actions_data, questions_data). """ from sqlalchemy import or_ result = await db.execute( select(AISession).where( AISession.id == session_id, or_( AISession.user_id == user_id, AISession.escalated_to_id == user_id, ), AISession.session_type == "chat", ) ) session = result.scalar_one_or_none() if not session: raise ValueError("Chat session not found") if session.status not in ("active", "paused"): raise ValueError(f"Cannot send messages to a {session.status} session") # If branching is active, route to branch message handler if session.is_branching and session.active_branch_id: from app.services.branch_manager import BranchManager from app.services.branch_aware_prompt_builder import BranchAwarePromptBuilder from app.models.session_branch import SessionBranch branch_result = await db.execute( select(SessionBranch).where(SessionBranch.id == session.active_branch_id) ) branch = branch_result.scalar_one_or_none() if branch: manager = BranchManager(db) sibling_ctx = await manager.build_cross_branch_context(branch.id) builder = BranchAwarePromptBuilder() session_context = f"Problem: {session.problem_summary or 'Unknown'}. Domain: {session.problem_domain or 'Unknown'}." prompt_args = builder.build( branch_messages=branch.conversation_messages, sibling_summaries=sibling_ctx, session_context=session_context, attachments=[], new_message=message, revival_context=branch.evidence_description if branch.status == "revived" else None, ) # Override images from prompt_args with actual images if provided if images: prompt_args["images"] = images ai_content, input_tokens, output_tokens = await _call_ai(**prompt_args) # Update branch conversation # Strip _(not yet completed)_ markers before storage (same reason as main path) stored_message = message.replace("_(not yet completed)_", "(pending)").replace("_(skipped)_", "(skipped)") msgs = list(branch.conversation_messages or []) msgs.append({"role": "user", "content": stored_message}) msgs.append({"role": "assistant", "content": ai_content}) branch.conversation_messages = msgs session.total_input_tokens += input_tokens session.total_output_tokens += output_tokens session.step_count += 2 if session.status == "paused": session.status = "active" # Check for fork, actions, questions, promote, and suggest_fix markers # in branch response too branch_display, branch_fork_data = _parse_fork_marker(ai_content) branch_display, branch_actions_data = _parse_actions_marker(branch_display) branch_display, branch_questions_data = _parse_questions_marker(branch_display) branch_display, branch_promote_items = _parse_promote_marker(branch_display) branch_display, branch_suggest_fix = _parse_suggest_fix_marker(branch_display) branch_display, branch_outcome_proposal = _parse_fix_outcome_marker(branch_display) if branch_display != ai_content: # Store stripped content in branch history msgs[-1] = {"role": "assistant", "content": branch_display} branch.conversation_messages = msgs branch_fork_metadata = None if branch_fork_data: try: fork_point, new_branches = await manager.create_fork( session_id=session.id, parent_branch_id=branch.id, trigger_step_id=None, fork_reason=branch_fork_data.get("fork_reason", ""), options=[ {"label": o["label"], "description": o.get("description", "")} for o in branch_fork_data["options"] ], ) first_branch = new_branches[0] await manager.switch_branch(session.id, first_branch.id) branch_fork_metadata = { "fork_point_id": str(fork_point.id), "fork_reason": branch_fork_data.get("fork_reason", ""), "branches": [ {"branch_id": str(b.id), "label": b.label} for b in new_branches ], "active_branch_id": str(first_branch.id), } await db.flush() except Exception: logger.exception("Failed to create fork within branch for session %s", session.id) # Persist task lane state on session — assign stable UUIDs so any # PROMOTE marker emitted later can reference the same items. if branch_questions_data or branch_actions_data: stable_qs, stable_as = _assign_stable_task_lane_ids( session.pending_task_lane, branch_questions_data, branch_actions_data, ) session.pending_task_lane = { "questions": stable_qs, "actions": stable_as, } else: session.pending_task_lane = None # Persist any PROMOTE items emitted in this turn. Done AFTER the # task-lane write so source_refs to brand-new items would still # land on persisted UUIDs (the model can also reference IDs from # the previous turn, which were already persisted). if branch_promote_items: await _persist_promote_items( db=db, session=session, user_id=user_id, items=branch_promote_items, ) # Persist a [SUGGEST_FIX] if the branch turn included one. if branch_suggest_fix: await _persist_suggested_fix( db=db, session=session, fix=branch_suggest_fix, ) # Persist a [FIX_OUTCOME] proposal if the branch turn included one. if branch_outcome_proposal is not None: await _record_ai_outcome_proposal( db=db, session=session, proposal=branch_outcome_proposal, ) suggested_flows = extract_suggested_flows( await rag_search(query=message, account_id=account_id, db=db, limit=8) ) return branch_display, suggested_flows, session, branch_fork_metadata, branch_actions_data, branch_questions_data # Auto-title from first message if still default if session.step_count == 0 and message.strip(): session.title = _auto_title(message) # Auto-detect problem domain from first message if not session.problem_summary and message.strip(): session.problem_summary = _auto_title(message) # RAG search for relevant flows rag_results = await rag_search( query=message, account_id=account_id, db=db, limit=8, ) rag_context = build_rag_context(rag_results) # Build message history for AI ai_messages: list[dict[str, Any]] = [] for msg in (session.conversation_messages or []): if msg.get("role") in ("user", "assistant"): ai_messages.append({"role": msg["role"], "content": msg["content"]}) # Call AI ai_content, input_tokens, output_tokens = await _call_ai( system_base=ASSISTANT_SYSTEM_PROMPT, rag_context=rag_context, history=ai_messages, new_message=message, images=images, ) # Check for fork marker in AI response display_content, fork_data = _parse_fork_marker(ai_content) # Check for actions marker in AI response display_content, actions_data = _parse_actions_marker(display_content) # Check for questions marker in AI response display_content, questions_data = _parse_questions_marker(display_content) # Check for promote markers — facts the AI is surfacing to What we know. display_content, promote_items = _parse_promote_marker(display_content) # Check for a [SUGGEST_FIX] marker — supersedes the prior active fix. display_content, suggest_fix_data = _parse_suggest_fix_marker(display_content) # Check for a [FIX_OUTCOME] proposal — AI confirms a prior fix's outcome. display_content, outcome_proposal = _parse_fix_outcome_marker(display_content) logger.info( "Marker parsing results — actions: %s, questions: %s, fork: %s, " "promote: %d, suggest_fix: %s, outcome_proposal: %s, " "raw_length: %d, display_length: %d", bool(actions_data), bool(questions_data), bool(fork_data), len(promote_items or []), bool(suggest_fix_data), bool(outcome_proposal), len(ai_content), len(display_content), ) # Store DISPLAY content (markers stripped) in conversation_messages. # The format reminder in the user message + system prompt final reminder # are sufficient to keep the AI emitting markers on subsequent turns. # # Strip _(not yet completed)_ task markers from the stored user message. # The AI processes them correctly on the current turn, but persisting them # into history causes the AI to re-inject stale task lane items from prior # turns — even across unrelated topics in a long session. stored_message = message.replace("_(not yet completed)_", "(pending)").replace("_(skipped)_", "(skipped)") msgs = list(session.conversation_messages or []) msgs.append({"role": "user", "content": stored_message}) msgs.append({"role": "assistant", "content": display_content}) session.conversation_messages = msgs session.step_count += 2 # message count for display session.total_input_tokens += input_tokens session.total_output_tokens += output_tokens # Resume if paused if session.status == "paused": session.status = "active" # If fork was detected, create branches fork_metadata = None if fork_data: try: from app.services.branch_manager import BranchManager mgr = BranchManager(db) # Create root branch if this is the first fork if not session.is_branching: await mgr.create_root_branch(session.id) fork_point, new_branches = await mgr.create_fork( session_id=session.id, parent_branch_id=session.active_branch_id, trigger_step_id=None, fork_reason=fork_data.get("fork_reason", ""), options=[ {"label": o["label"], "description": o.get("description", "")} for o in fork_data["options"] ], ) # Don't auto-switch — conversation continues on current branch. # Branches appear in sidebar. User switches when ready. fork_metadata = { "fork_point_id": str(fork_point.id), "fork_reason": fork_data.get("fork_reason", ""), "branches": [ {"branch_id": str(b.id), "label": b.label} for b in new_branches ], "active_branch_id": str(session.active_branch_id) if session.active_branch_id else None, } await db.flush() logger.info("Created fork with %d branches for session %s", len(new_branches), session_id) except Exception: logger.exception("Failed to create fork for session %s", session_id) # Fork failed but chat message still sent — don't break the response # Persist task lane state on session — assign stable UUIDs so any PROMOTE # marker (this turn or a later one) can reference the same items. if questions_data or actions_data: stable_qs, stable_as = _assign_stable_task_lane_ids( session.pending_task_lane, questions_data, actions_data, ) session.pending_task_lane = { "questions": stable_qs, "actions": stable_as, } else: session.pending_task_lane = None # Persist any PROMOTE items emitted in this turn. Done after task-lane # assignment so source_refs the model invented this turn already exist. if promote_items: await _persist_promote_items( db=db, session=session, user_id=user_id, items=promote_items, ) # Persist a [SUGGEST_FIX] if this turn included one — supersedes prior fix. if suggest_fix_data: await _persist_suggested_fix(db=db, session=session, fix=suggest_fix_data) # Persist a [FIX_OUTCOME] proposal if this turn included one. if outcome_proposal is not None: await _record_ai_outcome_proposal( db=db, session=session, proposal=outcome_proposal, ) suggested_flows = extract_suggested_flows(rag_results) return display_content, suggested_flows, session, fork_metadata, actions_data, questions_data