diff --git a/backend/app/api/endpoints/ai_sessions.py b/backend/app/api/endpoints/ai_sessions.py index 3fb43e15..d245b3bc 100644 --- a/backend/app/api/endpoints/ai_sessions.py +++ b/backend/app/api/endpoints/ai_sessions.py @@ -139,13 +139,18 @@ async def create_session( ) except Exception as e: logger.exception("FlowPilot session start failed: %s", e) - await _record_usage( - current_user, db, - generation_type="flowpilot_start", - input_tokens=0, output_tokens=0, - succeeded=False, error_code=type(e).__name__, - ) - await db.commit() + # Rollback the failed transaction before attempting usage recording + await db.rollback() + try: + await _record_usage( + current_user, db, + generation_type="flowpilot_start", + input_tokens=0, output_tokens=0, + succeeded=False, error_code=type(e).__name__, + ) + await db.commit() + except Exception: + logger.warning("Failed to record usage after session start failure", exc_info=True) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"AI provider error ({type(e).__name__}). Please try again.", @@ -193,15 +198,19 @@ async def respond_to_step( raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) except Exception as e: logger.exception("FlowPilot response failed: %s", e) - await _record_usage( - current_user, db, - generation_type="flowpilot_respond", - input_tokens=0, output_tokens=0, - succeeded=False, - session_id=session_id, - error_code=type(e).__name__, - ) - await db.commit() + await db.rollback() + try: + await _record_usage( + current_user, db, + generation_type="flowpilot_respond", + input_tokens=0, output_tokens=0, + succeeded=False, + session_id=session_id, + error_code=type(e).__name__, + ) + await db.commit() + except Exception: + logger.warning("Failed to record usage after response failure", exc_info=True) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"AI provider error ({type(e).__name__}). Please try again.", @@ -387,15 +396,19 @@ async def pickup_session( raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) except Exception as e: logger.exception("FlowPilot pickup failed: %s", e) - await _record_usage( - current_user, db, - generation_type="flowpilot_pickup", - input_tokens=0, output_tokens=0, - succeeded=False, - session_id=session_id, - error_code=type(e).__name__, - ) - await db.commit() + await db.rollback() + try: + await _record_usage( + current_user, db, + generation_type="flowpilot_pickup", + input_tokens=0, output_tokens=0, + succeeded=False, + session_id=session_id, + error_code=type(e).__name__, + ) + await db.commit() + except Exception: + logger.warning("Failed to record usage after pickup failure", exc_info=True) raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"AI provider error ({type(e).__name__}). Please try again.", diff --git a/backend/app/services/session_embedding_service.py b/backend/app/services/session_embedding_service.py index 641185df..65fe27da 100644 --- a/backend/app/services/session_embedding_service.py +++ b/backend/app/services/session_embedding_service.py @@ -54,44 +54,44 @@ async def generate_session_embedding(session_id: UUID, db: AsyncSession) -> None embedding_str = "[" + ",".join(str(v) for v in embedding_vector) + "]" - # Check for existing embedding - existing = await db.execute( - select(AISessionEmbedding).where( - AISessionEmbedding.session_id == session_id + # Use a savepoint so failures don't poison the parent transaction + async with db.begin_nested(): + # Check for existing embedding + existing = await db.execute( + select(AISessionEmbedding).where( + AISessionEmbedding.session_id == session_id + ) ) - ) - embed_record = existing.scalar_one_or_none() + embed_record = existing.scalar_one_or_none() - if embed_record: - # Update existing - embed_record.chunk_text = chunk_text - await db.execute( - text( - "UPDATE ai_session_embeddings " - "SET embedding = :emb::vector, updated_at = now() " - "WHERE session_id = :sid" - ), - {"emb": embedding_str, "sid": str(session_id)}, - ) - else: - # Insert new via raw SQL to include vector column - await db.execute( - text(""" - INSERT INTO ai_session_embeddings - (id, session_id, account_id, chunk_text, embedding_model, embedding, created_at, updated_at) - VALUES - (gen_random_uuid(), :session_id, :account_id, :chunk_text, :model, :embedding::vector, now(), now()) - """), - { - "session_id": str(session_id), - "account_id": str(session.account_id), - "chunk_text": chunk_text, - "model": "voyage-3.5", - "embedding": embedding_str, - }, - ) - - await db.flush() + if embed_record: + # Update existing + embed_record.chunk_text = chunk_text + await db.execute( + text( + "UPDATE ai_session_embeddings " + "SET embedding = :emb::vector, updated_at = now() " + "WHERE session_id = :sid" + ), + {"emb": embedding_str, "sid": str(session_id)}, + ) + else: + # Insert new via raw SQL to include vector column + await db.execute( + text(""" + INSERT INTO ai_session_embeddings + (id, session_id, account_id, chunk_text, embedding_model, embedding, created_at, updated_at) + VALUES + (gen_random_uuid(), :session_id, :account_id, :chunk_text, :model, :embedding::vector, now(), now()) + """), + { + "session_id": str(session_id), + "account_id": str(session.account_id), + "chunk_text": chunk_text, + "model": "voyage-3.5", + "embedding": embedding_str, + }, + ) except Exception: logger.warning( "Failed to generate embedding for session %s", session_id, exc_info=True