|
|
from typing import List |
|
|
import json |
|
|
|
|
|
from langchain_core.chat_history import BaseChatMessageHistory |
|
|
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage |
|
|
from langchain_community.chat_message_histories import ChatMessageHistory |
|
|
|
|
|
from logger import get_logger |
|
|
from llm_system import config |
|
|
|
|
|
log = get_logger(name="core_history") |
|
|
|
|
|
|
|
|
class RedisChatMessageHistory(BaseChatMessageHistory): |
|
|
"""A Redis-backed chat history implementation. |
|
|
|
|
|
Stores messages in a Redis list keyed by `chat_history:{session_id}` as JSON |
|
|
objects with fields: `role` ("human"|"ai"), `content`, `ts`. |
|
|
This implementation lazily imports `redis` so the module can be imported |
|
|
in environments where `redis` is not installed. |
|
|
""" |
|
|
|
|
|
def __init__(self, session_id: str, redis_url: 'Optional[str]' = None, ttl_seconds: int = 0): |
|
|
try: |
|
|
import redis |
|
|
except Exception: |
|
|
log.error("Redis package not available. Install `redis` to use Redis history backend.") |
|
|
raise |
|
|
|
|
|
self._redis = redis.from_url(redis_url) if redis_url else redis.Redis() |
|
|
self.session_id = session_id |
|
|
self.key = f"chat_history:{session_id}" |
|
|
self.ttl_seconds = ttl_seconds |
|
|
|
|
|
try: |
|
|
self._redis.ping() |
|
|
except Exception as e: |
|
|
log.error(f"Unable to connect to Redis at {redis_url}: {e}") |
|
|
raise |
|
|
|
|
|
@property |
|
|
def messages(self) -> List[BaseMessage]: |
|
|
"""Return the list of messages for this session as BaseMessage objects.""" |
|
|
raw = self._redis.lrange(self.key, 0, -1) |
|
|
msgs: List[BaseMessage] = [] |
|
|
for item in raw: |
|
|
try: |
|
|
obj = json.loads(item) |
|
|
role = obj.get("role") |
|
|
content = obj.get("content", "") |
|
|
if role == "ai": |
|
|
msgs.append(AIMessage(content=content)) |
|
|
else: |
|
|
msgs.append(HumanMessage(content=content)) |
|
|
except Exception: |
|
|
|
|
|
continue |
|
|
return msgs |
|
|
|
|
|
def add_message(self, message: BaseMessage) -> None: |
|
|
"""Append a message to the Redis list.""" |
|
|
role = "ai" if getattr(message, "type", None) == "ai" or message.__class__.__name__.lower().startswith("aimessage") else "human" |
|
|
payload = {"role": role, "content": getattr(message, "content", str(message)), "ts": None} |
|
|
self._redis.rpush(self.key, json.dumps(payload)) |
|
|
if self.ttl_seconds > 0: |
|
|
self._redis.expire(self.key, self.ttl_seconds) |
|
|
|
|
|
def add_messages(self, messages: List[BaseMessage]) -> None: |
|
|
if not messages: |
|
|
return |
|
|
pipe = self._redis.pipeline() |
|
|
for message in messages: |
|
|
role = "ai" if getattr(message, "type", None) == "ai" or message.__class__.__name__.lower().startswith("aimessage") else "human" |
|
|
payload = {"role": role, "content": getattr(message, "content", str(message)), "ts": None} |
|
|
pipe.rpush(self.key, json.dumps(payload)) |
|
|
if self.ttl_seconds > 0: |
|
|
pipe.expire(self.key, self.ttl_seconds) |
|
|
pipe.execute() |
|
|
|
|
|
def clear(self) -> None: |
|
|
self._redis.delete(self.key) |
|
|
|
|
|
|
|
|
class HistoryStore: |
|
|
"""A class to manage chat message histories for different sessions/users. |
|
|
|
|
|
This store can be backed by in-memory `ChatMessageHistory` (default) or |
|
|
by `RedisChatMessageHistory` when `llm_system.config.HISTORY_BACKEND == 'redis'`. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.histories = {} |
|
|
self.backend = getattr(config, "HISTORY_BACKEND", "memory") |
|
|
self.redis_url = getattr(config, "REDIS_URL", None) |
|
|
log.info(f"Initialized HistoryStore (backend={self.backend}).") |
|
|
|
|
|
def get_session_history(self, session_id: str) -> BaseChatMessageHistory: |
|
|
"""Retrieve or create the chat history for `session_id`. |
|
|
|
|
|
Returns a `BaseChatMessageHistory` implementation appropriate for the |
|
|
configured backend. |
|
|
""" |
|
|
|
|
|
if session_id in self.histories: |
|
|
log.info(f"Retrieved existing history for session: `{session_id}`") |
|
|
return self.histories[session_id] |
|
|
|
|
|
|
|
|
if self.backend == "redis": |
|
|
try: |
|
|
ttl = getattr(config, "REDIS_HISTORY_TTL_SECONDS", 0) |
|
|
hist = RedisChatMessageHistory(session_id=session_id, redis_url=self.redis_url, ttl_seconds=ttl) |
|
|
except Exception: |
|
|
log.exception("Failed to initialize RedisChatMessageHistory, falling back to in-memory.") |
|
|
hist = ChatMessageHistory() |
|
|
else: |
|
|
hist = ChatMessageHistory() |
|
|
|
|
|
self.histories[session_id] = hist |
|
|
log.info(f"Created history for session: `{session_id}` (backend={self.backend})") |
|
|
return hist |
|
|
|
|
|
def clear_session_history(self, session_id: str): |
|
|
if session_id in self.histories: |
|
|
try: |
|
|
self.histories[session_id].clear() |
|
|
except Exception: |
|
|
pass |
|
|
del self.histories[session_id] |
|
|
log.info(f"Cleared history for session: `{session_id}`") |
|
|
return True |
|
|
else: |
|
|
log.warning(f"No history found for session: `{session_id}` to clear.") |
|
|
return False |
|
|
|