|
|
import time |
|
|
import traceback |
|
|
from typing import List, Dict, Any, Tuple, Optional |
|
|
from models.schemas import RAGSearchResult |
|
|
from core.multilingual_manager import MultilingualManager |
|
|
from core.rag_system import EnhancedRAGSystem |
|
|
|
|
|
class ConversationalAgent: |
|
|
"""Lớp xử lý hội thoại với grounding từ RAG""" |
|
|
|
|
|
def __init__(self, rag_system: EnhancedRAGSystem, multilingual_manager: MultilingualManager): |
|
|
self.rag_system = rag_system |
|
|
self.multilingual_manager = multilingual_manager |
|
|
self.conversation_history = [] |
|
|
self.max_history_length = 10 |
|
|
|
|
|
def process_query(self, query: str, chat_history: List[Tuple[str, str]] = None) -> Tuple[str, List[Dict]]: |
|
|
"""Xử lý truy vấn hội thoại với grounding từ RAG""" |
|
|
try: |
|
|
print(f"🤖 CAG: Xử lý truy vấn: {query}") |
|
|
|
|
|
|
|
|
language = self.multilingual_manager.detect_language(query) |
|
|
print(f"🌐 CAG: Ngôn ngữ phát hiện: {language}") |
|
|
|
|
|
|
|
|
search_results = self.rag_system.semantic_search(query, top_k=3) |
|
|
|
|
|
|
|
|
context = self._build_context_from_results(search_results, query, language) |
|
|
|
|
|
|
|
|
self._update_conversation_history("user", query) |
|
|
|
|
|
|
|
|
prompt = self._create_grounded_prompt(query, context, language, chat_history) |
|
|
|
|
|
|
|
|
response = self._generate_response(prompt, language, search_results) |
|
|
|
|
|
|
|
|
self._update_conversation_history("assistant", response) |
|
|
|
|
|
|
|
|
results_dict = [] |
|
|
for result in search_results: |
|
|
results_dict.append({ |
|
|
"id": result.id, |
|
|
"text": result.text[:150] + "..." if len(result.text) > 150 else result.text, |
|
|
"similarity": round(result.similarity, 3), |
|
|
"metadata": result.metadata |
|
|
}) |
|
|
|
|
|
print(f"✅ CAG: Đã xử lý, tìm thấy {len(results_dict)} kết quả") |
|
|
return response, results_dict |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Xin lỗi, đã có lỗi xảy ra: {str(e)}" |
|
|
print(f"❌ CAG Lỗi: {traceback.format_exc()}") |
|
|
return error_msg, [] |
|
|
|
|
|
def _build_context_from_results(self, results: List[RAGSearchResult], query: str, language: str) -> str: |
|
|
"""Xây dựng context từ kết quả tìm kiếm""" |
|
|
if not results: |
|
|
if language == 'vi': |
|
|
return "Không có thông tin liên quan trong cơ sở tri thức." |
|
|
else: |
|
|
return "No relevant information found in the knowledge base." |
|
|
|
|
|
if language == 'vi': |
|
|
context_parts = [f"Thông tin liên quan đến '{query}':"] |
|
|
else: |
|
|
context_parts = [f"Relevant information about '{query}':"] |
|
|
|
|
|
for i, result in enumerate(results[:3]): |
|
|
source = result.metadata.get('source', 'unknown') |
|
|
lang = result.metadata.get('language', 'unknown') |
|
|
|
|
|
if language == 'vi': |
|
|
context_parts.append(f"\n{i+1}. {result.text}") |
|
|
context_parts.append(f" (Nguồn: {source}, Ngôn ngữ: {lang})") |
|
|
else: |
|
|
context_parts.append(f"\n{i+1}. {result.text}") |
|
|
context_parts.append(f" (Source: {source}, Language: {lang})") |
|
|
|
|
|
return "\n".join(context_parts) |
|
|
|
|
|
def _create_grounded_prompt(self, query: str, context: str, language: str, |
|
|
chat_history: List[Tuple[str, str]] = None) -> str: |
|
|
"""Tạo prompt với grounding từ context""" |
|
|
|
|
|
|
|
|
history_text = "" |
|
|
if chat_history and len(chat_history) > 0: |
|
|
if language == 'vi': |
|
|
history_parts = ["Lịch sử hội thoại trước:"] |
|
|
for user_msg, assistant_msg in chat_history[-2:]: |
|
|
history_parts.append(f"Bạn: {user_msg}") |
|
|
history_parts.append(f"Trợ lý: {assistant_msg}") |
|
|
else: |
|
|
history_parts = ["Previous conversation:"] |
|
|
for user_msg, assistant_msg in chat_history[-2:]: |
|
|
history_parts.append(f"You: {user_msg}") |
|
|
history_parts.append(f"Assistant: {assistant_msg}") |
|
|
history_text = "\n".join(history_parts) + "\n\n" |
|
|
|
|
|
|
|
|
if language == 'vi': |
|
|
prompt = f"""{history_text}THÔNG TIN TỪ CƠ SỞ DỮ LIỆU: |
|
|
{context} |
|
|
|
|
|
CÂU HỎI HIỆN TẠI: {query} |
|
|
|
|
|
YÊU CẦU: |
|
|
- Trả lời dựa trên thông tin được cung cấp |
|
|
- Nếu thiếu thông tin, hãy nói rõ |
|
|
- Trả lời bằng tiếng Việt |
|
|
- Ngắn gọn, chính xác |
|
|
- Có thể thêm thông tin bổ sung nếu cần |
|
|
|
|
|
TRẢ LỜI:""" |
|
|
else: |
|
|
prompt = f"""{history_text}INFORMATION FROM DATABASE: |
|
|
{context} |
|
|
|
|
|
CURRENT QUESTION: {query} |
|
|
|
|
|
INSTRUCTIONS: |
|
|
- Answer based on provided information |
|
|
- If information is insufficient, clearly state that |
|
|
- Respond in {language} |
|
|
- Be concise and accurate |
|
|
- You may add supplementary information if needed |
|
|
|
|
|
ANSWER:""" |
|
|
|
|
|
return prompt |
|
|
|
|
|
def _generate_response(self, prompt: str, language: str, search_results: List[RAGSearchResult]) -> str: |
|
|
"""Tạo phản hồi từ prompt (mô phỏng)""" |
|
|
|
|
|
time.sleep(0.3) |
|
|
|
|
|
if not search_results: |
|
|
if language == 'vi': |
|
|
return "Xin lỗi, tôi không tìm thấy thông tin cụ thể về chủ đề này trong cơ sở dữ liệu hiện tại. Bạn có thể thử diễn đạt khác hoặc thêm thông tin mới." |
|
|
else: |
|
|
return "Sorry, I couldn't find specific information about this topic in the current database. You might try rephrasing or adding new information." |
|
|
|
|
|
|
|
|
result_count = len(search_results) |
|
|
|
|
|
|
|
|
if language == 'vi': |
|
|
responses = [ |
|
|
f"Dựa trên {result_count} thông tin liên quan tôi tìm thấy:", |
|
|
f"Theo {result_count} nguồn thông tin từ cơ sở dữ liệu:", |
|
|
f"Căn cứ vào {result_count} kết quả tìm kiếm:" |
|
|
] |
|
|
|
|
|
|
|
|
sources = [] |
|
|
for result in search_results[:2]: |
|
|
source = result.metadata.get('source', '') |
|
|
if source and source not in sources: |
|
|
sources.append(source) |
|
|
|
|
|
if sources: |
|
|
source_text = f" (từ {', '.join(sources)})" |
|
|
else: |
|
|
source_text = "" |
|
|
|
|
|
follow_up = f"\n\nĐây là thông tin hữu ích tôi tìm được{source_text}. Bạn có thể xem chi tiết các nguồn thông tin bên phải." |
|
|
|
|
|
else: |
|
|
responses = [ |
|
|
f"Based on {result_count} relevant information sources:", |
|
|
f"According to {result_count} data sources from the database:", |
|
|
f"Based on the search results ({result_count} found):" |
|
|
] |
|
|
|
|
|
|
|
|
sources = [] |
|
|
for result in search_results[:2]: |
|
|
source = result.metadata.get('source', '') |
|
|
if source and source not in sources: |
|
|
sources.append(source) |
|
|
|
|
|
if sources: |
|
|
source_text = f" (from {', '.join(sources)})" |
|
|
else: |
|
|
source_text = "" |
|
|
|
|
|
follow_up = f"\n\nThis is useful information I found{source_text}. You can view the detailed sources on the right." |
|
|
|
|
|
import random |
|
|
base_response = random.choice(responses) |
|
|
|
|
|
|
|
|
if search_results and len(search_results[0].text) > 50: |
|
|
main_info = search_results[0].text[:200] + "..." if len(search_results[0].text) > 200 else search_results[0].text |
|
|
response = f"{base_response}\n\n{main_info}{follow_up}" |
|
|
else: |
|
|
response = f"{base_response}{follow_up}" |
|
|
|
|
|
return response |
|
|
|
|
|
def _update_conversation_history(self, role: str, message: str): |
|
|
"""Cập nhật lịch sử hội thoại""" |
|
|
self.conversation_history.append({ |
|
|
"role": role, |
|
|
"message": message, |
|
|
"timestamp": time.time() |
|
|
}) |
|
|
|
|
|
|
|
|
if len(self.conversation_history) > self.max_history_length: |
|
|
self.conversation_history = self.conversation_history[-self.max_history_length:] |
|
|
|
|
|
def clear_conversation_history(self): |
|
|
"""Xóa lịch sử hội thoại""" |
|
|
self.conversation_history = [] |
|
|
print("🧹 CAG: Đã xóa lịch sử hội thoại") |
|
|
|
|
|
def get_conversation_stats(self) -> Dict: |
|
|
"""Lấy thống kê hội thoại""" |
|
|
user_msgs = [m for m in self.conversation_history if m["role"] == "user"] |
|
|
assistant_msgs = [m for m in self.conversation_history if m["role"] == "assistant"] |
|
|
|
|
|
return { |
|
|
"total_messages": len(self.conversation_history), |
|
|
"user_messages": len(user_msgs), |
|
|
"assistant_messages": len(assistant_msgs), |
|
|
"current_session": len(self.conversation_history), |
|
|
"last_interaction": time.strftime('%H:%M:%S', time.localtime( |
|
|
self.conversation_history[-1]["timestamp"] if self.conversation_history else time.time() |
|
|
)) |
|
|
} |