feat: wire PDF and text file content into AI chat messages
PDF uploads were stored in S3 and had text extracted during upload, but fetch_upload_images() filtered exclusively for image MIME types, so document content never reached the AI. - Add fetch_upload_documents() in storage_service.py to retrieve extracted_content for PDFs and text files - Update ai_sessions.py chat endpoint to call both fetch_upload_images and fetch_upload_documents, injecting document text as context - Add PDF text extraction in _generate_ai_description (pypdf) - Add pypdf>=4.0.0 to requirements.txt - Fix test_db teardown to avoid connection pool issues - Add 5 tests for fetch_upload_documents Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -280,18 +280,28 @@ async def send_chat_message(
|
|||||||
user_id = current_user.id
|
user_id = current_user.id
|
||||||
account_id = current_user.account_id
|
account_id = current_user.account_id
|
||||||
|
|
||||||
# Fetch attached images from S3 (if any)
|
# Fetch attached uploads from S3 (if any)
|
||||||
images = None
|
images = None
|
||||||
|
message = data.message
|
||||||
if data.upload_ids:
|
if data.upload_ids:
|
||||||
from app.services.storage_service import fetch_upload_images
|
from app.services.storage_service import fetch_upload_images, fetch_upload_documents
|
||||||
images = await fetch_upload_images(data.upload_ids, account_id, db) or None
|
images = await fetch_upload_images(data.upload_ids, account_id, db) or None
|
||||||
|
|
||||||
|
# Inject document text (PDFs, text files) as context in the message
|
||||||
|
documents = await fetch_upload_documents(data.upload_ids, account_id, db)
|
||||||
|
if documents:
|
||||||
|
doc_parts = []
|
||||||
|
for doc in documents:
|
||||||
|
doc_parts.append(f"--- Attached file: {doc['filename']} ---\n{doc['text']}")
|
||||||
|
doc_context = "\n\n".join(doc_parts)
|
||||||
|
message = f"{message}\n\n[Attached document content]\n{doc_context}"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
ai_content, suggested_flows, session, fork_metadata, actions_data, questions_data = await unified_chat_service.send_chat_message(
|
ai_content, suggested_flows, session, fork_metadata, actions_data, questions_data = await unified_chat_service.send_chat_message(
|
||||||
session_id=session_id,
|
session_id=session_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
account_id=account_id,
|
account_id=account_id,
|
||||||
message=data.message,
|
message=message,
|
||||||
db=db,
|
db=db,
|
||||||
images=images,
|
images=images,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -61,6 +61,40 @@ async def _generate_ai_description(upload_id: UUID, file_data: bytes, content_ty
|
|||||||
max_tokens=100,
|
max_tokens=100,
|
||||||
)
|
)
|
||||||
upload.ai_description = description
|
upload.ai_description = description
|
||||||
|
elif content_type == "application/pdf":
|
||||||
|
try:
|
||||||
|
from pypdf import PdfReader
|
||||||
|
import io as _io
|
||||||
|
|
||||||
|
reader = PdfReader(_io.BytesIO(file_data))
|
||||||
|
pages_text = []
|
||||||
|
for page in reader.pages:
|
||||||
|
page_text = page.extract_text()
|
||||||
|
if page_text:
|
||||||
|
pages_text.append(page_text)
|
||||||
|
text_content = "\n\n".join(pages_text)
|
||||||
|
except Exception:
|
||||||
|
logger.warning("PDF text extraction failed for upload %s", upload_id)
|
||||||
|
text_content = ""
|
||||||
|
|
||||||
|
if text_content:
|
||||||
|
upload.extracted_content = text_content[:10000]
|
||||||
|
|
||||||
|
if len(text_content) > 2000:
|
||||||
|
summary, _, _ = await _call_ai(
|
||||||
|
system_base="You are a technical document analyst for IT troubleshooting.",
|
||||||
|
rag_context="",
|
||||||
|
history=[],
|
||||||
|
new_message=f"Summarize this PDF content in 2-3 sentences:\n\n{text_content[:5000]}",
|
||||||
|
max_tokens=200,
|
||||||
|
)
|
||||||
|
upload.content_summary = summary
|
||||||
|
upload.ai_description = summary
|
||||||
|
else:
|
||||||
|
upload.ai_description = f"PDF document: {upload.filename}"
|
||||||
|
else:
|
||||||
|
upload.ai_description = f"PDF document (no extractable text): {upload.filename}"
|
||||||
|
|
||||||
elif content_type.startswith("text/") or content_type in (
|
elif content_type.startswith("text/") or content_type in (
|
||||||
"application/json", "application/xml", "application/yaml",
|
"application/json", "application/xml", "application/yaml",
|
||||||
):
|
):
|
||||||
|
|||||||
@@ -16,10 +16,12 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
ALLOWED_IMAGE_TYPES = {"image/png", "image/jpeg", "image/gif", "image/webp"}
|
ALLOWED_IMAGE_TYPES = {"image/png", "image/jpeg", "image/gif", "image/webp"}
|
||||||
ALLOWED_TEXT_TYPES = {"text/plain", "text/csv", "application/octet-stream"}
|
ALLOWED_TEXT_TYPES = {"text/plain", "text/csv", "application/octet-stream"}
|
||||||
ALLOWED_TYPES = ALLOWED_IMAGE_TYPES | ALLOWED_TEXT_TYPES
|
ALLOWED_DOCUMENT_TYPES = {"application/pdf"}
|
||||||
|
ALLOWED_TYPES = ALLOWED_IMAGE_TYPES | ALLOWED_TEXT_TYPES | ALLOWED_DOCUMENT_TYPES
|
||||||
|
|
||||||
MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB
|
MAX_IMAGE_SIZE = 5 * 1024 * 1024 # 5MB
|
||||||
MAX_TEXT_SIZE = 1 * 1024 * 1024 # 1MB
|
MAX_TEXT_SIZE = 1 * 1024 * 1024 # 1MB
|
||||||
|
MAX_DOCUMENT_SIZE = 10 * 1024 * 1024 # 10MB
|
||||||
MAX_FILES_PER_SESSION = 20
|
MAX_FILES_PER_SESSION = 20
|
||||||
MAX_BYTES_PER_SESSION = 50 * 1024 * 1024 # 50MB
|
MAX_BYTES_PER_SESSION = 50 * 1024 * 1024 # 50MB
|
||||||
|
|
||||||
@@ -44,7 +46,12 @@ def validate_upload(content_type: str, size_bytes: int) -> str | None:
|
|||||||
"""Validate file type and size. Returns error message or None."""
|
"""Validate file type and size. Returns error message or None."""
|
||||||
if content_type not in ALLOWED_TYPES:
|
if content_type not in ALLOWED_TYPES:
|
||||||
return f"File type {content_type} not allowed"
|
return f"File type {content_type} not allowed"
|
||||||
max_size = MAX_IMAGE_SIZE if content_type in ALLOWED_IMAGE_TYPES else MAX_TEXT_SIZE
|
if content_type in ALLOWED_IMAGE_TYPES:
|
||||||
|
max_size = MAX_IMAGE_SIZE
|
||||||
|
elif content_type in ALLOWED_DOCUMENT_TYPES:
|
||||||
|
max_size = MAX_DOCUMENT_SIZE
|
||||||
|
else:
|
||||||
|
max_size = MAX_TEXT_SIZE
|
||||||
if size_bytes > max_size:
|
if size_bytes > max_size:
|
||||||
return f"File too large ({size_bytes} bytes, max {max_size})"
|
return f"File too large ({size_bytes} bytes, max {max_size})"
|
||||||
return None
|
return None
|
||||||
@@ -199,3 +206,77 @@ async def fetch_upload_images(
|
|||||||
except Exception:
|
except Exception:
|
||||||
logger.warning("Failed to fetch upload %s from S3", upload.id)
|
logger.warning("Failed to fetch upload %s from S3", upload.id)
|
||||||
return images
|
return images
|
||||||
|
|
||||||
|
|
||||||
|
DOCUMENT_CONTENT_TYPES = ALLOWED_DOCUMENT_TYPES | ALLOWED_TEXT_TYPES
|
||||||
|
MAX_DOCUMENT_CONTEXT_CHARS = 10_000 # Cap total injected text to control token cost
|
||||||
|
|
||||||
|
|
||||||
|
async def fetch_upload_documents(
|
||||||
|
upload_ids: list[UUID],
|
||||||
|
account_id: UUID,
|
||||||
|
db: Any,
|
||||||
|
) -> list[dict[str, str]]:
|
||||||
|
"""Fetch extracted text content for non-image uploads (PDFs, text files).
|
||||||
|
|
||||||
|
Returns a list of dicts with 'filename', 'content_type', and 'text' keys.
|
||||||
|
Text is sourced from the FileUpload.extracted_content field (populated
|
||||||
|
during upload by _generate_ai_description). Falls back to downloading
|
||||||
|
and decoding text files from S3 if extracted_content is empty.
|
||||||
|
"""
|
||||||
|
if not upload_ids:
|
||||||
|
return []
|
||||||
|
|
||||||
|
from sqlalchemy import select
|
||||||
|
from app.models.file_upload import FileUpload
|
||||||
|
|
||||||
|
result = await db.execute(
|
||||||
|
select(FileUpload).where(
|
||||||
|
FileUpload.id.in_(upload_ids),
|
||||||
|
FileUpload.account_id == account_id,
|
||||||
|
FileUpload.content_type.in_(DOCUMENT_CONTENT_TYPES),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
uploads = result.scalars().all()
|
||||||
|
|
||||||
|
documents: list[dict[str, str]] = []
|
||||||
|
total_chars = 0
|
||||||
|
for upload in uploads:
|
||||||
|
text = upload.extracted_content or ""
|
||||||
|
|
||||||
|
# Fallback: for text files without extracted_content, fetch from S3
|
||||||
|
if not text and upload.content_type in ALLOWED_TEXT_TYPES and settings.STORAGE_ENDPOINT:
|
||||||
|
try:
|
||||||
|
file_data = download_file(upload.storage_key)
|
||||||
|
try:
|
||||||
|
text = file_data.decode("utf-8")
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
text = file_data.decode("latin-1")
|
||||||
|
text = text[:MAX_DOCUMENT_CONTEXT_CHARS]
|
||||||
|
except Exception:
|
||||||
|
logger.warning("Failed to fetch text upload %s from S3", upload.id)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not text:
|
||||||
|
# PDF with no extractable text — include a note so AI knows
|
||||||
|
documents.append({
|
||||||
|
"filename": upload.filename,
|
||||||
|
"content_type": upload.content_type,
|
||||||
|
"text": f"[Attached file: {upload.filename} — no extractable text content]",
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Cap per-document and total to control token budget
|
||||||
|
remaining = MAX_DOCUMENT_CONTEXT_CHARS - total_chars
|
||||||
|
if remaining <= 0:
|
||||||
|
break
|
||||||
|
truncated = text[:remaining]
|
||||||
|
total_chars += len(truncated)
|
||||||
|
|
||||||
|
documents.append({
|
||||||
|
"filename": upload.filename,
|
||||||
|
"content_type": upload.content_type,
|
||||||
|
"text": truncated,
|
||||||
|
})
|
||||||
|
|
||||||
|
return documents
|
||||||
|
|||||||
@@ -57,3 +57,6 @@ boto3>=1.34.0
|
|||||||
|
|
||||||
# Image processing (vision upload resize)
|
# Image processing (vision upload resize)
|
||||||
Pillow>=10.0.0
|
Pillow>=10.0.0
|
||||||
|
|
||||||
|
# PDF text extraction (upload analysis)
|
||||||
|
pypdf>=4.0.0
|
||||||
|
|||||||
@@ -85,13 +85,25 @@ async def test_db() -> AsyncGenerator[AsyncSession, None]:
|
|||||||
# Provide session to test
|
# Provide session to test
|
||||||
async with async_session_maker() as session:
|
async with async_session_maker() as session:
|
||||||
yield session
|
yield session
|
||||||
|
# Ensure session is fully closed before teardown
|
||||||
|
await session.close()
|
||||||
|
|
||||||
|
# Dispose engine first so all pooled connections are released,
|
||||||
|
# then reconnect to perform the schema teardown cleanly.
|
||||||
|
await engine.dispose()
|
||||||
|
|
||||||
# Drop all tables after test (CASCADE for circular FKs)
|
# Drop all tables after test (CASCADE for circular FKs)
|
||||||
async with engine.begin() as conn:
|
teardown_engine = create_async_engine(
|
||||||
await conn.execute(sa.text("DROP SCHEMA public CASCADE"))
|
TEST_DATABASE_URL,
|
||||||
await conn.execute(sa.text("CREATE SCHEMA public"))
|
poolclass=NullPool,
|
||||||
|
echo=False,
|
||||||
await engine.dispose()
|
)
|
||||||
|
try:
|
||||||
|
async with teardown_engine.begin() as conn:
|
||||||
|
await conn.execute(sa.text("DROP SCHEMA public CASCADE"))
|
||||||
|
await conn.execute(sa.text("CREATE SCHEMA public"))
|
||||||
|
finally:
|
||||||
|
await teardown_engine.dispose()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
|||||||
@@ -134,6 +134,42 @@ async def test_upload_rejects_oversized_text(client, auth_headers):
|
|||||||
assert "too large" in response.json()["detail"].lower()
|
assert "too large" in response.json()["detail"].lower()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_upload_accepts_pdf(client, auth_headers):
|
||||||
|
"""Upload accepts application/pdf files (regression: was rejected with 400)."""
|
||||||
|
fake_key = f"uploads/acc/{uuid.uuid4()}.pdf"
|
||||||
|
fake_url = "https://fake-s3.example.com/presigned?token=pdf"
|
||||||
|
|
||||||
|
with patch("app.api.endpoints.uploads.settings") as mock_settings, \
|
||||||
|
patch("app.api.endpoints.uploads.storage_service") as mock_storage:
|
||||||
|
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
||||||
|
mock_storage.validate_upload.return_value = None
|
||||||
|
mock_storage.MAX_FILES_PER_SESSION = 20
|
||||||
|
mock_storage.MAX_BYTES_PER_SESSION = 50 * 1024 * 1024
|
||||||
|
mock_storage.upload_file = AsyncMock(return_value=fake_key)
|
||||||
|
mock_storage.get_presigned_url.return_value = fake_url
|
||||||
|
|
||||||
|
files = {"file": ("report.pdf", io.BytesIO(b"%PDF-1.4 test"), "application/pdf")}
|
||||||
|
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
||||||
|
|
||||||
|
assert response.status_code == 201
|
||||||
|
data = response.json()
|
||||||
|
assert data["filename"] == "report.pdf"
|
||||||
|
assert data["content_type"] == "application/pdf"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_upload_rejects_oversized_pdf(client, auth_headers):
|
||||||
|
"""Upload rejects PDF files exceeding 10 MB."""
|
||||||
|
large_data = b"%PDF-1.4 " + b"\x00" * (11 * 1024 * 1024) # 11 MB
|
||||||
|
with patch("app.api.endpoints.uploads.settings") as mock_settings:
|
||||||
|
mock_settings.STORAGE_ENDPOINT = "http://fake-s3"
|
||||||
|
files = {"file": ("huge.pdf", io.BytesIO(large_data), "application/pdf")}
|
||||||
|
response = await client.post("/api/v1/uploads", files=files, headers=auth_headers)
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert "too large" in response.json()["detail"].lower()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Happy path tests (storage fully mocked)
|
# Happy path tests (storage fully mocked)
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -299,3 +335,139 @@ async def test_delete_upload_forbidden_for_non_owner(client, auth_headers, test_
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert response.status_code == 403
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# fetch_upload_documents tests
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fetch_upload_documents_returns_pdf_content(client, auth_headers, test_db):
|
||||||
|
"""fetch_upload_documents returns extracted_content for PDF uploads."""
|
||||||
|
from app.models.file_upload import FileUpload
|
||||||
|
from app.models.user import User
|
||||||
|
from app.services.storage_service import fetch_upload_documents
|
||||||
|
from sqlalchemy import select
|
||||||
|
|
||||||
|
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
||||||
|
user = result.scalar_one()
|
||||||
|
|
||||||
|
upload = FileUpload(
|
||||||
|
account_id=user.account_id,
|
||||||
|
uploaded_by=user.id,
|
||||||
|
session_id=None,
|
||||||
|
filename="report.pdf",
|
||||||
|
content_type="application/pdf",
|
||||||
|
size_bytes=5000,
|
||||||
|
storage_key=f"uploads/{user.account_id}/{uuid.uuid4()}.pdf",
|
||||||
|
extracted_content="This is the extracted PDF text content.",
|
||||||
|
)
|
||||||
|
test_db.add(upload)
|
||||||
|
await test_db.commit()
|
||||||
|
await test_db.refresh(upload)
|
||||||
|
|
||||||
|
docs = await fetch_upload_documents([upload.id], user.account_id, test_db)
|
||||||
|
|
||||||
|
assert len(docs) == 1
|
||||||
|
assert docs[0]["filename"] == "report.pdf"
|
||||||
|
assert docs[0]["content_type"] == "application/pdf"
|
||||||
|
assert docs[0]["text"] == "This is the extracted PDF text content."
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fetch_upload_documents_excludes_images(client, auth_headers, test_db):
|
||||||
|
"""fetch_upload_documents does not return image uploads."""
|
||||||
|
from app.models.file_upload import FileUpload
|
||||||
|
from app.models.user import User
|
||||||
|
from app.services.storage_service import fetch_upload_documents
|
||||||
|
from sqlalchemy import select
|
||||||
|
|
||||||
|
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
||||||
|
user = result.scalar_one()
|
||||||
|
|
||||||
|
upload = FileUpload(
|
||||||
|
account_id=user.account_id,
|
||||||
|
uploaded_by=user.id,
|
||||||
|
session_id=None,
|
||||||
|
filename="screenshot.png",
|
||||||
|
content_type="image/png",
|
||||||
|
size_bytes=1024,
|
||||||
|
storage_key=f"uploads/{user.account_id}/{uuid.uuid4()}.png",
|
||||||
|
)
|
||||||
|
test_db.add(upload)
|
||||||
|
await test_db.commit()
|
||||||
|
await test_db.refresh(upload)
|
||||||
|
|
||||||
|
docs = await fetch_upload_documents([upload.id], user.account_id, test_db)
|
||||||
|
assert len(docs) == 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fetch_upload_documents_pdf_no_text(client, auth_headers, test_db):
|
||||||
|
"""PDF with no extracted text returns a placeholder note."""
|
||||||
|
from app.models.file_upload import FileUpload
|
||||||
|
from app.models.user import User
|
||||||
|
from app.services.storage_service import fetch_upload_documents
|
||||||
|
from sqlalchemy import select
|
||||||
|
|
||||||
|
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
||||||
|
user = result.scalar_one()
|
||||||
|
|
||||||
|
upload = FileUpload(
|
||||||
|
account_id=user.account_id,
|
||||||
|
uploaded_by=user.id,
|
||||||
|
session_id=None,
|
||||||
|
filename="scanned.pdf",
|
||||||
|
content_type="application/pdf",
|
||||||
|
size_bytes=2000,
|
||||||
|
storage_key=f"uploads/{user.account_id}/{uuid.uuid4()}.pdf",
|
||||||
|
extracted_content=None,
|
||||||
|
)
|
||||||
|
test_db.add(upload)
|
||||||
|
await test_db.commit()
|
||||||
|
await test_db.refresh(upload)
|
||||||
|
|
||||||
|
docs = await fetch_upload_documents([upload.id], user.account_id, test_db)
|
||||||
|
|
||||||
|
assert len(docs) == 1
|
||||||
|
assert "no extractable text" in docs[0]["text"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fetch_upload_documents_respects_account_filter(client, auth_headers, test_db):
|
||||||
|
"""fetch_upload_documents only returns uploads belonging to the given account."""
|
||||||
|
from app.models.file_upload import FileUpload
|
||||||
|
from app.models.user import User
|
||||||
|
from app.services.storage_service import fetch_upload_documents
|
||||||
|
from sqlalchemy import select
|
||||||
|
|
||||||
|
result = await test_db.execute(select(User).where(User.email == "test@example.com"))
|
||||||
|
user = result.scalar_one()
|
||||||
|
|
||||||
|
upload = FileUpload(
|
||||||
|
account_id=user.account_id,
|
||||||
|
uploaded_by=user.id,
|
||||||
|
session_id=None,
|
||||||
|
filename="report.pdf",
|
||||||
|
content_type="application/pdf",
|
||||||
|
size_bytes=5000,
|
||||||
|
storage_key=f"uploads/{user.account_id}/{uuid.uuid4()}.pdf",
|
||||||
|
extracted_content="Secret content",
|
||||||
|
)
|
||||||
|
test_db.add(upload)
|
||||||
|
await test_db.commit()
|
||||||
|
await test_db.refresh(upload)
|
||||||
|
|
||||||
|
# Query with a different account_id — should get nothing
|
||||||
|
other_account = uuid.uuid4()
|
||||||
|
docs = await fetch_upload_documents([upload.id], other_account, test_db)
|
||||||
|
assert len(docs) == 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fetch_upload_documents_empty_ids(client, auth_headers, test_db):
|
||||||
|
"""Empty upload_ids returns empty list without querying DB."""
|
||||||
|
from app.services.storage_service import fetch_upload_documents
|
||||||
|
|
||||||
|
docs = await fetch_upload_documents([], uuid.uuid4(), test_db)
|
||||||
|
assert docs == []
|
||||||
|
|||||||
Reference in New Issue
Block a user