Spaces:
Running
on
Zero
Running
on
Zero
| from app.logger_config import logger as logging | |
| from fastrtc.utils import AdditionalOutputs | |
| from pydub import AudioSegment | |
| import asyncio | |
| import os | |
| import time | |
| import numpy as np | |
| import zipfile | |
| import spaces | |
| import hmac | |
| import hashlib | |
| import base64 | |
| import os | |
| import time | |
| import random | |
| import torch | |
| from app.streaming_audio_processor import StreamingAudioProcessor | |
| from app.session_utils import ( | |
| get_active_task_flag_file, | |
| get_active_stream_flag_file, | |
| remove_active_stream_flag_file, | |
| remove_active_task_flag_file, | |
| remove_chunk_folder, | |
| get_session_hashe_chunks_dir | |
| ) | |
| from app.supported_languages import ( | |
| SUPPORTED_LANGS_MAP | |
| ) | |
| from app.canary_speech_engine import CanarySpeechEngine,CanaryConfig | |
| from app.silero_vad_engine import Silero_Vad_Engine | |
| from app.streaming_audio_processor import StreamingAudioProcessor,StreamingAudioProcessorConfig | |
| import nemo.collections.asr as nemo_asr | |
| READ_SIZE=4000 | |
| import gradio as gr | |
| from typing import Generator | |
| from typing import Generator, Tuple, Any, Optional | |
| GradioAudioYield = Tuple[int, np.ndarray] | |
| StreamYield = Generator[Tuple[GradioAudioYield, AdditionalOutputs], None, None] | |
| # -------------------------------------------------------- | |
| # Utility functions | |
| # -------------------------------------------------------- | |
| def generate_coturn_config(): | |
| """ | |
| Génère une configuration Coturn complète avec authentification dynamique (use-auth-secret). | |
| Returns: | |
| dict: Objet coturn_config prêt à être utilisé côté client WebRTC. | |
| """ | |
| secret_key = os.getenv("TURN_SECRET_KEY", "your_secret_key") | |
| ttl = int(os.getenv("TURN_TTL", 3600)) | |
| turn_url = os.getenv("TURN_URL", "turn:*******") | |
| turn_s_url = os.getenv("TURN_S_URL", "turns:*****") | |
| user = os.getenv("TURN_USER", "client") | |
| timestamp = int(time.time()) + ttl | |
| username = f"{timestamp}:{user}" | |
| password = base64.b64encode( | |
| hmac.new(secret_key.encode(), username.encode(), hashlib.sha1).digest() | |
| ).decode() | |
| coturn_config = { | |
| "iceServers": [ | |
| { | |
| "urls": [ | |
| f"{turn_url}", | |
| f"{turn_s_url}", | |
| ], | |
| "username": username, | |
| "credential": password, | |
| } | |
| ] | |
| } | |
| return coturn_config | |
| def read_and_stream_audio(filepath_to_stream: str, session_hash_code: str,read_size:int =8000, sample_rate:int =16000) -> StreamYield: | |
| """ | |
| Read an audio file and stream it chunk by chunk (1s per chunk). | |
| Handles errors safely and reports structured messages to the client. | |
| """ | |
| if not session_hash_code: | |
| yield from handle_stream_error("unknown", "No session_hash_code provided.") | |
| return | |
| if not filepath_to_stream or not os.path.exists(filepath_to_stream): | |
| yield from handle_stream_error(session_hash_code, f"Audio file not found: {filepath_to_stream}") | |
| return | |
| task_active_flag = get_active_task_flag_file(session_hash_code) | |
| try: | |
| segment = AudioSegment.from_file(filepath_to_stream) | |
| chunk_duration_ms = int((read_size/sample_rate)*1000) | |
| total_duration_ms = len(segment) | |
| total_chunks = len(segment) // chunk_duration_ms + 1 | |
| start_streaming(session_hash_code) | |
| logging.info(f"[{session_hash_code}] Starting stream: {filepath_to_stream} ({total_chunks} chunks, {chunk_duration_ms}ms steps).") | |
| chunk_dir = get_session_hashe_chunks_dir(session_hash_code) | |
| for i, start_ms in enumerate(range(0, total_duration_ms, chunk_duration_ms)): | |
| end_ms = min(start_ms + chunk_duration_ms, total_duration_ms) | |
| chunk = segment[start_ms:end_ms] | |
| frame_rate = chunk.frame_rate | |
| samples_int16 = np.array(chunk.get_array_of_samples(), dtype=np.int16) | |
| samples_float = (samples_int16 / 32768.0).astype(np.float32) | |
| # Gestion Mono vs Stéréo pour Gradio | |
| if chunk.channels > 1: | |
| samples_reshaped = samples_float.reshape(-1, chunk.channels) | |
| else: | |
| samples_reshaped = samples_float.reshape(1, -1) | |
| progress = round(((i + 1) / total_chunks) * 100, 2) | |
| # Envoi au client | |
| if is_stop_requested(session_hash_code): | |
| logging.info(f"[{session_hash_code}] Stop signal received.") | |
| samples = np.array(chunk.get_array_of_samples()).reshape(1, -1) | |
| yield ( | |
| (sample_rate, samples_reshaped), | |
| AdditionalOutputs({"stoped": True, "value": "STREAM_STOPPED", "session_hash_code": session_hash_code}) | |
| ) | |
| break | |
| yield ( | |
| (frame_rate, samples_reshaped), | |
| AdditionalOutputs({"progressed": True, "value": progress, "session_hash_code": session_hash_code}) | |
| ) | |
| if is_active_task(session_hash_code): | |
| os.makedirs(chunk_dir, exist_ok=True) | |
| npz_path = os.path.join(chunk_dir, f"chunk_{i:05d}.npz") | |
| # Compression activée, attention c'est lent (CPU intensif) | |
| if is_active_task(session_hash_code): | |
| try : | |
| np.savez_compressed(npz_path, data=samples_int16, rate=frame_rate) | |
| logging.debug(f"[{session_hash_code}] Saved chunk {i} to {npz_path}") | |
| except Exception as e : | |
| logging.error(f"[{session_hash_code}] eroor in Saved chunk {i} to {npz_path}") | |
| time.sleep(chunk_duration_ms/1000) | |
| # raise_error() # Optional injected test exception | |
| logging.info(f"[{session_hash_code}] Streaming completed.") | |
| except Exception as e: | |
| yield from handle_stream_error(session_hash_code, e) | |
| finally: | |
| remove_active_stream_flag_file(session_hash_code) | |
| remove_active_task_flag_file(session_hash_code) | |
| logging.info(f"[{session_hash_code}] Cleanup done.") | |
| def handle_stream_error(session_hash_code: str, error: Exception): | |
| """ | |
| Handle streaming errors: | |
| - Log the error | |
| - Send structured info to client | |
| - Reset stop flag | |
| """ | |
| msg = f"{type(error).__name__}: {str(error)}" | |
| logging.error(f"[{session_hash_code}] Stream Error: {msg}", exc_info=True) | |
| remove_active_stream_flag_file(session_hash_code) | |
| empty_audio = np.zeros((1, 16000), dtype=np.float32) | |
| yield ( | |
| (16000, empty_audio), | |
| AdditionalOutputs({"errored": True, "value": msg, "session_hash_code": session_hash_code}) | |
| ) | |
| # asr_model = nemo_asr.models.ASRModel.from_pretrained("nvidia/canary-1b-v2") | |
| asr_model = None | |
| # @spaces.cache | |
| # def load_model(): | |
| # logging.info("Chargement du modèle ASR/AST de NeMo...") | |
| # # Remplacez par votre logique de chargement de modèle | |
| # model = nemo_asr.models.EncDecRNNTModel.restore_from("path/to/model.nemo") | |
| # logging.info("Modèle chargé.") | |
| # return model | |
| # # Chargez-le une seule fois au démarrage du script | |
| # ASR_MODEL = load_model() | |
| def task_fake(session_hash_code: str, | |
| task_type, lang_source, lang_target, | |
| chunk_secs, left_context_secs, right_context_secs, | |
| streaming_policy, alignatt_thr, waitk_lagging, | |
| exclude_sink_frames, xatt_scores_layer, hallucinations_detector | |
| ): | |
| """Continuously read and delete .npz chunks while task is active.""" | |
| global asr_model | |
| yield ("initializing the CanarySpeechEngine and Silero_Vad_Engine", "info", None) | |
| yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None) | |
| yield (f"Task started for session {session_hash_code}", "info", None) | |
| active_flag = get_active_task_flag_file(session_hash_code) | |
| with open(active_flag, "w") as f: | |
| f.write("1") | |
| chunk_dir = get_session_hashe_chunks_dir(session_hash_code) | |
| logging.info(f"[{session_hash_code}] task started. {chunk_dir}") | |
| try: | |
| logging.info(f"[{session_hash_code}] task loop started.") | |
| yield (f"Task started for session {session_hash_code}", "info", None) | |
| while os.path.exists(active_flag) : | |
| if not os.path.exists(chunk_dir): | |
| logging.warning(f"[{session_hash_code}] No chunk directory found for task.") | |
| yield ("No audio chunks yet... waiting for stream.", "warning", None) | |
| time.sleep(0.1) | |
| continue | |
| # raise_error() | |
| files = sorted(f for f in os.listdir(chunk_dir) if f.endswith(".npz")) | |
| if not files: | |
| time.sleep(0.1) | |
| continue | |
| for fname in files: | |
| fpath = os.path.join(chunk_dir, fname) | |
| try: | |
| npz = np.load(fpath) | |
| samples = npz["data"] | |
| rate = int(npz["rate"]) | |
| text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n" | |
| yield (text, "success", fname) | |
| os.remove(fpath) | |
| logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}") | |
| # raise_error() | |
| except (EOFError, OSError, zipfile.BadZipFile, ValueError) as e: | |
| logging.warning(f"[{session_hash_code}] warning processing {fname}: {e}") | |
| yield (f"EOFError processing {fname}: {e}", "warning", fname) | |
| except Exception as e: | |
| error_type = type(e).__name__ | |
| logging.error(f"[{session_hash_code}] Error processing {fname}: {e} {error_type}") | |
| # yield (f"Error processing {fname}: {e}", "error", fname) | |
| raise Exception(e) | |
| # continue | |
| time.sleep(0.1) | |
| yield ("DONE", "done", None) | |
| logging.info(f"[{session_hash_code}] task loop ended (flag removed).") | |
| except Exception as e: | |
| logging.error(f"[{session_hash_code}] Unexpected task error: {e}", exc_info=True) | |
| yield (f"Unexpected error: {e}", "error", None) | |
| return | |
| finally: | |
| remove_active_task_flag_file(session_hash_code) | |
| remove_chunk_folder(session_hash_code) | |
| logging.info(f"[{session_hash_code}] Exiting task loop.") | |
| # yield ("Task finished and cleaned up.", "done", None) | |
| def task(session_hash_code: str, | |
| task_type, lang_source, lang_target, | |
| chunk_secs, left_context_secs, right_context_secs, | |
| streaming_policy, alignatt_thr, waitk_lagging, | |
| exclude_sink_frames, xatt_scores_layer, hallucinations_detector | |
| ): | |
| """Continuously read and delete .npz chunks while task is active.""" | |
| global asr_model | |
| yield ("initializing the CanarySpeechEngine and Silero_Vad_Engine", "info", None) | |
| conf = CanaryConfig.from_params( | |
| task_type, SUPPORTED_LANGS_MAP.get(lang_source),SUPPORTED_LANGS_MAP.get(lang_target) , | |
| chunk_secs, left_context_secs, right_context_secs, | |
| streaming_policy, alignatt_thr, waitk_lagging, | |
| exclude_sink_frames, xatt_scores_layer, hallucinations_detector | |
| ) | |
| canary_speech_engine = CanarySpeechEngine(asr_model,conf) | |
| silero_vad_engine = Silero_Vad_Engine() | |
| streaming_audio_processor_config = StreamingAudioProcessorConfig( | |
| read_size=READ_SIZE, | |
| silence_threshold_chunks=1 | |
| ) | |
| streamer = StreamingAudioProcessor(speech_engine=canary_speech_engine,vad_engine=silero_vad_engine,cfg=streaming_audio_processor_config) | |
| yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None) | |
| yield (f"Task started for session {session_hash_code}", "info", None) | |
| active_flag = get_active_task_flag_file(session_hash_code) | |
| with open(active_flag, "w") as f: | |
| f.write("1") | |
| chunk_dir = get_session_hashe_chunks_dir(session_hash_code) | |
| logging.info(f"[{session_hash_code}] task started. {chunk_dir}") | |
| try: | |
| logging.info(f"[{session_hash_code}] task loop started.") | |
| yield (f"Task started for session {session_hash_code}", "info", None) | |
| while os.path.exists(active_flag): | |
| if not os.path.exists(chunk_dir): | |
| logging.warning(f"[{session_hash_code}] No chunk directory found for task.") | |
| yield ("No audio chunks yet... waiting for stream.", "warning", None) | |
| time.sleep(0.1) | |
| continue | |
| files = sorted(f for f in os.listdir(chunk_dir) if f.endswith(".npz")) | |
| if not files: | |
| time.sleep(0.1) | |
| continue | |
| for fname in files: | |
| fpath = os.path.join(chunk_dir, fname) | |
| try: | |
| npz = np.load(fpath) | |
| samples = npz["data"] | |
| rate = int(npz["rate"]) | |
| for text in streamer.process_chunk(samples) : | |
| yield (text, "success", text) | |
| logging.debug(f"[{session_hash_code}] {text}") | |
| ### TODO | |
| # text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n" | |
| # yield (text, "success", fname) | |
| os.remove(fpath) | |
| logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}") | |
| except Exception as e: | |
| logging.warning(f"[{session_hash_code}] Error processing {fname}: {e}") | |
| yield (f"Error processing {fname}: {e}", "warning", fname) | |
| continue | |
| time.sleep(0.1) | |
| # TODO | |
| for final_text in streamer.finalize_stream() : | |
| yield (text, "success", final_text) | |
| # if final_text: | |
| # print(final_text, end='', flush=True) | |
| # yield f"\n{final_text}" | |
| ## | |
| yield ("DONE", "done", None) | |
| logging.info(f"[{session_hash_code}] task loop ended (flag removed).") | |
| except Exception as e: | |
| logging.error(f"[{session_hash_code}] task error: {e}", exc_info=True) | |
| yield (f"Unexpected error: {e}", "error", None) | |
| finally: | |
| remove_active_task_flag_file(session_hash_code) | |
| try: | |
| if os.path.exists(chunk_dir) and not os.listdir(chunk_dir): | |
| os.rmdir(chunk_dir) | |
| logging.debug(f"[{session_hash_code}] Cleaned up empty chunk dir.") | |
| except Exception as e: | |
| logging.error(f"[{session_hash_code}] Cleanup error: {e}") | |
| yield (f"Cleanup error: {e}", "error", None) | |
| logging.info(f"[{session_hash_code}] Exiting task loop.") | |
| yield ("Task finished and cleaned up.", "done", None) | |
| # --- Decorator compatibility layer --- | |
| if os.environ.get("SPACE_ID", "").startswith("zero-gpu"): | |
| logging.warning("Running on ZeroGPU — gpu_fork_decorator @spaces.GPU") | |
| def gpu_fork_decorator(f): return f | |
| else: | |
| gpu_decorator = spaces.GPU | |
| # --- Audio Stream Function --- | |
| def stop_streaming(session_hash_code: str): | |
| """Trigger the stop flag for active streaming.""" | |
| logging.info(f"[{session_hash_code}] Stop button clicked — sending stop signal.") | |
| remove_active_stream_flag_file(session_hash_code) | |
| remove_active_task_flag_file(session_hash_code) | |
| def start_streaming(session_hash_code: str): | |
| """Trigger the start flag for active streaming.""" | |
| logging.info(f"[{session_hash_code}] Start button clicked — sending start signal.") | |
| active_stream_flag = get_active_stream_flag_file(session_hash_code) | |
| with open(active_stream_flag, "w") as f: | |
| f.write("1") | |
| def is_stop_requested(session_hash_code) -> bool: | |
| """Check if the stop signal was requested.""" | |
| return not os.path.exists(get_active_stream_flag_file(session_hash_code)) | |
| def is_active_task(session_hash_code) -> bool: | |
| """Check if the stop signal was requested.""" | |
| return os.path.exists(get_active_task_flag_file(session_hash_code)) | |
| def is_active_stream(session_hash_code) -> bool: | |
| """Check if the stop signal was requested.""" | |
| return os.path.exists(get_active_stream_flag_file(session_hash_code)) | |
| def raise_error(): | |
| """Raise an error randomly (1 out of 10 times).""" | |
| if random.randint(1, 10) == 1: | |
| raise RuntimeError("Random failure triggered!") | |
| def debug_current_device(): | |
| """Safely logs GPU or CPU information without crashing on stateless GPU.""" | |
| logging.debug("=== Debugging current device ===") | |
| try: | |
| if torch.cuda.is_available(): | |
| device_name = torch.cuda.get_device_name(0) | |
| memory_allocated = torch.cuda.memory_allocated(0) / (1024 ** 2) | |
| memory_reserved = torch.cuda.memory_reserved(0) / (1024 ** 2) | |
| memory_total = torch.cuda.get_device_properties(0).total_memory / (1024 ** 2) | |
| capability = torch.cuda.get_device_capability(0) | |
| current_device = torch.cuda.current_device() | |
| logging.debug(f"GPU name : {device_name}") | |
| logging.debug(f"Current device ID : {current_device}") | |
| logging.debug(f"CUDA capability : {capability}") | |
| logging.debug(f"Memory allocated : {memory_allocated:.2f} MB") | |
| logging.debug(f"Memory reserved : {memory_reserved:.2f} MB") | |
| logging.debug(f"Total memory : {memory_total:.2f} MB") | |
| else: | |
| logging.debug("No GPU detected, running on CPU") | |
| except RuntimeError as e: | |
| # Handles Hugging Face Spaces “Stateless GPU” restriction | |
| if "CUDA must not be initialized" in str(e): | |
| logging.warning("⚠️ Skipping CUDA info: Stateless GPU environment detected.") | |
| else: | |
| logging.error(f"Unexpected CUDA error: {e}") | |
| def get_current_device(): | |
| """Returns the current device safely.""" | |
| try: | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| device_name = torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU" | |
| torch.tensor([0], dtype=torch.float32, device=device) | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| logging.debug("GPU cache cleared") | |
| except RuntimeError as e: | |
| if "CUDA must not be initialized" in str(e): | |
| device = torch.device("cpu") | |
| device_name = "CPU (stateless GPU mode)" | |
| # else: | |
| # raise | |
| return device, device_name | |