|
|
""" A script which will deal with ingestion of new documents into the vector database. |
|
|
- Currently has file ingestion which supports txt, pdf, and md files. |
|
|
- Plan to add more file types in the future. |
|
|
- Plan to add web based ingestion in the future. |
|
|
- Now supports multimodal ingestion with image embeddings and OCR text extraction from images |
|
|
""" |
|
|
|
|
|
from typing import List |
|
|
from pathlib import Path |
|
|
import uuid |
|
|
|
|
|
from llm_system.utils.loader import load_file |
|
|
from llm_system.utils.splitter import split_text |
|
|
from llm_system.core.llm import get_embeddings |
|
|
from llm_system.core import colpali_embeddings |
|
|
from llm_system.core import colpali_storage |
|
|
|
|
|
|
|
|
from llm_system.core.qdrant_database import VectorDB |
|
|
from langchain_core.embeddings import Embeddings |
|
|
from langchain_core.documents import Document |
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
try: |
|
|
import pytesseract |
|
|
PYTESSERACT_AVAILABLE = True |
|
|
except ImportError: |
|
|
PYTESSERACT_AVAILABLE = False |
|
|
|
|
|
from logger import get_logger |
|
|
log = get_logger(name="core_ingestion") |
|
|
|
|
|
|
|
|
def _extract_text_from_image(image_path: str) -> str: |
|
|
"""Extract text from an image using OCR (Tesseract). |
|
|
|
|
|
Args: |
|
|
image_path: Path to the image file |
|
|
|
|
|
Returns: |
|
|
Extracted text from the image, or empty string if OCR fails or unavailable |
|
|
""" |
|
|
if not PYTESSERACT_AVAILABLE: |
|
|
return "" |
|
|
|
|
|
try: |
|
|
image = Image.open(image_path) |
|
|
|
|
|
text = pytesseract.image_to_string(image) |
|
|
if text.strip(): |
|
|
log.info(f"β
Extracted {len(text)} characters of OCR text from {Path(image_path).name}") |
|
|
return text.strip() |
|
|
else: |
|
|
log.debug(f"No text found in image via OCR: {image_path}") |
|
|
return "" |
|
|
except Exception as e: |
|
|
log.warning(f"β οΈ Failed to extract OCR text from {image_path}: {e}") |
|
|
return "" |
|
|
|
|
|
|
|
|
def _create_image_documents(documents: List[Document], user_id: str) -> List[Document]: |
|
|
"""Create Document objects for extracted images to be embedded. |
|
|
|
|
|
Extracts image references from document metadata and creates separate |
|
|
documents for each image that can be embedded with Jina multimodal embeddings. |
|
|
Automatically extracts text from images using OCR to enhance searchability. |
|
|
Only creates unique images (avoids duplicates when same images attached to multiple text chunks). |
|
|
|
|
|
Args: |
|
|
documents: List of documents that may contain extracted images in metadata |
|
|
user_id: User ID for tracking |
|
|
|
|
|
Returns: |
|
|
List of Document objects for images (empty if no images found) |
|
|
""" |
|
|
image_documents = [] |
|
|
seen_image_ids = set() |
|
|
|
|
|
|
|
|
all_images = [] |
|
|
for doc in documents: |
|
|
if 'images' in doc.metadata and doc.metadata['images']: |
|
|
all_images.extend(doc.metadata['images']) |
|
|
|
|
|
|
|
|
for img in all_images: |
|
|
image_id = img.get('image_id', 'unknown') |
|
|
|
|
|
|
|
|
if image_id in seen_image_ids: |
|
|
continue |
|
|
seen_image_ids.add(image_id) |
|
|
|
|
|
|
|
|
image_path = img.get('image_path') |
|
|
ocr_text = "" |
|
|
if image_path: |
|
|
ocr_text = _extract_text_from_image(str(image_path)) |
|
|
|
|
|
|
|
|
if ocr_text: |
|
|
try: |
|
|
from pathlib import Path |
|
|
ocr_file_path = Path(str(image_path)).with_suffix('.txt') |
|
|
ocr_file_path.write_text(ocr_text, encoding='utf-8') |
|
|
log.debug(f"Saved OCR text to: {ocr_file_path}") |
|
|
except Exception as e: |
|
|
log.warning(f"Could not save OCR text to file: {e}") |
|
|
|
|
|
|
|
|
if ocr_text: |
|
|
page_content = f"[IMAGE] {image_id}\n\nOCR Text:\n{ocr_text}" |
|
|
else: |
|
|
page_content = f"[IMAGE] {image_id}\n\nImage extracted from document. Use visual understanding to analyze." |
|
|
|
|
|
|
|
|
colpali_embedding = None |
|
|
colpali_embedding_summary = None |
|
|
if colpali_embeddings.is_colpali_available() and image_path: |
|
|
colpali_embedding = colpali_embeddings.embed_image(str(image_path)) |
|
|
|
|
|
if colpali_embedding: |
|
|
colpali_embedding_summary = colpali_embedding.get('num_patches', 0) |
|
|
|
|
|
colpali_storage.save_colpali_embedding(image_id, user_id, colpali_embedding) |
|
|
|
|
|
|
|
|
image_doc = Document( |
|
|
page_content=page_content, |
|
|
metadata={ |
|
|
'user_id': user_id, |
|
|
'type': 'image', |
|
|
'image_id': image_id, |
|
|
'image_path': str(image_path), |
|
|
'page_number': img.get('page_number'), |
|
|
'position': img.get('position'), |
|
|
'source_pdf': img.get('metadata', {}).get('source_pdf'), |
|
|
'extractor': img.get('metadata', {}).get('extractor'), |
|
|
'has_ocr_text': bool(ocr_text), |
|
|
'colpali_available': colpali_embedding_summary is not None, |
|
|
'colpali_patches': colpali_embedding_summary or 0, |
|
|
} |
|
|
) |
|
|
image_documents.append(image_doc) |
|
|
|
|
|
if image_documents: |
|
|
ocr_count = sum(1 for doc in image_documents if doc.metadata.get('has_ocr_text')) |
|
|
colpali_count = sum(1 for doc in image_documents if doc.metadata.get('colpali_available')) |
|
|
log.info(f"β
Created {len(image_documents)} unique image documents ({ocr_count} with OCR, {colpali_count} with ColPali embeddings)") |
|
|
|
|
|
return image_documents |
|
|
|
|
|
|
|
|
def ingest_file(user_id: str, file_path: str, vectorstore: VectorDB, |
|
|
embeddings: Embeddings = None) -> tuple[bool, List[str], str]: |
|
|
"""Ingest a file into the vector database with multimodal support. |
|
|
|
|
|
Ingests both text chunks and extracted images, using Jina embeddings |
|
|
if available for multimodal support. |
|
|
|
|
|
Returns the ids of vector embeddings stored in database. |
|
|
|
|
|
Args: |
|
|
user_id: User identifier for document tracking |
|
|
file_path (str): The absolute path to the file to be ingested. |
|
|
vectorstore (VectorDB): The vector database instance. |
|
|
embeddings (Embeddings): Optional embeddings model. If None, uses get_embeddings() factory. |
|
|
|
|
|
Returns: |
|
|
tuple[bool, List[str], str]: A tuple containing: |
|
|
- bool: True if ingestion was successful, False otherwise. |
|
|
- List[str]: List of document IDs that were ingested. |
|
|
- str: Message indicating the result of the ingestion. |
|
|
""" |
|
|
|
|
|
log.info(f"π [INGESTION START] user_id={user_id}, file_path={file_path}") |
|
|
|
|
|
|
|
|
if embeddings is None: |
|
|
log.info("π Using embeddings from factory (will use Jina if configured)") |
|
|
embeddings = get_embeddings() |
|
|
|
|
|
|
|
|
log.info(f"π [STEP 1] Loading file from: {file_path}") |
|
|
status, documents, message = load_file(user_id, file_path) |
|
|
log.info(f"π [STEP 1 RESULT] status={status}, docs_count={len(documents) if documents else 0}, message={message}") |
|
|
|
|
|
if not status: |
|
|
log.error(f"β [STEP 1 FAILED] load_file returned False: {message}") |
|
|
return False, [], message |
|
|
|
|
|
|
|
|
log.info(f"βοΈ [STEP 2] Splitting {len(documents)} documents into chunks") |
|
|
status, split_docs, message = split_text(documents) |
|
|
log.info(f"βοΈ [STEP 2 RESULT] status={status}, chunks_count={len(split_docs) if split_docs else 0}, message={message}") |
|
|
|
|
|
if status and not split_docs: |
|
|
log.warning(f"β οΈ [STEP 2 WARNING] No content found in the file: {file_path}") |
|
|
return True, [], f"No content found in the file: {file_path}" |
|
|
|
|
|
if not status: |
|
|
log.error(f"β [STEP 2 FAILED] split_text returned False: {message}") |
|
|
return False, [], message |
|
|
|
|
|
|
|
|
log.info(f"πΌοΈ [STEP 2.5] Extracting image documents for multimodal embedding") |
|
|
image_docs = _create_image_documents(documents, user_id) |
|
|
|
|
|
|
|
|
all_docs_to_embed = split_docs + image_docs |
|
|
total_docs_count = len(all_docs_to_embed) |
|
|
|
|
|
log.info(f"π¦ [STEP 2.5 RESULT] text_chunks={len(split_docs)}, image_docs={len(image_docs)}, total={total_docs_count}") |
|
|
|
|
|
|
|
|
log.info(f"πΎ [STEP 3] Adding {total_docs_count} documents (text + images) to vector database") |
|
|
try: |
|
|
doc_ids = vectorstore.db.add_documents(all_docs_to_embed) |
|
|
log.info(f"πΎ [STEP 3 RESULT] doc_ids={len(doc_ids) if doc_ids else 0}") |
|
|
|
|
|
log.info(f"πΏ [STEP 4] Saving vector database to disk") |
|
|
if vectorstore.save_db_to_disk(): |
|
|
log.info(f"β
[INGESTION SUCCESS] Ingested {total_docs_count} items ({len(split_docs)} text chunks + {len(image_docs)} images) from {file_path}") |
|
|
return True, doc_ids, f"Ingested {total_docs_count} items successfully ({len(split_docs)} text chunks + {len(image_docs)} images)." |
|
|
else: |
|
|
log.error("β [STEP 4 FAILED] Failed to save the vector database to disk after ingestion.") |
|
|
return False, [], "Failed to save the vector database to disk after ingestion." |
|
|
|
|
|
except Exception as e: |
|
|
log.error(f"β [STEP 3 EXCEPTION] Failed to add documents to vector database: {e}") |
|
|
import traceback |
|
|
log.error(f"Traceback: {traceback.format_exc()}") |
|
|
return False, [], f"Failed to ingest documents: {e}" |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
from dotenv import load_dotenv |
|
|
from langchain.callbacks.tracers.langchain import wait_for_all_tracers |
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
user = "test_user" |
|
|
|
|
|
example_file_path="/Users/neetikasaxena/Documents/sanchit/sample_code/chat-with-your-data/test_data/resume_sanchit_imo_health.pdf" |
|
|
|
|
|
|
|
|
vector_db = VectorDB(embed_model="mxbai-embed-large:latest", persist_path=None) |
|
|
|
|
|
|
|
|
status, doc_ids, message = ingest_file(user, example_file_path, vector_db) |
|
|
if status: |
|
|
print(doc_ids) |
|
|
else: |
|
|
print(f"Error: {message}") |
|
|
|
|
|
|
|
|
relevant_docs= vector_db.retriever.invoke( |
|
|
input="what contribution did Sanchit make for the retail pharmacy client?", |
|
|
filter={"user_id": user}, |
|
|
k=2, |
|
|
search_type="similarity" |
|
|
) |
|
|
|
|
|
print("\n\n.............................\n\n") |
|
|
|
|
|
print(f"Query 1") |
|
|
for i, doc in enumerate(relevant_docs): |
|
|
print(f"\nChunk {i+1}:") |
|
|
print(doc.page_content) |
|
|
print("\n\n.............................\n\n") |
|
|
print(f"Metadata: {doc.metadata}") |
|
|
|
|
|
print("\n\n.............................\n\n") |
|
|
|
|
|
relevant_docs= vector_db.retriever.invoke( |
|
|
input="what was the impact for the tech client?", |
|
|
filter={"user_id": "random"}, |
|
|
k=2, |
|
|
search_type="similarity" |
|
|
) |
|
|
|
|
|
print("\n\n.............................\n\n") |
|
|
|
|
|
|
|
|
|
|
|
relevant_docs= vector_db.retriever.invoke( |
|
|
input="what did Sanchit do during his employment with Samsung ?", |
|
|
filter={"user_id": user}, |
|
|
k=2, |
|
|
search_type="similarity" |
|
|
) |
|
|
|
|
|
|
|
|
print(f"Query 2") |
|
|
for i, doc in enumerate(relevant_docs): |
|
|
print(f"\nChunk {i+1}:") |
|
|
print(doc.page_content) |
|
|
print("\n\n.............................\n\n") |
|
|
print(f"Metadata: {doc.metadata}") |
|
|
|
|
|
wait_for_all_tracers() |
|
|
|