omniasr-transcriptions / server /audio_transcription.py
jeanma's picture
Omnilingual ASR transcription demo
ae238b3 verified
from __future__ import annotations
# Standard library imports
import logging
import os
import tempfile
from typing import Dict, List, Optional, Tuple
# Third-party imports
import librosa
import numpy as np
import soundfile as sf
import torch
import uroman
# fairseq2 imports
from inference.align_utils import get_uroman_tokens
from inference.audio_chunker import AudioChunker
from inference.audio_reading_tools import wav_to_bytes
# Import AudioAlignment and its config classes
from inference.audio_sentence_alignment import AudioAlignment
from inference.mms_model_pipeline import MMSModel
from inference.text_normalization import text_normalize
from transcription_status import transcription_status
from env_vars import USE_CHUNKING
# Constants
SAMPLE_RATE = 16000
logger = logging.getLogger(__name__)
def transcribe_single_chunk(audio_tensor: torch.Tensor, sample_rate: int = 16000, language_with_script: str = None):
"""
Basic transcription pipeline for a single audio chunk using MMS model pipeline.
This is the lowest-level transcription function that handles individual audio segments.
Args:
audio_tensor (torch.Tensor): Audio tensor (1D waveform)
sample_rate (int): Sample rate of the audio tensor
language_with_script (str): language_with_script for transcription (3-letter ISO codes like "eng", "spa") with script
Returns:
str: Transcribed text
"""
logger.info("Starting complete audio transcription pipeline...")
try:
logger.info("Using pipeline transcription...")
# Use the singleton model instance
model = MMSModel.get_instance()
# Transcribe using pipeline - convert tensor to list format
lang_list = [language_with_script] if language_with_script else None
results = model.transcribe_audio(audio_tensor, batch_size=1, language_with_scripts=lang_list)
result = results[0] if results else {}
# Convert pipeline result to expected format
if isinstance(result, dict) and 'text' in result:
transcription_text = result['text']
elif isinstance(result, str):
transcription_text = result
else:
transcription_text = str(result)
if not transcription_text.strip():
logger.warning("Pipeline returned empty transcription")
return ""
logger.info(f"✓ Pipeline transcription successful: '{transcription_text}'")
# Return the transcription text
return transcription_text
except Exception as e:
logger.error(f"Error in transcription pipeline: {str(e)}", exc_info=True)
raise
def perform_forced_alignment(
audio_tensor: torch.Tensor,
transcription_tokens: List[str],
device,
sample_rate: int = 16000,
) -> List[Dict]:
"""
Perform forced alignment using the AudioAlignment class from audio_sentence_alignment.py.
Uses the provided audio tensor directly.
Args:
audio_tensor (torch.Tensor): Audio tensor (1D waveform)
transcription_tokens (List[str]): List of tokens from transcription
device: Device for computation
sample_rate (int): Audio sample rate
Returns:
List[Dict]: List of segments with timestamps and text
"""
try:
logger.info(f"Starting forced alignment with audio tensor")
logger.info(f"Audio shape: {audio_tensor.shape}, sample_rate: {sample_rate}")
logger.info(f"Tokens to align: {transcription_tokens}")
# Use the provided audio tensor directly
# Convert to the format expected by AudioAlignment.get_one_row_alignments
if hasattr(audio_tensor, "cpu"):
# If it's a torch tensor, use it directly
alignment_tensor = audio_tensor.float()
else:
# If it's numpy, convert to tensor
alignment_tensor = torch.from_numpy(audio_tensor).float()
# Ensure it's 1D (flatten if needed)
if len(alignment_tensor.shape) > 1:
alignment_tensor = alignment_tensor.flatten()
# Convert audio tensor to bytes format expected by AudioAlignment
# Use wav_to_bytes to create proper audio bytes
# Move tensor to CPU first to avoid CUDA tensor to numpy conversion error
audio_tensor_cpu = alignment_tensor.cpu() if alignment_tensor.is_cuda else alignment_tensor
audio_arr = wav_to_bytes(
audio_tensor_cpu, sample_rate=sample_rate, format="wav"
)
logger.info(
f"Converted audio to bytes: shape={audio_arr.shape}, dtype={audio_arr.dtype}"
)
# Preprocess tokens for MMS alignment model using the same approach as TextRomanizer
# The MMS alignment model expects romanized tokens in the same format as text_sentences_tokens
try:
# Join tokens back to text for uroman processing
transcription_text = " ".join(transcription_tokens)
# Create uroman instance and process the text the same way as TextRomanizer
uroman_instance = uroman.Uroman()
# Step 1: Normalize the text first using text_normalize function (same as TextRomanizer)
normalized_text = text_normalize(transcription_text.strip(), "en")
# Step 2: Get uroman tokens using the same function as TextRomanizer
# This creates character-level tokens with spaces between characters
uroman_tokens_str = get_uroman_tokens(
[normalized_text], uroman_instance, "en"
)[0]
# Step 3: Split by spaces to get individual character tokens (same as real MMS pipeline)
alignment_tokens = uroman_tokens_str.split()
logger.info(f"Original tokens: {transcription_tokens}")
logger.info(f"Original text: '{transcription_text}'")
logger.info(f"Normalized text: '{normalized_text}'")
logger.info(f"Uroman tokens string: '{uroman_tokens_str}'")
logger.info(
f"Alignment tokens (count={len(alignment_tokens)}): {alignment_tokens[:20]}..."
)
# Additional debugging - check for any unusual characters
for i, token in enumerate(alignment_tokens[:10]): # Check first 10 tokens
logger.info(
f"Token {i}: '{token}' (length={len(token)}, chars={[c for c in token]})"
)
except Exception as e:
logger.warning(
f"Failed to preprocess tokens with TextRomanizer approach: {e}"
)
logger.exception("Full error traceback:")
# Fallback: use simple character-level tokenization
transcription_text = " ".join(transcription_tokens).lower()
# Simple character-level tokenization as fallback
alignment_tokens = []
for char in transcription_text:
if char == " ":
alignment_tokens.append(" ")
else:
alignment_tokens.append(char)
logger.info(f"Using fallback character tokens: {alignment_tokens[:20]}...")
logger.info(
f"Using {len(alignment_tokens)} alignment tokens for forced alignment"
)
# Create AudioAlignment instance
logger.info("Creating AudioAlignment instance...")
alignment = AudioAlignment()
# Perform alignment using get_one_row_alignments
logger.info("Performing alignment...")
logger.info(f"About to call get_one_row_alignments with:")
logger.info(f" audio_arr type: {type(audio_arr)}, shape: {audio_arr.shape}")
logger.info(
f" alignment_tokens type: {type(alignment_tokens)}, length: {len(alignment_tokens)}"
)
logger.info(
f" First 10 tokens: {alignment_tokens[:10] if len(alignment_tokens) >= 10 else alignment_tokens}"
)
# Check for any problematic characters in tokens
for i, token in enumerate(alignment_tokens[:5]):
token_chars = [ord(c) for c in str(token)]
logger.info(f" Token {i} '{token}' char codes: {token_chars}")
# Check if tokens contain any RTL characters that might cause the LTR assertion
rtl_chars = []
for i, token in enumerate(alignment_tokens):
for char in str(token):
# Check for Arabic, Hebrew, and other RTL characters
if (
"\u0590" <= char <= "\u08ff"
or "\ufb1d" <= char <= "\ufdff"
or "\ufe70" <= char <= "\ufeff"
):
rtl_chars.append((i, token, char, ord(char)))
if rtl_chars:
logger.warning(f"Found RTL characters in tokens: {rtl_chars[:10]}...")
try:
audio_segments = alignment.get_one_row_alignments(
audio_arr, sample_rate, alignment_tokens
)
except Exception as alignment_error:
logger.error(f"Alignment failed with error: {alignment_error}")
logger.error(f"Error type: {type(alignment_error)}")
# Try to provide more context about the error
if "ltr" in str(alignment_error).lower():
logger.error("LTR assertion error detected. This might be due to:")
logger.error("1. RTL characters in the input tokens")
logger.error(
"2. Incorrect token format - tokens should be individual characters"
)
logger.error("3. Unicode normalization issues")
# Try a simple ASCII-only fallback
logger.info("Attempting ASCII-only fallback...")
ascii_tokens = []
for token in alignment_tokens:
# Keep only ASCII characters
ascii_token = "".join(c for c in str(token) if ord(c) < 128)
if ascii_token:
ascii_tokens.append(ascii_token)
logger.info(
f"ASCII tokens (count={len(ascii_tokens)}): {ascii_tokens[:20]}..."
)
try:
audio_segments = alignment.get_one_row_alignments(
audio_arr, ascii_tokens
)
alignment_tokens = ascii_tokens # Update for later use
logger.info("ASCII fallback successful!")
except Exception as ascii_error:
logger.error(f"ASCII fallback also failed: {ascii_error}")
raise alignment_error
else:
raise
logger.info(
f"Alignment completed, got {len(audio_segments)} character segments"
)
# Debug: Log the actual structure of audio_segments
if audio_segments:
logger.info("=== Audio Segments Debug Info ===")
logger.info(f"Total segments: {len(audio_segments)}")
# Print ALL audio segments for complete debugging
logger.info("=== ALL AUDIO SEGMENTS ===")
for i, segment in enumerate(audio_segments):
logger.info(f"Segment {i}: {segment}")
if i > 0 and i % 20 == 0: # Print progress every 20 segments
logger.info(
f"... printed {i+1}/{len(audio_segments)} segments so far..."
)
logger.info("=== End All Audio Segments ===")
logger.info("=== End Audio Segments Debug ===")
# Convert character-level segments back to word-level segments
# Use the actual alignment timings to preserve silence and natural timing
aligned_segments = []
logger.info(
f"Converting {len(audio_segments)} character segments to word segments"
)
logger.info(f"Original tokens: {transcription_tokens}")
logger.info(f"Alignment tokens: {alignment_tokens[:20]}...")
# Validate that we have segments and tokens
if not audio_segments or not transcription_tokens:
logger.warning("No audio segments or transcription tokens available")
return []
# Get actual timing from character segments
if audio_segments:
# Use the known segment keys from audio_sentence_alignment
start_key, duration_key = "segment_start_sec", "segment_duration"
first_segment = audio_segments[0]
last_segment = audio_segments[-1]
total_audio_duration = last_segment.get(start_key, 0) + last_segment.get(
duration_key, 0
)
logger.info(
f"Total audio duration from segments: {total_audio_duration:.3f}s"
)
else:
total_audio_duration = 0.0
start_key, duration_key = "segment_start_sec", "segment_duration"
# Strategy: Group character segments by words using the actual alignment timing
# This preserves the natural timing including silences from the forced alignment
# First, reconstruct the alignment character sequence
alignment_char_sequence = "".join(alignment_tokens)
transcription_text = "".join(
transcription_tokens
) # Remove spaces for character matching
logger.info(f"Alignment sequence length: {len(alignment_char_sequence)}")
logger.info(f"Transcription length: {len(transcription_text)}")
# Create word boundaries based on romanized alignment tokens
# We need to map each original word to its position in the romanized sequence
word_boundaries = []
alignment_pos = 0
# Process each word individually to get its romanized representation
for word in transcription_tokens:
try:
# Get romanized version of this individual word
normalized_word = text_normalize(word.strip(), "en")
uroman_word_str = get_uroman_tokens([normalized_word], uroman_instance, "en")[0]
romanized_word_tokens = uroman_word_str.split()
word_start = alignment_pos
word_end = alignment_pos + len(romanized_word_tokens)
word_boundaries.append((word_start, word_end))
alignment_pos = word_end
logger.info(f"Word '{word}' -> romanized tokens {romanized_word_tokens} -> positions {word_start}-{word_end}")
except Exception as e:
logger.warning(f"Failed to romanize word '{word}': {e}")
# Fallback: estimate based on character length ratio
estimated_length = max(1, int(len(word) * len(alignment_tokens) / len(transcription_text)))
word_start = alignment_pos
word_end = min(alignment_pos + estimated_length, len(alignment_tokens))
word_boundaries.append((word_start, word_end))
alignment_pos = word_end
logger.info(f"Word '{word}' (fallback) -> estimated positions {word_start}-{word_end}")
logger.info(f"Word boundaries (romanized): {word_boundaries[:5]}...")
logger.info(f"Total alignment tokens used: {alignment_pos}/{len(alignment_tokens)}")
# Map each word to its character segments using the boundaries
for word_idx, (word, (word_start, word_end)) in enumerate(
zip(transcription_tokens, word_boundaries)
):
# Find character segments that belong to this word
word_segments = []
# Map word character range to alignment token indices
# Since alignment_tokens might be slightly different due to normalization,
# we'll be flexible and use a range around the expected positions
start_idx = max(0, min(word_start, len(audio_segments) - 1))
end_idx = min(word_end, len(audio_segments))
# Ensure we don't go beyond available segments
for seg_idx in range(start_idx, end_idx):
if seg_idx < len(audio_segments):
word_segments.append(audio_segments[seg_idx])
if word_segments:
# Use actual timing from the character segments for this word
start_times = [seg.get(start_key, 0) for seg in word_segments]
end_times = [
seg.get(start_key, 0) + seg.get(duration_key, 0)
for seg in word_segments
]
start_time = min(start_times) if start_times else 0
end_time = max(end_times) if end_times else start_time + 0.1
duration = end_time - start_time
# Ensure minimum duration
if duration < 0.05: # Minimum 50ms
duration = 0.05
end_time = start_time + duration
logger.debug(
f"Word '{word}' (segments {start_idx}-{end_idx}, {len(word_segments)} segs): {start_time:.3f}s - {end_time:.3f}s ({duration:.3f}s)"
)
else:
logger.warning(
f"No segments found for word '{word}' at position {word_start}-{word_end}"
)
# Fallback: use proportional timing if no segments found
if total_audio_duration > 0 and len(transcription_text) > 0:
start_proportion = word_start / len(transcription_text)
end_proportion = word_end / len(transcription_text)
start_time = start_proportion * total_audio_duration
end_time = end_proportion * total_audio_duration
duration = end_time - start_time
else:
# Ultimate fallback
word_duration = 0.5
start_time = word_idx * word_duration
end_time = start_time + word_duration
duration = word_duration
logger.debug(
f"Word '{word}' (fallback): {start_time:.3f}s - {end_time:.3f}s"
)
aligned_segments.append(
{
"text": word,
"start": start_time,
"end": end_time,
"duration": duration,
}
)
# Validate segments don't overlap but preserve natural gaps/silences
for i in range(1, len(aligned_segments)):
prev_end = aligned_segments[i - 1]["end"]
current_start = aligned_segments[i]["start"]
if current_start < prev_end:
# Only fix actual overlaps, don't force adjacency
gap = prev_end - current_start
logger.debug(
f"Overlap detected: segment {i-1} ends at {prev_end:.3f}s, segment {i} starts at {current_start:.3f}s (overlap: {gap:.3f}s)"
)
# Fix overlap by adjusting current segment start to previous end
aligned_segments[i]["start"] = prev_end
aligned_segments[i]["duration"] = (
aligned_segments[i]["end"] - aligned_segments[i]["start"]
)
logger.debug(
f"Fixed overlap for segment {i}: adjusted start to {prev_end:.3f}s"
)
else:
# Log natural gaps (this is normal and expected)
gap = current_start - prev_end
if gap > 0.1: # Log gaps > 100ms
logger.debug(
f"Natural gap preserved: {gap:.3f}s between segments {i-1} and {i}"
)
logger.info(f"Forced alignment completed: {len(aligned_segments)} segments")
return aligned_segments
except Exception as e:
logger.error(f"Error in forced alignment: {str(e)}", exc_info=True)
# Fallback: create uniform timestamps based on audio tensor length
logger.info("Using fallback uniform timestamps")
try:
# Calculate duration from the audio tensor
total_duration = (
len(audio_tensor) / sample_rate
if len(audio_tensor) > 0
else len(transcription_tokens) * 0.5
)
except:
total_duration = len(transcription_tokens) * 0.5 # Fallback
segment_duration = (
total_duration / len(transcription_tokens) if transcription_tokens else 1.0
)
fallback_segments = []
for i, token in enumerate(transcription_tokens):
start_time = i * segment_duration
end_time = (i + 1) * segment_duration
fallback_segments.append(
{
"text": token,
"start": start_time,
"end": end_time,
"duration": segment_duration,
}
)
logger.info(
f"Using fallback uniform timestamps: {len(fallback_segments)} segments"
)
return fallback_segments
def transcribe_with_word_alignment(audio_tensor: torch.Tensor, sample_rate: int = 16000, language_with_script: str = None) -> Dict:
"""
Transcription pipeline that includes word-level timing through forced alignment.
Adds precise word-level timestamps to the basic transcription capability.
Args:
audio_tensor (torch.Tensor): Audio tensor (1D waveform)
sample_rate (int): Sample rate of the audio tensor
language_with_script (str): language_with_script code for transcription (3-letter ISO codes like "eng", "spa") with script
Returns:
Dict: Transcription results with alignment information including word-level timestamps
"""
try:
# Get model and device first
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# Get the transcription results
transcription_text = transcribe_single_chunk(audio_tensor, sample_rate=sample_rate, language_with_script=language_with_script)
if not transcription_text:
return {
"transcription": "",
"tokens": [],
"aligned_segments": [],
"total_duration": 0.0,
}
# Tokenize the transcription for alignment
tokens = transcription_text.split()
# Perform forced alignment using the original audio tensor
logger.info("Performing forced alignment with original audio tensor...")
aligned_segments = perform_forced_alignment(audio_tensor, tokens, device, sample_rate)
# Calculate total duration
total_duration = aligned_segments[-1]["end"] if aligned_segments else 0.0
result = {
"transcription": transcription_text,
"tokens": tokens,
"aligned_segments": aligned_segments,
"total_duration": total_duration,
"num_segments": len(aligned_segments),
}
logger.info(
f"Transcription with alignment completed: {len(aligned_segments)} segments, {total_duration:.2f}s total"
)
return result
except Exception as e:
logger.error(f"Error in transcription with alignment: {str(e)}", exc_info=True)
# Return basic transcription without alignment
try:
transcription_text = transcribe_single_chunk(audio_tensor, sample_rate=sample_rate, language_with_script=language_with_script)
tokens = transcription_text.split() if transcription_text else []
return {
"transcription": transcription_text,
"tokens": tokens,
"aligned_segments": [],
"total_duration": 0.0,
"alignment_error": str(e),
}
except Exception as e2:
logger.error(f"Error in fallback transcription: {str(e2)}", exc_info=True)
return {
"transcription": "",
"tokens": [],
"aligned_segments": [],
"total_duration": 0.0,
"error": str(e2),
}
def _validate_and_adjust_segments(
aligned_segments: List[Dict],
chunk_start_time: float,
chunk_audio_tensor: torch.Tensor,
chunk_sample_rate: int,
chunk_duration: float,
chunk_index: int
) -> List[Dict]:
"""
Private helper function to validate and adjust segment timestamps to global timeline.
Args:
aligned_segments: Raw segments from forced alignment (local chunk timeline)
chunk_start_time: Start time of this chunk in global timeline
chunk_audio_tensor: Audio tensor for this chunk (to get actual duration)
chunk_sample_rate: Sample rate of the chunk
chunk_duration: Reported duration of the chunk
chunk_index: Index of this chunk for debugging
Returns:
List of validated segments with global timeline timestamps
"""
adjusted_segments = []
# Get the actual audio duration from the chunk tensor instead of the potentially incorrect chunk duration
actual_chunk_duration = len(chunk_audio_tensor) / chunk_sample_rate if len(chunk_audio_tensor) > 0 else chunk_duration
for segment in aligned_segments:
original_start = segment["start"]
original_end = segment["end"]
# Validate that segment timestamps are within chunk boundaries
if original_start < 0:
logger.warning(
f"Segment '{segment['text']}' has negative start time {original_start:.3f}s, clipping to 0"
)
original_start = 0
if original_end > actual_chunk_duration + 1.0: # Allow 1s buffer for alignment errors
logger.warning(
f"Segment '{segment['text']}' end time {original_end:.3f}s exceeds actual chunk duration {actual_chunk_duration:.3f}s, clipping"
)
original_end = actual_chunk_duration
if original_start >= original_end:
logger.warning(
f"Segment '{segment['text']}' has invalid timing {original_start:.3f}s-{original_end:.3f}s, using fallback"
)
# Use proportional timing based on segment position using actual chunk duration
segment_index = len(adjusted_segments)
total_segments = len(aligned_segments)
if total_segments > 0:
segment_proportion = segment_index / total_segments
next_proportion = (segment_index + 1) / total_segments
original_start = segment_proportion * actual_chunk_duration
original_end = next_proportion * actual_chunk_duration
else:
original_start = 0
original_end = 0.5
# Create segment with absolute timeline
adjusted_segment = {
"text": segment["text"],
"start": original_start + chunk_start_time, # Global timeline
"end": original_end + chunk_start_time, # Global timeline
"duration": original_end - original_start,
"chunk_index": chunk_index,
"original_start": original_start, # Local chunk time
"original_end": original_end, # Local chunk time
}
adjusted_segments.append(adjusted_segment)
logger.debug(
f"Segment '{segment['text']}': {original_start:.3f}-{original_end:.3f} -> {adjusted_segment['start']:.3f}-{adjusted_segment['end']:.3f}"
)
logger.info(
f"Adjusted {len(adjusted_segments)} segments to absolute timeline (chunk starts at {chunk_start_time:.2f}s)"
)
return adjusted_segments
def transcribe_full_audio_with_chunking(
audio_tensor: torch.Tensor, sample_rate: int = 16000, chunk_duration: float = 30.0, language_with_script: str = None, progress_callback=None
) -> Dict:
"""
Complete audio transcription pipeline that handles any length audio with intelligent chunking.
This is the full-featured transcription function that can process both short and long audio files.
Chunking mode is controlled by USE_CHUNKING environment variable:
- USE_CHUNKING=false: No chunking (single chunk mode)
- USE_CHUNKING=true (default): VAD-based intelligent chunking
Args:
audio_tensor: Audio tensor (1D waveform)
sample_rate: Sample rate of the audio tensor
chunk_duration: Target chunk duration in seconds (for static chunking)
language_with_script: {Language code}_{script} for transcription
progress_callback: Optional callback for progress updates
Returns:
Dict with full transcription and segment information including word-level timestamps
"""
try:
logger.info(f"Starting long-form transcription: tensor shape {audio_tensor.shape} at {sample_rate}Hz")
logger.info(f"USE_CHUNKING = {USE_CHUNKING}")
# Initialize chunker
chunker = AudioChunker()
# Determine chunking mode based on USE_CHUNKING setting
chunking_mode = "vad" if USE_CHUNKING else "none"
# Chunk the audio using the new unified interface
# Ensure tensor is 1D before chunking (squeeze any extra dimensions)
if len(audio_tensor.shape) > 1:
logger.info(f"Squeezing audio tensor from {audio_tensor.shape} to 1D")
audio_tensor_1d = audio_tensor.squeeze()
else:
audio_tensor_1d = audio_tensor
chunks = chunker.chunk_audio(audio_tensor_1d, sample_rate=sample_rate, mode=chunking_mode, chunk_duration=chunk_duration)
if not chunks:
logger.warning("No audio chunks created")
return {
"transcription": "",
"chunks": [],
"total_duration": 0.0,
"error": "No audio content detected",
}
logger.info(f"Processing {len(chunks)} audio chunks (mode: {chunking_mode})")
# Validate chunk continuity
for i, chunk in enumerate(chunks):
logger.info(
f"Chunk {i+1}: {chunk['start_time']:.2f}s - {chunk['end_time']:.2f}s ({chunk['duration']:.2f}s)"
)
if i > 0:
prev_end = chunks[i - 1]["end_time"]
current_start = chunk["start_time"]
gap = current_start - prev_end
if abs(gap) > 0.1: # More than 100ms gap/overlap
logger.warning(
f"Gap/overlap between chunks {i} and {i+1}: {gap:.3f}s"
)
# Process each chunk - now all chunks have uniform format!
all_segments = []
full_transcription_parts = []
total_duration = 0.0
chunk_details = []
for i, chunk in enumerate(chunks):
logger.info(
f"Processing chunk {i+1}/{len(chunks)} ({chunk['duration']:.1f}s, {chunk['start_time']:.1f}s-{chunk['end_time']:.1f}s)"
)
try:
# Process this chunk using tensor-based transcription pipeline
# Use the chunk's audio_data tensor directly - no more file operations!
chunk_audio_tensor = chunk["audio_data"]
chunk_sample_rate = chunk["sample_rate"]
chunk_result = transcribe_with_word_alignment(
audio_tensor=chunk_audio_tensor,
sample_rate=chunk_sample_rate,
language_with_script=language_with_script
)
# Process alignment results - uniform handling for all chunk types
chunk_segments = []
chunk_start_time = chunk["start_time"]
chunk_duration = chunk["duration"]
if chunk_result.get("aligned_segments"):
logger.info(
f"Chunk {i+1} has {len(chunk_result['aligned_segments'])} segments"
)
chunk_segments = _validate_and_adjust_segments(
aligned_segments=chunk_result["aligned_segments"],
chunk_start_time=chunk_start_time,
chunk_audio_tensor=chunk_audio_tensor,
chunk_sample_rate=chunk_sample_rate,
chunk_duration=chunk_duration,
chunk_index=i
)
all_segments.extend(chunk_segments)
logger.info(f"Chunk {i+1} processed {len(chunk_segments)} valid segments")
# Add to full transcription
chunk_transcription = ""
if chunk_result.get("transcription"):
chunk_transcription = chunk_result["transcription"]
full_transcription_parts.append(chunk_transcription)
# Store detailed chunk information
chunk_detail = {
"chunk_index": i,
"start_time": chunk["start_time"],
"end_time": chunk["end_time"],
"duration": chunk["duration"],
"transcription": chunk_transcription,
"num_segments": len(chunk_segments),
"segments": chunk_segments,
}
chunk_details.append(chunk_detail)
total_duration = max(total_duration, chunk["end_time"])
# Update progress linearly from 0.1 to 0.9 based on chunk processing
progress = 0.1 + (0.8 * (i + 1) / len(chunks))
transcription_status.update_progress(progress)
logger.info(
f"Chunk {i+1} processed: '{chunk_transcription}' ({len(chunk_segments)} segments)"
)
except Exception as chunk_error:
logger.error(f"Error processing chunk {i+1}: {chunk_error}")
# Continue with next chunk
# Combine results
full_transcription = " ".join(full_transcription_parts)
# Validate segment continuity
logger.info("Validating segment continuity...")
for i in range(1, len(all_segments)):
prev_end = all_segments[i - 1]["end"]
current_start = all_segments[i]["start"]
gap = current_start - prev_end
if abs(gap) > 1.0: # More than 1 second gap
logger.warning(f"Large gap between segments {i-1} and {i}: {gap:.3f}s")
result = {
"transcription": full_transcription,
"aligned_segments": all_segments,
"chunks": [
{
"chunk_index": chunk_detail["chunk_index"],
"start_time": chunk_detail["start_time"],
"end_time": chunk_detail["end_time"],
"duration": chunk_detail["duration"],
"transcription": chunk_detail["transcription"],
"num_segments": chunk_detail["num_segments"],
}
for chunk_detail in chunk_details
],
"chunk_details": chunk_details, # Full details including segments per chunk
"total_duration": total_duration,
"num_chunks": len(chunks),
"num_segments": len(all_segments),
"status": "success",
}
logger.info(
f"Long-form transcription completed: {len(chunks)} chunks, {total_duration:.1f}s total"
)
logger.info(f"Total segments: {len(all_segments)}")
# Log chunk timing summary
for chunk_detail in chunk_details:
logger.info(
f"Chunk {chunk_detail['chunk_index']}: {chunk_detail['start_time']:.2f}-{chunk_detail['end_time']:.2f}s, {chunk_detail['num_segments']} segments"
)
return result
except Exception as e:
logger.error(f"Error in long-form transcription: {str(e)}", exc_info=True)
return {
"transcription": "",
"chunks": [],
"total_duration": 0.0,
"error": str(e),
}