voicebot / services /gemini_realtime_service.py
datbkpro's picture
Update services/gemini_realtime_service.py
27b3d88 verified
import asyncio
import base64
import json
import os
from typing import Callable, Optional, AsyncGenerator
import numpy as np
from google import genai
from google.genai.types import (
LiveConnectConfig,
PrebuiltVoiceConfig,
SpeechConfig,
VoiceConfig,
)
from google.api_core import exceptions as google_exceptions
class GeminiRealtimeService:
"""Dịch vụ Gemini Realtime API với audio streaming thực"""
def __init__(self, api_key: str = None):
self.api_key = api_key or os.getenv("GEMINI_API_KEY")
self.client = None
self.session = None
self.is_active = False
self.callback = None
self.voice_name = "Puck"
self.input_queue = asyncio.Queue()
self.output_queue = asyncio.Queue()
self._session_task = None
async def initialize(self):
"""Khởi tạo client Gemini"""
if not self.api_key:
raise ValueError("Gemini API key is required")
try:
self.client = genai.Client(
api_key=self.api_key,
http_options={"api_version": "v1alpha"},
)
return True
except Exception as e:
raise Exception(f"Không thể khởi tạo Gemini client: {str(e)}")
def encode_audio(self, data: np.ndarray) -> str:
"""Encode audio data to base64"""
return base64.b64encode(data.tobytes()).decode("UTF-8")
async def start_session(self, voice_name: str = "Puck", callback: Callable = None):
"""Bắt đầu session Gemini Realtime với audio streaming"""
try:
if not self.client:
await self.initialize()
self.voice_name = voice_name
self.callback = callback
# Tạo config cho realtime session
config = LiveConnectConfig(
response_modalities=["AUDIO"],
speech_config=SpeechConfig(
voice_config=VoiceConfig(
prebuilt_voice_config=PrebuiltVoiceConfig(
voice_name=voice_name,
)
)
),
)
# SỬA LỖI: Sử dụng async with đúng cách
self.session = self.client.aio.live.connect(
model="gemini-2.0-flash-exp",
config=config
)
self.is_active = True
if self.callback:
await self.callback({
'type': 'status',
'message': f'✅ Đã kết nối Gemini Audio Streaming - Giọng: {voice_name}',
'status': 'connected'
})
print("✅ Gemini Realtime Audio session started")
# Khởi động background task để xử lý session
self._session_task = asyncio.create_task(self._handle_session())
return True
except Exception as e:
error_msg = f"❌ Lỗi khởi động Gemini Audio: {e}"
if self.callback:
await self.callback({
'type': 'error',
'message': error_msg,
'status': 'error'
})
print(error_msg)
return False
async def _handle_session(self):
"""Xử lý session realtime với async with đúng cách"""
try:
# SỬA LỖI: Sử dụng async with đúng cách
async with self.session as session:
# Khởi động sender và receiver
sender_task = asyncio.create_task(self._audio_sender(session))
receiver_task = asyncio.create_task(self._audio_receiver(session))
# Chờ cả hai task hoàn thành
await asyncio.gather(sender_task, receiver_task)
except Exception as e:
error_msg = f"❌ Lỗi trong session: {e}"
if self.callback:
await self.callback({
'type': 'error',
'message': error_msg,
'status': 'error'
})
print(error_msg)
async def _audio_sender(self, session):
"""Gửi audio data đến Gemini"""
try:
async for audio_chunk in self._audio_stream_generator():
await session.send(audio_chunk)
except Exception as e:
error_msg = f"❌ Lỗi gửi audio: {e}"
if self.callback:
await self.callback({
'type': 'error',
'message': error_msg,
'status': 'error'
})
async def _audio_stream_generator(self) -> AsyncGenerator[bytes, None]:
"""Generator cho audio streaming"""
while self.is_active:
try:
audio_data = await asyncio.wait_for(self.input_queue.get(), timeout=1.0)
yield audio_data
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"❌ Lỗi audio generator: {e}")
break
async def _audio_receiver(self, session):
"""Nhận audio response từ Gemini"""
try:
async for response in session:
if hasattr(response, 'data') and response.data:
# Audio response
audio_data = np.frombuffer(response.data, dtype=np.int16)
if self.callback:
await self.callback({
'type': 'audio',
'audio_data': audio_data,
'sample_rate': 24000,
'status': 'audio_streaming'
})
# Đưa vào output queue để phát ra loa
self.output_queue.put_nowait((24000, audio_data))
elif hasattr(response, 'text') and response.text:
# Text response
if self.callback:
await self.callback({
'type': 'text',
'content': response.text,
'role': 'assistant',
'status': 'text_response'
})
except Exception as e:
error_msg = f"❌ Lỗi nhận audio: {e}"
if self.callback:
await self.callback({
'type': 'error',
'message': error_msg,
'status': 'error'
})
async def send_audio_chunk(self, audio_chunk: np.ndarray, sample_rate: int = 16000):
"""Gửi audio chunk đến Gemini"""
if not self.is_active:
return False
try:
# Resample nếu cần (Gemini cần 16kHz)
if sample_rate != 16000:
audio_chunk = self._resample_audio(audio_chunk, sample_rate, 16000)
# Encode và gửi
audio_bytes = audio_chunk.tobytes()
await self.input_queue.put(audio_bytes)
return True
except Exception as e:
print(f"❌ Lỗi gửi audio chunk: {e}")
return False
async def receive_audio(self) -> tuple[int, np.ndarray] | None:
"""Nhận audio từ Gemini"""
try:
return await asyncio.wait_for(self.output_queue.get(), timeout=1.0)
except asyncio.TimeoutError:
return None
def _resample_audio(self, audio_chunk: np.ndarray, original_rate: int, target_rate: int):
"""Resample audio chunk (đơn giản)"""
if original_rate == target_rate:
return audio_chunk
ratio = target_rate / original_rate
new_length = int(len(audio_chunk) * ratio)
return np.interp(
np.linspace(0, len(audio_chunk) - 1, new_length),
np.arange(len(audio_chunk)),
audio_chunk
).astype(np.int16)
async def send_text(self, text: str):
"""Gửi text message (fallback)"""
if not self.client:
return None
try:
response = await self.client.aio.models.generate_content(
model="gemini-2.0-flash-exp",
contents=text
)
if self.callback:
await self.callback({
'type': 'text',
'content': response.text,
'role': 'assistant',
'status': 'text_response'
})
return response.text
except google_exceptions.ResourceExhausted:
error_msg = "❌ Quota Gemini đã hết. Vui lòng kiểm tra billing."
if self.callback:
await self.callback({
'type': 'error',
'message': error_msg,
'status': 'quota_exceeded'
})
return error_msg
except Exception as e:
error_msg = f"❌ Lỗi gửi text: {e}"
if self.callback:
await self.callback({
'type': 'error',
'message': error_msg,
'status': 'error'
})
return error_msg
async def close(self):
"""Đóng kết nối"""
self.is_active = False
# Hủy task nếu đang chạy
if self._session_task:
self._session_task.cancel()
try:
await self._session_task
except asyncio.CancelledError:
pass
if self.callback:
await self.callback({
'type': 'status',
'message': '🛑 Đã đóng kết nối Gemini Audio',
'status': 'disconnected'
})
print("🛑 Gemini Audio session closed")