Archime's picture
correctly GPU ABORT
0c03412
from app.logger_config import logger as logging
from fastrtc.utils import AdditionalOutputs
from pydub import AudioSegment
import asyncio
import os
import time
import numpy as np
import zipfile
import spaces
import hmac
import hashlib
import base64
import os
import time
import random
import torch
from app.streaming_audio_processor import StreamingAudioProcessor
from app.session_utils import (
get_active_task_flag_file,
get_active_stream_flag_file,
remove_active_stream_flag_file,
remove_active_task_flag_file,
remove_chunk_folder,
get_session_hashe_chunks_dir
)
from app.supported_languages import (
SUPPORTED_LANGS_MAP
)
from app.canary_speech_engine import CanarySpeechEngine,CanaryConfig
from app.silero_vad_engine import Silero_Vad_Engine
from app.streaming_audio_processor import StreamingAudioProcessor,StreamingAudioProcessorConfig
import nemo.collections.asr as nemo_asr
READ_SIZE=4000
import gradio as gr
from typing import Generator
from typing import Generator, Tuple, Any, Optional
GradioAudioYield = Tuple[int, np.ndarray]
StreamYield = Generator[Tuple[GradioAudioYield, AdditionalOutputs], None, None]
# --------------------------------------------------------
# Utility functions
# --------------------------------------------------------
def generate_coturn_config():
"""
Génère une configuration Coturn complète avec authentification dynamique (use-auth-secret).
Returns:
dict: Objet coturn_config prêt à être utilisé côté client WebRTC.
"""
secret_key = os.getenv("TURN_SECRET_KEY", "your_secret_key")
ttl = int(os.getenv("TURN_TTL", 3600))
turn_url = os.getenv("TURN_URL", "turn:*******")
turn_s_url = os.getenv("TURN_S_URL", "turns:*****")
user = os.getenv("TURN_USER", "client")
timestamp = int(time.time()) + ttl
username = f"{timestamp}:{user}"
password = base64.b64encode(
hmac.new(secret_key.encode(), username.encode(), hashlib.sha1).digest()
).decode()
coturn_config = {
"iceServers": [
{
"urls": [
f"{turn_url}",
f"{turn_s_url}",
],
"username": username,
"credential": password,
}
]
}
return coturn_config
def read_and_stream_audio(filepath_to_stream: str, session_hash_code: str,read_size:int =8000, sample_rate:int =16000) -> StreamYield:
"""
Read an audio file and stream it chunk by chunk (1s per chunk).
Handles errors safely and reports structured messages to the client.
"""
if not session_hash_code:
yield from handle_stream_error("unknown", "No session_hash_code provided.")
return
if not filepath_to_stream or not os.path.exists(filepath_to_stream):
yield from handle_stream_error(session_hash_code, f"Audio file not found: {filepath_to_stream}")
return
task_active_flag = get_active_task_flag_file(session_hash_code)
try:
segment = AudioSegment.from_file(filepath_to_stream)
chunk_duration_ms = int((read_size/sample_rate)*1000)
total_duration_ms = len(segment)
total_chunks = len(segment) // chunk_duration_ms + 1
start_streaming(session_hash_code)
logging.info(f"[{session_hash_code}] Starting stream: {filepath_to_stream} ({total_chunks} chunks, {chunk_duration_ms}ms steps).")
chunk_dir = get_session_hashe_chunks_dir(session_hash_code)
for i, start_ms in enumerate(range(0, total_duration_ms, chunk_duration_ms)):
end_ms = min(start_ms + chunk_duration_ms, total_duration_ms)
chunk = segment[start_ms:end_ms]
frame_rate = chunk.frame_rate
samples_int16 = np.array(chunk.get_array_of_samples(), dtype=np.int16)
samples_float = (samples_int16 / 32768.0).astype(np.float32)
# Gestion Mono vs Stéréo pour Gradio
if chunk.channels > 1:
samples_reshaped = samples_float.reshape(-1, chunk.channels)
else:
samples_reshaped = samples_float.reshape(1, -1)
progress = round(((i + 1) / total_chunks) * 100, 2)
# Envoi au client
if is_stop_requested(session_hash_code):
logging.info(f"[{session_hash_code}] Stop signal received.")
samples = np.array(chunk.get_array_of_samples()).reshape(1, -1)
yield (
(sample_rate, samples_reshaped),
AdditionalOutputs({"stoped": True, "value": "STREAM_STOPPED", "session_hash_code": session_hash_code})
)
break
yield (
(frame_rate, samples_reshaped),
AdditionalOutputs({"progressed": True, "value": progress, "session_hash_code": session_hash_code})
)
if is_active_task(session_hash_code):
os.makedirs(chunk_dir, exist_ok=True)
npz_path = os.path.join(chunk_dir, f"chunk_{i:05d}.npz")
# Compression activée, attention c'est lent (CPU intensif)
if is_active_task(session_hash_code):
try :
np.savez_compressed(npz_path, data=samples_int16, rate=frame_rate)
logging.debug(f"[{session_hash_code}] Saved chunk {i} to {npz_path}")
except Exception as e :
logging.error(f"[{session_hash_code}] eroor in Saved chunk {i} to {npz_path}")
time.sleep(chunk_duration_ms/1000)
# raise_error() # Optional injected test exception
logging.info(f"[{session_hash_code}] Streaming completed.")
except Exception as e:
yield from handle_stream_error(session_hash_code, e)
finally:
remove_active_stream_flag_file(session_hash_code)
remove_active_task_flag_file(session_hash_code)
logging.info(f"[{session_hash_code}] Cleanup done.")
def handle_stream_error(session_hash_code: str, error: Exception):
"""
Handle streaming errors:
- Log the error
- Send structured info to client
- Reset stop flag
"""
msg = f"{type(error).__name__}: {str(error)}"
logging.error(f"[{session_hash_code}] Stream Error: {msg}", exc_info=True)
remove_active_stream_flag_file(session_hash_code)
empty_audio = np.zeros((1, 16000), dtype=np.float32)
yield (
(16000, empty_audio),
AdditionalOutputs({"errored": True, "value": msg, "session_hash_code": session_hash_code})
)
# asr_model = nemo_asr.models.ASRModel.from_pretrained("nvidia/canary-1b-v2")
asr_model = None
# @spaces.cache
# def load_model():
# logging.info("Chargement du modèle ASR/AST de NeMo...")
# # Remplacez par votre logique de chargement de modèle
# model = nemo_asr.models.EncDecRNNTModel.restore_from("path/to/model.nemo")
# logging.info("Modèle chargé.")
# return model
# # Chargez-le une seule fois au démarrage du script
# ASR_MODEL = load_model()
@spaces.GPU(duration=10)
def task_fake(session_hash_code: str,
task_type, lang_source, lang_target,
chunk_secs, left_context_secs, right_context_secs,
streaming_policy, alignatt_thr, waitk_lagging,
exclude_sink_frames, xatt_scores_layer, hallucinations_detector
):
"""Continuously read and delete .npz chunks while task is active."""
global asr_model
yield ("initializing the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
yield (f"Task started for session {session_hash_code}", "info", None)
active_flag = get_active_task_flag_file(session_hash_code)
with open(active_flag, "w") as f:
f.write("1")
chunk_dir = get_session_hashe_chunks_dir(session_hash_code)
logging.info(f"[{session_hash_code}] task started. {chunk_dir}")
try:
logging.info(f"[{session_hash_code}] task loop started.")
yield (f"Task started for session {session_hash_code}", "info", None)
while os.path.exists(active_flag) :
if not os.path.exists(chunk_dir):
logging.warning(f"[{session_hash_code}] No chunk directory found for task.")
yield ("No audio chunks yet... waiting for stream.", "warning", None)
time.sleep(0.1)
continue
# raise_error()
files = sorted(f for f in os.listdir(chunk_dir) if f.endswith(".npz"))
if not files:
time.sleep(0.1)
continue
for fname in files:
fpath = os.path.join(chunk_dir, fname)
try:
npz = np.load(fpath)
samples = npz["data"]
rate = int(npz["rate"])
text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n"
yield (text, "success", fname)
os.remove(fpath)
logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}")
# raise_error()
except (EOFError, OSError, zipfile.BadZipFile, ValueError) as e:
logging.warning(f"[{session_hash_code}] warning processing {fname}: {e}")
yield (f"EOFError processing {fname}: {e}", "warning", fname)
except Exception as e:
error_type = type(e).__name__
logging.error(f"[{session_hash_code}] Error processing {fname}: {e} {error_type}")
# yield (f"Error processing {fname}: {e}", "error", fname)
raise Exception(e)
# continue
time.sleep(0.1)
yield ("DONE", "done", None)
logging.info(f"[{session_hash_code}] task loop ended (flag removed).")
except Exception as e:
logging.error(f"[{session_hash_code}] Unexpected task error: {e}", exc_info=True)
yield (f"Unexpected error: {e}", "error", None)
return
finally:
remove_active_task_flag_file(session_hash_code)
remove_chunk_folder(session_hash_code)
logging.info(f"[{session_hash_code}] Exiting task loop.")
# yield ("Task finished and cleaned up.", "done", None)
@spaces.GPU
def task(session_hash_code: str,
task_type, lang_source, lang_target,
chunk_secs, left_context_secs, right_context_secs,
streaming_policy, alignatt_thr, waitk_lagging,
exclude_sink_frames, xatt_scores_layer, hallucinations_detector
):
"""Continuously read and delete .npz chunks while task is active."""
global asr_model
yield ("initializing the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
conf = CanaryConfig.from_params(
task_type, SUPPORTED_LANGS_MAP.get(lang_source),SUPPORTED_LANGS_MAP.get(lang_target) ,
chunk_secs, left_context_secs, right_context_secs,
streaming_policy, alignatt_thr, waitk_lagging,
exclude_sink_frames, xatt_scores_layer, hallucinations_detector
)
canary_speech_engine = CanarySpeechEngine(asr_model,conf)
silero_vad_engine = Silero_Vad_Engine()
streaming_audio_processor_config = StreamingAudioProcessorConfig(
read_size=READ_SIZE,
silence_threshold_chunks=1
)
streamer = StreamingAudioProcessor(speech_engine=canary_speech_engine,vad_engine=silero_vad_engine,cfg=streaming_audio_processor_config)
yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
yield (f"Task started for session {session_hash_code}", "info", None)
active_flag = get_active_task_flag_file(session_hash_code)
with open(active_flag, "w") as f:
f.write("1")
chunk_dir = get_session_hashe_chunks_dir(session_hash_code)
logging.info(f"[{session_hash_code}] task started. {chunk_dir}")
try:
logging.info(f"[{session_hash_code}] task loop started.")
yield (f"Task started for session {session_hash_code}", "info", None)
while os.path.exists(active_flag):
if not os.path.exists(chunk_dir):
logging.warning(f"[{session_hash_code}] No chunk directory found for task.")
yield ("No audio chunks yet... waiting for stream.", "warning", None)
time.sleep(0.1)
continue
files = sorted(f for f in os.listdir(chunk_dir) if f.endswith(".npz"))
if not files:
time.sleep(0.1)
continue
for fname in files:
fpath = os.path.join(chunk_dir, fname)
try:
npz = np.load(fpath)
samples = npz["data"]
rate = int(npz["rate"])
for text in streamer.process_chunk(samples) :
yield (text, "success", text)
logging.debug(f"[{session_hash_code}] {text}")
### TODO
# text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n"
# yield (text, "success", fname)
os.remove(fpath)
logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}")
except Exception as e:
logging.warning(f"[{session_hash_code}] Error processing {fname}: {e}")
yield (f"Error processing {fname}: {e}", "warning", fname)
continue
time.sleep(0.1)
# TODO
for final_text in streamer.finalize_stream() :
yield (text, "success", final_text)
# if final_text:
# print(final_text, end='', flush=True)
# yield f"\n{final_text}"
##
yield ("DONE", "done", None)
logging.info(f"[{session_hash_code}] task loop ended (flag removed).")
except Exception as e:
logging.error(f"[{session_hash_code}] task error: {e}", exc_info=True)
yield (f"Unexpected error: {e}", "error", None)
finally:
remove_active_task_flag_file(session_hash_code)
try:
if os.path.exists(chunk_dir) and not os.listdir(chunk_dir):
os.rmdir(chunk_dir)
logging.debug(f"[{session_hash_code}] Cleaned up empty chunk dir.")
except Exception as e:
logging.error(f"[{session_hash_code}] Cleanup error: {e}")
yield (f"Cleanup error: {e}", "error", None)
logging.info(f"[{session_hash_code}] Exiting task loop.")
yield ("Task finished and cleaned up.", "done", None)
# --- Decorator compatibility layer ---
if os.environ.get("SPACE_ID", "").startswith("zero-gpu"):
logging.warning("Running on ZeroGPU — gpu_fork_decorator @spaces.GPU")
def gpu_fork_decorator(f): return f
else:
gpu_decorator = spaces.GPU
# --- Audio Stream Function ---
def stop_streaming(session_hash_code: str):
"""Trigger the stop flag for active streaming."""
logging.info(f"[{session_hash_code}] Stop button clicked — sending stop signal.")
remove_active_stream_flag_file(session_hash_code)
remove_active_task_flag_file(session_hash_code)
def start_streaming(session_hash_code: str):
"""Trigger the start flag for active streaming."""
logging.info(f"[{session_hash_code}] Start button clicked — sending start signal.")
active_stream_flag = get_active_stream_flag_file(session_hash_code)
with open(active_stream_flag, "w") as f:
f.write("1")
def is_stop_requested(session_hash_code) -> bool:
"""Check if the stop signal was requested."""
return not os.path.exists(get_active_stream_flag_file(session_hash_code))
def is_active_task(session_hash_code) -> bool:
"""Check if the stop signal was requested."""
return os.path.exists(get_active_task_flag_file(session_hash_code))
def is_active_stream(session_hash_code) -> bool:
"""Check if the stop signal was requested."""
return os.path.exists(get_active_stream_flag_file(session_hash_code))
def raise_error():
"""Raise an error randomly (1 out of 10 times)."""
if random.randint(1, 10) == 1:
raise RuntimeError("Random failure triggered!")
def debug_current_device():
"""Safely logs GPU or CPU information without crashing on stateless GPU."""
logging.debug("=== Debugging current device ===")
try:
if torch.cuda.is_available():
device_name = torch.cuda.get_device_name(0)
memory_allocated = torch.cuda.memory_allocated(0) / (1024 ** 2)
memory_reserved = torch.cuda.memory_reserved(0) / (1024 ** 2)
memory_total = torch.cuda.get_device_properties(0).total_memory / (1024 ** 2)
capability = torch.cuda.get_device_capability(0)
current_device = torch.cuda.current_device()
logging.debug(f"GPU name : {device_name}")
logging.debug(f"Current device ID : {current_device}")
logging.debug(f"CUDA capability : {capability}")
logging.debug(f"Memory allocated : {memory_allocated:.2f} MB")
logging.debug(f"Memory reserved : {memory_reserved:.2f} MB")
logging.debug(f"Total memory : {memory_total:.2f} MB")
else:
logging.debug("No GPU detected, running on CPU")
except RuntimeError as e:
# Handles Hugging Face Spaces “Stateless GPU” restriction
if "CUDA must not be initialized" in str(e):
logging.warning("⚠️ Skipping CUDA info: Stateless GPU environment detected.")
else:
logging.error(f"Unexpected CUDA error: {e}")
def get_current_device():
"""Returns the current device safely."""
try:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device_name = torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU"
torch.tensor([0], dtype=torch.float32, device=device)
if torch.cuda.is_available():
torch.cuda.empty_cache()
logging.debug("GPU cache cleared")
except RuntimeError as e:
if "CUDA must not be initialized" in str(e):
device = torch.device("cpu")
device_name = "CPU (stateless GPU mode)"
# else:
# raise
return device, device_name