Spaces:
Running
on
Zero
Running
on
Zero
File size: 7,664 Bytes
11c4a5a 935d736 11c4a5a 935d736 11c4a5a 935d736 11c4a5a 935d736 11c4a5a 935d736 11c4a5a 935d736 11c4a5a 935d736 11c4a5a 799a0f6 11c4a5a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
import numpy as np
from app.logger_config import (
logger as logging,
DEBUG
)
from app.interfaces import IStreamingSpeechEngine,IVoiceActivityEngine
from dataclasses import dataclass
@dataclass
class StreamingAudioProcessorConfig:
"""Configuration settings for the StreamingAudioProcessor."""
read_size: int = 8000
silence_threshold_chunks: int = 2
sample_rate: int = 16000
# Add other streaming-related parameters here
# e.g., vad_padding_chunks: int = 0
class StreamingAudioProcessor:
"""
Manages streaming transcription by combining a speech engine
and a voice activity detector (VAD).
This class handles internal audio buffering and VAD state.
"""
def __init__(self, speech_engine: IStreamingSpeechEngine, vad_engine :IVoiceActivityEngine, cfg : StreamingAudioProcessorConfig):
"""
Initializes the streaming processor.
Args:
speech_engine: The ASR speech engine (must have .transcribe_chunk() and .reset()).
vad_engine: The VAD engine (returns True/False for a chunk).
cfg: The configuration object for this processor.
"""
logging.info("Initializing StreamingAudioProcessor...")
self.speech_engine = speech_engine
self.vad_engine = vad_engine
# Store config
self.VAD_SAMPLE_RATE = cfg.sample_rate
self.read_size = cfg.read_size
self.SILENCE_THRESHOLD_CHUNKS = cfg.silence_threshold_chunks
# Internal buffer state (Optimized: using numpy array)
self.internal_buffer = np.array([], dtype='int16')
# Internal logic state
self.is_first_logical_chunk = True
self.logical_chunk_size = self.speech_engine.context_samples.chunk
self.initial_logical_chunk_size = self.speech_engine.context_samples.chunk + self.speech_engine.context_samples.right
# Internal VAD state
self.silent_chunks_count = 0
self.chunks_count = 0
logging.info(f" Config: VAD Sample Rate={self.VAD_SAMPLE_RATE}Hz")
logging.info(f" Config: Physical Read Size={self.read_size} samples")
logging.info(f" Config: Silence Threshold={self.SILENCE_THRESHOLD_CHUNKS} chunks")
logging.info(f" Config: Initial Logical Chunk={self.initial_logical_chunk_size} samples")
logging.info(f" Config: Subsequent Logical Chunk={self.logical_chunk_size} samples")
def _append_to_buffer(self, chunk_np, asr_chunk_len):
"""
Appends a NumPy chunk to the internal buffer and returns a logical chunk if ready.
(Optimized to use numpy concatenation).
"""
logging.debug(f"Appending {len(chunk_np)} samples to internal buffer (current size: {len(self.internal_buffer)}).")
self.internal_buffer = np.concatenate((self.internal_buffer, chunk_np))
if len(self.internal_buffer) >= asr_chunk_len:
asr_signal_chunk = self.internal_buffer[:asr_chunk_len]
self.internal_buffer = self.internal_buffer[asr_chunk_len:]
logging.debug(f"Extracted logical chunk of {len(asr_signal_chunk)} samples. Buffer remaining: {len(self.internal_buffer)}.")
return asr_signal_chunk
else:
logging.debug(f"Buffer size ({len(self.internal_buffer)}) < target ({asr_chunk_len}). Holding.")
return None
def _flush_and_reset(self):
"""
Flushes the remaining buffer to the transcriber, resets the state,
and returns the last transcribed text.
"""
if len(self.internal_buffer) > 0:
# Buffer is already a numpy array
final_segment_chunk = self.internal_buffer
logging.info(f"Flushing segment remainder of {len(final_segment_chunk)} samples.")
for seg, new_text in self.speech_engine.transcribe_chunk(final_segment_chunk, is_last_chunk=True) :
yield new_text
else:
# Buffer is empty, but send a silent "flush"
# to force the transcriber to finalize its internal state.
logging.info("Buffer empty, sending silent flush to finalize segment.")
flush_chunk = np.zeros(self.logical_chunk_size, dtype='int16')
for seg, new_text in self.speech_engine.transcribe_chunk(flush_chunk, is_last_chunk=True) :
yield new_text
# Full state reset
logging.debug("Resetting speech engine state...")
self.speech_engine.reset() # Resets the speech engine (decoder state)
logging.debug("Resetting internal buffer and VAD state.")
self.internal_buffer = np.array([], dtype='int16') # Reset buffer
self.is_first_logical_chunk = True
self.silent_chunks_count = 0
yield ""
def process_chunk(self, chunk: np.ndarray):
"""
Processes a single physical chunk (e.g., 8000 samples).
Manages VAD, buffering, and transcription.
Args:
chunk (np.ndarray): The audio chunk (int16).
Returns:
list: A list of new transcribed text segments.
(Often empty, may contain one or more segments).
"""
new_text_segments = []
self.chunks_count += 1
logging.debug(f"--- Processing Physical Chunk {self.chunks_count} ---")
# --- 1. VAD Logic ---
has_speech = self.vad_engine(chunk)
logging.debug(f"VAD result: {'SPEECH' if has_speech else 'SILENCE'}")
if has_speech:
self.silent_chunks_count = 0
else:
self.silent_chunks_count += 1
logging.debug(f"Silent chunks count: {self.silent_chunks_count}/{self.SILENCE_THRESHOLD_CHUNKS}")
silence_reset = self.silent_chunks_count >= self.SILENCE_THRESHOLD_CHUNKS
# --- 2. Buffering & Transcription Logic ---
target_size = self.initial_logical_chunk_size if self.is_first_logical_chunk else self.logical_chunk_size
asr_chunk_np = self._append_to_buffer(chunk, target_size) # Now returns np.ndarray or None
if asr_chunk_np is not None:
logging.debug(f"Sending logical chunk (size: {len(asr_chunk_np)}) to speech engine...")
for seg, new_text in self.speech_engine.transcribe_chunk(asr_chunk_np, is_last_chunk=False) :
logging.info(f"Received new text segment: '{new_text}'")
new_text_segments.append(new_text)
yield new_text
else :
yield ""
self.is_first_logical_chunk = False
# --- 3. VAD Reset Logic ---
if silence_reset and not self.is_first_logical_chunk:
logging.info(f"\n[VAD RESET: SILENCE detected ({self.silent_chunks_count} empty chunks) at {(self.chunks_count * (self.read_size/self.VAD_SAMPLE_RATE)):.2f}s]")
# Flush the buffer, reset state, and get final text
for reset_text in self._flush_and_reset() :
logging.info(f"Received final reset text: '{reset_text}'")
new_text_segments.append(reset_text)
yield reset_text
else :
yield ""
yield ""
def finalize_stream(self):
"""
Must be called at the very end of the stream (after the loop breaks).
Flushes anything remaining in the buffer.
"""
logging.info("Finalizing stream. Flushing final buffer...")
for reset_text in self._flush_and_reset() :
logging.info(f"Received final flushed text: '{reset_text}'")
yield reset_text
|