Files
resolutionflow/backend/app/services/session_embedding_service.py
chihlasm eed771cb27 fix: prevent InFailedSQLTransactionError in session creation
Root cause: embedding generation could break the DB transaction via a failed
SQL statement. The except block caught the Python error but left the transaction
in a failed state. Subsequent queries (_record_usage → subscription lookup)
then failed with InFailedSQLTransactionError.

Fixes:
- session_embedding_service: use begin_nested() savepoint so failures don't
  poison the parent transaction
- ai_sessions.py: add db.rollback() before _record_usage in all 3 error
  handlers (create, respond, pickup) to recover from broken transactions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 04:36:12 +00:00

166 lines
5.7 KiB
Python

"""Generate and store embeddings for AI sessions for similar-session matching.
Uses Voyage AI (voyage-3.5, 1024 dims) via the shared embedding_service to
create vector representations of session content. Enables cosine similarity
search across sessions within the same account.
"""
import logging
from uuid import UUID
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.ai_session import AISession
from app.models.ai_session_embedding import AISessionEmbedding
from app.services.embedding_service import get_embedding
logger = logging.getLogger(__name__)
async def generate_session_embedding(session_id: UUID, db: AsyncSession) -> None:
"""Generate embedding for an AI session's content.
Builds a text chunk from the session's problem summary, resolution,
domain, and escalation reason, then embeds it via Voyage AI and
upserts into ai_session_embeddings.
"""
result = await db.execute(
select(AISession).where(AISession.id == session_id)
)
session = result.scalar_one_or_none()
if not session:
return
# Build text to embed — combine available session metadata
parts = []
if session.problem_summary:
parts.append(session.problem_summary)
if session.resolution_summary:
parts.append(f"Resolution: {session.resolution_summary}")
if session.problem_domain:
parts.append(f"Domain: {session.problem_domain}")
if session.escalation_reason:
parts.append(f"Escalation: {session.escalation_reason}")
if not parts:
return
chunk_text = " ".join(parts)
try:
embedding_vector = await get_embedding(chunk_text, input_type="document")
if not embedding_vector:
return
embedding_str = "[" + ",".join(str(v) for v in embedding_vector) + "]"
# Use a savepoint so failures don't poison the parent transaction
async with db.begin_nested():
# Check for existing embedding
existing = await db.execute(
select(AISessionEmbedding).where(
AISessionEmbedding.session_id == session_id
)
)
embed_record = existing.scalar_one_or_none()
if embed_record:
# Update existing
embed_record.chunk_text = chunk_text
await db.execute(
text(
"UPDATE ai_session_embeddings "
"SET embedding = :emb::vector, updated_at = now() "
"WHERE session_id = :sid"
),
{"emb": embedding_str, "sid": str(session_id)},
)
else:
# Insert new via raw SQL to include vector column
await db.execute(
text("""
INSERT INTO ai_session_embeddings
(id, session_id, account_id, chunk_text, embedding_model, embedding, created_at, updated_at)
VALUES
(gen_random_uuid(), :session_id, :account_id, :chunk_text, :model, :embedding::vector, now(), now())
"""),
{
"session_id": str(session_id),
"account_id": str(session.account_id),
"chunk_text": chunk_text,
"model": "voyage-3.5",
"embedding": embedding_str,
},
)
except Exception:
logger.warning(
"Failed to generate embedding for session %s", session_id, exc_info=True
)
async def find_similar_sessions(
session_id: UUID,
account_id: UUID,
db: AsyncSession,
limit: int = 5,
) -> list[dict]:
"""Find sessions similar to the given session using cosine similarity.
Returns a list of dicts with session metadata and similarity score,
ordered by highest similarity first.
"""
# Verify the source session has an embedding
check = await db.execute(
text(
"SELECT 1 FROM ai_session_embeddings "
"WHERE session_id = :sid AND embedding IS NOT NULL"
),
{"sid": str(session_id)},
)
if not check.first():
return []
# Cosine similarity search across all sessions in the account
result = await db.execute(
text("""
SELECT
e.session_id,
s.problem_summary,
s.problem_domain,
s.status,
s.resolution_summary,
s.created_at,
1 - (e.embedding <=> (
SELECT embedding FROM ai_session_embeddings WHERE session_id = :sid
)) as similarity
FROM ai_session_embeddings e
JOIN ai_sessions s ON s.id = e.session_id
WHERE e.account_id = :account_id
AND e.session_id != :sid
AND e.embedding IS NOT NULL
ORDER BY e.embedding <=> (
SELECT embedding FROM ai_session_embeddings WHERE session_id = :sid
)
LIMIT :lim
"""),
{
"sid": str(session_id),
"account_id": str(account_id),
"lim": limit,
},
)
rows = result.mappings().all()
return [
{
"id": str(row["session_id"]),
"problem_summary": row["problem_summary"],
"problem_domain": row["problem_domain"],
"status": row["status"],
"resolution_summary": row["resolution_summary"],
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
"similarity": round(float(row["similarity"]), 3) if row["similarity"] else 0,
}
for row in rows
]