Files
resolutionflow/backend/app/core/kb_extraction_service.py
chihlasm 03390ed59f feat: enable Markdown (.md) file upload in KB Accelerator
Moved md from Phase 2 extensions to allowed formats, added extraction
handler (reuses txt handler), and updated plan_limits defaults to
include md for all plans.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 23:29:51 -04:00

201 lines
6.1 KiB
Python

"""KB Accelerator text extraction service.
Extracts plain text and structural metadata from uploaded KB articles.
Phase 1: txt, paste, docx. Phase 2 will add pdf, html, md.
"""
import io
import logging
from typing import Any, Callable
logger = logging.getLogger(__name__)
# Type alias for extraction handlers
ExtractResult = tuple[str, dict[str, Any] | None]
ExtractHandler = Callable[[bytes], ExtractResult]
def _extract_txt(content_bytes: bytes) -> ExtractResult:
"""Extract from plain text — pass through with no metadata."""
text = content_bytes.decode("utf-8", errors="replace")
return text.strip(), None
def _extract_paste(content_bytes: bytes) -> ExtractResult:
"""Extract from pasted text — identical to txt."""
return _extract_txt(content_bytes)
def _extract_docx(content_bytes: bytes) -> ExtractResult:
"""Extract text and structural metadata from a DOCX file.
Preserves heading levels, list structures, table content,
and bold/italic emphasis markers.
"""
try:
from docx import Document
from docx.enum.text import WD_ALIGN_PARAGRAPH
except ImportError:
raise RuntimeError(
"python-docx is required for DOCX extraction. "
"Install it with: pip install python-docx"
)
doc = Document(io.BytesIO(content_bytes))
text_parts: list[str] = []
metadata: dict[str, Any] = {
"headings": [],
"lists": [],
"tables": [],
"emphasis": [],
}
list_items: list[dict[str, Any]] = []
current_list_type: str | None = None
for i, para in enumerate(doc.paragraphs):
style_name = para.style.name if para.style else ""
text = para.text.strip()
if not text:
# Flush any accumulated list
if list_items:
metadata["lists"].append({
"type": current_list_type or "unordered",
"items": list_items,
})
list_items = []
current_list_type = None
text_parts.append("")
continue
# Detect headings
if style_name.startswith("Heading"):
try:
level = int(style_name.split()[-1])
except (ValueError, IndexError):
level = 1
metadata["headings"].append({
"level": level,
"text": text,
"paragraph_index": i,
})
text_parts.append(text)
continue
# Detect list items
if style_name.startswith("List"):
is_ordered = "Number" in style_name or "Ordered" in style_name
list_type = "ordered" if is_ordered else "unordered"
if current_list_type is not None and current_list_type != list_type:
# Flush previous list
metadata["lists"].append({
"type": current_list_type,
"items": list_items,
})
list_items = []
current_list_type = list_type
list_items.append({"text": text, "paragraph_index": i})
text_parts.append(text)
continue
# Flush any accumulated list before a non-list paragraph
if list_items:
metadata["lists"].append({
"type": current_list_type or "unordered",
"items": list_items,
})
list_items = []
current_list_type = None
# Detect emphasis (bold/italic runs)
for run in para.runs:
run_text = run.text.strip()
if not run_text:
continue
if run.bold:
metadata["emphasis"].append({
"type": "bold",
"text": run_text,
"paragraph_index": i,
})
if run.italic:
metadata["emphasis"].append({
"type": "italic",
"text": run_text,
"paragraph_index": i,
})
text_parts.append(text)
# Flush trailing list
if list_items:
metadata["lists"].append({
"type": current_list_type or "unordered",
"items": list_items,
})
# Extract tables
for t_idx, table in enumerate(doc.tables):
table_data: list[list[str]] = []
for row in table.rows:
table_data.append([cell.text.strip() for cell in row.cells])
if table_data:
metadata["tables"].append({
"table_index": t_idx,
"rows": table_data,
})
# Also add table content to text
for row in table_data:
text_parts.append(" | ".join(row))
full_text = "\n".join(text_parts).strip()
# Clean up empty metadata sections
metadata = {k: v for k, v in metadata.items() if v}
return full_text, metadata if metadata else None
# Registry of format handlers — extend for Phase 2
FORMAT_HANDLERS: dict[str, ExtractHandler] = {
"txt": _extract_txt,
"md": _extract_txt,
"paste": _extract_paste,
"docx": _extract_docx,
}
def extract_text(
content_bytes: bytes,
source_format: str,
) -> ExtractResult:
"""Extract plain text and structural metadata from uploaded content.
Args:
content_bytes: Raw bytes of the uploaded content.
source_format: Format identifier ('txt', 'paste', 'docx', etc.)
Returns:
Tuple of (plain_text, structural_metadata_or_none).
Raises:
ValueError: If the format is not supported.
RuntimeError: If a required extraction library is not installed.
"""
handler = FORMAT_HANDLERS.get(source_format)
if handler is None:
raise ValueError(f"Unsupported format: {source_format}")
logger.info("Extracting text from format=%s", source_format)
text, metadata = handler(content_bytes)
if not text.strip():
raise ValueError("Extracted text is empty — the document may be blank or contain only images.")
logger.info(
"Extraction complete: %d chars, metadata=%s",
len(text),
"yes" if metadata else "no",
)
return text, metadata