File size: 7,664 Bytes
11c4a5a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
935d736
 
11c4a5a
 
 
 
 
935d736
 
11c4a5a
 
 
 
 
 
 
 
 
 
935d736
11c4a5a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
935d736
11c4a5a
 
935d736
 
 
11c4a5a
 
 
 
 
 
 
935d736
11c4a5a
 
935d736
 
 
 
 
11c4a5a
 
 
 
 
 
 
799a0f6
 
 
11c4a5a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import numpy as np
from app.logger_config import (
    logger as logging,
    DEBUG
) 
from app.interfaces import IStreamingSpeechEngine,IVoiceActivityEngine

from dataclasses import dataclass

@dataclass
class StreamingAudioProcessorConfig:
    """Configuration settings for the StreamingAudioProcessor."""
    read_size: int = 8000
    silence_threshold_chunks: int = 2
    sample_rate: int = 16000
    # Add other streaming-related parameters here
    # e.g., vad_padding_chunks: int = 0


class StreamingAudioProcessor:
    """
    Manages streaming transcription by combining a speech engine
    and a voice activity detector (VAD).

    This class handles internal audio buffering and VAD state.
    """

    def __init__(self, speech_engine: IStreamingSpeechEngine, vad_engine :IVoiceActivityEngine, cfg : StreamingAudioProcessorConfig):
        """
        Initializes the streaming processor.

        Args:
            speech_engine: The ASR speech engine (must have .transcribe_chunk() and .reset()).
            vad_engine: The VAD engine (returns True/False for a chunk).
            cfg: The configuration object for this processor.
        """
        logging.info("Initializing StreamingAudioProcessor...")
        self.speech_engine = speech_engine
        self.vad_engine = vad_engine

        # Store config
        self.VAD_SAMPLE_RATE = cfg.sample_rate
        self.read_size = cfg.read_size
        self.SILENCE_THRESHOLD_CHUNKS = cfg.silence_threshold_chunks
        
        # Internal buffer state (Optimized: using numpy array)
        self.internal_buffer = np.array([], dtype='int16')
        
        # Internal logic state
        self.is_first_logical_chunk = True
        self.logical_chunk_size = self.speech_engine.context_samples.chunk
        self.initial_logical_chunk_size = self.speech_engine.context_samples.chunk + self.speech_engine.context_samples.right
        
        # Internal VAD state
        self.silent_chunks_count = 0
        self.chunks_count = 0

        logging.info(f"  Config: VAD Sample Rate={self.VAD_SAMPLE_RATE}Hz")
        logging.info(f"  Config: Physical Read Size={self.read_size} samples")
        logging.info(f"  Config: Silence Threshold={self.SILENCE_THRESHOLD_CHUNKS} chunks")
        logging.info(f"  Config: Initial Logical Chunk={self.initial_logical_chunk_size} samples")
        logging.info(f"  Config: Subsequent Logical Chunk={self.logical_chunk_size} samples")


    def _append_to_buffer(self, chunk_np, asr_chunk_len):
        """
        Appends a NumPy chunk to the internal buffer and returns a logical chunk if ready.
        (Optimized to use numpy concatenation).
        """
        logging.debug(f"Appending {len(chunk_np)} samples to internal buffer (current size: {len(self.internal_buffer)}).")
        self.internal_buffer = np.concatenate((self.internal_buffer, chunk_np))
        
        if len(self.internal_buffer) >= asr_chunk_len:
            asr_signal_chunk = self.internal_buffer[:asr_chunk_len]
            self.internal_buffer = self.internal_buffer[asr_chunk_len:]
            logging.debug(f"Extracted logical chunk of {len(asr_signal_chunk)} samples. Buffer remaining: {len(self.internal_buffer)}.")
            return asr_signal_chunk
        else:
            logging.debug(f"Buffer size ({len(self.internal_buffer)}) < target ({asr_chunk_len}). Holding.")
            return None

    def _flush_and_reset(self):
        """
        Flushes the remaining buffer to the transcriber, resets the state,
        and returns the last transcribed text.
        """
        if len(self.internal_buffer) > 0:
            # Buffer is already a numpy array
            final_segment_chunk = self.internal_buffer
            logging.info(f"Flushing segment remainder of {len(final_segment_chunk)} samples.")
            for seg, new_text in  self.speech_engine.transcribe_chunk(final_segment_chunk, is_last_chunk=True) :
                yield new_text
        else:
            # Buffer is empty, but send a silent "flush"
            # to force the transcriber to finalize its internal state.
            logging.info("Buffer empty, sending silent flush to finalize segment.")
            flush_chunk = np.zeros(self.logical_chunk_size, dtype='int16')
            for seg, new_text in self.speech_engine.transcribe_chunk(flush_chunk, is_last_chunk=True) :
                yield new_text

        # Full state reset
        logging.debug("Resetting speech engine state...")
        self.speech_engine.reset() # Resets the speech engine (decoder state)
        
        logging.debug("Resetting internal buffer and VAD state.")
        self.internal_buffer = np.array([], dtype='int16') # Reset buffer
        self.is_first_logical_chunk = True
        self.silent_chunks_count = 0
        
        yield ""

    def process_chunk(self, chunk: np.ndarray):
        """
        Processes a single physical chunk (e.g., 8000 samples).
        Manages VAD, buffering, and transcription.
        
        Args:
            chunk (np.ndarray): The audio chunk (int16).
        
        Returns:
            list: A list of new transcribed text segments.
                  (Often empty, may contain one or more segments).
        """
        new_text_segments = []
        self.chunks_count += 1
        logging.debug(f"--- Processing Physical Chunk {self.chunks_count} ---")
        
        # --- 1. VAD Logic ---
        has_speech = self.vad_engine(chunk) 
        logging.debug(f"VAD result: {'SPEECH' if has_speech else 'SILENCE'}")

        if has_speech:
            self.silent_chunks_count = 0
        else:
            self.silent_chunks_count += 1
            logging.debug(f"Silent chunks count: {self.silent_chunks_count}/{self.SILENCE_THRESHOLD_CHUNKS}")
            
        silence_reset = self.silent_chunks_count >= self.SILENCE_THRESHOLD_CHUNKS

        # --- 2. Buffering & Transcription Logic ---
        target_size = self.initial_logical_chunk_size if self.is_first_logical_chunk else self.logical_chunk_size
        asr_chunk_np = self._append_to_buffer(chunk, target_size) # Now returns np.ndarray or None

        if asr_chunk_np is not None:
            logging.debug(f"Sending logical chunk (size: {len(asr_chunk_np)}) to speech engine...")
            for seg, new_text in self.speech_engine.transcribe_chunk(asr_chunk_np, is_last_chunk=False) :
                logging.info(f"Received new text segment: '{new_text}'")
                new_text_segments.append(new_text)
                yield new_text
            else :
                yield ""
            self.is_first_logical_chunk = False

        # --- 3. VAD Reset Logic ---
        if silence_reset and not self.is_first_logical_chunk:
            logging.info(f"\n[VAD RESET: SILENCE detected ({self.silent_chunks_count} empty chunks) at {(self.chunks_count * (self.read_size/self.VAD_SAMPLE_RATE)):.2f}s]")
            
            # Flush the buffer, reset state, and get final text
            for reset_text in self._flush_and_reset() :
                logging.info(f"Received final reset text: '{reset_text}'")
                new_text_segments.append(reset_text)
                yield reset_text
            else :
                yield ""
        
        yield ""

    def finalize_stream(self):
        """
        Must be called at the very end of the stream (after the loop breaks).
        Flushes anything remaining in the buffer.
        """
        logging.info("Finalizing stream. Flushing final buffer...")
        for reset_text in  self._flush_and_reset() :
            logging.info(f"Received final flushed text: '{reset_text}'")
            yield reset_text