Spaces:
Build error
Build error
| import os | |
| import requests | |
| import json | |
| import urllib.request | |
| import time | |
| from threading import Thread, Event | |
| from requests.exceptions import RequestException | |
| from tqdm import tqdm | |
| from indexer import indexer | |
| import logging | |
| CACHE_DIR = os.getenv("CACHE_DIR") | |
| download_progress = {} | |
| class Instance: | |
| def __init__(self, id, url, cache_dir, index_file, token, repo, load_balancer_api, max_retries=20, initial_delay=1): | |
| self.version = "0.2.6 V Alpha" | |
| self.id = id | |
| self.url = url | |
| self.CACHE_DIR = cache_dir | |
| self.INDEX_FILE = index_file | |
| self.TOKEN = token | |
| self.REPO = repo | |
| self.FILM_STORE_JSON_PATH = os.path.join(cache_dir, "film_store.json") | |
| self.TV_STORE_JSON_PATH = os.path.join(cache_dir, "tv_store.json") | |
| self.download_threads = {} | |
| self.file_structure = None | |
| self.load_balancer_api = load_balancer_api | |
| self.max_retries = max_retries | |
| self.initial_delay = initial_delay | |
| self.last_report_time = time.time() # Initialize the last report time | |
| self.re_register_event = Event() | |
| # Ensure CACHE_DIR exists | |
| if not os.path.exists(self.CACHE_DIR): | |
| os.makedirs(self.CACHE_DIR) | |
| for path in [self.FILM_STORE_JSON_PATH, self.TV_STORE_JSON_PATH]: | |
| if not os.path.exists(path): | |
| with open(path, 'w') as json_file: | |
| json.dump({}, json_file) | |
| # Index the file structure and load it | |
| self.run_indexer_and_load() | |
| # Start prefetching metadata and monitoring registration | |
| self.register_to_load_balancer() | |
| registration_thread = Thread(target=self.monitor_registration) | |
| registration_thread.daemon = True | |
| registration_thread.start() | |
| # Start the thread to re-index every 2 minutes | |
| indexer_thread = Thread(target=self.run_indexer_periodically) | |
| indexer_thread.daemon = True | |
| indexer_thread.start() | |
| def run_indexer_and_load(self): | |
| """Runs the indexer and loads the file structure from INDEX_FILE.""" | |
| indexer() | |
| if not os.path.exists(self.INDEX_FILE): | |
| raise FileNotFoundError(f"{self.INDEX_FILE} not found. Please make sure the file exists.") | |
| with open(self.INDEX_FILE, 'r') as f: | |
| self.file_structure = json.load(f) | |
| logging.info("File structure reloaded successfully.") | |
| def run_indexer_periodically(self): | |
| """Periodically reruns the indexer and reloads the file structure.""" | |
| while True: | |
| time.sleep(120) # Wait for 2 minutes | |
| logging.info("Re-running indexer and reloading file structure.") | |
| self.run_indexer_and_load() | |
| def compile_report(self): | |
| self.last_report_time = time.time() # Update the last report time | |
| film_store_path = os.path.join(self.CACHE_DIR, "film_store.json") | |
| tv_store_path = os.path.join(self.CACHE_DIR, "tv_store.json") | |
| cache_size = self.get_cache_size() | |
| report = { | |
| "instance_id": self.id, | |
| "instance_url": self.url, | |
| "film_store": self.read_json(film_store_path), | |
| "tv_store": self.read_json(tv_store_path), | |
| "cache_size": cache_size | |
| } | |
| return report | |
| def register_to_load_balancer(self): | |
| result = self.load_balancer_api.register_instance(self.id, self.url) | |
| if result is not None: | |
| logging.info(f'Registered instance {self.id} to load balancer.') | |
| else: | |
| logging.error(f'Failed to register instance {self.id} to load balancer.') | |
| def monitor_registration(self): | |
| while True: | |
| if time.time() - self.last_report_time > 60: # Check if 1 minute has passed | |
| logging.info('1 minute passed since last report. Re-registering...') | |
| self.register_to_load_balancer() | |
| self.last_report_time = time.time() # Reset the last report time | |
| time.sleep(30) # Check every 30 seconds | |
| def get_cache_size(self): | |
| total_size = 0 | |
| for dirpath, dirnames, filenames in os.walk(CACHE_DIR): | |
| for f in filenames: | |
| fp = os.path.join(dirpath, f) | |
| total_size += os.path.getsize(fp) | |
| return {"cache_size": f"{total_size / (1024 * 1024 * 1024):.2f} GB"} | |
| def read_json(file_path): | |
| if os.path.exists(file_path): | |
| with open(file_path, 'r') as json_file: | |
| return json.load(json_file) | |
| return {} | |
| def get_system_proxies(): | |
| """ | |
| Retrieves the system's HTTP and HTTPS proxies. | |
| Returns: | |
| dict: A dictionary containing the proxies. | |
| """ | |
| try: | |
| proxies = urllib.request.getproxies() | |
| print("System proxies:", proxies) | |
| return { | |
| "http": proxies.get("http"), | |
| "https": proxies.get("http") | |
| } | |
| except Exception as e: | |
| print(f"Error getting system proxies: {e}") | |
| return {} | |
| def download_film(self, file_url, token, cache_path, proxies, film_id, title, chunk_size=100 * 1024 * 1024): | |
| """ | |
| Downloads a file from the specified URL and saves it to the cache path. | |
| Tracks the download progress. | |
| Args: | |
| file_url (str): The URL of the file to download. | |
| token (str): The authorization token for the request. | |
| cache_path (str): The path to save the downloaded file. | |
| proxies (dict): Proxies for the request. | |
| film_id (str): Unique identifier for the film download. | |
| title (str): The title of the film. | |
| chunk_size (int): Size of each chunk to download. | |
| """ | |
| print(f"Downloading file from URL: {file_url} to {cache_path} with proxies: {proxies}") | |
| headers = {'Authorization': f'Bearer {token}'} | |
| try: | |
| response = requests.get(file_url, headers=headers, proxies=proxies, stream=True) | |
| response.raise_for_status() | |
| total_size = int(response.headers.get('content-length', 0)) | |
| download_progress[film_id] = {"total": total_size, "downloaded": 0, "status": "Downloading", "start_time": time.time()} | |
| os.makedirs(os.path.dirname(cache_path), exist_ok=True) | |
| with open(cache_path, 'wb') as file, tqdm(total=total_size, unit='B', unit_scale=True, desc=cache_path) as pbar: | |
| for data in response.iter_content(chunk_size=chunk_size): | |
| file.write(data) | |
| pbar.update(len(data)) | |
| download_progress[film_id]["downloaded"] += len(data) | |
| print(f'File cached to {cache_path} successfully.') | |
| self.update_film_store_json(title, cache_path) | |
| download_progress[film_id]["status"] = "Completed" | |
| except RequestException as e: | |
| print(f"Error downloading file: {e}") | |
| download_progress[film_id]["status"] = "Failed" | |
| except IOError as e: | |
| print(f"Error writing file {cache_path}: {e}") | |
| download_progress[film_id]["status"] = "Failed" | |
| finally: | |
| if download_progress[film_id]["status"] != "Downloading": | |
| download_progress[film_id]["end_time"] = time.time() | |
| def get_download_progress(id): | |
| """ | |
| Gets the download progress for a specific film. | |
| Args: | |
| film_id (str): The unique identifier for the film download. | |
| Returns: | |
| dict: A dictionary containing the total size, downloaded size, progress percentage, status, and ETA. | |
| """ | |
| if id in download_progress: | |
| total = download_progress[id]["total"] | |
| downloaded = download_progress[id]["downloaded"] | |
| status = download_progress[id].get("status", "In Progress") | |
| progress = (downloaded / total) * 100 if total > 0 else 0 | |
| eta = None | |
| if status == "Downloading" and downloaded > 0: | |
| elapsed_time = time.time() - download_progress[id]["start_time"] | |
| estimated_total_time = elapsed_time * (total / downloaded) | |
| eta = estimated_total_time - elapsed_time | |
| elif status == "Completed": | |
| eta = 0 | |
| return {"total": total, "downloaded": downloaded, "progress": progress, "status": status, "eta": eta} | |
| return {"total": 0, "downloaded": 0, "progress": 0, "status": "Not Found", "eta": None} | |
| def update_film_store_json(self,title, cache_path): | |
| """ | |
| Updates the film store JSON with the new file. | |
| Args: | |
| title (str): The title of the film. | |
| cache_path (str): The local path where the file is saved. | |
| """ | |
| film_store_data = {} | |
| if os.path.exists(self.FILM_STORE_JSON_PATH): | |
| with open(self.FILM_STORE_JSON_PATH, 'r') as json_file: | |
| film_store_data = json.load(json_file) | |
| film_store_data[title] = cache_path | |
| with open(self.FILM_STORE_JSON_PATH, 'w') as json_file: | |
| json.dump(film_store_data, json_file, indent=2) | |
| print(f'Film store updated with {title}.') | |
| def download_episode(self, file_url, token, cache_path, proxies, episode_id, title, chunk_size=100 * 1024 * 1024): | |
| """ | |
| Downloads a file from the specified URL and saves it to the cache path. | |
| Tracks the download progress. | |
| Args: | |
| file_url (str): The URL of the file to download. | |
| token (str): The authorization token for the request. | |
| cache_path (str): The path to save the downloaded file. | |
| proxies (dict): Proxies for the request. | |
| episode_id (str): Unique identifier for the film download. | |
| title (str): The title of the film. | |
| chunk_size (int): Size of each chunk to download. | |
| """ | |
| print(f"Downloading file from URL: {file_url} to {cache_path} with proxies: {proxies}") | |
| headers = {'Authorization': f'Bearer {token}'} | |
| try: | |
| response = requests.get(file_url, headers=headers, proxies=proxies, stream=True) | |
| response.raise_for_status() | |
| total_size = int(response.headers.get('content-length', 0)) | |
| download_progress[episode_id] = {"total": total_size, "downloaded": 0, "status": "Downloading", "start_time": time.time()} | |
| os.makedirs(os.path.dirname(cache_path), exist_ok=True) | |
| with open(cache_path, 'wb') as file, tqdm(total=total_size, unit='B', unit_scale=True, desc=cache_path) as pbar: | |
| for data in response.iter_content(chunk_size=chunk_size): | |
| file.write(data) | |
| pbar.update(len(data)) | |
| download_progress[episode_id]["downloaded"] += len(data) | |
| print(f'File cached to {cache_path} successfully.') | |
| self.update_tv_store_json(title, cache_path) | |
| download_progress[episode_id]["status"] = "Completed" | |
| except RequestException as e: | |
| print(f"Error downloading file: {e}") | |
| download_progress[episode_id]["status"] = "Failed" | |
| except IOError as e: | |
| print(f"Error writing file {cache_path}: {e}") | |
| download_progress[episode_id]["status"] = "Failed" | |
| finally: | |
| if download_progress[episode_id]["status"] != "Downloading": | |
| download_progress[episode_id]["end_time"] = time.time() | |
| def update_tv_store_json(self, title, cache_path): | |
| """ | |
| Updates the TV store JSON with the new file, organizing by title, season, and episode. | |
| Args: | |
| title (str): The title of the TV show. | |
| cache_path (str): The local path where the file is saved. | |
| """ | |
| tv_store_data = {} | |
| if os.path.exists(self.TV_STORE_JSON_PATH): | |
| with open(self.TV_STORE_JSON_PATH, 'r') as json_file: | |
| tv_store_data = json.load(json_file) | |
| # Extract season and episode information from the cache_path | |
| season_part = os.path.basename(os.path.dirname(cache_path)) # Extracts 'Season 1' | |
| episode_part = os.path.basename(cache_path) # Extracts 'Grand Blue Dreaming - S01E01 - Deep Blue HDTV-720p.mp4' | |
| # Create the structure if not already present | |
| if title not in tv_store_data: | |
| tv_store_data[title] = {} | |
| if season_part not in tv_store_data[title]: | |
| tv_store_data[title][season_part] = {} | |
| # Assuming episode_part is unique for each episode within a season | |
| tv_store_data[title][season_part][episode_part] = cache_path | |
| with open(self.TV_STORE_JSON_PATH, 'w') as json_file: | |
| json.dump(tv_store_data, json_file, indent=2) | |
| print(f'TV store updated with {title}, {season_part}, {episode_part}.') | |
| def load_json(self, file_path): | |
| """Load JSON data from a file.""" | |
| with open(file_path, 'r') as file: | |
| return json.load(file) | |
| def find_movie_path(self, title): | |
| """Find the path of the movie in the JSON data based on the title.""" | |
| for directory in self.file_structure: | |
| if directory['type'] == 'directory' and directory['path'] == 'films': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory': | |
| for item in sub_directory['contents']: | |
| if item['type'] == 'file' and title.lower() in item['path'].lower(): | |
| return item['path'] | |
| return None | |
| def find_tv_path(self, title): | |
| """Find the path of the TV show in the JSON data based on the title.""" | |
| for directory in self.file_structure: | |
| if directory['type'] == 'directory' and directory['path'] == 'tv': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): | |
| return sub_directory['path'] | |
| return None | |
| def get_tv_structure(self, title): | |
| """Find the path of the TV show in the JSON data based on the title.""" | |
| for directory in self.file_structure: | |
| if directory['type'] == 'directory' and directory['path'] == 'tv': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory' and title.lower() in sub_directory['path'].lower(): | |
| return sub_directory | |
| return None | |
| def get_film_id(self, title): | |
| """Generate a film ID based on the title.""" | |
| return title.replace(" ", "_").lower() | |
| def bytes_to_human_readable(self, num, suffix="B"): | |
| for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: | |
| if abs(num) < 1024.0: | |
| return f"{num:3.1f} {unit}{suffix}" | |
| num /= 1024.0 | |
| return f"{num:.1f} Y{suffix}" | |
| def encode_episodeid(self, title, season, episode): | |
| return f"{title}_{season}_{episode}" | |
| def get_all_tv_shows(self): | |
| """Get all TV shows from the indexed cache structure JSON file.""" | |
| tv_shows = {} | |
| for directory in self.file_structure: | |
| if directory['type'] == 'directory' and directory['path'] == 'tv': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory': | |
| show_title = sub_directory['path'].split('/')[-1] | |
| tv_shows[show_title] = [] | |
| for season_directory in sub_directory['contents']: | |
| if season_directory['type'] == 'directory': | |
| season = season_directory['path'].split('/')[-1] | |
| for episode in season_directory['contents']: | |
| if episode['type'] == 'file': | |
| tv_shows[show_title].append({ | |
| "season": season, | |
| "episode": episode['path'].split('/')[-1], | |
| "path": episode['path'] | |
| }) | |
| return tv_shows | |
| def get_all_films(self): | |
| """Get all films from the indexed cache structure JSON file.""" | |
| films = [] | |
| for directory in self.file_structure: | |
| if directory['type'] == 'directory' and directory['path'] == 'films': | |
| for sub_directory in directory['contents']: | |
| if sub_directory['type'] == 'directory': | |
| films.append(sub_directory['path']) | |
| return films | |
| def register_to_load_balancer(self): | |
| retries = 0 | |
| delay = self.initial_delay | |
| max_delay = 120 | |
| while True: | |
| try: | |
| result = self.load_balancer_api.register_instance(self.id, self.url) | |
| if result: | |
| logging.info(f'Successfully registered instance {self.id} to load balancer.') | |
| return result | |
| except Exception as e: | |
| logging.error(f'Error during registration: {e}') | |
| retries += 1 | |
| logging.warning(f'Attempt {retries} to register instance {self.id} failed. Retrying in {delay} seconds...') | |
| time.sleep(delay) | |
| delay = min(delay * 2, max_delay) # Exponential backoff with maximum delay |