sanchitshaleen
Initial deployment of RAG with Gemma-3 to Hugging Face Spaces
4aec76b
""" A script which will deal with ingestion of new documents into the vector database.
- Currently has file ingestion which supports txt, pdf, and md files.
- Plan to add more file types in the future.
- Plan to add web based ingestion in the future.
- Now supports multimodal ingestion with image embeddings and OCR text extraction from images
"""
from typing import List
from pathlib import Path
import uuid
from llm_system.utils.loader import load_file
from llm_system.utils.splitter import split_text
from llm_system.core.llm import get_embeddings
from llm_system.core import colpali_embeddings
from llm_system.core import colpali_storage
# For type hinting
from llm_system.core.qdrant_database import VectorDB
from langchain_core.embeddings import Embeddings
from langchain_core.documents import Document
from PIL import Image
# Try to import pytesseract for OCR
try:
import pytesseract
PYTESSERACT_AVAILABLE = True
except ImportError:
PYTESSERACT_AVAILABLE = False
from logger import get_logger
log = get_logger(name="core_ingestion")
def _extract_text_from_image(image_path: str) -> str:
"""Extract text from an image using OCR (Tesseract).
Args:
image_path: Path to the image file
Returns:
Extracted text from the image, or empty string if OCR fails or unavailable
"""
if not PYTESSERACT_AVAILABLE:
return ""
try:
image = Image.open(image_path)
# Extract text using Tesseract OCR
text = pytesseract.image_to_string(image)
if text.strip():
log.info(f"βœ… Extracted {len(text)} characters of OCR text from {Path(image_path).name}")
return text.strip()
else:
log.debug(f"No text found in image via OCR: {image_path}")
return ""
except Exception as e:
log.warning(f"⚠️ Failed to extract OCR text from {image_path}: {e}")
return ""
def _create_image_documents(documents: List[Document], user_id: str) -> List[Document]:
"""Create Document objects for extracted images to be embedded.
Extracts image references from document metadata and creates separate
documents for each image that can be embedded with Jina multimodal embeddings.
Automatically extracts text from images using OCR to enhance searchability.
Only creates unique images (avoids duplicates when same images attached to multiple text chunks).
Args:
documents: List of documents that may contain extracted images in metadata
user_id: User ID for tracking
Returns:
List of Document objects for images (empty if no images found)
"""
image_documents = []
seen_image_ids = set() # Track already-created images to avoid duplicates
# Collect all unique images from all documents
all_images = []
for doc in documents:
if 'images' in doc.metadata and doc.metadata['images']:
all_images.extend(doc.metadata['images'])
# Create documents for unique images only
for img in all_images:
image_id = img.get('image_id', 'unknown')
# Skip if we've already created a document for this image
if image_id in seen_image_ids:
continue
seen_image_ids.add(image_id)
# Extract text from image using OCR
image_path = img.get('image_path')
ocr_text = ""
if image_path:
ocr_text = _extract_text_from_image(str(image_path))
# Save OCR text to file alongside image (for auditing/reference)
if ocr_text:
try:
from pathlib import Path
ocr_file_path = Path(str(image_path)).with_suffix('.txt')
ocr_file_path.write_text(ocr_text, encoding='utf-8')
log.debug(f"Saved OCR text to: {ocr_file_path}")
except Exception as e:
log.warning(f"Could not save OCR text to file: {e}")
# Create page_content with OCR text if available, otherwise minimal content
if ocr_text:
page_content = f"[IMAGE] {image_id}\n\nOCR Text:\n{ocr_text}"
else:
page_content = f"[IMAGE] {image_id}\n\nImage extracted from document. Use visual understanding to analyze."
# Try to generate ColPali embeddings for visual understanding
colpali_embedding = None
colpali_embedding_summary = None
if colpali_embeddings.is_colpali_available() and image_path:
colpali_embedding = colpali_embeddings.embed_image(str(image_path))
# Store a summary (num_patches) for Qdrant persistence
if colpali_embedding:
colpali_embedding_summary = colpali_embedding.get('num_patches', 0)
# Persist full embedding to disk for later retrieval
colpali_storage.save_colpali_embedding(image_id, user_id, colpali_embedding)
# Create a document for this image
image_doc = Document(
page_content=page_content,
metadata={
'user_id': user_id,
'type': 'image',
'image_id': image_id,
'image_path': str(image_path), # Ensure string for serialization
'page_number': img.get('page_number'),
'position': img.get('position'),
'source_pdf': img.get('metadata', {}).get('source_pdf'),
'extractor': img.get('metadata', {}).get('extractor'),
'has_ocr_text': bool(ocr_text),
'colpali_available': colpali_embedding_summary is not None,
'colpali_patches': colpali_embedding_summary or 0, # Number of patch embeddings
}
)
image_documents.append(image_doc)
if image_documents:
ocr_count = sum(1 for doc in image_documents if doc.metadata.get('has_ocr_text'))
colpali_count = sum(1 for doc in image_documents if doc.metadata.get('colpali_available'))
log.info(f"βœ… Created {len(image_documents)} unique image documents ({ocr_count} with OCR, {colpali_count} with ColPali embeddings)")
return image_documents
def ingest_file(user_id: str, file_path: str, vectorstore: VectorDB,
embeddings: Embeddings = None) -> tuple[bool, List[str], str]:
"""Ingest a file into the vector database with multimodal support.
Ingests both text chunks and extracted images, using Jina embeddings
if available for multimodal support.
Returns the ids of vector embeddings stored in database.
Args:
user_id: User identifier for document tracking
file_path (str): The absolute path to the file to be ingested.
vectorstore (VectorDB): The vector database instance.
embeddings (Embeddings): Optional embeddings model. If None, uses get_embeddings() factory.
Returns:
tuple[bool, List[str], str]: A tuple containing:
- bool: True if ingestion was successful, False otherwise.
- List[str]: List of document IDs that were ingested.
- str: Message indicating the result of the ingestion.
"""
log.info(f"πŸ”„ [INGESTION START] user_id={user_id}, file_path={file_path}")
# Use provided embeddings or get from factory (supports Jina multimodal)
if embeddings is None:
log.info("πŸ“Š Using embeddings from factory (will use Jina if configured)")
embeddings = get_embeddings()
# Load the file and get its content as Document objects:
log.info(f"πŸ“‚ [STEP 1] Loading file from: {file_path}")
status, documents, message = load_file(user_id, file_path)
log.info(f"πŸ“‚ [STEP 1 RESULT] status={status}, docs_count={len(documents) if documents else 0}, message={message}")
if not status:
log.error(f"❌ [STEP 1 FAILED] load_file returned False: {message}")
return False, [], message
# Split the documents into smaller chunks:
log.info(f"βœ‚οΈ [STEP 2] Splitting {len(documents)} documents into chunks")
status, split_docs, message = split_text(documents)
log.info(f"βœ‚οΈ [STEP 2 RESULT] status={status}, chunks_count={len(split_docs) if split_docs else 0}, message={message}")
if status and not split_docs:
log.warning(f"⚠️ [STEP 2 WARNING] No content found in the file: {file_path}")
return True, [], f"No content found in the file: {file_path}"
if not status:
log.error(f"❌ [STEP 2 FAILED] split_text returned False: {message}")
return False, [], message
# Create image documents for multimodal embedding
log.info(f"πŸ–ΌοΈ [STEP 2.5] Extracting image documents for multimodal embedding")
image_docs = _create_image_documents(documents, user_id)
# Combine text chunks and image documents
all_docs_to_embed = split_docs + image_docs
total_docs_count = len(all_docs_to_embed)
log.info(f"πŸ“¦ [STEP 2.5 RESULT] text_chunks={len(split_docs)}, image_docs={len(image_docs)}, total={total_docs_count}")
# Add the documents to the vector database:
log.info(f"πŸ’Ύ [STEP 3] Adding {total_docs_count} documents (text + images) to vector database")
try:
doc_ids = vectorstore.db.add_documents(all_docs_to_embed)
log.info(f"πŸ’Ύ [STEP 3 RESULT] doc_ids={len(doc_ids) if doc_ids else 0}")
log.info(f"πŸ’Ώ [STEP 4] Saving vector database to disk")
if vectorstore.save_db_to_disk():
log.info(f"βœ… [INGESTION SUCCESS] Ingested {total_docs_count} items ({len(split_docs)} text chunks + {len(image_docs)} images) from {file_path}")
return True, doc_ids, f"Ingested {total_docs_count} items successfully ({len(split_docs)} text chunks + {len(image_docs)} images)."
else:
log.error("❌ [STEP 4 FAILED] Failed to save the vector database to disk after ingestion.")
return False, [], "Failed to save the vector database to disk after ingestion."
except Exception as e:
log.error(f"❌ [STEP 3 EXCEPTION] Failed to add documents to vector database: {e}")
import traceback
log.error(f"Traceback: {traceback.format_exc()}")
return False, [], f"Failed to ingest documents: {e}"
if __name__ == "__main__":
from dotenv import load_dotenv
from langchain.callbacks.tracers.langchain import wait_for_all_tracers
load_dotenv()
# Example usage
user = "test_user"
# example_file_path = "../../../GenAI/Data/attention_is_all_you_need_1706.03762v7.pdf"
example_file_path="/Users/neetikasaxena/Documents/sanchit/sample_code/chat-with-your-data/test_data/resume_sanchit_imo_health.pdf"
# example_file_path = "../../../GenAI/Data/speech.md"
vector_db = VectorDB(embed_model="mxbai-embed-large:latest", persist_path=None)
# Ingest with multimodal support (embeddings auto-selected from factory)
status, doc_ids, message = ingest_file(user, example_file_path, vector_db)
if status:
print(doc_ids)
else:
print(f"Error: {message}")
# Retrieve the documents to verify ingestion:
relevant_docs= vector_db.retriever.invoke(
input="what contribution did Sanchit make for the retail pharmacy client?",
filter={"user_id": user},
k=2,
search_type="similarity"
)
print("\n\n.............................\n\n")
# Print the retrieved documents
print(f"Query 1")
for i, doc in enumerate(relevant_docs):
print(f"\nChunk {i+1}:")
print(doc.page_content)
print("\n\n.............................\n\n")
print(f"Metadata: {doc.metadata}")
print("\n\n.............................\n\n")
relevant_docs= vector_db.retriever.invoke(
input="what was the impact for the tech client?",
filter={"user_id": "random"},
k=2,
search_type="similarity"
)
print("\n\n.............................\n\n")
# Retrieve the documents to verify ingestion:
relevant_docs= vector_db.retriever.invoke(
input="what did Sanchit do during his employment with Samsung ?",
filter={"user_id": user},
k=2,
search_type="similarity"
)
# Print the retrieved documents
print(f"Query 2")
for i, doc in enumerate(relevant_docs):
print(f"\nChunk {i+1}:")
print(doc.page_content)
print("\n\n.............................\n\n")
print(f"Metadata: {doc.metadata}")
wait_for_all_tracers()