| | from pathlib import Path |
| | from PIL import Image |
| | import PyPDF2 |
| | from config.settings import settings |
| | from typing import Dict |
| | import tempfile |
| | import os |
| |
|
| | class FileHandler: |
| | def __init__(self): |
| | self.temp_dir = Path(settings.TEMP_DIR) |
| | self.max_size_mb = settings.MAX_FILE_SIZE_MB |
| |
|
| | def validate_file(self, uploaded_file) -> Dict: |
| | validation = {"valid": False, "error": None, "file_info": None} |
| | if not uploaded_file: |
| | validation["error"] = "No file" |
| | return validation |
| | file_size_mb = len(uploaded_file.getbuffer()) / (1024 * 1024) |
| | if file_size_mb > self.max_size_mb: |
| | validation["error"] = "File too large" |
| | return validation |
| | file_extension = uploaded_file.name.split('.')[-1].lower() |
| | if file_extension not in settings.SUPPORTED_FILE_TYPES: |
| | validation["error"] = "Unsupported type" |
| | return validation |
| | validation["valid"] = True |
| | |
| | import os |
| | filename = os.path.basename(uploaded_file.name) |
| | validation["file_info"] = {"name": filename, "size_mb": file_size_mb, "type": file_extension} |
| | return validation |
| |
|
| | def save_uploaded_file(self, uploaded_file, session_id: str) -> str: |
| | |
| | if not session_id: |
| | import uuid |
| | session_id = str(uuid.uuid4())[:8] |
| | |
| | |
| | session_dir = self.temp_dir / session_id / "input" |
| | session_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | import os |
| | import logging |
| | logger = logging.getLogger(__name__) |
| | |
| | filename = os.path.basename(uploaded_file.name) |
| | file_path = session_dir / filename |
| | |
| | logger.info(f"Moving file from Gradio temp: {uploaded_file.name}") |
| | logger.info(f"To session directory: {file_path}") |
| | |
| | with open(file_path, "wb") as f: |
| | |
| | if hasattr(uploaded_file, 'getbuffer'): |
| | f.write(uploaded_file.getbuffer()) |
| | elif hasattr(uploaded_file, 'read'): |
| | f.write(uploaded_file.read()) |
| | else: |
| | |
| | with open(uploaded_file.name, 'rb') as src: |
| | f.write(src.read()) |
| | return str(file_path) |
| |
|
| | def get_file_preview(self, file_path: str, file_type: str) -> str: |
| | if file_type == 'pdf': |
| | try: |
| | with open(file_path, 'rb') as file: |
| | reader = PyPDF2.PdfReader(file) |
| | if len(reader.pages) > 0: |
| | text = reader.pages[0].extract_text() |
| | return text[:500] + "..." if len(text) > 500 else text |
| | except Exception: |
| | return "PDF preview not available" |
| | elif file_type == 'txt': |
| | try: |
| | with open(file_path, 'r', encoding='utf-8') as file: |
| | text = file.read() |
| | return text[:500] + "..." if len(text) > 500 else text |
| | except Exception: |
| | return "Text preview not available" |
| | |
| | return "Preview not available" |
| |
|
| | def cleanup_temp_files(self): |
| | """Clean up old temporary files.""" |
| | try: |
| | import time |
| | current_time = time.time() |
| | |
| | for session_dir in self.temp_dir.iterdir(): |
| | if session_dir.is_dir(): |
| | |
| | dir_age = current_time - session_dir.stat().st_mtime |
| | if dir_age > 24 * 3600: |
| | import shutil |
| | shutil.rmtree(session_dir) |
| | except Exception: |
| | pass |