From e3561034085cc704dfac6335c4f1ce997f888fb1 Mon Sep 17 00:00:00 2001 From: chihlasm Date: Fri, 20 Mar 2026 03:48:09 +0000 Subject: [PATCH] feat(search): add semantic similar session matching via Voyage AI embeddings Adds vector-based similar session discovery using the existing Voyage AI embedding infrastructure and pgvector cosine similarity search. - New AISessionEmbedding model with vector(1024) column - session_embedding_service: generate + upsert embeddings, find similar sessions - Embeddings generated on session create (from problem_summary/domain) and updated on resolve (adds resolution_summary) - GET /ai-sessions/{id}/similar endpoint returns top-N similar sessions - Migration a7c9e3b1f402 creates ai_session_embeddings table Co-Authored-By: Claude Opus 4.6 (1M context) --- ...3b1f402_add_ai_session_embeddings_table.py | 83 +++++++++ backend/app/api/endpoints/ai_sessions.py | 26 +++ backend/app/models/__init__.py | 2 + backend/app/models/ai_session_embedding.py | 53 ++++++ backend/app/services/flowpilot_engine.py | 14 ++ .../app/services/session_embedding_service.py | 165 ++++++++++++++++++ 6 files changed, 343 insertions(+) create mode 100644 backend/alembic/versions/a7c9e3b1f402_add_ai_session_embeddings_table.py create mode 100644 backend/app/models/ai_session_embedding.py create mode 100644 backend/app/services/session_embedding_service.py diff --git a/backend/alembic/versions/a7c9e3b1f402_add_ai_session_embeddings_table.py b/backend/alembic/versions/a7c9e3b1f402_add_ai_session_embeddings_table.py new file mode 100644 index 00000000..1c9a6cdf --- /dev/null +++ b/backend/alembic/versions/a7c9e3b1f402_add_ai_session_embeddings_table.py @@ -0,0 +1,83 @@ +"""add ai_session_embeddings table for similar-session matching + +Revision ID: a7c9e3b1f402 +Revises: dbf67047d4c8 +Create Date: 2026-03-20 +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + + +# revision identifiers, used by Alembic. +revision: str = "a7c9e3b1f402" +down_revision: Union[str, None] = "dbf67047d4c8" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # pgvector extension should already exist from migration 042 + op.execute("CREATE EXTENSION IF NOT EXISTS vector") + + op.create_table( + "ai_session_embeddings", + sa.Column( + "id", + postgresql.UUID(as_uuid=True), + primary_key=True, + server_default=sa.text("gen_random_uuid()"), + ), + sa.Column( + "session_id", + postgresql.UUID(as_uuid=True), + sa.ForeignKey("ai_sessions.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column( + "account_id", + postgresql.UUID(as_uuid=True), + sa.ForeignKey("accounts.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("chunk_text", sa.Text(), nullable=False), + sa.Column( + "embedding_model", + sa.String(50), + nullable=False, + server_default="voyage-3.5", + ), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + ), + ) + + # Add vector column via raw SQL (pgvector type not in SA dialect) + op.execute( + "ALTER TABLE ai_session_embeddings ADD COLUMN embedding vector(1024)" + ) + + op.create_index( + "ix_ai_session_embeddings_session_id", + "ai_session_embeddings", + ["session_id"], + unique=True, + ) + op.create_index( + "ix_ai_session_embeddings_account_id", + "ai_session_embeddings", + ["account_id"], + ) + + +def downgrade() -> None: + op.drop_table("ai_session_embeddings") diff --git a/backend/app/api/endpoints/ai_sessions.py b/backend/app/api/endpoints/ai_sessions.py index 8502f926..d2ee73ca 100644 --- a/backend/app/api/endpoints/ai_sessions.py +++ b/backend/app/api/endpoints/ai_sessions.py @@ -493,6 +493,32 @@ async def search_sessions( ] +# ── Similar Sessions ── + +@router.get("/{session_id}/similar") +@limiter.limit("15/minute") +async def get_similar_sessions( + request: Request, + session_id: UUID, + current_user: Annotated[User, Depends(get_current_active_user)], + db: Annotated[AsyncSession, Depends(get_db)], + limit: int = Query(5, ge=1, le=20), +): + """Find sessions semantically similar to this one using vector embeddings.""" + from app.services.session_embedding_service import find_similar_sessions + + if not current_user.account_id: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="No account") + + results = await find_similar_sessions( + session_id=session_id, + account_id=current_user.account_id, + db=db, + limit=limit, + ) + return results + + # ── List sessions ── @router.get("", response_model=list[AISessionSummary]) diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index a01c04c4..a38cc391 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -48,6 +48,7 @@ from .notification_log import NotificationLog from .notification import Notification from .psa_activity_log import PsaActivityLog from .file_upload import FileUpload +from .ai_session_embedding import AISessionEmbedding __all__ = [ "User", @@ -110,4 +111,5 @@ __all__ = [ "Notification", "PsaActivityLog", "FileUpload", + "AISessionEmbedding", ] diff --git a/backend/app/models/ai_session_embedding.py b/backend/app/models/ai_session_embedding.py new file mode 100644 index 00000000..a7baa210 --- /dev/null +++ b/backend/app/models/ai_session_embedding.py @@ -0,0 +1,53 @@ +"""AI session embedding storage for similar-session matching. + +Stores vector embeddings of AI session content (problem summary, resolution, +domain) for cosine similarity search via pgvector. One embedding per session. +""" +import uuid +from datetime import datetime, timezone + +from sqlalchemy import String, Text, DateTime, ForeignKey, Index +from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy.dialects.postgresql import UUID + +from app.core.database import Base + +try: + from pgvector.sqlalchemy import Vector +except ImportError: + Vector = None + + +class AISessionEmbedding(Base): + __tablename__ = "ai_session_embeddings" + __table_args__ = ( + Index("ix_ai_session_embeddings_account_id", "account_id"), + Index("ix_ai_session_embeddings_session_id", "session_id", unique=True), + ) + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + session_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("ai_sessions.id", ondelete="CASCADE"), + nullable=False, + ) + account_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("accounts.id", ondelete="CASCADE"), + nullable=False, + ) + chunk_text: Mapped[str] = mapped_column(Text, nullable=False) + embedding_model: Mapped[str] = mapped_column( + String(50), nullable=False, default="voyage-3.5" + ) + # The embedding column is created via migration with vector(1024) type + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=lambda: datetime.now(timezone.utc), + onupdate=lambda: datetime.now(timezone.utc), + ) diff --git a/backend/app/services/flowpilot_engine.py b/backend/app/services/flowpilot_engine.py index ad02d1c8..40dbfe87 100644 --- a/backend/app/services/flowpilot_engine.py +++ b/backend/app/services/flowpilot_engine.py @@ -331,6 +331,13 @@ async def start_session( await db.flush() + # Generate session embedding for similar-session matching (fire-and-forget) + try: + from app.services.session_embedding_service import generate_session_embedding + await generate_session_embedding(session.id, db) + except Exception: + logger.warning("Failed to generate session embedding on create", exc_info=True) + return AISessionCreateResponse( session_id=session.id, status=session.status, @@ -492,6 +499,13 @@ async def resolve_session( await db.flush() + # Update session embedding with resolution data for similar-session matching + try: + from app.services.session_embedding_service import generate_session_embedding + await generate_session_embedding(session.id, db) + except Exception: + logger.warning("Failed to update session embedding on resolve", exc_info=True) + # Push documentation to PSA if ticket is linked psa_result = await _push_to_psa(session, user_id, db) diff --git a/backend/app/services/session_embedding_service.py b/backend/app/services/session_embedding_service.py new file mode 100644 index 00000000..641185df --- /dev/null +++ b/backend/app/services/session_embedding_service.py @@ -0,0 +1,165 @@ +"""Generate and store embeddings for AI sessions for similar-session matching. + +Uses Voyage AI (voyage-3.5, 1024 dims) via the shared embedding_service to +create vector representations of session content. Enables cosine similarity +search across sessions within the same account. +""" +import logging +from uuid import UUID + +from sqlalchemy import select, text +from sqlalchemy.ext.asyncio import AsyncSession + +from app.models.ai_session import AISession +from app.models.ai_session_embedding import AISessionEmbedding +from app.services.embedding_service import get_embedding + +logger = logging.getLogger(__name__) + + +async def generate_session_embedding(session_id: UUID, db: AsyncSession) -> None: + """Generate embedding for an AI session's content. + + Builds a text chunk from the session's problem summary, resolution, + domain, and escalation reason, then embeds it via Voyage AI and + upserts into ai_session_embeddings. + """ + result = await db.execute( + select(AISession).where(AISession.id == session_id) + ) + session = result.scalar_one_or_none() + if not session: + return + + # Build text to embed — combine available session metadata + parts = [] + if session.problem_summary: + parts.append(session.problem_summary) + if session.resolution_summary: + parts.append(f"Resolution: {session.resolution_summary}") + if session.problem_domain: + parts.append(f"Domain: {session.problem_domain}") + if session.escalation_reason: + parts.append(f"Escalation: {session.escalation_reason}") + + if not parts: + return + + chunk_text = " ".join(parts) + + try: + embedding_vector = await get_embedding(chunk_text, input_type="document") + if not embedding_vector: + return + + embedding_str = "[" + ",".join(str(v) for v in embedding_vector) + "]" + + # Check for existing embedding + existing = await db.execute( + select(AISessionEmbedding).where( + AISessionEmbedding.session_id == session_id + ) + ) + embed_record = existing.scalar_one_or_none() + + if embed_record: + # Update existing + embed_record.chunk_text = chunk_text + await db.execute( + text( + "UPDATE ai_session_embeddings " + "SET embedding = :emb::vector, updated_at = now() " + "WHERE session_id = :sid" + ), + {"emb": embedding_str, "sid": str(session_id)}, + ) + else: + # Insert new via raw SQL to include vector column + await db.execute( + text(""" + INSERT INTO ai_session_embeddings + (id, session_id, account_id, chunk_text, embedding_model, embedding, created_at, updated_at) + VALUES + (gen_random_uuid(), :session_id, :account_id, :chunk_text, :model, :embedding::vector, now(), now()) + """), + { + "session_id": str(session_id), + "account_id": str(session.account_id), + "chunk_text": chunk_text, + "model": "voyage-3.5", + "embedding": embedding_str, + }, + ) + + await db.flush() + except Exception: + logger.warning( + "Failed to generate embedding for session %s", session_id, exc_info=True + ) + + +async def find_similar_sessions( + session_id: UUID, + account_id: UUID, + db: AsyncSession, + limit: int = 5, +) -> list[dict]: + """Find sessions similar to the given session using cosine similarity. + + Returns a list of dicts with session metadata and similarity score, + ordered by highest similarity first. + """ + # Verify the source session has an embedding + check = await db.execute( + text( + "SELECT 1 FROM ai_session_embeddings " + "WHERE session_id = :sid AND embedding IS NOT NULL" + ), + {"sid": str(session_id)}, + ) + if not check.first(): + return [] + + # Cosine similarity search across all sessions in the account + result = await db.execute( + text(""" + SELECT + e.session_id, + s.problem_summary, + s.problem_domain, + s.status, + s.resolution_summary, + s.created_at, + 1 - (e.embedding <=> ( + SELECT embedding FROM ai_session_embeddings WHERE session_id = :sid + )) as similarity + FROM ai_session_embeddings e + JOIN ai_sessions s ON s.id = e.session_id + WHERE e.account_id = :account_id + AND e.session_id != :sid + AND e.embedding IS NOT NULL + ORDER BY e.embedding <=> ( + SELECT embedding FROM ai_session_embeddings WHERE session_id = :sid + ) + LIMIT :lim + """), + { + "sid": str(session_id), + "account_id": str(account_id), + "lim": limit, + }, + ) + + rows = result.mappings().all() + return [ + { + "id": str(row["session_id"]), + "problem_summary": row["problem_summary"], + "problem_domain": row["problem_domain"], + "status": row["status"], + "resolution_summary": row["resolution_summary"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + "similarity": round(float(row["similarity"]), 3) if row["similarity"] else 0, + } + for row in rows + ]