Spaces:
Sleeping
Sleeping
File size: 8,076 Bytes
836f75b 0b7f1e4 836f75b b8db82b 836f75b 65e40ba 836f75b b8db82b 836f75b 733f7a8 b8db82b 836f75b 733f7a8 b8db82b 733f7a8 c137f8e 733f7a8 b8db82b c137f8e e0c041b 836f75b 65e40ba 836f75b e0c041b 836f75b 65e40ba 836f75b 65e40ba 836f75b 65e40ba 836f75b 65e40ba 836f75b 65e40ba 836f75b 733f7a8 836f75b 65e40ba 836f75b 65e40ba aa7ee0b 836f75b aa7ee0b 836f75b aa7ee0b 836f75b 65e40ba 836f75b 65e40ba aa7ee0b 65e40ba aa7ee0b b8a5d63 836f75b 65e40ba aa7ee0b 836f75b 65e40ba 836f75b aa7ee0b 836f75b 65e40ba 836f75b 65e40ba 836f75b 65e40ba 836f75b 65e40ba 836f75b 65e40ba 836f75b 65e40ba 836f75b 65e40ba b8db82b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
import os
from indexer import indexer
from threading import Event, Thread
import asyncio
import time
import logging
from utils import convert_to_gb
from api import InstancesAPI
CACHE_DIR = os.getenv("CACHE_DIR")
class LoadBalancer:
def __init__(self, cache_dir, token, repo, polling_interval=4, max_retries=3, initial_delay=1):
self.version = "0.0.2 Alpha"
self.instances = []
self.instances_health = {}
self.polling_interval = polling_interval
self.max_retries = max_retries
self.initial_delay = initial_delay
self.stop_event = Event()
self.instances_api = InstancesAPI(self.instances)
self.CACHE_DIR = cache_dir
self.TOKEN = token
self.REPO = repo
self.MUSIC_STORE = {}
self.file_structure = None
self.category_files_map = {}
# Ensure CACHE_DIR exists
if not os.path.exists(self.CACHE_DIR):
os.makedirs(self.CACHE_DIR)
# Initialize file structure and start prefetching
self.update_file_structure()
# Start polling and file checking in separate threads
polling_thread = Thread(target=self.start_polling)
polling_thread.daemon = True
polling_thread.start()
# Start periodic tasks
asyncio.create_task(self.run_periodic_tasks())
def update_file_structure(self):
"""Update the file structure and the category-files map."""
self.file_structure = indexer() # Assume this re-fetches the file structure
self.category_files_map = {} # Reset the map
for directory in self.file_structure:
if directory['type'] == 'directory':
# Map category to its files
self.category_files_map[directory['path']] = [
file['path'] for file in directory['contents'] if file['type'] == 'file'
]
async def run_periodic_tasks(self):
"""Run indexer and prefetch functions every 5 minutes."""
while not self.stop_event.is_set():
self.update_file_structure() # Re-run indexer and update the map
await asyncio.sleep(300) # Sleep for 5 minutes
def get_reports(self):
reports = self.instances_api.fetch_reports()
temp_music_store = {}
for instance_url in self.instances[:]:
if instance_url in reports:
report = reports[instance_url]
logging.info(f"Report from {instance_url}: {report}")
self.process_report(instance_url, report, temp_music_store)
else:
logging.error(f"Failed to get report from {instance_url}. Removing instance.")
self.remove_instance(instance_url)
self.MUSIC_STORE = temp_music_store
def process_report(self, instance_url, report, temp_music_store):
music_store = report.get('music_store', {})
cache_size = report.get('cache_size')
logging.info(f"Processing report from {instance_url}")
# Update temporary music store
for title, path in music_store.items():
url = f"{instance_url}/api/get/music/{title.replace(' ', '%20')}"
temp_music_store[title] = url
logging.info("Music Store processed successfully.")
self.update_instances_health(instance=instance_url, cache_size=cache_size)
def start_polling(self):
logging.info("Starting polling.")
while not self.stop_event.is_set():
self.get_reports()
time.sleep(self.polling_interval)
logging.info("Polling stopped.")
def stop_polling(self):
logging.info("Stopping polling.")
self.stop_event.set()
def register_instance(self, instance_url):
if instance_url not in self.instances:
self.instances.append(instance_url)
logging.info(f"Registered instance {instance_url}")
else:
logging.info(f"Instance {instance_url} is already registered.")
def remove_instance(self, instance_url):
if instance_url in self.instances:
self.instances.remove(instance_url)
self.instances_health.pop(instance_url, None)
logging.info(f"Removed instance {instance_url}")
else:
logging.info(f"Instance {instance_url} not found for removal.")
def update_instances_health(self, instance, cache_size):
self.instances_health[instance] = {"used": cache_size["cache_size"], "total": "50 GB"}
logging.info(f"Updated instance {instance} with cache size {cache_size}")
def download_music_to_best_instance(self, file_name):
"""Downloads a music file to the instance with the most free space in self.instance_health."""
best_instance = None
max_free_space = -1
# Determine the instance with the most free space
for instance_url, space_info in self.instances_health.items():
total_space = convert_to_gb(space_info.get('total', 0))
used_space = convert_to_gb(space_info.get('used', 0))
free_space = total_space - used_space
if free_space > max_free_space:
max_free_space = free_space
best_instance = instance_url
if not best_instance:
logging.error("No suitable instance found for downloading the music.")
return {"error": "No suitable instance found for downloading the music."}
# Attempt to download music to the best instance
try:
result = self.instances_api.download_music(best_instance, file_name)
# Check if the response is as expected
if not result or "music_id" not in result or "status" not in result:
logging.error(f"Unexpected response from instance {best_instance}: {result}")
return {
"error": "Failed to retrieve valid download data from the instance.",
"details": result if result else "Empty response"
}
# Prepare response with download progress URL
music_id = self.get_music_id(file_name)
status = result["status"]
progress_url = f'{best_instance}/api/get/progress/{music_id}'
response = {
"music_id": music_id,
"status": status,
"progress_url": progress_url
}
return response
except Exception as e:
# Log network or API call-related errors
logging.error(f"Error downloading music to {best_instance}: {str(e)}")
return {
"error": "Error occurred during music download.",
"details": str(e)
}
def find_music_path(self, title):
"""Find the path of the music in the indexed data based on the title."""
for directory in self.file_structure:
if directory['type'] == 'directory':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'file' and title.lower() in sub_directory['path'].lower():
return sub_directory['path']
return None
def get_music_id(self, title):
"""Generate a unique music ID based on the title."""
return title.replace(" ", "_").lower()
def get_all_music(self):
"""Get all music files from the indexed file structure."""
music_files = []
for directory in self.file_structure:
if directory['type'] == 'directory':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'file':
music_files.append(sub_directory['path'])
return music_files
def get_all_categories(self):
"""Get a list of all category folders."""
return list(self.category_files_map.keys())
def get_files_from_category(self, category):
"""Get all files from a specified category."""
return self.category_files_map.get(category, [])
|