import asyncio import base64 import json from pathlib import Path import gradio as gr import numpy as np import openai from dotenv import load_dotenv from fastapi import FastAPI from typing import Callable from core.silero_vad import SileroVAD from services.streaming_voice_service import VoskStreamingASR from fastapi.responses import HTMLResponse, StreamingResponse # Thêm vào imports hiện tại from fastrtc import ( AdditionalOutputs, AsyncStreamHandler, Stream, get_twilio_turn_credentials, wait_for_item, ) from gradio.utils import get_space class OpenAIRealtimeService: """Dịch vụ OpenAI Realtime API cho streaming chất lượng cao""" def __init__(self, api_key: str): self.client = openai.AsyncOpenAI(api_key=api_key) self.connection = None self.is_active = False async def start_session(self): """Bắt đầu session OpenAI Realtime""" try: self.connection = await self.client.beta.realtime.connect( model="gpt-4o-mini-realtime-preview-2024-12-17" ) # Cấu hình session await self.connection.session.update( session={ "turn_detection": {"type": "server_vad"}, "input_audio_transcription": { "model": "whisper-1", "language": "vi", # Hỗ trợ tiếng Việt }, } ) self.is_active = True print("✅ OpenAI Realtime session started") return True except Exception as e: print(f"❌ Lỗi khởi động OpenAI Realtime: {e}") return False async def process_audio_chunk(self, audio_chunk: np.ndarray, sample_rate: int = 24000): """Xử lý audio chunk với OpenAI Realtime API""" if not self.connection or not self.is_active: return None try: # Chuẩn hóa audio if sample_rate != 24000: audio_chunk = self._resample_audio(audio_chunk, sample_rate, 24000) # Encode audio audio_message = base64.b64encode(audio_chunk.tobytes()).decode("utf-8") # Gửi đến OpenAI await self.connection.input_audio_buffer.append(audio=audio_message) except Exception as e: print(f"❌ Lỗi xử lý audio với OpenAI: {e}") async def get_responses(self): """Lấy responses từ OpenAI Realtime API""" if not self.connection: return async for event in self.connection: if event.type == "input_audio_buffer.speech_started": yield {"type": "speech_started"} elif event.type == "conversation.item.input_audio_transcription.completed": yield { "type": "user_transcription", "content": event.transcript, "role": "user" } elif event.type == "response.audio_transcript.done": yield { "type": "assistant_transcription", "content": event.transcript, "role": "assistant" } elif event.type == "response.audio.delta": audio_data = np.frombuffer( base64.b64decode(event.delta), dtype=np.int16 ) yield { "type": "audio_delta", "audio": audio_data, "sample_rate": 24000 } async def close(self): """Đóng kết nối""" if self.connection: await self.connection.close() self.is_active = False print("🛑 OpenAI Realtime session closed") class HybridStreamingService: """Service kết hợp VOSK local và OpenAI Realtime""" def __init__(self, groq_client, rag_system, tts_service, openai_key: str = None): self.groq_client = groq_client self.rag_system = rag_system self.tts_service = tts_service # Local ASR với VOSK self.vosk_asr = VoskStreamingASR() self.vad_processor = SileroVAD() # OpenAI Realtime API self.openai_service = None if openai_key: self.openai_service = OpenAIRealtimeService(openai_key) self.current_mode = "local" # 'local' hoặc 'openai' self.is_listening = False async def start_listening(self, speech_callback: Callable, mode: str = "auto"): """Bắt đầu lắng nghe với mode lựa chọn""" self.current_callback = speech_callback if mode == "openai" and self.openai_service: return await self._start_openai_mode() else: return self._start_local_mode() async def _start_openai_mode(self): """Khởi động chế độ OpenAI Realtime""" try: success = await self.openai_service.start_session() if success: self.is_listening = True self.current_mode = "openai" # Khởi động background task để nhận responses asyncio.create_task(self._openai_response_handler()) if self.current_callback: self.current_callback({ 'transcription': "Đã bắt đầu với OpenAI Realtime...", 'response': "", 'tts_audio': None, 'status': 'openai_listening' }) return True return False except Exception as e: print(f"❌ Lỗi khởi động OpenAI mode: {e}") return False def _start_local_mode(self): """Khởi động chế độ local VOSK""" try: if self.vosk_asr.start_stream() and self.vad_processor.start_stream(self._on_speech_detected): self.is_listening = True self.current_mode = "local" # Khởi động worker threads self._start_worker_threads() if self.current_callback: self.current_callback({ 'transcription': "Đã bắt đầu với VOSK local...", 'response': "", 'tts_audio': None, 'status': 'local_listening' }) return True return False except Exception as e: print(f"❌ Lỗi khởi động local mode: {e}") return False async def _openai_response_handler(self): """Xử lý responses từ OpenAI Realtime""" try: async for response in self.openai_service.get_responses(): if response['type'] == 'user_transcription' and self.current_callback: self.current_callback({ 'transcription': response['content'], 'response': "Đang xử lý...", 'tts_audio': None, 'status': 'processing' }) elif response['type'] == 'assistant_transcription' and self.current_callback: self.current_callback({ 'transcription': "", # Giữ transcription cũ 'response': response['content'], 'tts_audio': None, 'status': 'completed' }) elif response['type'] == 'audio_delta' and self.current_callback: # Xử lý audio real-time từ OpenAI audio_path = self._save_temp_audio(response['audio'], response['sample_rate']) self.current_callback({ 'transcription': "", 'response': "", 'tts_audio': audio_path, 'status': 'audio_streaming' }) except Exception as e: print(f"❌ Lỗi OpenAI response handler: {e}")