voicebot / services /openai_realtime_service.py
datbkpro's picture
Update services/openai_realtime_service.py
d5c10e8 verified
import asyncio
import base64
import json
from pathlib import Path
import gradio as gr
import numpy as np
import openai
from dotenv import load_dotenv
from fastapi import FastAPI
from typing import Callable
from core.silero_vad import SileroVAD
from services.streaming_voice_service import VoskStreamingASR
from fastapi.responses import HTMLResponse, StreamingResponse
# Thêm vào imports hiện tại
from fastrtc import (
AdditionalOutputs,
AsyncStreamHandler,
Stream,
get_twilio_turn_credentials,
wait_for_item,
)
from gradio.utils import get_space
class OpenAIRealtimeService:
"""Dịch vụ OpenAI Realtime API cho streaming chất lượng cao"""
def __init__(self, api_key: str):
self.client = openai.AsyncOpenAI(api_key=api_key)
self.connection = None
self.is_active = False
async def start_session(self):
"""Bắt đầu session OpenAI Realtime"""
try:
self.connection = await self.client.beta.realtime.connect(
model="gpt-4o-mini-realtime-preview-2024-12-17"
)
# Cấu hình session
await self.connection.session.update(
session={
"turn_detection": {"type": "server_vad"},
"input_audio_transcription": {
"model": "whisper-1",
"language": "vi", # Hỗ trợ tiếng Việt
},
}
)
self.is_active = True
print("✅ OpenAI Realtime session started")
return True
except Exception as e:
print(f"❌ Lỗi khởi động OpenAI Realtime: {e}")
return False
async def process_audio_chunk(self, audio_chunk: np.ndarray, sample_rate: int = 24000):
"""Xử lý audio chunk với OpenAI Realtime API"""
if not self.connection or not self.is_active:
return None
try:
# Chuẩn hóa audio
if sample_rate != 24000:
audio_chunk = self._resample_audio(audio_chunk, sample_rate, 24000)
# Encode audio
audio_message = base64.b64encode(audio_chunk.tobytes()).decode("utf-8")
# Gửi đến OpenAI
await self.connection.input_audio_buffer.append(audio=audio_message)
except Exception as e:
print(f"❌ Lỗi xử lý audio với OpenAI: {e}")
async def get_responses(self):
"""Lấy responses từ OpenAI Realtime API"""
if not self.connection:
return
async for event in self.connection:
if event.type == "input_audio_buffer.speech_started":
yield {"type": "speech_started"}
elif event.type == "conversation.item.input_audio_transcription.completed":
yield {
"type": "user_transcription",
"content": event.transcript,
"role": "user"
}
elif event.type == "response.audio_transcript.done":
yield {
"type": "assistant_transcription",
"content": event.transcript,
"role": "assistant"
}
elif event.type == "response.audio.delta":
audio_data = np.frombuffer(
base64.b64decode(event.delta), dtype=np.int16
)
yield {
"type": "audio_delta",
"audio": audio_data,
"sample_rate": 24000
}
async def close(self):
"""Đóng kết nối"""
if self.connection:
await self.connection.close()
self.is_active = False
print("🛑 OpenAI Realtime session closed")
class HybridStreamingService:
"""Service kết hợp VOSK local và OpenAI Realtime"""
def __init__(self, groq_client, rag_system, tts_service, openai_key: str = None):
self.groq_client = groq_client
self.rag_system = rag_system
self.tts_service = tts_service
# Local ASR với VOSK
self.vosk_asr = VoskStreamingASR()
self.vad_processor = SileroVAD()
# OpenAI Realtime API
self.openai_service = None
if openai_key:
self.openai_service = OpenAIRealtimeService(openai_key)
self.current_mode = "local" # 'local' hoặc 'openai'
self.is_listening = False
async def start_listening(self, speech_callback: Callable, mode: str = "auto"):
"""Bắt đầu lắng nghe với mode lựa chọn"""
self.current_callback = speech_callback
if mode == "openai" and self.openai_service:
return await self._start_openai_mode()
else:
return self._start_local_mode()
async def _start_openai_mode(self):
"""Khởi động chế độ OpenAI Realtime"""
try:
success = await self.openai_service.start_session()
if success:
self.is_listening = True
self.current_mode = "openai"
# Khởi động background task để nhận responses
asyncio.create_task(self._openai_response_handler())
if self.current_callback:
self.current_callback({
'transcription': "Đã bắt đầu với OpenAI Realtime...",
'response': "",
'tts_audio': None,
'status': 'openai_listening'
})
return True
return False
except Exception as e:
print(f"❌ Lỗi khởi động OpenAI mode: {e}")
return False
def _start_local_mode(self):
"""Khởi động chế độ local VOSK"""
try:
if self.vosk_asr.start_stream() and self.vad_processor.start_stream(self._on_speech_detected):
self.is_listening = True
self.current_mode = "local"
# Khởi động worker threads
self._start_worker_threads()
if self.current_callback:
self.current_callback({
'transcription': "Đã bắt đầu với VOSK local...",
'response': "",
'tts_audio': None,
'status': 'local_listening'
})
return True
return False
except Exception as e:
print(f"❌ Lỗi khởi động local mode: {e}")
return False
async def _openai_response_handler(self):
"""Xử lý responses từ OpenAI Realtime"""
try:
async for response in self.openai_service.get_responses():
if response['type'] == 'user_transcription' and self.current_callback:
self.current_callback({
'transcription': response['content'],
'response': "Đang xử lý...",
'tts_audio': None,
'status': 'processing'
})
elif response['type'] == 'assistant_transcription' and self.current_callback:
self.current_callback({
'transcription': "", # Giữ transcription cũ
'response': response['content'],
'tts_audio': None,
'status': 'completed'
})
elif response['type'] == 'audio_delta' and self.current_callback:
# Xử lý audio real-time từ OpenAI
audio_path = self._save_temp_audio(response['audio'], response['sample_rate'])
self.current_callback({
'transcription': "",
'response': "",
'tts_audio': audio_path,
'status': 'audio_streaming'
})
except Exception as e:
print(f"❌ Lỗi OpenAI response handler: {e}")