File size: 18,695 Bytes
c53e4c1
87f1f7d
 
 
 
 
 
d296c7b
87f1f7d
 
 
 
 
 
6f523af
11c4a5a
 
87f1f7d
 
935d736
 
 
0c03412
799a0f6
87f1f7d
fc64c8b
8417fa3
 
 
 
 
 
 
4f560c0
 
 
 
 
87f1f7d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4f560c0
87f1f7d
 
 
 
935d736
 
87f1f7d
 
 
935d736
87f1f7d
935d736
87f1f7d
 
11c4a5a
4f560c0
87f1f7d
935d736
4f560c0
87f1f7d
4f560c0
 
935d736
87f1f7d
4f560c0
 
 
 
87f1f7d
4f560c0
 
 
 
 
 
 
 
 
87f1f7d
4f560c0
 
fc64c8b
4f560c0
 
 
 
 
 
87f1f7d
4f560c0
 
 
 
 
d296c7b
4f560c0
87f1f7d
4f560c0
 
d296c7b
 
 
 
 
87f1f7d
4f560c0
87f1f7d
d296c7b
4f560c0
 
87f1f7d
 
935d736
87f1f7d
935d736
d296c7b
4f560c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87f1f7d
4f560c0
 
 
 
87f1f7d
 
fc64c8b
 
8417fa3
4f560c0
 
 
 
 
 
 
 
 
 
 
 
0c03412
935d736
8417fa3
 
 
 
 
 
 
 
 
935d736
8417fa3
935d736
8417fa3
 
799a0f6
935d736
8417fa3
 
935d736
 
8417fa3
d296c7b
8417fa3
935d736
8417fa3
 
 
d296c7b
8417fa3
 
 
 
 
 
 
 
 
 
 
 
 
 
935d736
d296c7b
4f560c0
d296c7b
 
4f560c0
8417fa3
d296c7b
 
 
 
4f560c0
8417fa3
 
 
935d736
8417fa3
 
d296c7b
8417fa3
d296c7b
8417fa3
 
0c03412
 
935d736
d296c7b
8417fa3
 
0c03412
935d736
 
8417fa3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
935d736
8417fa3
935d736
8417fa3
 
799a0f6
935d736
8417fa3
 
935d736
 
8417fa3
 
 
935d736
8417fa3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
935d736
8417fa3
935d736
8417fa3
 
 
 
935d736
8417fa3
935d736
8417fa3
 
 
 
 
799a0f6
 
8417fa3
 
 
 
 
935d736
8417fa3
 
935d736
8417fa3
 
 
0c03412
8417fa3
 
 
 
935d736
8417fa3
935d736
8417fa3
 
935d736
8417fa3
 
 
87f1f7d
 
 
 
11c4a5a
 
87f1f7d
 
 
11c4a5a
87f1f7d
 
 
 
 
 
935d736
87f1f7d
935d736
 
 
 
 
 
 
 
 
 
 
fc64c8b
935d736
 
 
fc64c8b
 
 
87f1f7d
d296c7b
 
 
 
87f1f7d
799a0f6
87f1f7d
 
 
 
 
 
c53e4c1
f2d7035
c53e4c1
 
f2d7035
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c53e4c1
f2d7035
 
 
 
aaaa3df
 
 
 
f2d7035
 
 
 
41380c4
 
75c9c9a
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
from app.logger_config import logger as logging
from fastrtc.utils import AdditionalOutputs
from pydub import AudioSegment
import asyncio
import os
import time
import numpy as np
import zipfile
import spaces
import hmac
import hashlib
import base64
import os
import time
import random
import torch
from app.streaming_audio_processor import StreamingAudioProcessor
from app.session_utils import (
    get_active_task_flag_file,
    get_active_stream_flag_file,
    remove_active_stream_flag_file,
    remove_active_task_flag_file,
    remove_chunk_folder,
    get_session_hashe_chunks_dir
)
from app.supported_languages import (
    SUPPORTED_LANGS_MAP
)
from app.canary_speech_engine import CanarySpeechEngine,CanaryConfig
from app.silero_vad_engine import Silero_Vad_Engine
from app.streaming_audio_processor import StreamingAudioProcessor,StreamingAudioProcessorConfig
import nemo.collections.asr as nemo_asr
READ_SIZE=4000
import gradio as gr
from typing import Generator
from typing import Generator, Tuple, Any, Optional
GradioAudioYield = Tuple[int, np.ndarray]
StreamYield = Generator[Tuple[GradioAudioYield, AdditionalOutputs], None, None]

