Spaces:
Running
on
Zero
Running
on
Zero
File size: 18,695 Bytes
c53e4c1 87f1f7d d296c7b 87f1f7d 6f523af 11c4a5a 87f1f7d 935d736 0c03412 799a0f6 87f1f7d fc64c8b 8417fa3 4f560c0 87f1f7d 4f560c0 87f1f7d 935d736 87f1f7d 935d736 87f1f7d 935d736 87f1f7d 11c4a5a 4f560c0 87f1f7d 935d736 4f560c0 87f1f7d 4f560c0 935d736 87f1f7d 4f560c0 87f1f7d 4f560c0 87f1f7d 4f560c0 fc64c8b 4f560c0 87f1f7d 4f560c0 d296c7b 4f560c0 87f1f7d 4f560c0 d296c7b 87f1f7d 4f560c0 87f1f7d d296c7b 4f560c0 87f1f7d 935d736 87f1f7d 935d736 d296c7b 4f560c0 87f1f7d 4f560c0 87f1f7d fc64c8b 8417fa3 4f560c0 0c03412 935d736 8417fa3 935d736 8417fa3 935d736 8417fa3 799a0f6 935d736 8417fa3 935d736 8417fa3 d296c7b 8417fa3 935d736 8417fa3 d296c7b 8417fa3 935d736 d296c7b 4f560c0 d296c7b 4f560c0 8417fa3 d296c7b 4f560c0 8417fa3 935d736 8417fa3 d296c7b 8417fa3 d296c7b 8417fa3 0c03412 935d736 d296c7b 8417fa3 0c03412 935d736 8417fa3 935d736 8417fa3 935d736 8417fa3 799a0f6 935d736 8417fa3 935d736 8417fa3 935d736 8417fa3 935d736 8417fa3 935d736 8417fa3 935d736 8417fa3 935d736 8417fa3 799a0f6 8417fa3 935d736 8417fa3 935d736 8417fa3 0c03412 8417fa3 935d736 8417fa3 935d736 8417fa3 935d736 8417fa3 87f1f7d 11c4a5a 87f1f7d 11c4a5a 87f1f7d 935d736 87f1f7d 935d736 fc64c8b 935d736 fc64c8b 87f1f7d d296c7b 87f1f7d 799a0f6 87f1f7d c53e4c1 f2d7035 c53e4c1 f2d7035 c53e4c1 f2d7035 aaaa3df f2d7035 41380c4 75c9c9a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 |
from app.logger_config import logger as logging
from fastrtc.utils import AdditionalOutputs
from pydub import AudioSegment
import asyncio
import os
import time
import numpy as np
import zipfile
import spaces
import hmac
import hashlib
import base64
import os
import time
import random
import torch
from app.streaming_audio_processor import StreamingAudioProcessor
from app.session_utils import (
get_active_task_flag_file,
get_active_stream_flag_file,
remove_active_stream_flag_file,
remove_active_task_flag_file,
remove_chunk_folder,
get_session_hashe_chunks_dir
)
from app.supported_languages import (
SUPPORTED_LANGS_MAP
)
from app.canary_speech_engine import CanarySpeechEngine,CanaryConfig
from app.silero_vad_engine import Silero_Vad_Engine
from app.streaming_audio_processor import StreamingAudioProcessor,StreamingAudioProcessorConfig
import nemo.collections.asr as nemo_asr
READ_SIZE=4000
import gradio as gr
from typing import Generator
from typing import Generator, Tuple, Any, Optional
GradioAudioYield = Tuple[int, np.ndarray]
StreamYield = Generator[Tuple[GradioAudioYield, AdditionalOutputs], None, None]
# --------------------------------------------------------
# Utility functions
# --------------------------------------------------------
def generate_coturn_config():
"""
Génère une configuration Coturn complète avec authentification dynamique (use-auth-secret).
Returns:
dict: Objet coturn_config prêt à être utilisé côté client WebRTC.
"""
secret_key = os.getenv("TURN_SECRET_KEY", "your_secret_key")
ttl = int(os.getenv("TURN_TTL", 3600))
turn_url = os.getenv("TURN_URL", "turn:*******")
turn_s_url = os.getenv("TURN_S_URL", "turns:*****")
user = os.getenv("TURN_USER", "client")
timestamp = int(time.time()) + ttl
username = f"{timestamp}:{user}"
password = base64.b64encode(
hmac.new(secret_key.encode(), username.encode(), hashlib.sha1).digest()
).decode()
coturn_config = {
"iceServers": [
{
"urls": [
f"{turn_url}",
f"{turn_s_url}",
],
"username": username,
"credential": password,
}
]
}
return coturn_config
def read_and_stream_audio(filepath_to_stream: str, session_hash_code: str,read_size:int =8000, sample_rate:int =16000) -> StreamYield:
"""
Read an audio file and stream it chunk by chunk (1s per chunk).
Handles errors safely and reports structured messages to the client.
"""
if not session_hash_code:
yield from handle_stream_error("unknown", "No session_hash_code provided.")
return
if not filepath_to_stream or not os.path.exists(filepath_to_stream):
yield from handle_stream_error(session_hash_code, f"Audio file not found: {filepath_to_stream}")
return
task_active_flag = get_active_task_flag_file(session_hash_code)
try:
segment = AudioSegment.from_file(filepath_to_stream)
chunk_duration_ms = int((read_size/sample_rate)*1000)
total_duration_ms = len(segment)
total_chunks = len(segment) // chunk_duration_ms + 1
start_streaming(session_hash_code)
logging.info(f"[{session_hash_code}] Starting stream: {filepath_to_stream} ({total_chunks} chunks, {chunk_duration_ms}ms steps).")
chunk_dir = get_session_hashe_chunks_dir(session_hash_code)
for i, start_ms in enumerate(range(0, total_duration_ms, chunk_duration_ms)):
end_ms = min(start_ms + chunk_duration_ms, total_duration_ms)
chunk = segment[start_ms:end_ms]
frame_rate = chunk.frame_rate
samples_int16 = np.array(chunk.get_array_of_samples(), dtype=np.int16)
samples_float = (samples_int16 / 32768.0).astype(np.float32)
# Gestion Mono vs Stéréo pour Gradio
if chunk.channels > 1:
samples_reshaped = samples_float.reshape(-1, chunk.channels)
else:
samples_reshaped = samples_float.reshape(1, -1)
progress = round(((i + 1) / total_chunks) * 100, 2)
# Envoi au client
if is_stop_requested(session_hash_code):
logging.info(f"[{session_hash_code}] Stop signal received.")
samples = np.array(chunk.get_array_of_samples()).reshape(1, -1)
yield (
(sample_rate, samples_reshaped),
AdditionalOutputs({"stoped": True, "value": "STREAM_STOPPED", "session_hash_code": session_hash_code})
)
break
yield (
(frame_rate, samples_reshaped),
AdditionalOutputs({"progressed": True, "value": progress, "session_hash_code": session_hash_code})
)
if is_active_task(session_hash_code):
os.makedirs(chunk_dir, exist_ok=True)
npz_path = os.path.join(chunk_dir, f"chunk_{i:05d}.npz")
# Compression activée, attention c'est lent (CPU intensif)
if is_active_task(session_hash_code):
try :
np.savez_compressed(npz_path, data=samples_int16, rate=frame_rate)
logging.debug(f"[{session_hash_code}] Saved chunk {i} to {npz_path}")
except Exception as e :
logging.error(f"[{session_hash_code}] eroor in Saved chunk {i} to {npz_path}")
time.sleep(chunk_duration_ms/1000)
# raise_error() # Optional injected test exception
logging.info(f"[{session_hash_code}] Streaming completed.")
except Exception as e:
yield from handle_stream_error(session_hash_code, e)
finally:
remove_active_stream_flag_file(session_hash_code)
remove_active_task_flag_file(session_hash_code)
logging.info(f"[{session_hash_code}] Cleanup done.")
def handle_stream_error(session_hash_code: str, error: Exception):
"""
Handle streaming errors:
- Log the error
- Send structured info to client
- Reset stop flag
"""
msg = f"{type(error).__name__}: {str(error)}"
logging.error(f"[{session_hash_code}] Stream Error: {msg}", exc_info=True)
remove_active_stream_flag_file(session_hash_code)
empty_audio = np.zeros((1, 16000), dtype=np.float32)
yield (
(16000, empty_audio),
AdditionalOutputs({"errored": True, "value": msg, "session_hash_code": session_hash_code})
)
# asr_model = nemo_asr.models.ASRModel.from_pretrained("nvidia/canary-1b-v2")
asr_model = None
# @spaces.cache
# def load_model():
# logging.info("Chargement du modèle ASR/AST de NeMo...")
# # Remplacez par votre logique de chargement de modèle
# model = nemo_asr.models.EncDecRNNTModel.restore_from("path/to/model.nemo")
# logging.info("Modèle chargé.")
# return model
# # Chargez-le une seule fois au démarrage du script
# ASR_MODEL = load_model()
@spaces.GPU(duration=10)
def task_fake(session_hash_code: str,
task_type, lang_source, lang_target,
chunk_secs, left_context_secs, right_context_secs,
streaming_policy, alignatt_thr, waitk_lagging,
exclude_sink_frames, xatt_scores_layer, hallucinations_detector
):
"""Continuously read and delete .npz chunks while task is active."""
global asr_model
yield ("initializing the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
yield (f"Task started for session {session_hash_code}", "info", None)
active_flag = get_active_task_flag_file(session_hash_code)
with open(active_flag, "w") as f:
f.write("1")
chunk_dir = get_session_hashe_chunks_dir(session_hash_code)
logging.info(f"[{session_hash_code}] task started. {chunk_dir}")
try:
logging.info(f"[{session_hash_code}] task loop started.")
yield (f"Task started for session {session_hash_code}", "info", None)
while os.path.exists(active_flag) :
if not os.path.exists(chunk_dir):
logging.warning(f"[{session_hash_code}] No chunk directory found for task.")
yield ("No audio chunks yet... waiting for stream.", "warning", None)
time.sleep(0.1)
continue
# raise_error()
files = sorted(f for f in os.listdir(chunk_dir) if f.endswith(".npz"))
if not files:
time.sleep(0.1)
continue
for fname in files:
fpath = os.path.join(chunk_dir, fname)
try:
npz = np.load(fpath)
samples = npz["data"]
rate = int(npz["rate"])
text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n"
yield (text, "success", fname)
os.remove(fpath)
logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}")
# raise_error()
except (EOFError, OSError, zipfile.BadZipFile, ValueError) as e:
logging.warning(f"[{session_hash_code}] warning processing {fname}: {e}")
yield (f"EOFError processing {fname}: {e}", "warning", fname)
except Exception as e:
error_type = type(e).__name__
logging.error(f"[{session_hash_code}] Error processing {fname}: {e} {error_type}")
# yield (f"Error processing {fname}: {e}", "error", fname)
raise Exception(e)
# continue
time.sleep(0.1)
yield ("DONE", "done", None)
logging.info(f"[{session_hash_code}] task loop ended (flag removed).")
except Exception as e:
logging.error(f"[{session_hash_code}] Unexpected task error: {e}", exc_info=True)
yield (f"Unexpected error: {e}", "error", None)
return
finally:
remove_active_task_flag_file(session_hash_code)
remove_chunk_folder(session_hash_code)
logging.info(f"[{session_hash_code}] Exiting task loop.")
# yield ("Task finished and cleaned up.", "done", None)
@spaces.GPU
def task(session_hash_code: str,
task_type, lang_source, lang_target,
chunk_secs, left_context_secs, right_context_secs,
streaming_policy, alignatt_thr, waitk_lagging,
exclude_sink_frames, xatt_scores_layer, hallucinations_detector
):
"""Continuously read and delete .npz chunks while task is active."""
global asr_model
yield ("initializing the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
conf = CanaryConfig.from_params(
task_type, SUPPORTED_LANGS_MAP.get(lang_source),SUPPORTED_LANGS_MAP.get(lang_target) ,
chunk_secs, left_context_secs, right_context_secs,
streaming_policy, alignatt_thr, waitk_lagging,
exclude_sink_frames, xatt_scores_layer, hallucinations_detector
)
canary_speech_engine = CanarySpeechEngine(asr_model,conf)
silero_vad_engine = Silero_Vad_Engine()
streaming_audio_processor_config = StreamingAudioProcessorConfig(
read_size=READ_SIZE,
silence_threshold_chunks=1
)
streamer = StreamingAudioProcessor(speech_engine=canary_speech_engine,vad_engine=silero_vad_engine,cfg=streaming_audio_processor_config)
yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
yield (f"Task started for session {session_hash_code}", "info", None)
active_flag = get_active_task_flag_file(session_hash_code)
with open(active_flag, "w") as f:
f.write("1")
chunk_dir = get_session_hashe_chunks_dir(session_hash_code)
logging.info(f"[{session_hash_code}] task started. {chunk_dir}")
try:
logging.info(f"[{session_hash_code}] task loop started.")
yield (f"Task started for session {session_hash_code}", "info", None)
while os.path.exists(active_flag):
if not os.path.exists(chunk_dir):
logging.warning(f"[{session_hash_code}] No chunk directory found for task.")
yield ("No audio chunks yet... waiting for stream.", "warning", None)
time.sleep(0.1)
continue
files = sorted(f for f in os.listdir(chunk_dir) if f.endswith(".npz"))
if not files:
time.sleep(0.1)
continue
for fname in files:
fpath = os.path.join(chunk_dir, fname)
try:
npz = np.load(fpath)
samples = npz["data"]
rate = int(npz["rate"])
for text in streamer.process_chunk(samples) :
yield (text, "success", text)
logging.debug(f"[{session_hash_code}] {text}")
### TODO
# text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n"
# yield (text, "success", fname)
os.remove(fpath)
logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}")
except Exception as e:
logging.warning(f"[{session_hash_code}] Error processing {fname}: {e}")
yield (f"Error processing {fname}: {e}", "warning", fname)
continue
time.sleep(0.1)
# TODO
for final_text in streamer.finalize_stream() :
yield (text, "success", final_text)
# if final_text:
# print(final_text, end='', flush=True)
# yield f"\n{final_text}"
##
yield ("DONE", "done", None)
logging.info(f"[{session_hash_code}] task loop ended (flag removed).")
except Exception as e:
logging.error(f"[{session_hash_code}] task error: {e}", exc_info=True)
yield (f"Unexpected error: {e}", "error", None)
finally:
remove_active_task_flag_file(session_hash_code)
try:
if os.path.exists(chunk_dir) and not os.listdir(chunk_dir):
os.rmdir(chunk_dir)
logging.debug(f"[{session_hash_code}] Cleaned up empty chunk dir.")
except Exception as e:
logging.error(f"[{session_hash_code}] Cleanup error: {e}")
yield (f"Cleanup error: {e}", "error", None)
logging.info(f"[{session_hash_code}] Exiting task loop.")
yield ("Task finished and cleaned up.", "done", None)
# --- Decorator compatibility layer ---
if os.environ.get("SPACE_ID", "").startswith("zero-gpu"):
logging.warning("Running on ZeroGPU — gpu_fork_decorator @spaces.GPU")
def gpu_fork_decorator(f): return f
else:
gpu_decorator = spaces.GPU
# --- Audio Stream Function ---
def stop_streaming(session_hash_code: str):
"""Trigger the stop flag for active streaming."""
logging.info(f"[{session_hash_code}] Stop button clicked — sending stop signal.")
remove_active_stream_flag_file(session_hash_code)
remove_active_task_flag_file(session_hash_code)
def start_streaming(session_hash_code: str):
"""Trigger the start flag for active streaming."""
logging.info(f"[{session_hash_code}] Start button clicked — sending start signal.")
active_stream_flag = get_active_stream_flag_file(session_hash_code)
with open(active_stream_flag, "w") as f:
f.write("1")
def is_stop_requested(session_hash_code) -> bool:
"""Check if the stop signal was requested."""
return not os.path.exists(get_active_stream_flag_file(session_hash_code))
def is_active_task(session_hash_code) -> bool:
"""Check if the stop signal was requested."""
return os.path.exists(get_active_task_flag_file(session_hash_code))
def is_active_stream(session_hash_code) -> bool:
"""Check if the stop signal was requested."""
return os.path.exists(get_active_stream_flag_file(session_hash_code))
def raise_error():
"""Raise an error randomly (1 out of 10 times)."""
if random.randint(1, 10) == 1:
raise RuntimeError("Random failure triggered!")
def debug_current_device():
"""Safely logs GPU or CPU information without crashing on stateless GPU."""
logging.debug("=== Debugging current device ===")
try:
if torch.cuda.is_available():
device_name = torch.cuda.get_device_name(0)
memory_allocated = torch.cuda.memory_allocated(0) / (1024 ** 2)
memory_reserved = torch.cuda.memory_reserved(0) / (1024 ** 2)
memory_total = torch.cuda.get_device_properties(0).total_memory / (1024 ** 2)
capability = torch.cuda.get_device_capability(0)
current_device = torch.cuda.current_device()
logging.debug(f"GPU name : {device_name}")
logging.debug(f"Current device ID : {current_device}")
logging.debug(f"CUDA capability : {capability}")
logging.debug(f"Memory allocated : {memory_allocated:.2f} MB")
logging.debug(f"Memory reserved : {memory_reserved:.2f} MB")
logging.debug(f"Total memory : {memory_total:.2f} MB")
else:
logging.debug("No GPU detected, running on CPU")
except RuntimeError as e:
# Handles Hugging Face Spaces “Stateless GPU” restriction
if "CUDA must not be initialized" in str(e):
logging.warning("⚠️ Skipping CUDA info: Stateless GPU environment detected.")
else:
logging.error(f"Unexpected CUDA error: {e}")
def get_current_device():
"""Returns the current device safely."""
try:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device_name = torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU"
torch.tensor([0], dtype=torch.float32, device=device)
if torch.cuda.is_available():
torch.cuda.empty_cache()
logging.debug("GPU cache cleared")
except RuntimeError as e:
if "CUDA must not be initialized" in str(e):
device = torch.device("cpu")
device_name = "CPU (stateless GPU mode)"
# else:
# raise
return device, device_name
|