feat: AI marker system prompt fixes, TaskLane activation, and FlowPilot updates

- Fix system prompt to ensure [QUESTIONS]/[ACTIONS] markers in AI responses
- Add format reminder injection to user messages for marker compliance
- Wire TaskLane activation in prefill and resume paths
- Add ActionCardGroup component for structured question/action rendering
- Update FlowPilot session and step card components
- Update ai-session schemas and types for marker data

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
chihlasm
2026-03-26 19:57:39 +00:00
parent 37d217b12a
commit 3c0a29115c
14 changed files with 913 additions and 42 deletions

View File

@@ -4,7 +4,9 @@ Replaces assistant_chat_service for new chat sessions. Messages are stored
in ai_sessions.conversation_messages JSONB. Reuses the same AI calling
infrastructure and system prompt from assistant_chat_service.
"""
import json
import logging
import re
from typing import Any
from uuid import UUID
@@ -22,6 +24,129 @@ from app.services.rag_service import search as rag_search, build_rag_context, ex
logger = logging.getLogger(__name__)
def _parse_fork_marker(ai_content: str) -> tuple[str, dict[str, Any] | None]:
"""Extract [FORK]...[/FORK] JSON from AI response.
Returns (cleaned_content, fork_data_or_None).
The fork marker is stripped from the display text.
"""
match = re.search(r'\[FORK\]\s*([\s\S]*?)\s*\[/FORK\]', ai_content)
if not match:
return ai_content, None
try:
raw = match.group(1).strip()
# Strip markdown fences if AI wrapped it
if raw.startswith("```"):
raw = re.sub(r'^```(?:json)?\s*', '', raw)
raw = re.sub(r'\s*```$', '', raw)
fork_data = json.loads(raw)
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Failed to parse [FORK] marker: %s", e)
return ai_content, None
# Validate structure
if not isinstance(fork_data, dict) or "options" not in fork_data:
logger.warning("Invalid [FORK] data — missing 'options'")
return ai_content, None
options = fork_data["options"]
if not isinstance(options, list) or len(options) < 2:
logger.warning("Invalid [FORK] data — need at least 2 options")
return ai_content, None
# Strip the marker from display text
cleaned = ai_content[:match.start()] + ai_content[match.end():]
cleaned = cleaned.strip()
return cleaned, fork_data
def _parse_actions_marker(ai_content: str) -> tuple[str, list[dict[str, Any]] | None]:
"""Extract [ACTIONS]...[/ACTIONS] JSON from AI response.
Returns (cleaned_content, actions_list_or_None).
The actions marker is stripped from the display text.
"""
match = re.search(r'\[ACTIONS\]\s*([\s\S]*?)\s*\[/ACTIONS\]', ai_content)
if not match:
return ai_content, None
try:
raw = match.group(1).strip()
if raw.startswith("```"):
raw = re.sub(r'^```(?:json)?\s*', '', raw)
raw = re.sub(r'\s*```$', '', raw)
actions = json.loads(raw)
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Failed to parse [ACTIONS] marker: %s", e)
return ai_content, None
if not isinstance(actions, list) or len(actions) == 0:
logger.warning("Invalid [ACTIONS] data — need at least 1 action")
return ai_content, None
# Validate each action has at minimum a label
valid_actions = []
for a in actions:
if isinstance(a, dict) and a.get("label"):
valid_actions.append({
"label": a["label"],
"command": a.get("command"),
"description": a.get("description", ""),
})
if not valid_actions:
return ai_content, None
cleaned = ai_content[:match.start()] + ai_content[match.end():]
cleaned = cleaned.strip()
return cleaned, valid_actions
def _parse_questions_marker(ai_content: str) -> tuple[str, list[dict[str, Any]] | None]:
"""Extract [QUESTIONS]...[/QUESTIONS] JSON from AI response.
Returns (cleaned_content, questions_list_or_None).
The questions marker is stripped from the display text.
"""
match = re.search(r'\[QUESTIONS\]\s*([\s\S]*?)\s*\[/QUESTIONS\]', ai_content)
if not match:
return ai_content, None
try:
raw = match.group(1).strip()
if raw.startswith("```"):
raw = re.sub(r'^```(?:json)?\s*', '', raw)
raw = re.sub(r'\s*```$', '', raw)
questions = json.loads(raw)
except (json.JSONDecodeError, ValueError) as e:
logger.warning("Failed to parse [QUESTIONS] marker: %s", e)
return ai_content, None
if not isinstance(questions, list) or len(questions) == 0:
logger.warning("Invalid [QUESTIONS] data — need at least 1 question")
return ai_content, None
# Validate each question has at minimum a text field
valid_questions = []
for q in questions:
if isinstance(q, dict) and q.get("text"):
valid_questions.append({
"text": q["text"],
"context": q.get("context", ""),
})
if not valid_questions:
return ai_content, None
cleaned = ai_content[:match.start()] + ai_content[match.end():]
cleaned = cleaned.strip()
return cleaned, valid_questions
async def create_chat_session(
user_id: UUID,
account_id: UUID,
@@ -58,14 +183,14 @@ async def send_chat_message(
message: str,
db: AsyncSession,
images: list[dict[str, Any]] | None = None,
) -> tuple[str, list[dict[str, Any]], AISession]:
) -> tuple[str, list[dict[str, Any]], AISession, dict[str, Any] | None, list[dict[str, Any]] | None, list[dict[str, Any]] | None]:
"""Send a message in a chat session and get AI response.
Args:
images: Optional list of {"media_type": str, "data": str (base64)}
for vision content attached to this message.
Returns (ai_content, suggested_flows, session).
Returns (ai_content, suggested_flows, session, fork_metadata, actions_data, questions_data).
"""
result = await db.execute(
select(AISession).where(
@@ -124,10 +249,47 @@ async def send_chat_message(
if session.status == "paused":
session.status = "active"
# Check for fork, actions, and questions markers in branch response too
branch_display, branch_fork_data = _parse_fork_marker(ai_content)
branch_display, branch_actions_data = _parse_actions_marker(branch_display)
branch_display, branch_questions_data = _parse_questions_marker(branch_display)
if branch_display != ai_content:
# Store stripped content in branch history
msgs[-1] = {"role": "assistant", "content": branch_display}
branch.conversation_messages = msgs
branch_fork_metadata = None
if branch_fork_data:
try:
fork_point, new_branches = await manager.create_fork(
session_id=session.id,
parent_branch_id=branch.id,
trigger_step_id=None,
fork_reason=branch_fork_data.get("fork_reason", ""),
options=[
{"label": o["label"], "description": o.get("description", "")}
for o in branch_fork_data["options"]
],
)
first_branch = new_branches[0]
await manager.switch_branch(session.id, first_branch.id)
branch_fork_metadata = {
"fork_point_id": str(fork_point.id),
"fork_reason": branch_fork_data.get("fork_reason", ""),
"branches": [
{"branch_id": str(b.id), "label": b.label}
for b in new_branches
],
"active_branch_id": str(first_branch.id),
}
await db.flush()
except Exception:
logger.exception("Failed to create fork within branch for session %s", session.id)
suggested_flows = extract_suggested_flows(
await rag_search(query=message, account_id=account_id, db=db, limit=8)
)
return ai_content, suggested_flows, session
return branch_display, suggested_flows, session, branch_fork_metadata, branch_actions_data, branch_questions_data
# Auto-title from first message if still default
if session.step_count == 0 and message.strip():
@@ -161,10 +323,27 @@ async def send_chat_message(
images=images,
)
# Append messages to conversation_messages
# Check for fork marker in AI response
display_content, fork_data = _parse_fork_marker(ai_content)
# Check for actions marker in AI response
display_content, actions_data = _parse_actions_marker(display_content)
# Check for questions marker in AI response
display_content, questions_data = _parse_questions_marker(display_content)
logger.info(
"Marker parsing results — actions: %s, questions: %s, fork: %s, raw_length: %d, display_length: %d",
bool(actions_data), bool(questions_data), bool(fork_data),
len(ai_content), len(display_content),
)
# Store DISPLAY content (markers stripped) in conversation_messages.
# The format reminder in the user message + system prompt final reminder
# are sufficient to keep the AI emitting markers on subsequent turns.
msgs = list(session.conversation_messages or [])
msgs.append({"role": "user", "content": message})
msgs.append({"role": "assistant", "content": ai_content})
msgs.append({"role": "assistant", "content": display_content})
session.conversation_messages = msgs
session.step_count += 2 # message count for display
session.total_input_tokens += input_tokens
@@ -174,6 +353,46 @@ async def send_chat_message(
if session.status == "paused":
session.status = "active"
# If fork was detected, create branches
fork_metadata = None
if fork_data:
try:
from app.services.branch_manager import BranchManager
mgr = BranchManager(db)
# Create root branch if this is the first fork
if not session.is_branching:
await mgr.create_root_branch(session.id)
fork_point, new_branches = await mgr.create_fork(
session_id=session.id,
parent_branch_id=session.active_branch_id,
trigger_step_id=None,
fork_reason=fork_data.get("fork_reason", ""),
options=[
{"label": o["label"], "description": o.get("description", "")}
for o in fork_data["options"]
],
)
# Don't auto-switch — conversation continues on current branch.
# Branches appear in sidebar. User switches when ready.
fork_metadata = {
"fork_point_id": str(fork_point.id),
"fork_reason": fork_data.get("fork_reason", ""),
"branches": [
{"branch_id": str(b.id), "label": b.label}
for b in new_branches
],
"active_branch_id": str(session.active_branch_id) if session.active_branch_id else None,
}
await db.flush()
logger.info("Created fork with %d branches for session %s", len(new_branches), session_id)
except Exception:
logger.exception("Failed to create fork for session %s", session_id)
# Fork failed but chat message still sent — don't break the response
suggested_flows = extract_suggested_flows(rag_results)
return ai_content, suggested_flows, session
return display_content, suggested_flows, session, fork_metadata, actions_data, questions_data