# --------------------------------------------------------
# Utility functions
# --------------------------------------------------------
def generate_coturn_config():
    """
    Génère une configuration Coturn complète avec authentification dynamique (use-auth-secret).
    Returns:
        dict: Objet coturn_config prêt à être utilisé côté client WebRTC.
    """

    secret_key = os.getenv("TURN_SECRET_KEY", "your_secret_key")
    ttl = int(os.getenv("TURN_TTL", 3600))
    turn_url = os.getenv("TURN_URL", "turn:*******")
    turn_s_url = os.getenv("TURN_S_URL", "turns:*****")
    user = os.getenv("TURN_USER", "client")

    timestamp = int(time.time()) + ttl
    username = f"{timestamp}:{user}"
    password = base64.b64encode(
        hmac.new(secret_key.encode(), username.encode(), hashlib.sha1).digest()
    ).decode()
 
    coturn_config = {
        "iceServers": [
            {
                "urls": [
                    f"{turn_url}",
                    f"{turn_s_url}",
                ],
                "username": username,
                "credential": password,
            }
        ]
    }
    return coturn_config



def read_and_stream_audio(filepath_to_stream: str, session_hash_code: str,read_size:int =8000, sample_rate:int =16000)  -> StreamYield:
    """
    Read an audio file and stream it chunk by chunk (1s per chunk).
    Handles errors safely and reports structured messages to the client.
    """
    if not session_hash_code:
        yield from handle_stream_error("unknown", "No session_hash_code provided.")
        return

    if not filepath_to_stream or not os.path.exists(filepath_to_stream):
        yield from handle_stream_error(session_hash_code, f"Audio file not found: {filepath_to_stream}")
        return
    task_active_flag = get_active_task_flag_file(session_hash_code)
    try:
        segment = AudioSegment.from_file(filepath_to_stream)
        chunk_duration_ms = int((read_size/sample_rate)*1000)
        total_duration_ms = len(segment)
        total_chunks = len(segment) // chunk_duration_ms + 1
        start_streaming(session_hash_code)
        logging.info(f"[{session_hash_code}] Starting stream: {filepath_to_stream} ({total_chunks} chunks, {chunk_duration_ms}ms steps).")

        chunk_dir = get_session_hashe_chunks_dir(session_hash_code)
        for i, start_ms in enumerate(range(0, total_duration_ms, chunk_duration_ms)):
            

     
            
            end_ms = min(start_ms + chunk_duration_ms, total_duration_ms)
            chunk = segment[start_ms:end_ms]
            frame_rate = chunk.frame_rate
            samples_int16 = np.array(chunk.get_array_of_samples(), dtype=np.int16)
            samples_float = (samples_int16 / 32768.0).astype(np.float32)

            # Gestion Mono vs Stéréo pour Gradio
            if chunk.channels > 1:
                samples_reshaped = samples_float.reshape(-1, chunk.channels)
            else:
                samples_reshaped = samples_float.reshape(1, -1) 

            progress = round(((i + 1) / total_chunks) * 100, 2)
                        # Envoi au client
            
            if is_stop_requested(session_hash_code):
                logging.info(f"[{session_hash_code}] Stop signal received.")
                samples = np.array(chunk.get_array_of_samples()).reshape(1, -1)
                yield (
                    (sample_rate, samples_reshaped), 
                    AdditionalOutputs({"stoped": True, "value": "STREAM_STOPPED", "session_hash_code": session_hash_code})
                )
                break
            yield (
                (frame_rate, samples_reshaped), 
                AdditionalOutputs({"progressed": True, "value": progress, "session_hash_code": session_hash_code})
            )
            if is_active_task(session_hash_code):
                os.makedirs(chunk_dir, exist_ok=True)
                
                npz_path = os.path.join(chunk_dir, f"chunk_{i:05d}.npz")
                # Compression activée, attention c'est lent (CPU intensif)
                if is_active_task(session_hash_code):
                    try :
                        np.savez_compressed(npz_path, data=samples_int16, rate=frame_rate)
                        logging.debug(f"[{session_hash_code}] Saved chunk {i} to {npz_path}")
                    except Exception as e :
                        logging.error(f"[{session_hash_code}] eroor in Saved chunk {i} to {npz_path}")

            time.sleep(chunk_duration_ms/1000)

            # raise_error()  # Optional injected test exception

        logging.info(f"[{session_hash_code}] Streaming completed.")

    except Exception as e:
        yield from handle_stream_error(session_hash_code, e)
    finally:
        remove_active_stream_flag_file(session_hash_code)
        remove_active_task_flag_file(session_hash_code)
        logging.info(f"[{session_hash_code}] Cleanup done.")



