sanchitshaleen
Initial deployment of RAG with Gemma-3 to Hugging Face Spaces
4aec76b
from typing import List
import json
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from langchain_community.chat_message_histories import ChatMessageHistory
from logger import get_logger
from llm_system import config
log = get_logger(name="core_history")
class RedisChatMessageHistory(BaseChatMessageHistory):
"""A Redis-backed chat history implementation.
Stores messages in a Redis list keyed by `chat_history:{session_id}` as JSON
objects with fields: `role` ("human"|"ai"), `content`, `ts`.
This implementation lazily imports `redis` so the module can be imported
in environments where `redis` is not installed.
"""
def __init__(self, session_id: str, redis_url: 'Optional[str]' = None, ttl_seconds: int = 0):
try:
import redis
except Exception:
log.error("Redis package not available. Install `redis` to use Redis history backend.")
raise
self._redis = redis.from_url(redis_url) if redis_url else redis.Redis()
self.session_id = session_id
self.key = f"chat_history:{session_id}"
self.ttl_seconds = ttl_seconds # 0 = no expiry
# Try a quick ping to validate the connection and fail fast if Redis is unreachable
try:
self._redis.ping()
except Exception as e:
log.error(f"Unable to connect to Redis at {redis_url}: {e}")
raise
@property
def messages(self) -> List[BaseMessage]:
"""Return the list of messages for this session as BaseMessage objects."""
raw = self._redis.lrange(self.key, 0, -1)
msgs: List[BaseMessage] = []
for item in raw:
try:
obj = json.loads(item)
role = obj.get("role")
content = obj.get("content", "")
if role == "ai":
msgs.append(AIMessage(content=content))
else:
msgs.append(HumanMessage(content=content))
except Exception:
# skip malformed entries
continue
return msgs
def add_message(self, message: BaseMessage) -> None:
"""Append a message to the Redis list."""
role = "ai" if getattr(message, "type", None) == "ai" or message.__class__.__name__.lower().startswith("aimessage") else "human"
payload = {"role": role, "content": getattr(message, "content", str(message)), "ts": None}
self._redis.rpush(self.key, json.dumps(payload))
if self.ttl_seconds > 0:
self._redis.expire(self.key, self.ttl_seconds)
def add_messages(self, messages: List[BaseMessage]) -> None:
if not messages:
return
pipe = self._redis.pipeline()
for message in messages:
role = "ai" if getattr(message, "type", None) == "ai" or message.__class__.__name__.lower().startswith("aimessage") else "human"
payload = {"role": role, "content": getattr(message, "content", str(message)), "ts": None}
pipe.rpush(self.key, json.dumps(payload))
if self.ttl_seconds > 0:
pipe.expire(self.key, self.ttl_seconds)
pipe.execute()
def clear(self) -> None:
self._redis.delete(self.key)
class HistoryStore:
"""A class to manage chat message histories for different sessions/users.
This store can be backed by in-memory `ChatMessageHistory` (default) or
by `RedisChatMessageHistory` when `llm_system.config.HISTORY_BACKEND == 'redis'`.
"""
def __init__(self):
self.histories = {}
self.backend = getattr(config, "HISTORY_BACKEND", "memory")
self.redis_url = getattr(config, "REDIS_URL", None)
log.info(f"Initialized HistoryStore (backend={self.backend}).")
def get_session_history(self, session_id: str) -> BaseChatMessageHistory:
"""Retrieve or create the chat history for `session_id`.
Returns a `BaseChatMessageHistory` implementation appropriate for the
configured backend.
"""
if session_id in self.histories:
log.info(f"Retrieved existing history for session: `{session_id}`")
return self.histories[session_id]
# Create a new history according to backend
if self.backend == "redis":
try:
ttl = getattr(config, "REDIS_HISTORY_TTL_SECONDS", 0)
hist = RedisChatMessageHistory(session_id=session_id, redis_url=self.redis_url, ttl_seconds=ttl)
except Exception:
log.exception("Failed to initialize RedisChatMessageHistory, falling back to in-memory.")
hist = ChatMessageHistory()
else:
hist = ChatMessageHistory()
self.histories[session_id] = hist
log.info(f"Created history for session: `{session_id}` (backend={self.backend})")
return hist
def clear_session_history(self, session_id: str):
if session_id in self.histories:
try:
self.histories[session_id].clear()
except Exception:
pass
del self.histories[session_id]
log.info(f"Cleared history for session: `{session_id}`")
return True
else:
log.warning(f"No history found for session: `{session_id}` to clear.")
return False