Add is_branching guard to unified_chat_service.send_chat_message() that routes messages through BranchAwarePromptBuilder when a session has active branching. Add branch_id to all AISessionStep constructor calls in flowpilot_engine.py via optional branch_id param on _create_step_from_parsed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
180 lines
6.1 KiB
Python
180 lines
6.1 KiB
Python
"""Unified chat service — chat sessions on ai_sessions table.
|
|
|
|
Replaces assistant_chat_service for new chat sessions. Messages are stored
|
|
in ai_sessions.conversation_messages JSONB. Reuses the same AI calling
|
|
infrastructure and system prompt from assistant_chat_service.
|
|
"""
|
|
import logging
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.ai_session import AISession
|
|
from app.services.assistant_chat_service import (
|
|
ASSISTANT_SYSTEM_PROMPT,
|
|
_call_ai,
|
|
_auto_title,
|
|
)
|
|
from app.services.rag_service import search as rag_search, build_rag_context, extract_suggested_flows
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def create_chat_session(
|
|
user_id: UUID,
|
|
account_id: UUID,
|
|
team_id: UUID | None,
|
|
intake_content: dict[str, Any],
|
|
db: AsyncSession,
|
|
) -> AISession:
|
|
"""Create a new chat session on ai_sessions."""
|
|
first_message = intake_content.get("text", "")
|
|
title = _auto_title(first_message) if first_message else "New Chat"
|
|
|
|
session = AISession(
|
|
user_id=user_id,
|
|
account_id=account_id,
|
|
team_id=team_id,
|
|
session_type="chat",
|
|
title=title,
|
|
intake_type="free_text",
|
|
intake_content=intake_content,
|
|
status="active",
|
|
confidence_tier="discovery",
|
|
confidence_score=0.0,
|
|
conversation_messages=[],
|
|
)
|
|
db.add(session)
|
|
await db.flush()
|
|
return session
|
|
|
|
|
|
async def send_chat_message(
|
|
session_id: UUID,
|
|
user_id: UUID,
|
|
account_id: UUID,
|
|
message: str,
|
|
db: AsyncSession,
|
|
images: list[dict[str, Any]] | None = None,
|
|
) -> tuple[str, list[dict[str, Any]], AISession]:
|
|
"""Send a message in a chat session and get AI response.
|
|
|
|
Args:
|
|
images: Optional list of {"media_type": str, "data": str (base64)}
|
|
for vision content attached to this message.
|
|
|
|
Returns (ai_content, suggested_flows, session).
|
|
"""
|
|
result = await db.execute(
|
|
select(AISession).where(
|
|
AISession.id == session_id,
|
|
AISession.user_id == user_id,
|
|
AISession.session_type == "chat",
|
|
)
|
|
)
|
|
session = result.scalar_one_or_none()
|
|
if not session:
|
|
raise ValueError("Chat session not found")
|
|
|
|
if session.status not in ("active", "paused"):
|
|
raise ValueError(f"Cannot send messages to a {session.status} session")
|
|
|
|
# If branching is active, route to branch message handler
|
|
if session.is_branching and session.active_branch_id:
|
|
from app.services.branch_manager import BranchManager
|
|
from app.services.branch_aware_prompt_builder import BranchAwarePromptBuilder
|
|
from app.models.session_branch import SessionBranch
|
|
|
|
branch_result = await db.execute(
|
|
select(SessionBranch).where(SessionBranch.id == session.active_branch_id)
|
|
)
|
|
branch = branch_result.scalar_one_or_none()
|
|
if branch:
|
|
manager = BranchManager(db)
|
|
sibling_ctx = await manager.build_cross_branch_context(branch.id)
|
|
|
|
builder = BranchAwarePromptBuilder()
|
|
session_context = f"Problem: {session.problem_summary or 'Unknown'}. Domain: {session.problem_domain or 'Unknown'}."
|
|
prompt_args = builder.build(
|
|
branch_messages=branch.conversation_messages,
|
|
sibling_summaries=sibling_ctx,
|
|
session_context=session_context,
|
|
attachments=[],
|
|
new_message=message,
|
|
revival_context=branch.evidence_description if branch.status == "revived" else None,
|
|
)
|
|
|
|
# Override images from prompt_args with actual images if provided
|
|
if images:
|
|
prompt_args["images"] = images
|
|
ai_content, input_tokens, output_tokens = await _call_ai(**prompt_args)
|
|
|
|
# Update branch conversation
|
|
msgs = list(branch.conversation_messages or [])
|
|
msgs.append({"role": "user", "content": message})
|
|
msgs.append({"role": "assistant", "content": ai_content})
|
|
branch.conversation_messages = msgs
|
|
|
|
session.total_input_tokens += input_tokens
|
|
session.total_output_tokens += output_tokens
|
|
session.step_count += 2
|
|
|
|
if session.status == "paused":
|
|
session.status = "active"
|
|
|
|
suggested_flows = extract_suggested_flows(
|
|
await rag_search(query=message, account_id=account_id, db=db, limit=8)
|
|
)
|
|
return ai_content, suggested_flows, session
|
|
|
|
# Auto-title from first message if still default
|
|
if session.step_count == 0 and message.strip():
|
|
session.title = _auto_title(message)
|
|
|
|
# Auto-detect problem domain from first message
|
|
if not session.problem_summary and message.strip():
|
|
session.problem_summary = _auto_title(message)
|
|
|
|
# RAG search for relevant flows
|
|
rag_results = await rag_search(
|
|
query=message,
|
|
account_id=account_id,
|
|
db=db,
|
|
limit=8,
|
|
)
|
|
rag_context = build_rag_context(rag_results)
|
|
|
|
# Build message history for AI
|
|
ai_messages: list[dict[str, Any]] = []
|
|
for msg in (session.conversation_messages or []):
|
|
if msg.get("role") in ("user", "assistant"):
|
|
ai_messages.append({"role": msg["role"], "content": msg["content"]})
|
|
|
|
# Call AI
|
|
ai_content, input_tokens, output_tokens = await _call_ai(
|
|
system_base=ASSISTANT_SYSTEM_PROMPT,
|
|
rag_context=rag_context,
|
|
history=ai_messages,
|
|
new_message=message,
|
|
images=images,
|
|
)
|
|
|
|
# Append messages to conversation_messages
|
|
msgs = list(session.conversation_messages or [])
|
|
msgs.append({"role": "user", "content": message})
|
|
msgs.append({"role": "assistant", "content": ai_content})
|
|
session.conversation_messages = msgs
|
|
session.step_count += 2 # message count for display
|
|
session.total_input_tokens += input_tokens
|
|
session.total_output_tokens += output_tokens
|
|
|
|
# Resume if paused
|
|
if session.status == "paused":
|
|
session.status = "active"
|
|
|
|
suggested_flows = extract_suggested_flows(rag_results)
|
|
|
|
return ai_content, suggested_flows, session
|