def handle_stream_error(session_hash_code: str, error: Exception):
    """
    Handle streaming errors:
    - Log the error
    - Send structured info to client
    - Reset stop flag
    """
    msg = f"{type(error).__name__}: {str(error)}"
    logging.error(f"[{session_hash_code}] Stream Error: {msg}", exc_info=True)

    remove_active_stream_flag_file(session_hash_code)
    empty_audio = np.zeros((1, 16000), dtype=np.float32)

    yield (
            (16000, empty_audio), 
            AdditionalOutputs({"errored": True, "value": msg, "session_hash_code": session_hash_code})
        )    


# asr_model = nemo_asr.models.ASRModel.from_pretrained("nvidia/canary-1b-v2")
asr_model = None

# @spaces.cache
# def load_model():
#     logging.info("Chargement du modèle ASR/AST de NeMo...")
#     # Remplacez par votre logique de chargement de modèle
#     model = nemo_asr.models.EncDecRNNTModel.restore_from("path/to/model.nemo") 
#     logging.info("Modèle chargé.")
#     return model

# # Chargez-le une seule fois au démarrage du script
# ASR_MODEL = load_model()


@spaces.GPU(duration=10)
def task_fake(session_hash_code: str,
        task_type, lang_source, lang_target,
        chunk_secs, left_context_secs, right_context_secs,
        streaming_policy, alignatt_thr, waitk_lagging,
        exclude_sink_frames, xatt_scores_layer, hallucinations_detector
          ):
    """Continuously read and delete .npz chunks while task is active."""
    global asr_model
    yield ("initializing the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
    yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
    yield (f"Task started for session {session_hash_code}", "info", None)
    
    active_flag = get_active_task_flag_file(session_hash_code)
    with open(active_flag, "w") as f:
        f.write("1")
    chunk_dir = get_session_hashe_chunks_dir(session_hash_code)
    logging.info(f"[{session_hash_code}] task started. {chunk_dir}")
    
    try:
        logging.info(f"[{session_hash_code}] task loop started.")
        yield (f"Task started for session {session_hash_code}", "info", None)
        
        while os.path.exists(active_flag) :
            if not os.path.exists(chunk_dir):
                logging.warning(f"[{session_hash_code}] No chunk directory found for task.")
                yield ("No audio chunks yet... waiting for stream.", "warning", None)
                time.sleep(0.1)
                continue
            # raise_error()    
            files = sorted(f for f in os.listdir(chunk_dir) if f.endswith(".npz"))
            if not files:
                time.sleep(0.1)
                continue
                
            for fname in files:
                fpath = os.path.join(chunk_dir, fname)
                try:
                    npz = np.load(fpath)
                    samples = npz["data"]
                    rate = int(npz["rate"])
                    text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n"
                    yield (text, "success", fname)   
                    os.remove(fpath)
                    logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}")
                    # raise_error() 

                except (EOFError, OSError, zipfile.BadZipFile, ValueError) as e:
                    logging.warning(f"[{session_hash_code}] warning processing {fname}: {e}")
                    yield (f"EOFError processing {fname}: {e}", "warning", fname)
                except Exception as e:
                    error_type = type(e).__name__ 
                    logging.error(f"[{session_hash_code}] Error processing {fname}: {e} {error_type}")
                    # yield (f"Error processing {fname}: {e}", "error", fname)
                    raise Exception(e)
                    # continue
                time.sleep(0.1)
                
        yield ("DONE", "done", None)
        logging.info(f"[{session_hash_code}] task loop ended (flag removed).")
        
    except Exception as e:
        logging.error(f"[{session_hash_code}] Unexpected task error: {e}", exc_info=True)
        yield (f"Unexpected error: {e}", "error", None)
        return
        
    finally:
        remove_active_task_flag_file(session_hash_code)
        remove_chunk_folder(session_hash_code)
        logging.info(f"[{session_hash_code}] Exiting task loop.")
        # yield ("Task finished and cleaned up.", "done", None)



