from app.logger_config import logger as logging from fastrtc.utils import AdditionalOutputs from pydub import AudioSegment import asyncio import os import time import numpy as np import zipfile import spaces import hmac import hashlib import base64 import os import time import random import torch from app.streaming_audio_processor import StreamingAudioProcessor from app.session_utils import ( get_active_task_flag_file, get_active_stream_flag_file, remove_active_stream_flag_file, remove_active_task_flag_file, remove_chunk_folder, get_session_hashe_chunks_dir ) from app.supported_languages import ( SUPPORTED_LANGS_MAP ) from app.canary_speech_engine import CanarySpeechEngine,CanaryConfig from app.silero_vad_engine import Silero_Vad_Engine from app.streaming_audio_processor import StreamingAudioProcessor,StreamingAudioProcessorConfig import nemo.collections.asr as nemo_asr READ_SIZE=4000 import gradio as gr from typing import Generator from typing import Generator, Tuple, Any, Optional GradioAudioYield = Tuple[int, np.ndarray] StreamYield = Generator[Tuple[GradioAudioYield, AdditionalOutputs], None, None] # -------------------------------------------------------- # Utility functions # -------------------------------------------------------- def generate_coturn_config(): """ Génère une configuration Coturn complète avec authentification dynamique (use-auth-secret). Returns: dict: Objet coturn_config prêt à être utilisé côté client WebRTC. """ secret_key = os.getenv("TURN_SECRET_KEY", "your_secret_key") ttl = int(os.getenv("TURN_TTL", 3600)) turn_url = os.getenv("TURN_URL", "turn:*******") turn_s_url = os.getenv("TURN_S_URL", "turns:*****") user = os.getenv("TURN_USER", "client") timestamp = int(time.time()) + ttl username = f"{timestamp}:{user}" password = base64.b64encode( hmac.new(secret_key.encode(), username.encode(), hashlib.sha1).digest() ).decode() coturn_config = { "iceServers": [ { "urls": [ f"{turn_url}", f"{turn_s_url}", ], "username": username, "credential": password, } ] } return coturn_config def read_and_stream_audio(filepath_to_stream: str, session_hash_code: str,read_size:int =8000, sample_rate:int =16000) -> StreamYield: """ Read an audio file and stream it chunk by chunk (1s per chunk). Handles errors safely and reports structured messages to the client. """ if not session_hash_code: yield from handle_stream_error("unknown", "No session_hash_code provided.") return if not filepath_to_stream or not os.path.exists(filepath_to_stream): yield from handle_stream_error(session_hash_code, f"Audio file not found: {filepath_to_stream}") return task_active_flag = get_active_task_flag_file(session_hash_code) try: segment = AudioSegment.from_file(filepath_to_stream) chunk_duration_ms = int((read_size/sample_rate)*1000) total_duration_ms = len(segment) total_chunks = len(segment) // chunk_duration_ms + 1 start_streaming(session_hash_code) logging.info(f"[{session_hash_code}] Starting stream: {filepath_to_stream} ({total_chunks} chunks, {chunk_duration_ms}ms steps).") chunk_dir = get_session_hashe_chunks_dir(session_hash_code) for i, start_ms in enumerate(range(0, total_duration_ms, chunk_duration_ms)): end_ms = min(start_ms + chunk_duration_ms, total_duration_ms) chunk = segment[start_ms:end_ms] frame_rate = chunk.frame_rate samples_int16 = np.array(chunk.get_array_of_samples(), dtype=np.int16) samples_float = (samples_int16 / 32768.0).astype(np.float32) # Gestion Mono vs Stéréo pour Gradio if chunk.channels > 1: samples_reshaped = samples_float.reshape(-1, chunk.channels) else: samples_reshaped = samples_float.reshape(1, -1) progress = round(((i + 1) / total_chunks) * 100, 2) # Envoi au client if is_stop_requested(session_hash_code): logging.info(f"[{session_hash_code}] Stop signal received.") samples = np.array(chunk.get_array_of_samples()).reshape(1, -1) yield ( (sample_rate, samples_reshaped), AdditionalOutputs({"stoped": True, "value": "STREAM_STOPPED", "session_hash_code": session_hash_code}) ) break yield ( (frame_rate, samples_reshaped), AdditionalOutputs({"progressed": True, "value": progress, "session_hash_code": session_hash_code}) ) if is_active_task(session_hash_code): os.makedirs(chunk_dir, exist_ok=True) npz_path = os.path.join(chunk_dir, f"chunk_{i:05d}.npz") # Compression activée, attention c'est lent (CPU intensif) if is_active_task(session_hash_code): try : np.savez_compressed(npz_path, data=samples_int16, rate=frame_rate) logging.debug(f"[{session_hash_code}] Saved chunk {i} to {npz_path}") except Exception as e : logging.error(f"[{session_hash_code}] eroor in Saved chunk {i} to {npz_path}") time.sleep(chunk_duration_ms/1000) # raise_error() # Optional injected test exception logging.info(f"[{session_hash_code}] Streaming completed.") except Exception as e: yield from handle_stream_error(session_hash_code, e) finally: remove_active_stream_flag_file(session_hash_code) remove_active_task_flag_file(session_hash_code) logging.info(f"[{session_hash_code}] Cleanup done.") def handle_stream_error(session_hash_code: str, error: Exception): """ Handle streaming errors: - Log the error - Send structured info to client - Reset stop flag """ msg = f"{type(error).__name__}: {str(error)}" logging.error(f"[{session_hash_code}] Stream Error: {msg}", exc_info=True) remove_active_stream_flag_file(session_hash_code) empty_audio = np.zeros((1, 16000), dtype=np.float32) yield ( (16000, empty_audio), AdditionalOutputs({"errored": True, "value": msg, "session_hash_code": session_hash_code}) ) # asr_model = nemo_asr.models.ASRModel.from_pretrained("nvidia/canary-1b-v2") asr_model = None # @spaces.cache # def load_model(): # logging.info("Chargement du modèle ASR/AST de NeMo...") # # Remplacez par votre logique de chargement de modèle # model = nemo_asr.models.EncDecRNNTModel.restore_from("path/to/model.nemo") # logging.info("Modèle chargé.") # return model # # Chargez-le une seule fois au démarrage du script # ASR_MODEL = load_model() @spaces.GPU(duration=10) def task_fake(session_hash_code: str, task_type, lang_source, lang_target, chunk_secs, left_context_secs, right_context_secs, streaming_policy, alignatt_thr, waitk_lagging, exclude_sink_frames, xatt_scores_layer, hallucinations_detector ): """Continuously read and delete .npz chunks while task is active.""" global asr_model yield ("initializing the CanarySpeechEngine and Silero_Vad_Engine", "info", None) yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None) yield (f"Task started for session {session_hash_code}", "info", None) active_flag = get_active_task_flag_file(session_hash_code) with open(active_flag, "w") as f: f.write("1") chunk_dir = get_session_hashe_chunks_dir(session_hash_code) logging.info(f"[{session_hash_code}] task started. {chunk_dir}") try: logging.info(f"[{session_hash_code}] task loop started.") yield (f"Task started for session {session_hash_code}", "info", None) while os.path.exists(active_flag) : if not os.path.exists(chunk_dir): logging.warning(f"[{session_hash_code}] No chunk directory found for task.") yield ("No audio chunks yet... waiting for stream.", "warning", None) time.sleep(0.1) continue # raise_error() files = sorted(f for f in os.listdir(chunk_dir) if f.endswith(".npz")) if not files: time.sleep(0.1) continue for fname in files: fpath = os.path.join(chunk_dir, fname) try: npz = np.load(fpath) samples = npz["data"] rate = int(npz["rate"]) text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n" yield (text, "success", fname) os.remove(fpath) logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}") # raise_error() except (EOFError, OSError, zipfile.BadZipFile, ValueError) as e: logging.warning(f"[{session_hash_code}] warning processing {fname}: {e}") yield (f"EOFError processing {fname}: {e}", "warning", fname) except Exception as e: error_type = type(e).__name__ logging.error(f"[{session_hash_code}] Error processing {fname}: {e} {error_type}") # yield (f"Error processing {fname}: {e}", "error", fname) raise Exception(e) # continue time.sleep(0.1) yield ("DONE", "done", None) logging.info(f"[{session_hash_code}] task loop ended (flag removed).") except Exception as e: logging.error(f"[{session_hash_code}] Unexpected task error: {e}", exc_info=True) yield (f"Unexpected error: {e}", "error", None) return finally: remove_active_task_flag_file(session_hash_code) remove_chunk_folder(session_hash_code) logging.info(f"[{session_hash_code}] Exiting task loop.") # yield ("Task finished and cleaned up.", "done", None) @spaces.GPU def task(session_hash_code: str, task_type, lang_source, lang_target, chunk_secs, left_context_secs, right_context_secs, streaming_policy, alignatt_thr, waitk_lagging, exclude_sink_frames, xatt_scores_layer, hallucinations_detector ): """Continuously read and delete .npz chunks while task is active.""" global asr_model yield ("initializing the CanarySpeechEngine and Silero_Vad_Engine", "info", None) conf = CanaryConfig.from_params( task_type, SUPPORTED_LANGS_MAP.get(lang_source),SUPPORTED_LANGS_MAP.get(lang_target) , chunk_secs, left_context_secs, right_context_secs, streaming_policy, alignatt_thr, waitk_lagging, exclude_sink_frames, xatt_scores_layer, hallucinations_detector ) canary_speech_engine = CanarySpeechEngine(asr_model,conf) silero_vad_engine = Silero_Vad_Engine() streaming_audio_processor_config = StreamingAudioProcessorConfig( read_size=READ_SIZE, silence_threshold_chunks=1 ) streamer = StreamingAudioProcessor(speech_engine=canary_speech_engine,vad_engine=silero_vad_engine,cfg=streaming_audio_processor_config) yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None) yield (f"Task started for session {session_hash_code}", "info", None) active_flag = get_active_task_flag_file(session_hash_code) with open(active_flag, "w") as f: f.write("1") chunk_dir = get_session_hashe_chunks_dir(session_hash_code) logging.info(f"[{session_hash_code}] task started. {chunk_dir}") try: logging.info(f"[{session_hash_code}] task loop started.") yield (f"Task started for session {session_hash_code}", "info", None) while os.path.exists(active_flag): if not os.path.exists(chunk_dir): logging.warning(f"[{session_hash_code}] No chunk directory found for task.") yield ("No audio chunks yet... waiting for stream.", "warning", None) time.sleep(0.1) continue files = sorted(f for f in os.listdir(chunk_dir) if f.endswith(".npz")) if not files: time.sleep(0.1) continue for fname in files: fpath = os.path.join(chunk_dir, fname) try: npz = np.load(fpath) samples = npz["data"] rate = int(npz["rate"]) for text in streamer.process_chunk(samples) : yield (text, "success", text) logging.debug(f"[{session_hash_code}] {text}") ### TODO # text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n" # yield (text, "success", fname) os.remove(fpath) logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}") except Exception as e: logging.warning(f"[{session_hash_code}] Error processing {fname}: {e}") yield (f"Error processing {fname}: {e}", "warning", fname) continue time.sleep(0.1) # TODO for final_text in streamer.finalize_stream() : yield (text, "success", final_text) # if final_text: # print(final_text, end='', flush=True) # yield f"\n{final_text}" ## yield ("DONE", "done", None) logging.info(f"[{session_hash_code}] task loop ended (flag removed).") except Exception as e: logging.error(f"[{session_hash_code}] task error: {e}", exc_info=True) yield (f"Unexpected error: {e}", "error", None) finally: remove_active_task_flag_file(session_hash_code) try: if os.path.exists(chunk_dir) and not os.listdir(chunk_dir): os.rmdir(chunk_dir) logging.debug(f"[{session_hash_code}] Cleaned up empty chunk dir.") except Exception as e: logging.error(f"[{session_hash_code}] Cleanup error: {e}") yield (f"Cleanup error: {e}", "error", None) logging.info(f"[{session_hash_code}] Exiting task loop.") yield ("Task finished and cleaned up.", "done", None) # --- Decorator compatibility layer --- if os.environ.get("SPACE_ID", "").startswith("zero-gpu"): logging.warning("Running on ZeroGPU — gpu_fork_decorator @spaces.GPU") def gpu_fork_decorator(f): return f else: gpu_decorator = spaces.GPU # --- Audio Stream Function --- def stop_streaming(session_hash_code: str): """Trigger the stop flag for active streaming.""" logging.info(f"[{session_hash_code}] Stop button clicked — sending stop signal.") remove_active_stream_flag_file(session_hash_code) remove_active_task_flag_file(session_hash_code) def start_streaming(session_hash_code: str): """Trigger the start flag for active streaming.""" logging.info(f"[{session_hash_code}] Start button clicked — sending start signal.") active_stream_flag = get_active_stream_flag_file(session_hash_code) with open(active_stream_flag, "w") as f: f.write("1") def is_stop_requested(session_hash_code) -> bool: """Check if the stop signal was requested.""" return not os.path.exists(get_active_stream_flag_file(session_hash_code)) def is_active_task(session_hash_code) -> bool: """Check if the stop signal was requested.""" return os.path.exists(get_active_task_flag_file(session_hash_code)) def is_active_stream(session_hash_code) -> bool: """Check if the stop signal was requested.""" return os.path.exists(get_active_stream_flag_file(session_hash_code)) def raise_error(): """Raise an error randomly (1 out of 10 times).""" if random.randint(1, 10) == 1: raise RuntimeError("Random failure triggered!") def debug_current_device(): """Safely logs GPU or CPU information without crashing on stateless GPU.""" logging.debug("=== Debugging current device ===") try: if torch.cuda.is_available(): device_name = torch.cuda.get_device_name(0) memory_allocated = torch.cuda.memory_allocated(0) / (1024 ** 2) memory_reserved = torch.cuda.memory_reserved(0) / (1024 ** 2) memory_total = torch.cuda.get_device_properties(0).total_memory / (1024 ** 2) capability = torch.cuda.get_device_capability(0) current_device = torch.cuda.current_device() logging.debug(f"GPU name : {device_name}") logging.debug(f"Current device ID : {current_device}") logging.debug(f"CUDA capability : {capability}") logging.debug(f"Memory allocated : {memory_allocated:.2f} MB") logging.debug(f"Memory reserved : {memory_reserved:.2f} MB") logging.debug(f"Total memory : {memory_total:.2f} MB") else: logging.debug("No GPU detected, running on CPU") except RuntimeError as e: # Handles Hugging Face Spaces “Stateless GPU” restriction if "CUDA must not be initialized" in str(e): logging.warning("⚠️ Skipping CUDA info: Stateless GPU environment detected.") else: logging.error(f"Unexpected CUDA error: {e}") def get_current_device(): """Returns the current device safely.""" try: device = torch.device("cuda" if torch.cuda.is_available() else "cpu") device_name = torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU" torch.tensor([0], dtype=torch.float32, device=device) if torch.cuda.is_available(): torch.cuda.empty_cache() logging.debug("GPU cache cleared") except RuntimeError as e: if "CUDA must not be initialized" in str(e): device = torch.device("cpu") device_name = "CPU (stateless GPU mode)" # else: # raise return device, device_name