""" A script which will deal with ingestion of new documents into the vector database. - Currently has file ingestion which supports txt, pdf, and md files. - Plan to add more file types in the future. - Plan to add web based ingestion in the future. - Now supports multimodal ingestion with image embeddings and OCR text extraction from images """ from typing import List from pathlib import Path import uuid from llm_system.utils.loader import load_file from llm_system.utils.splitter import split_text from llm_system.core.llm import get_embeddings from llm_system.core import colpali_embeddings from llm_system.core import colpali_storage # For type hinting from llm_system.core.qdrant_database import VectorDB from langchain_core.embeddings import Embeddings from langchain_core.documents import Document from PIL import Image # Try to import pytesseract for OCR try: import pytesseract PYTESSERACT_AVAILABLE = True except ImportError: PYTESSERACT_AVAILABLE = False from logger import get_logger log = get_logger(name="core_ingestion") def _extract_text_from_image(image_path: str) -> str: """Extract text from an image using OCR (Tesseract). Args: image_path: Path to the image file Returns: Extracted text from the image, or empty string if OCR fails or unavailable """ if not PYTESSERACT_AVAILABLE: return "" try: image = Image.open(image_path) # Extract text using Tesseract OCR text = pytesseract.image_to_string(image) if text.strip(): log.info(f"✅ Extracted {len(text)} characters of OCR text from {Path(image_path).name}") return text.strip() else: log.debug(f"No text found in image via OCR: {image_path}") return "" except Exception as e: log.warning(f"⚠️ Failed to extract OCR text from {image_path}: {e}") return "" def _create_image_documents(documents: List[Document], user_id: str) -> List[Document]: """Create Document objects for extracted images to be embedded. Extracts image references from document metadata and creates separate documents for each image that can be embedded with Jina multimodal embeddings. Automatically extracts text from images using OCR to enhance searchability. Only creates unique images (avoids duplicates when same images attached to multiple text chunks). Args: documents: List of documents that may contain extracted images in metadata user_id: User ID for tracking Returns: List of Document objects for images (empty if no images found) """ image_documents = [] seen_image_ids = set() # Track already-created images to avoid duplicates # Collect all unique images from all documents all_images = [] for doc in documents: if 'images' in doc.metadata and doc.metadata['images']: all_images.extend(doc.metadata['images']) # Create documents for unique images only for img in all_images: image_id = img.get('image_id', 'unknown') # Skip if we've already created a document for this image if image_id in seen_image_ids: continue seen_image_ids.add(image_id) # Extract text from image using OCR image_path = img.get('image_path') ocr_text = "" if image_path: ocr_text = _extract_text_from_image(str(image_path)) # Save OCR text to file alongside image (for auditing/reference) if ocr_text: try: from pathlib import Path ocr_file_path = Path(str(image_path)).with_suffix('.txt') ocr_file_path.write_text(ocr_text, encoding='utf-8') log.debug(f"Saved OCR text to: {ocr_file_path}") except Exception as e: log.warning(f"Could not save OCR text to file: {e}") # Create page_content with OCR text if available, otherwise minimal content if ocr_text: page_content = f"[IMAGE] {image_id}\n\nOCR Text:\n{ocr_text}" else: page_content = f"[IMAGE] {image_id}\n\nImage extracted from document. Use visual understanding to analyze." # Try to generate ColPali embeddings for visual understanding colpali_embedding = None colpali_embedding_summary = None if colpali_embeddings.is_colpali_available() and image_path: colpali_embedding = colpali_embeddings.embed_image(str(image_path)) # Store a summary (num_patches) for Qdrant persistence if colpali_embedding: colpali_embedding_summary = colpali_embedding.get('num_patches', 0) # Persist full embedding to disk for later retrieval colpali_storage.save_colpali_embedding(image_id, user_id, colpali_embedding) # Create a document for this image image_doc = Document( page_content=page_content, metadata={ 'user_id': user_id, 'type': 'image', 'image_id': image_id, 'image_path': str(image_path), # Ensure string for serialization 'page_number': img.get('page_number'), 'position': img.get('position'), 'source_pdf': img.get('metadata', {}).get('source_pdf'), 'extractor': img.get('metadata', {}).get('extractor'), 'has_ocr_text': bool(ocr_text), 'colpali_available': colpali_embedding_summary is not None, 'colpali_patches': colpali_embedding_summary or 0, # Number of patch embeddings } ) image_documents.append(image_doc) if image_documents: ocr_count = sum(1 for doc in image_documents if doc.metadata.get('has_ocr_text')) colpali_count = sum(1 for doc in image_documents if doc.metadata.get('colpali_available')) log.info(f"✅ Created {len(image_documents)} unique image documents ({ocr_count} with OCR, {colpali_count} with ColPali embeddings)") return image_documents def ingest_file(user_id: str, file_path: str, vectorstore: VectorDB, embeddings: Embeddings = None) -> tuple[bool, List[str], str]: """Ingest a file into the vector database with multimodal support. Ingests both text chunks and extracted images, using Jina embeddings if available for multimodal support. Returns the ids of vector embeddings stored in database. Args: user_id: User identifier for document tracking file_path (str): The absolute path to the file to be ingested. vectorstore (VectorDB): The vector database instance. embeddings (Embeddings): Optional embeddings model. If None, uses get_embeddings() factory. Returns: tuple[bool, List[str], str]: A tuple containing: - bool: True if ingestion was successful, False otherwise. - List[str]: List of document IDs that were ingested. - str: Message indicating the result of the ingestion. """ log.info(f"🔄 [INGESTION START] user_id={user_id}, file_path={file_path}") # Use provided embeddings or get from factory (supports Jina multimodal) if embeddings is None: log.info("📊 Using embeddings from factory (will use Jina if configured)") embeddings = get_embeddings() # Load the file and get its content as Document objects: log.info(f"📂 [STEP 1] Loading file from: {file_path}") status, documents, message = load_file(user_id, file_path) log.info(f"📂 [STEP 1 RESULT] status={status}, docs_count={len(documents) if documents else 0}, message={message}") if not status: log.error(f"❌ [STEP 1 FAILED] load_file returned False: {message}") return False, [], message # Split the documents into smaller chunks: log.info(f"✂️ [STEP 2] Splitting {len(documents)} documents into chunks") status, split_docs, message = split_text(documents) log.info(f"✂️ [STEP 2 RESULT] status={status}, chunks_count={len(split_docs) if split_docs else 0}, message={message}") if status and not split_docs: log.warning(f"⚠️ [STEP 2 WARNING] No content found in the file: {file_path}") return True, [], f"No content found in the file: {file_path}" if not status: log.error(f"❌ [STEP 2 FAILED] split_text returned False: {message}") return False, [], message # Create image documents for multimodal embedding log.info(f"🖼️ [STEP 2.5] Extracting image documents for multimodal embedding") image_docs = _create_image_documents(documents, user_id) # Combine text chunks and image documents all_docs_to_embed = split_docs + image_docs total_docs_count = len(all_docs_to_embed) log.info(f"📦 [STEP 2.5 RESULT] text_chunks={len(split_docs)}, image_docs={len(image_docs)}, total={total_docs_count}") # Add the documents to the vector database: log.info(f"💾 [STEP 3] Adding {total_docs_count} documents (text + images) to vector database") try: doc_ids = vectorstore.db.add_documents(all_docs_to_embed) log.info(f"💾 [STEP 3 RESULT] doc_ids={len(doc_ids) if doc_ids else 0}") log.info(f"💿 [STEP 4] Saving vector database to disk") if vectorstore.save_db_to_disk(): log.info(f"✅ [INGESTION SUCCESS] Ingested {total_docs_count} items ({len(split_docs)} text chunks + {len(image_docs)} images) from {file_path}") return True, doc_ids, f"Ingested {total_docs_count} items successfully ({len(split_docs)} text chunks + {len(image_docs)} images)." else: log.error("❌ [STEP 4 FAILED] Failed to save the vector database to disk after ingestion.") return False, [], "Failed to save the vector database to disk after ingestion." except Exception as e: log.error(f"❌ [STEP 3 EXCEPTION] Failed to add documents to vector database: {e}") import traceback log.error(f"Traceback: {traceback.format_exc()}") return False, [], f"Failed to ingest documents: {e}" if __name__ == "__main__": from dotenv import load_dotenv from langchain.callbacks.tracers.langchain import wait_for_all_tracers load_dotenv() # Example usage user = "test_user" # example_file_path = "../../../GenAI/Data/attention_is_all_you_need_1706.03762v7.pdf" example_file_path="/Users/neetikasaxena/Documents/sanchit/sample_code/chat-with-your-data/test_data/resume_sanchit_imo_health.pdf" # example_file_path = "../../../GenAI/Data/speech.md" vector_db = VectorDB(embed_model="mxbai-embed-large:latest", persist_path=None) # Ingest with multimodal support (embeddings auto-selected from factory) status, doc_ids, message = ingest_file(user, example_file_path, vector_db) if status: print(doc_ids) else: print(f"Error: {message}") # Retrieve the documents to verify ingestion: relevant_docs= vector_db.retriever.invoke( input="what contribution did Sanchit make for the retail pharmacy client?", filter={"user_id": user}, k=2, search_type="similarity" ) print("\n\n.............................\n\n") # Print the retrieved documents print(f"Query 1") for i, doc in enumerate(relevant_docs): print(f"\nChunk {i+1}:") print(doc.page_content) print("\n\n.............................\n\n") print(f"Metadata: {doc.metadata}") print("\n\n.............................\n\n") relevant_docs= vector_db.retriever.invoke( input="what was the impact for the tech client?", filter={"user_id": "random"}, k=2, search_type="similarity" ) print("\n\n.............................\n\n") # Retrieve the documents to verify ingestion: relevant_docs= vector_db.retriever.invoke( input="what did Sanchit do during his employment with Samsung ?", filter={"user_id": user}, k=2, search_type="similarity" ) # Print the retrieved documents print(f"Query 2") for i, doc in enumerate(relevant_docs): print(f"\nChunk {i+1}:") print(doc.page_content) print("\n\n.............................\n\n") print(f"Metadata: {doc.metadata}") wait_for_all_tracers()