@spaces.GPU
def task(session_hash_code: str,
        task_type, lang_source, lang_target,
        chunk_secs, left_context_secs, right_context_secs,
        streaming_policy, alignatt_thr, waitk_lagging,
        exclude_sink_frames, xatt_scores_layer, hallucinations_detector
          ):
    """Continuously read and delete .npz chunks while task is active."""
    global asr_model
    yield ("initializing the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
    conf = CanaryConfig.from_params(
        task_type, SUPPORTED_LANGS_MAP.get(lang_source),SUPPORTED_LANGS_MAP.get(lang_target) ,
        chunk_secs, left_context_secs, right_context_secs,
        streaming_policy, alignatt_thr, waitk_lagging,
        exclude_sink_frames, xatt_scores_layer, hallucinations_detector 
    )
    
    canary_speech_engine = CanarySpeechEngine(asr_model,conf)
    silero_vad_engine = Silero_Vad_Engine()
    streaming_audio_processor_config = StreamingAudioProcessorConfig(
    read_size=READ_SIZE,
    silence_threshold_chunks=1
    )
    streamer = StreamingAudioProcessor(speech_engine=canary_speech_engine,vad_engine=silero_vad_engine,cfg=streaming_audio_processor_config)
    yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
    yield (f"Task started for session {session_hash_code}", "info", None)
    
    active_flag = get_active_task_flag_file(session_hash_code)
    with open(active_flag, "w") as f:
        f.write("1")
    chunk_dir = get_session_hashe_chunks_dir(session_hash_code)
    logging.info(f"[{session_hash_code}] task started. {chunk_dir}")
    
    try:
        logging.info(f"[{session_hash_code}] task loop started.")
        yield (f"Task started for session {session_hash_code}", "info", None)
        
        while os.path.exists(active_flag):
            if not os.path.exists(chunk_dir):
                logging.warning(f"[{session_hash_code}] No chunk directory found for task.")
                yield ("No audio chunks yet... waiting for stream.", "warning", None)
                time.sleep(0.1)
                continue
                
            files = sorted(f for f in os.listdir(chunk_dir) if f.endswith(".npz"))
            if not files:
                time.sleep(0.1)
                continue
                
            for fname in files:
                fpath = os.path.join(chunk_dir, fname)
                try:
                    npz = np.load(fpath)
                    samples = npz["data"]
                    rate = int(npz["rate"])
                    for text in streamer.process_chunk(samples) :
                        yield (text, "success", text) 
                        logging.debug(f"[{session_hash_code}] {text}")
                    ### TODO    
                    # text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n"
                    # yield (text, "success", fname)   
                    os.remove(fpath)
                    logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}")
                except Exception as e:
                    logging.warning(f"[{session_hash_code}] Error processing {fname}: {e}")
                    yield (f"Error processing {fname}: {e}", "warning", fname)
                    continue
                time.sleep(0.1)
                
        # TODO    
        for final_text in streamer.finalize_stream() :
            yield (text, "success", final_text) 
        # if final_text:
        #     print(final_text, end='', flush=True)
        # yield f"\n{final_text}"
        ##
        yield ("DONE", "done", None)
        logging.info(f"[{session_hash_code}] task loop ended (flag removed).")
        
    except Exception as e:
        logging.error(f"[{session_hash_code}] task error: {e}", exc_info=True)
        yield (f"Unexpected error: {e}", "error", None)
        
    finally:
        remove_active_task_flag_file(session_hash_code)
        
        try:
            if os.path.exists(chunk_dir) and not os.listdir(chunk_dir):
                os.rmdir(chunk_dir)
                logging.debug(f"[{session_hash_code}] Cleaned up empty chunk dir.")
        except Exception as e:
            logging.error(f"[{session_hash_code}] Cleanup error: {e}")
            yield (f"Cleanup error: {e}", "error", None)
            
        logging.info(f"[{session_hash_code}] Exiting task loop.")
        yield ("Task finished and cleaned up.", "done", None)




