FastAPI-Backend-Models / routes /voice_chat_routes.py
malek-messaoudii
Update chat voice part
a71355d
from fastapi import APIRouter, UploadFile, File, HTTPException, Query
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import tempfile
import os
from pathlib import Path
import uuid
import io
from services.stt_service import speech_to_text
from services.tts_service import text_to_speech
from services.chat_service import generate_chat_response
from models.voice_chat import TextChatRequest, VoiceChatResponse
router = APIRouter(prefix="/voice-chat", tags=["Voice Chat"])
# Temporary audio cache
audio_cache = {}
@router.post("/voice", response_model=VoiceChatResponse)
async def voice_chat_endpoint(
file: UploadFile = File(...),
conversation_id: Optional[str] = Query(None)
):
"""
Complete voice chat endpoint (English only):
1. STT: Audio β†’ Text
2. Chatbot: Text β†’ Response
3. TTS: Response β†’ Audio
"""
# 1. Check audio file
if not file.content_type or not file.content_type.startswith('audio/'):
raise HTTPException(
status_code=400,
detail=f"File must be an audio file. Received: {file.content_type}"
)
# 2. Create conversation ID if not provided
if not conversation_id:
conversation_id = str(uuid.uuid4())
# 3. Save audio temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_file:
temp_path = temp_file.name
content = await file.read()
if len(content) == 0:
os.unlink(temp_path)
raise HTTPException(status_code=400, detail="Audio file is empty")
temp_file.write(content)
try:
# 4. STT: Audio β†’ Text (English)
user_text = speech_to_text(temp_path)
if not user_text or user_text.strip() == "":
raise HTTPException(
status_code=400,
detail="No speech detected in audio."
)
print(f"🎀 STT Result: {user_text}")
# 5. Generate chatbot response (English)
chatbot_response = generate_chat_response(
user_input=user_text,
conversation_id=conversation_id
)
print(f"πŸ€– Chatbot Response: {chatbot_response}")
# 6. TTS: Response text β†’ Audio (English voice)
audio_path = text_to_speech(
text=chatbot_response,
voice="Aaliyah-PlayAI", # English voice
fmt="wav"
)
# 7. Read and store audio
with open(audio_path, "rb") as audio_file:
audio_data = audio_file.read()
audio_cache[conversation_id] = {
"audio": audio_data,
"text": chatbot_response
}
# 8. Clean up temporary files
os.unlink(temp_path)
if Path(audio_path).exists():
os.unlink(audio_path)
# 9. Return response
return VoiceChatResponse(
text_response=chatbot_response,
audio_url=f"/voice-chat/audio/{conversation_id}",
conversation_id=conversation_id
)
except HTTPException:
raise
except Exception as e:
# Clean up on error
if os.path.exists(temp_path):
os.unlink(temp_path)
import traceback
error_details = traceback.format_exc()
print(f"❌ Error in voice_chat_endpoint: {error_details}")
raise HTTPException(
status_code=500,
detail=f"Error during voice processing: {str(e)}"
)
@router.post("/text", response_model=VoiceChatResponse)
async def text_chat_endpoint(request: TextChatRequest):
"""
Text chat with audio response (English only)
For users who prefer to type but hear the response
"""
try:
# 1. Create conversation ID if not provided
if not request.conversation_id:
conversation_id = str(uuid.uuid4())
else:
conversation_id = request.conversation_id
# 2. Validate text
if not request.text or request.text.strip() == "":
raise HTTPException(status_code=400, detail="Text cannot be empty")
print(f"πŸ“ Text received: {request.text}")
# 3. Generate chatbot response
chatbot_response = generate_chat_response(
user_input=request.text,
conversation_id=conversation_id
)
print(f"πŸ€– Chatbot Response: {chatbot_response}")
# 4. TTS with English voice
audio_path = text_to_speech(
text=chatbot_response,
voice="Aaliyah-PlayAI",
fmt="wav"
)
# 5. Read and store audio
with open(audio_path, "rb") as audio_file:
audio_data = audio_file.read()
audio_cache[conversation_id] = {
"audio": audio_data,
"text": chatbot_response
}
# 6. Clean up
if Path(audio_path).exists():
os.unlink(audio_path)
# 7. Return response
return VoiceChatResponse(
text_response=chatbot_response,
audio_url=f"/voice-chat/audio/{conversation_id}",
conversation_id=conversation_id
)
except HTTPException:
raise
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"❌ Error in text_chat_endpoint: {error_details}")
raise HTTPException(
status_code=500,
detail=f"Error during chat: {str(e)}"
)
@router.get("/audio/{conversation_id}")
async def get_audio_stream(conversation_id: str):
"""
Stream audio of the last response
"""
if conversation_id not in audio_cache:
raise HTTPException(
status_code=404,
detail=f"No audio found for conversation {conversation_id}"
)
audio_data = audio_cache[conversation_id]["audio"]
return StreamingResponse(
io.BytesIO(audio_data),
media_type="audio/wav",
headers={
"Content-Disposition": f"attachment; filename=response_{conversation_id[:8]}.wav"
}
)
@router.get("/test")
async def test_endpoint():
"""
Test endpoint to verify API is working
"""
return {
"status": "ok",
"message": "Voice Chat API is working (English only)",
"endpoints": {
"POST /voice-chat/voice": "Voice input β†’ Voice response",
"POST /voice-chat/text": "Text input β†’ Voice response",
"GET /voice-chat/audio/{id}": "Get audio response",
"POST /stt/": "Speech to text",
"POST /tts/": "Text to speech"
}
}