""" Database Module for LLM System - Qdrant Vector Store - Contains the `VectorDB` class to manage a vector database using Qdrant and embeddings. - Supports both traditional text embeddings (Ollama) and multimodal embeddings (Jina v4). - Provides methods to initialize the database, retrieve embeddings, and perform similarity searches. - Replaces FAISS with Qdrant for production-ready vector storage """ import os from typing import Tuple, Optional, List from langchain_core.documents import Document from langchain_qdrant import QdrantVectorStore from langchain_core.runnables import ConfigurableField from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams # For type hinting from langchain_core.embeddings import Embeddings from langchain_core.vectorstores import VectorStore from langchain_core.vectorstores import VectorStoreRetriever # For hybrid search from langchain_community.retrievers import BM25Retriever from langchain.retrievers.ensemble import EnsembleRetriever # config: from llm_system.config import QDRANT_URL, QDRANT_COLLECTION_NAME, QDRANT_API_KEY, EMB_MODEL_NAME, OLLAMA_BASE_URL, USE_JINA_EMBEDDINGS # Import embeddings factory from llm_system.core.llm import get_embeddings from logger import get_logger log = get_logger(name="core_database") class VectorDB: """A class to manage the vector database using Qdrant with flexible embeddings. Args: embed_model (str): The name of the embeddings model to use (for Ollama fallback). retriever_num_docs (int): Number of documents to retrieve for similarity search. verify_connection (bool): Whether to verify the connection to the embeddings model. use_jina (bool, optional): Whether to use Jina multimodal embeddings. If None, uses config. qdrant_url (str, optional): Qdrant server URL. Defaults to config value. collection_name (str, optional): Qdrant collection name. Defaults to config value. api_key (str, optional): API key for Qdrant Cloud. Defaults to config value. ## Functions: + `get_embeddings()`: Returns the embeddings model (Jina or Ollama). + `get_vector_store()`: Returns the Qdrant vector store. + `get_retriever()`: Returns the retriever configured for similarity search. + `save_db_to_disk()`: No-op for Qdrant (data persisted automatically on server). """ def __init__( self, embed_model: str, retriever_num_docs: int = 5, verify_connection: bool = False, use_jina: Optional[bool] = None, qdrant_url: Optional[str] = None, collection_name: Optional[str] = None, api_key: Optional[str] = None ): self.qdrant_url: str = qdrant_url or QDRANT_URL self.collection_name: str = collection_name or QDRANT_COLLECTION_NAME self.api_key: Optional[str] = api_key or QDRANT_API_KEY self.retriever_k: int = retriever_num_docs self.embed_model = embed_model log.info( f"Initializing VectorDB with embeddings='{embed_model}', Qdrant URL='{self.qdrant_url}', " f"collection='{self.collection_name}', k={retriever_num_docs} docs." ) # Initialize embeddings (Jina multimodal or Ollama fallback) if use_jina is None: use_jina = USE_JINA_EMBEDDINGS if use_jina: log.info("Using Jina multimodal embeddings") self.embeddings = get_embeddings(use_jina=True) else: log.info(f"Using Ollama embeddings: {embed_model}") from langchain_ollama import OllamaEmbeddings self.embeddings = OllamaEmbeddings(model=embed_model, base_url=OLLAMA_BASE_URL, num_gpu=0, keep_alive=-1) if verify_connection: try: self.embeddings.embed_documents(['a']) log.info(f"Embeddings model initialized and verified.") except Exception as e: log.error(f"Failed to initialize Embeddings: {e}") raise RuntimeError(f"Couldn't initialize Embeddings model") from e else: log.warning(f"Embeddings initialized without connection verification.") # Initialize Qdrant client try: self.qdrant_client = QdrantClient( url=self.qdrant_url, api_key=self.api_key, timeout=30 ) log.info(f"Connected to Qdrant at '{self.qdrant_url}'.") except Exception as e: log.error(f"Failed to connect to Qdrant: {e}") raise RuntimeError(f"Couldn't connect to Qdrant at '{self.qdrant_url}'") from e # Get embedding dimension sample_embedding = self.embeddings.embed_query("test") embedding_dim = len(sample_embedding) log.info(f"Embedding dimension: {embedding_dim}") # Check if collection exists, create if not collections = self.qdrant_client.get_collections() collection_names = [c.name for c in collections.collections] if self.collection_name not in collection_names: log.info(f"Collection '{self.collection_name}' not found. Creating new collection.") self.qdrant_client.create_collection( collection_name=self.collection_name, vectors_config=VectorParams(size=embedding_dim, distance=Distance.COSINE), ) log.info(f"Created collection '{self.collection_name}' with dimension {embedding_dim}.") else: log.info(f"Using existing collection '{self.collection_name}'.") # Initialize Qdrant vector store self.db = QdrantVectorStore( client=self.qdrant_client, collection_name=self.collection_name, embedding=self.embeddings, ) log.info("Qdrant vector store initialized.") # Create semantic search retriever (similarity) semantic_retriever = self.db.as_retriever( search_kwargs={ "k": self.retriever_k, } ) # Create BM25 keyword-based retriever for hybrid search try: bm25_retriever = BM25Retriever.from_documents([], k=self.retriever_k) self.bm25_retriever = bm25_retriever log.info("Initialized BM25 retriever for hybrid search") except Exception as e: log.warning(f"Could not initialize BM25 retriever: {e}. Using semantic search only.") self.bm25_retriever = None self.retriever = semantic_retriever self.semantic_retriever = semantic_retriever def get_embeddings(self) -> Embeddings: """Returns the Ollama embeddings model.""" log.info("Returning the Embeddings model instance.") return self.embeddings def get_vector_store(self) -> VectorStore: """Returns the Qdrant vector store instance.""" log.info("Returning the Qdrant vector store instance.") return self.db def get_retriever(self) -> VectorStoreRetriever: """Returns the configurable retriever for similarity search.""" log.info("Returning the retriever for similarity search.") return self.retriever # type: ignore[return-value] def get_hybrid_retriever(self, use_hybrid: bool = True): """Returns a hybrid retriever combining semantic and keyword search. Args: use_hybrid: If True, returns ensemble of semantic + BM25 retrieval. If False, returns semantic search only. Returns: Retriever that combines semantic and keyword-based search """ if not use_hybrid or self.bm25_retriever is None: log.info("Returning semantic search only") return self.semantic_retriever try: ensemble = EnsembleRetriever( retrievers=[self.semantic_retriever, self.bm25_retriever], weights=[0.6, 0.4] ) log.info("Returning hybrid retriever (semantic + BM25 ensemble)") return ensemble except Exception as e: log.warning(f"Could not create ensemble retriever: {e}. Using semantic search only.") return self.semantic_retriever def save_db_to_disk(self) -> bool: """No-op for Qdrant (data is automatically persisted on the server). Colpali embeddings are stored in document metadata and automatically persisted with the documents in Qdrant. Returns: bool: Always returns True as Qdrant handles persistence automatically. """ log.info("Qdrant automatically persists data on server. No manual save needed.") return True def get_collection_info(self) -> dict: """Get information about the current collection. Returns: dict: Collection information including size, vectors count, etc. """ try: info = self.qdrant_client.get_collection(self.collection_name) log.info(f"Retrieved collection info for '{self.collection_name}'.") return { "name": self.collection_name, "vectors_count": info.vectors_count, "points_count": info.points_count, "status": info.status, } except Exception as e: log.error(f"Failed to get collection info: {e}") return {} def delete_by_filter(self, filter_dict: dict) -> bool: """Delete documents matching the filter criteria. Args: filter_dict: Qdrant filter dictionary (e.g., {"user_id": "test_user"}) Returns: bool: True if deletion was successful. """ try: from qdrant_client.models import Filter, FieldCondition, MatchValue # Convert simple dict to Qdrant filter conditions = [] for key, value in filter_dict.items(): conditions.append( FieldCondition( key=key, match=MatchValue(value=value) ) ) filter_obj = Filter(must=conditions) self.qdrant_client.delete( collection_name=self.collection_name, points_selector=filter_obj ) log.info(f"Deleted documents matching filter: {filter_dict}") return True except Exception as e: log.error(f"Failed to delete documents: {e}") return False def create_hybrid_retriever_wrapper(self, base_retriever): """Create a hybrid retriever that combines semantic and keyword search. This wraps the base retriever to add keyword-based (BM25) results to semantic search results, prioritizing nutrition facts and specific terms. Args: base_retriever: The base semantic search retriever from Qdrant Returns: A wrapper retriever that performs hybrid search """ def hybrid_invoke(query: str, **kwargs): # Get semantic results semantic_docs = base_retriever.invoke(query, **kwargs) # Try to enhance with keyword search if BM25 is available if self.bm25_retriever is None: log.debug("BM25 not available, returning semantic results only") return semantic_docs try: # Get keyword results keyword_docs = self.bm25_retriever.invoke(query) # Combine and deduplicate (prioritize semantic, add unique keyword results) seen_ids = {doc.metadata.get('id') for doc in semantic_docs} combined = list(semantic_docs) for doc in keyword_docs: if doc.metadata.get('id') not in seen_ids: combined.append(doc) seen_ids.add(doc.metadata.get('id')) if len(combined) >= self.retriever_k: break log.debug(f"Hybrid search returned {len(semantic_docs)} semantic + {len(combined) - len(semantic_docs)} keyword results") return combined[:self.retriever_k] except Exception as e: log.warning(f"Error in hybrid search, falling back to semantic: {e}") return semantic_docs # Create a simple wrapper class class HybridRetriever: def __init__(self, invoke_fn): self.invoke_fn = invoke_fn def invoke(self, query, **kwargs): return self.invoke_fn(query, **kwargs) async def ainvoke(self, query, **kwargs): return self.invoke_fn(query, **kwargs) return HybridRetriever(hybrid_invoke)