# --- Decorator compatibility layer ---
if os.environ.get("SPACE_ID", "").startswith("zero-gpu"):
    logging.warning("Running on ZeroGPU — gpu_fork_decorator @spaces.GPU")
    def gpu_fork_decorator(f): return f
else:
    gpu_decorator = spaces.GPU


# --- Audio Stream Function ---





def stop_streaming(session_hash_code: str):
    """Trigger the stop flag for active streaming."""
    logging.info(f"[{session_hash_code}] Stop button clicked — sending stop signal.")
    remove_active_stream_flag_file(session_hash_code)
    remove_active_task_flag_file(session_hash_code)

def start_streaming(session_hash_code: str):
    """Trigger the start flag for active streaming."""
    logging.info(f"[{session_hash_code}] Start button clicked — sending start signal.")
    active_stream_flag = get_active_stream_flag_file(session_hash_code)
    with open(active_stream_flag, "w") as f:
        f.write("1")
    
def is_stop_requested(session_hash_code) -> bool:
    """Check if the stop signal was requested."""
    return not os.path.exists(get_active_stream_flag_file(session_hash_code))

def is_active_task(session_hash_code) -> bool:
    """Check if the stop signal was requested."""
    return os.path.exists(get_active_task_flag_file(session_hash_code))

def is_active_stream(session_hash_code) -> bool:
    """Check if the stop signal was requested."""
    return os.path.exists(get_active_stream_flag_file(session_hash_code))


def raise_error():
    """Raise an error randomly (1 out of 10 times)."""
    if random.randint(1, 10) == 1:
        raise RuntimeError("Random failure triggered!")



def debug_current_device():
    """Safely logs GPU or CPU information without crashing on stateless GPU."""
    logging.debug("=== Debugging current device ===")

    try:
        if torch.cuda.is_available():
            device_name = torch.cuda.get_device_name(0)
            memory_allocated = torch.cuda.memory_allocated(0) / (1024 ** 2)
            memory_reserved = torch.cuda.memory_reserved(0) / (1024 ** 2)
            memory_total = torch.cuda.get_device_properties(0).total_memory / (1024 ** 2)
            capability = torch.cuda.get_device_capability(0)
            current_device = torch.cuda.current_device()
            logging.debug(f"GPU name          : {device_name}")
            logging.debug(f"Current device ID : {current_device}")
            logging.debug(f"CUDA capability   : {capability}")
            logging.debug(f"Memory allocated  : {memory_allocated:.2f} MB")
            logging.debug(f"Memory reserved   : {memory_reserved:.2f} MB")
            logging.debug(f"Total memory      : {memory_total:.2f} MB")
        else:
            logging.debug("No GPU detected, running on CPU")

    except RuntimeError as e:
        # Handles Hugging Face Spaces “Stateless GPU” restriction
        if "CUDA must not be initialized" in str(e):
            logging.warning("⚠️ Skipping CUDA info: Stateless GPU environment detected.")
        else:
            logging.error(f"Unexpected CUDA error: {e}")


def get_current_device():
    """Returns the current device safely."""
    try:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        device_name = torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU"
        torch.tensor([0], dtype=torch.float32, device=device)
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
            logging.debug("GPU cache cleared")
    except RuntimeError as e:
        if "CUDA must not be initialized" in str(e):
            device = torch.device("cpu")
            device_name = "CPU (stateless GPU mode)"
        # else:
        #     raise
    return device, device_name