Adds vector-based similar session discovery using the existing Voyage AI
embedding infrastructure and pgvector cosine similarity search.
- New AISessionEmbedding model with vector(1024) column
- session_embedding_service: generate + upsert embeddings, find similar sessions
- Embeddings generated on session create (from problem_summary/domain) and
updated on resolve (adds resolution_summary)
- GET /ai-sessions/{id}/similar endpoint returns top-N similar sessions
- Migration a7c9e3b1f402 creates ai_session_embeddings table
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
166 lines
5.5 KiB
Python
166 lines
5.5 KiB
Python
"""Generate and store embeddings for AI sessions for similar-session matching.
|
|
|
|
Uses Voyage AI (voyage-3.5, 1024 dims) via the shared embedding_service to
|
|
create vector representations of session content. Enables cosine similarity
|
|
search across sessions within the same account.
|
|
"""
|
|
import logging
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import select, text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.ai_session import AISession
|
|
from app.models.ai_session_embedding import AISessionEmbedding
|
|
from app.services.embedding_service import get_embedding
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def generate_session_embedding(session_id: UUID, db: AsyncSession) -> None:
|
|
"""Generate embedding for an AI session's content.
|
|
|
|
Builds a text chunk from the session's problem summary, resolution,
|
|
domain, and escalation reason, then embeds it via Voyage AI and
|
|
upserts into ai_session_embeddings.
|
|
"""
|
|
result = await db.execute(
|
|
select(AISession).where(AISession.id == session_id)
|
|
)
|
|
session = result.scalar_one_or_none()
|
|
if not session:
|
|
return
|
|
|
|
# Build text to embed — combine available session metadata
|
|
parts = []
|
|
if session.problem_summary:
|
|
parts.append(session.problem_summary)
|
|
if session.resolution_summary:
|
|
parts.append(f"Resolution: {session.resolution_summary}")
|
|
if session.problem_domain:
|
|
parts.append(f"Domain: {session.problem_domain}")
|
|
if session.escalation_reason:
|
|
parts.append(f"Escalation: {session.escalation_reason}")
|
|
|
|
if not parts:
|
|
return
|
|
|
|
chunk_text = " ".join(parts)
|
|
|
|
try:
|
|
embedding_vector = await get_embedding(chunk_text, input_type="document")
|
|
if not embedding_vector:
|
|
return
|
|
|
|
embedding_str = "[" + ",".join(str(v) for v in embedding_vector) + "]"
|
|
|
|
# Check for existing embedding
|
|
existing = await db.execute(
|
|
select(AISessionEmbedding).where(
|
|
AISessionEmbedding.session_id == session_id
|
|
)
|
|
)
|
|
embed_record = existing.scalar_one_or_none()
|
|
|
|
if embed_record:
|
|
# Update existing
|
|
embed_record.chunk_text = chunk_text
|
|
await db.execute(
|
|
text(
|
|
"UPDATE ai_session_embeddings "
|
|
"SET embedding = :emb::vector, updated_at = now() "
|
|
"WHERE session_id = :sid"
|
|
),
|
|
{"emb": embedding_str, "sid": str(session_id)},
|
|
)
|
|
else:
|
|
# Insert new via raw SQL to include vector column
|
|
await db.execute(
|
|
text("""
|
|
INSERT INTO ai_session_embeddings
|
|
(id, session_id, account_id, chunk_text, embedding_model, embedding, created_at, updated_at)
|
|
VALUES
|
|
(gen_random_uuid(), :session_id, :account_id, :chunk_text, :model, :embedding::vector, now(), now())
|
|
"""),
|
|
{
|
|
"session_id": str(session_id),
|
|
"account_id": str(session.account_id),
|
|
"chunk_text": chunk_text,
|
|
"model": "voyage-3.5",
|
|
"embedding": embedding_str,
|
|
},
|
|
)
|
|
|
|
await db.flush()
|
|
except Exception:
|
|
logger.warning(
|
|
"Failed to generate embedding for session %s", session_id, exc_info=True
|
|
)
|
|
|
|
|
|
async def find_similar_sessions(
|
|
session_id: UUID,
|
|
account_id: UUID,
|
|
db: AsyncSession,
|
|
limit: int = 5,
|
|
) -> list[dict]:
|
|
"""Find sessions similar to the given session using cosine similarity.
|
|
|
|
Returns a list of dicts with session metadata and similarity score,
|
|
ordered by highest similarity first.
|
|
"""
|
|
# Verify the source session has an embedding
|
|
check = await db.execute(
|
|
text(
|
|
"SELECT 1 FROM ai_session_embeddings "
|
|
"WHERE session_id = :sid AND embedding IS NOT NULL"
|
|
),
|
|
{"sid": str(session_id)},
|
|
)
|
|
if not check.first():
|
|
return []
|
|
|
|
# Cosine similarity search across all sessions in the account
|
|
result = await db.execute(
|
|
text("""
|
|
SELECT
|
|
e.session_id,
|
|
s.problem_summary,
|
|
s.problem_domain,
|
|
s.status,
|
|
s.resolution_summary,
|
|
s.created_at,
|
|
1 - (e.embedding <=> (
|
|
SELECT embedding FROM ai_session_embeddings WHERE session_id = :sid
|
|
)) as similarity
|
|
FROM ai_session_embeddings e
|
|
JOIN ai_sessions s ON s.id = e.session_id
|
|
WHERE e.account_id = :account_id
|
|
AND e.session_id != :sid
|
|
AND e.embedding IS NOT NULL
|
|
ORDER BY e.embedding <=> (
|
|
SELECT embedding FROM ai_session_embeddings WHERE session_id = :sid
|
|
)
|
|
LIMIT :lim
|
|
"""),
|
|
{
|
|
"sid": str(session_id),
|
|
"account_id": str(account_id),
|
|
"lim": limit,
|
|
},
|
|
)
|
|
|
|
rows = result.mappings().all()
|
|
return [
|
|
{
|
|
"id": str(row["session_id"]),
|
|
"problem_summary": row["problem_summary"],
|
|
"problem_domain": row["problem_domain"],
|
|
"status": row["status"],
|
|
"resolution_summary": row["resolution_summary"],
|
|
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
|
|
"similarity": round(float(row["similarity"]), 3) if row["similarity"] else 0,
|
|
}
|
|
for row in rows
|
|
]
|