Files
resolutionflow/backend/app/services/session_embedding_service.py
chihlasm e356103408 feat(search): add semantic similar session matching via Voyage AI embeddings
Adds vector-based similar session discovery using the existing Voyage AI
embedding infrastructure and pgvector cosine similarity search.

- New AISessionEmbedding model with vector(1024) column
- session_embedding_service: generate + upsert embeddings, find similar sessions
- Embeddings generated on session create (from problem_summary/domain) and
  updated on resolve (adds resolution_summary)
- GET /ai-sessions/{id}/similar endpoint returns top-N similar sessions
- Migration a7c9e3b1f402 creates ai_session_embeddings table

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 03:48:09 +00:00

166 lines
5.5 KiB
Python

"""Generate and store embeddings for AI sessions for similar-session matching.
Uses Voyage AI (voyage-3.5, 1024 dims) via the shared embedding_service to
create vector representations of session content. Enables cosine similarity
search across sessions within the same account.
"""
import logging
from uuid import UUID
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.ai_session import AISession
from app.models.ai_session_embedding import AISessionEmbedding
from app.services.embedding_service import get_embedding
logger = logging.getLogger(__name__)
async def generate_session_embedding(session_id: UUID, db: AsyncSession) -> None:
"""Generate embedding for an AI session's content.
Builds a text chunk from the session's problem summary, resolution,
domain, and escalation reason, then embeds it via Voyage AI and
upserts into ai_session_embeddings.
"""
result = await db.execute(
select(AISession).where(AISession.id == session_id)
)
session = result.scalar_one_or_none()
if not session:
return
# Build text to embed — combine available session metadata
parts = []
if session.problem_summary:
parts.append(session.problem_summary)
if session.resolution_summary:
parts.append(f"Resolution: {session.resolution_summary}")
if session.problem_domain:
parts.append(f"Domain: {session.problem_domain}")
if session.escalation_reason:
parts.append(f"Escalation: {session.escalation_reason}")
if not parts:
return
chunk_text = " ".join(parts)
try:
embedding_vector = await get_embedding(chunk_text, input_type="document")
if not embedding_vector:
return
embedding_str = "[" + ",".join(str(v) for v in embedding_vector) + "]"
# Check for existing embedding
existing = await db.execute(
select(AISessionEmbedding).where(
AISessionEmbedding.session_id == session_id
)
)
embed_record = existing.scalar_one_or_none()
if embed_record:
# Update existing
embed_record.chunk_text = chunk_text
await db.execute(
text(
"UPDATE ai_session_embeddings "
"SET embedding = :emb::vector, updated_at = now() "
"WHERE session_id = :sid"
),
{"emb": embedding_str, "sid": str(session_id)},
)
else:
# Insert new via raw SQL to include vector column
await db.execute(
text("""
INSERT INTO ai_session_embeddings
(id, session_id, account_id, chunk_text, embedding_model, embedding, created_at, updated_at)
VALUES
(gen_random_uuid(), :session_id, :account_id, :chunk_text, :model, :embedding::vector, now(), now())
"""),
{
"session_id": str(session_id),
"account_id": str(session.account_id),
"chunk_text": chunk_text,
"model": "voyage-3.5",
"embedding": embedding_str,
},
)
await db.flush()
except Exception:
logger.warning(
"Failed to generate embedding for session %s", session_id, exc_info=True
)
async def find_similar_sessions(
session_id: UUID,
account_id: UUID,
db: AsyncSession,
limit: int = 5,
) -> list[dict]:
"""Find sessions similar to the given session using cosine similarity.
Returns a list of dicts with session metadata and similarity score,
ordered by highest similarity first.
"""
# Verify the source session has an embedding
check = await db.execute(
text(
"SELECT 1 FROM ai_session_embeddings "
"WHERE session_id = :sid AND embedding IS NOT NULL"
),
{"sid": str(session_id)},
)
if not check.first():
return []
# Cosine similarity search across all sessions in the account
result = await db.execute(
text("""
SELECT
e.session_id,
s.problem_summary,
s.problem_domain,
s.status,
s.resolution_summary,
s.created_at,
1 - (e.embedding <=> (
SELECT embedding FROM ai_session_embeddings WHERE session_id = :sid
)) as similarity
FROM ai_session_embeddings e
JOIN ai_sessions s ON s.id = e.session_id
WHERE e.account_id = :account_id
AND e.session_id != :sid
AND e.embedding IS NOT NULL
ORDER BY e.embedding <=> (
SELECT embedding FROM ai_session_embeddings WHERE session_id = :sid
)
LIMIT :lim
"""),
{
"sid": str(session_id),
"account_id": str(account_id),
"lim": limit,
},
)
rows = result.mappings().all()
return [
{
"id": str(row["session_id"]),
"problem_summary": row["problem_summary"],
"problem_domain": row["problem_domain"],
"status": row["status"],
"resolution_summary": row["resolution_summary"],
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
"similarity": round(float(row["similarity"]), 3) if row["similarity"] else 0,
}
for row in rows
]