| | |
| |
|
| | import os |
| | import tempfile |
| | import math |
| | from io import BytesIO |
| | from typing import List, Dict, Tuple, Union |
| | import streamlit as st |
| |
|
| | try: |
| | from pydub import AudioSegment |
| | PYDUB_AVAILABLE = True |
| | except ImportError: |
| | PYDUB_AVAILABLE = False |
| | st.warning("⚠️ Pydub nie jest dostępny. Funkcje kompresji ograniczone.") |
| |
|
| | try: |
| | import librosa |
| | import soundfile as sf |
| | LIBROSA_AVAILABLE = True |
| | except ImportError: |
| | LIBROSA_AVAILABLE = False |
| |
|
| | from config import FILE_PROCESSING, USER_MESSAGES |
| |
|
| | class FileHandler: |
| | """Klasa do obsługi plików audio/video - zoptymalizowana dla Whisper API (max 25MB)""" |
| | |
| | def __init__(self): |
| | self.temp_files = [] |
| | self.processing_stats = {} |
| | |
| | |
| | self.WHISPER_MAX_SIZE_MB = 25 |
| | self.SAFE_CHUNK_SIZE_MB = 20 |
| | |
| | def process_file(self, uploaded_file, max_chunk_size_mb: int = 20, auto_compress: bool = True) -> List[str]: |
| | """ |
| | Główna funkcja przetwarzania pliku dla Whisper API |
| | Returns: Lista ścieżek do plików gotowych do transkrypcji (każdy <25MB) |
| | """ |
| | try: |
| | file_size_mb = uploaded_file.size / (1024 * 1024) |
| | |
| | st.info(f"🔄 Przetwarzam {uploaded_file.name} ({file_size_mb:.1f}MB)") |
| | |
| | |
| | if file_size_mb <= self.WHISPER_MAX_SIZE_MB: |
| | |
| | temp_path = self._save_temp_file(uploaded_file) |
| | if temp_path: |
| | st.success(f"✅ Plik gotowy do transkrypcji ({file_size_mb:.1f}MB)") |
| | return [temp_path] |
| | else: |
| | return [] |
| | |
| | |
| | if file_size_mb > 100: |
| | st.error(f"❌ Plik zbyt duży ({file_size_mb:.1f}MB). Maksymalnie 100MB.") |
| | return [] |
| | |
| | |
| | if auto_compress and file_size_mb > self.WHISPER_MAX_SIZE_MB: |
| | compressed_file = self._compress_audio_for_whisper(uploaded_file) |
| | if compressed_file: |
| | compressed_size_mb = len(compressed_file.getvalue()) / (1024 * 1024) |
| | if compressed_size_mb <= self.WHISPER_MAX_SIZE_MB: |
| | temp_path = self._save_bytesio_to_temp(compressed_file, uploaded_file.name) |
| | if temp_path: |
| | st.success(f"✅ Skompresowano: {file_size_mb:.1f}MB → {compressed_size_mb:.1f}MB") |
| | return [temp_path] |
| | |
| | |
| | return self._split_audio_for_whisper(uploaded_file, max_chunk_size_mb) |
| | |
| | except Exception as e: |
| | st.error(f"❌ Błąd przetwarzania {uploaded_file.name}: {str(e)}") |
| | return [] |
| | |
| | def _compress_audio_for_whisper(self, uploaded_file) -> Union[BytesIO, None]: |
| | """Agresywna kompresja audio dla Whisper API""" |
| | if not PYDUB_AVAILABLE: |
| | st.warning("Pydub niedostępny - pomijam kompresję") |
| | return None |
| | |
| | try: |
| | st.info("🗜️ Kompresuję audio...") |
| | |
| | |
| | audio_data = uploaded_file.read() |
| | uploaded_file.seek(0) |
| | |
| | audio = AudioSegment.from_file(BytesIO(audio_data)) |
| | |
| | |
| | compressed = audio.set_channels(1) |
| | compressed = compressed.set_frame_rate(16000) |
| | |
| | |
| | original_size_mb = uploaded_file.size / (1024 * 1024) |
| | |
| | if original_size_mb > 50: |
| | |
| | bitrate = "32k" |
| | elif original_size_mb > 35: |
| | |
| | bitrate = "48k" |
| | else: |
| | |
| | bitrate = "64k" |
| | |
| | |
| | output = BytesIO() |
| | compressed.export( |
| | output, |
| | format="mp3", |
| | bitrate=bitrate, |
| | parameters=["-ac", "1", "-ar", "16000"] |
| | ) |
| | output.seek(0) |
| | |
| | |
| | compressed_size_mb = len(output.getvalue()) / (1024 * 1024) |
| | |
| | if compressed_size_mb <= self.WHISPER_MAX_SIZE_MB: |
| | return output |
| | else: |
| | st.warning(f"⚠️ Kompresja niewystarczająca ({compressed_size_mb:.1f}MB). Przechodzę do dzielenia.") |
| | return None |
| | |
| | except Exception as e: |
| | st.warning(f"Kompresja nieudana: {str(e)}") |
| | return None |
| | |
| | def _split_audio_for_whisper(self, uploaded_file, max_chunk_size_mb: int) -> List[str]: |
| | """Dzieli plik audio na części <25MB dla Whisper""" |
| | try: |
| | if not PYDUB_AVAILABLE: |
| | st.error("❌ Pydub wymagany do dzielenia plików. Zainstaluj: pip install pydub") |
| | return [] |
| | |
| | st.info("✂️ Dzielę plik na części...") |
| | |
| | |
| | audio_data = uploaded_file.read() |
| | audio = AudioSegment.from_file(BytesIO(audio_data)) |
| | |
| | |
| | total_duration_ms = len(audio) |
| | file_size_mb = uploaded_file.size / (1024 * 1024) |
| | |
| | |
| | safe_chunk_size_mb = min(max_chunk_size_mb, self.SAFE_CHUNK_SIZE_MB) |
| | |
| | |
| | estimated_parts = math.ceil(file_size_mb / safe_chunk_size_mb) |
| | chunk_duration_ms = total_duration_ms // estimated_parts |
| | |
| | |
| | overlap_ms = 10 * 1000 |
| | |
| | st.info(f"📂 Dzielę na {estimated_parts} części (~{chunk_duration_ms//60000:.1f} min każda)") |
| | |
| | parts = [] |
| | base_name = os.path.splitext(uploaded_file.name)[0] |
| | |
| | for i in range(estimated_parts): |
| | start_ms = max(0, i * chunk_duration_ms - (overlap_ms if i > 0 else 0)) |
| | end_ms = min(total_duration_ms, (i + 1) * chunk_duration_ms + overlap_ms) |
| | |
| | |
| | chunk = audio[start_ms:end_ms] |
| | |
| | |
| | chunk = chunk.set_channels(1) |
| | chunk = chunk.set_frame_rate(22050) |
| | |
| | |
| | temp_fd, temp_path = tempfile.mkstemp( |
| | suffix=f"_part{i+1:02d}.mp3", |
| | prefix=f"{base_name}_" |
| | ) |
| | os.close(temp_fd) |
| | |
| | chunk.export(temp_path, format="mp3", bitrate="96k") |
| | |
| | |
| | part_size_mb = os.path.getsize(temp_path) / (1024 * 1024) |
| | |
| | if part_size_mb > self.WHISPER_MAX_SIZE_MB: |
| | st.error(f"❌ Część {i+1} nadal za duża ({part_size_mb:.1f}MB)") |
| | os.remove(temp_path) |
| | continue |
| | |
| | parts.append(temp_path) |
| | self.temp_files.append(temp_path) |
| | |
| | st.success(f"✅ Część {i+1}/{estimated_parts}: {part_size_mb:.1f}MB, {(end_ms-start_ms)//60000:.1f} min") |
| | |
| | if not parts: |
| | st.error("❌ Nie udało się utworzyć żadnej prawidłowej części") |
| | |
| | return parts |
| | |
| | except Exception as e: |
| | st.error(f"❌ Błąd dzielenia pliku: {str(e)}") |
| | return [] |
| | |
| | def _save_temp_file(self, uploaded_file) -> str: |
| | """Zapisuje uploaded file do pliku tymczasowego""" |
| | try: |
| | suffix = f".{uploaded_file.name.split('.')[-1]}" |
| | temp_fd, temp_path = tempfile.mkstemp(suffix=suffix) |
| | |
| | |
| | with os.fdopen(temp_fd, 'wb') as tmp_file: |
| | content = uploaded_file.read() |
| | tmp_file.write(content) |
| | |
| | |
| | uploaded_file.seek(0) |
| | |
| | self.temp_files.append(temp_path) |
| | return temp_path |
| | |
| | except Exception as e: |
| | st.error(f"❌ Błąd zapisu tymczasowego: {str(e)}") |
| | return "" |
| | |
| | def _save_bytesio_to_temp(self, bytes_io: BytesIO, original_name: str) -> str: |
| | """Zapisz BytesIO do pliku tymczasowego""" |
| | try: |
| | suffix = f"_compressed.mp3" |
| | base_name = os.path.splitext(original_name)[0] |
| | |
| | temp_fd, temp_path = tempfile.mkstemp( |
| | suffix=suffix, |
| | prefix=f"{base_name}_" |
| | ) |
| | |
| | with os.fdopen(temp_fd, 'wb') as tmp_file: |
| | tmp_file.write(bytes_io.getvalue()) |
| | |
| | self.temp_files.append(temp_path) |
| | return temp_path |
| | |
| | except Exception as e: |
| | st.error(f"❌ Błąd zapisu skompresowanego: {str(e)}") |
| | return "" |
| | |
| | def validate_file_for_whisper(self, uploaded_file) -> Tuple[bool, str]: |
| | """Walidacja pliku dla Whisper API""" |
| | try: |
| | |
| | file_size_mb = uploaded_file.size / (1024 * 1024) |
| | |
| | if file_size_mb == 0: |
| | return False, "Plik jest pusty" |
| | |
| | if file_size_mb > 100: |
| | return False, f"Plik za duży: {file_size_mb:.1f}MB > 100MB" |
| | |
| | |
| | file_ext = uploaded_file.name.split('.')[-1].lower() |
| | supported_formats = ['mp3', 'wav', 'mp4', 'm4a', 'aac', 'mov', 'avi'] |
| | |
| | if file_ext not in supported_formats: |
| | return False, f"Nieobsługiwany format: .{file_ext}" |
| | |
| | |
| | if file_size_mb > self.WHISPER_MAX_SIZE_MB: |
| | return True, f"Plik wymaga przetwarzania ({file_size_mb:.1f}MB > {self.WHISPER_MAX_SIZE_MB}MB)" |
| | |
| | return True, "OK" |
| | |
| | except Exception as e: |
| | return False, f"Błąd walidacji: {str(e)}" |
| | |
| | def get_audio_duration(self, file_path: str) -> float: |
| | """Pobierz długość pliku audio w sekundach""" |
| | try: |
| | if LIBROSA_AVAILABLE: |
| | duration = librosa.get_duration(filename=file_path) |
| | return duration |
| | elif PYDUB_AVAILABLE: |
| | audio = AudioSegment.from_file(file_path) |
| | return len(audio) / 1000.0 |
| | else: |
| | |
| | file_size = os.path.getsize(file_path) |
| | return file_size / (1024 * 1024) * 60 |
| | except: |
| | file_size = os.path.getsize(file_path) |
| | return file_size / (1024 * 1024) * 60 |
| | |
| | def estimate_processing_time(self, uploaded_files: List) -> Dict: |
| | """Estymuj czas przetwarzania""" |
| | total_size_mb = sum(f.size for f in uploaded_files) / (1024 * 1024) |
| | total_duration_est = total_size_mb * 60 |
| | |
| | |
| | processing_time = 0 |
| | for f in uploaded_files: |
| | file_size_mb = f.size / (1024 * 1024) |
| | if file_size_mb > self.WHISPER_MAX_SIZE_MB: |
| | processing_time += file_size_mb * 2 |
| | |
| | |
| | transcription_time = total_duration_est * 0.1 |
| | |
| | |
| | report_time = len(uploaded_files) * 30 |
| | |
| | return { |
| | 'total_size_mb': total_size_mb, |
| | 'estimated_audio_duration': total_duration_est, |
| | 'estimated_processing_time': processing_time, |
| | 'estimated_transcription_time': transcription_time, |
| | 'estimated_report_time': report_time, |
| | 'total_estimated_time': processing_time + transcription_time + report_time, |
| | 'files_needing_processing': sum(1 for f in uploaded_files |
| | if f.size / (1024 * 1024) > self.WHISPER_MAX_SIZE_MB) |
| | } |
| | |
| | def get_file_info(self, uploaded_file) -> Dict: |
| | """Pobierz szczegółowe informacje o pliku""" |
| | file_size_mb = uploaded_file.size / (1024 * 1024) |
| | file_ext = uploaded_file.name.split('.')[-1].lower() |
| | |
| | return { |
| | 'name': uploaded_file.name, |
| | 'size_mb': file_size_mb, |
| | 'format': file_ext, |
| | 'whisper_ready': file_size_mb <= self.WHISPER_MAX_SIZE_MB, |
| | 'needs_compression': file_size_mb > self.WHISPER_MAX_SIZE_MB and file_size_mb <= 50, |
| | 'needs_splitting': file_size_mb > 50, |
| | 'too_large': file_size_mb > 100, |
| | 'estimated_duration': file_size_mb * 60, |
| | 'estimated_processing_time': max(0, file_size_mb - self.WHISPER_MAX_SIZE_MB) * 2 |
| | } |
| | |
| | def cleanup_temp_files(self): |
| | """Wyczyść pliki tymczasowe""" |
| | cleaned = 0 |
| | errors = 0 |
| | |
| | for temp_file in self.temp_files: |
| | try: |
| | if os.path.exists(temp_file): |
| | os.remove(temp_file) |
| | cleaned += 1 |
| | except Exception as e: |
| | errors += 1 |
| | st.warning(f"Nie można usunąć {temp_file}: {e}") |
| | |
| | self.temp_files = [] |
| | |
| | if cleaned > 0: |
| | st.success(f"🧹 Wyczyszczono {cleaned} plików tymczasowych") |
| | |
| | if errors > 0: |
| | st.warning(f"⚠️ {errors} plików nie udało się usunąć") |
| | |
| | def get_processing_stats(self) -> Dict: |
| | """Zwróć statystyki przetwarzania""" |
| | return { |
| | 'temp_files_count': len(self.temp_files), |
| | 'whisper_max_size_mb': self.WHISPER_MAX_SIZE_MB, |
| | 'safe_chunk_size_mb': self.SAFE_CHUNK_SIZE_MB, |
| | 'processing_stats': self.processing_stats, |
| | 'libraries_available': { |
| | 'pydub': PYDUB_AVAILABLE, |
| | 'librosa': LIBROSA_AVAILABLE |
| | } |
| | } |
| | |
| | def analyze_upload_batch(self, uploaded_files: List) -> Dict: |
| | """Analizuj całą paczkę plików""" |
| | analysis = { |
| | 'total_files': len(uploaded_files), |
| | 'total_size_mb': 0, |
| | 'whisper_ready': 0, |
| | 'need_compression': 0, |
| | 'need_splitting': 0, |
| | 'too_large': 0, |
| | 'estimated_parts': 0, |
| | 'file_details': [] |
| | } |
| | |
| | for file in uploaded_files: |
| | info = self.get_file_info(file) |
| | analysis['file_details'].append(info) |
| | analysis['total_size_mb'] += info['size_mb'] |
| | |
| | if info['whisper_ready']: |
| | analysis['whisper_ready'] += 1 |
| | elif info['needs_compression']: |
| | analysis['need_compression'] += 1 |
| | elif info['needs_splitting']: |
| | analysis['need_splitting'] += 1 |
| | |
| | parts = math.ceil(info['size_mb'] / self.SAFE_CHUNK_SIZE_MB) |
| | analysis['estimated_parts'] += parts |
| | elif info['too_large']: |
| | analysis['too_large'] += 1 |
| | |
| | return analysis |
| | |
| | def create_processing_plan(self, uploaded_files: List) -> str: |
| | """Stwórz plan przetwarzania dla użytkownika""" |
| | analysis = self.analyze_upload_batch(uploaded_files) |
| | |
| | plan = f""" |
| | 📋 **PLAN PRZETWARZANIA** |
| | |
| | 📊 **Podsumowanie:** |
| | - Plików: {analysis['total_files']} ({analysis['total_size_mb']:.1f}MB) |
| | - Gotowych do transkrypcji: {analysis['whisper_ready']} |
| | - Wymagających kompresji: {analysis['need_compression']} |
| | - Wymagających dzielenia: {analysis['need_splitting']} |
| | - Za dużych: {analysis['too_large']} |
| | |
| | """ |
| | |
| | if analysis['estimated_parts'] > 0: |
| | plan += f"- Szacowana liczba części: {analysis['estimated_parts']}\n" |
| | |
| | if analysis['too_large'] > 0: |
| | plan += f"\n❌ **PLIKI ZA DUŻE (>100MB):**\n" |
| | for info in analysis['file_details']: |
| | if info['too_large']: |
| | plan += f"- {info['name']}: {info['size_mb']:.1f}MB\n" |
| | |
| | if analysis['need_splitting'] > 0: |
| | plan += f"\n✂️ **PLIKI DO PODZIELENIA:**\n" |
| | for info in analysis['file_details']: |
| | if info['needs_splitting']: |
| | parts = math.ceil(info['size_mb'] / self.SAFE_CHUNK_SIZE_MB) |
| | plan += f"- {info['name']}: {info['size_mb']:.1f}MB → ~{parts} części\n" |
| | |
| | if analysis['need_compression'] > 0: |
| | plan += f"\n🗜️ **PLIKI DO KOMPRESJI:**\n" |
| | for info in analysis['file_details']: |
| | if info['needs_compression']: |
| | plan += f"- {info['name']}: {info['size_mb']:.1f}MB\n" |
| | |
| | |
| | times = self.estimate_processing_time(uploaded_files) |
| | plan += f""" |
| | ⏱️ **ESTYMACJA CZASÓW:** |
| | - Przetwarzanie plików: ~{times['estimated_processing_time']:.1f}s |
| | - Transkrypcja: ~{times['estimated_transcription_time']:.1f}s |
| | - Generowanie raportu: ~{times['estimated_report_time']:.1f}s |
| | - **ŁĄCZNIE: ~{times['total_estimated_time']:.1f}s ({times['total_estimated_time']/60:.1f} min)** |
| | """ |
| | |
| | return plan |
| |
|
| | |
| | def check_file_size_for_whisper(file_path: str) -> Tuple[bool, float]: |
| | """Sprawdź czy plik mieści się w limicie Whisper""" |
| | try: |
| | size_mb = os.path.getsize(file_path) / (1024 * 1024) |
| | return size_mb <= 25, size_mb |
| | except: |
| | return False, 0 |
| |
|
| | def estimate_compression_ratio(file_ext: str) -> float: |
| | """Estymuj współczynnik kompresji dla różnych formatów""" |
| | ratios = { |
| | 'wav': 0.1, |
| | 'aac': 0.7, |
| | 'mp3': 0.8, |
| | 'm4a': 0.7, |
| | 'mp4': 0.5, |
| | 'mov': 0.5, |
| | 'avi': 0.4 |
| | } |
| | return ratios.get(file_ext.lower(), 0.6) |
| |
|
| | |
| | if __name__ == "__main__": |
| | print("🧪 Test FileHandler") |
| | handler = FileHandler() |
| | |
| | stats = handler.get_processing_stats() |
| | print(f"📊 Biblioteki: {stats['libraries_available']}") |
| | print(f"🎯 Limit Whisper: {stats['whisper_max_size_mb']}MB") |
| | print(f"🔒 Bezpieczny chunk: {stats['safe_chunk_size_mb']}MB") |
| | |
| | print("✅ FileHandler gotowy do użycia") |