load-balancer / LoadBalancer.py
ChandimaPrabath's picture
patch2
b8a5d63
import os
from indexer import indexer
from threading import Event, Thread
import asyncio
import time
import logging
from utils import convert_to_gb
from api import InstancesAPI
CACHE_DIR = os.getenv("CACHE_DIR")
class LoadBalancer:
def __init__(self, cache_dir, token, repo, polling_interval=4, max_retries=3, initial_delay=1):
self.version = "0.0.2 Alpha"
self.instances = []
self.instances_health = {}
self.polling_interval = polling_interval
self.max_retries = max_retries
self.initial_delay = initial_delay
self.stop_event = Event()
self.instances_api = InstancesAPI(self.instances)
self.CACHE_DIR = cache_dir
self.TOKEN = token
self.REPO = repo
self.MUSIC_STORE = {}
self.file_structure = None
self.category_files_map = {}
# Ensure CACHE_DIR exists
if not os.path.exists(self.CACHE_DIR):
os.makedirs(self.CACHE_DIR)
# Initialize file structure and start prefetching
self.update_file_structure()
# Start polling and file checking in separate threads
polling_thread = Thread(target=self.start_polling)
polling_thread.daemon = True
polling_thread.start()
# Start periodic tasks
asyncio.create_task(self.run_periodic_tasks())
def update_file_structure(self):
"""Update the file structure and the category-files map."""
self.file_structure = indexer() # Assume this re-fetches the file structure
self.category_files_map = {} # Reset the map
for directory in self.file_structure:
if directory['type'] == 'directory':
# Map category to its files
self.category_files_map[directory['path']] = [
file['path'] for file in directory['contents'] if file['type'] == 'file'
]
async def run_periodic_tasks(self):
"""Run indexer and prefetch functions every 5 minutes."""
while not self.stop_event.is_set():
self.update_file_structure() # Re-run indexer and update the map
await asyncio.sleep(300) # Sleep for 5 minutes
def get_reports(self):
reports = self.instances_api.fetch_reports()
temp_music_store = {}
for instance_url in self.instances[:]:
if instance_url in reports:
report = reports[instance_url]
logging.info(f"Report from {instance_url}: {report}")
self.process_report(instance_url, report, temp_music_store)
else:
logging.error(f"Failed to get report from {instance_url}. Removing instance.")
self.remove_instance(instance_url)
self.MUSIC_STORE = temp_music_store
def process_report(self, instance_url, report, temp_music_store):
music_store = report.get('music_store', {})
cache_size = report.get('cache_size')
logging.info(f"Processing report from {instance_url}")
# Update temporary music store
for title, path in music_store.items():
url = f"{instance_url}/api/get/music/{title.replace(' ', '%20')}"
temp_music_store[title] = url
logging.info("Music Store processed successfully.")
self.update_instances_health(instance=instance_url, cache_size=cache_size)
def start_polling(self):
logging.info("Starting polling.")
while not self.stop_event.is_set():
self.get_reports()
time.sleep(self.polling_interval)
logging.info("Polling stopped.")
def stop_polling(self):
logging.info("Stopping polling.")
self.stop_event.set()
def register_instance(self, instance_url):
if instance_url not in self.instances:
self.instances.append(instance_url)
logging.info(f"Registered instance {instance_url}")
else:
logging.info(f"Instance {instance_url} is already registered.")
def remove_instance(self, instance_url):
if instance_url in self.instances:
self.instances.remove(instance_url)
self.instances_health.pop(instance_url, None)
logging.info(f"Removed instance {instance_url}")
else:
logging.info(f"Instance {instance_url} not found for removal.")
def update_instances_health(self, instance, cache_size):
self.instances_health[instance] = {"used": cache_size["cache_size"], "total": "50 GB"}
logging.info(f"Updated instance {instance} with cache size {cache_size}")
def download_music_to_best_instance(self, file_name):
"""Downloads a music file to the instance with the most free space in self.instance_health."""
best_instance = None
max_free_space = -1
# Determine the instance with the most free space
for instance_url, space_info in self.instances_health.items():
total_space = convert_to_gb(space_info.get('total', 0))
used_space = convert_to_gb(space_info.get('used', 0))
free_space = total_space - used_space
if free_space > max_free_space:
max_free_space = free_space
best_instance = instance_url
if not best_instance:
logging.error("No suitable instance found for downloading the music.")
return {"error": "No suitable instance found for downloading the music."}
# Attempt to download music to the best instance
try:
result = self.instances_api.download_music(best_instance, file_name)
# Check if the response is as expected
if not result or "music_id" not in result or "status" not in result:
logging.error(f"Unexpected response from instance {best_instance}: {result}")
return {
"error": "Failed to retrieve valid download data from the instance.",
"details": result if result else "Empty response"
}
# Prepare response with download progress URL
music_id = self.get_music_id(file_name)
status = result["status"]
progress_url = f'{best_instance}/api/get/progress/{music_id}'
response = {
"music_id": music_id,
"status": status,
"progress_url": progress_url
}
return response
except Exception as e:
# Log network or API call-related errors
logging.error(f"Error downloading music to {best_instance}: {str(e)}")
return {
"error": "Error occurred during music download.",
"details": str(e)
}
def find_music_path(self, title):
"""Find the path of the music in the indexed data based on the title."""
for directory in self.file_structure:
if directory['type'] == 'directory':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'file' and title.lower() in sub_directory['path'].lower():
return sub_directory['path']
return None
def get_music_id(self, title):
"""Generate a unique music ID based on the title."""
return title.replace(" ", "_").lower()
def get_all_music(self):
"""Get all music files from the indexed file structure."""
music_files = []
for directory in self.file_structure:
if directory['type'] == 'directory':
for sub_directory in directory['contents']:
if sub_directory['type'] == 'file':
music_files.append(sub_directory['path'])
return music_files
def get_all_categories(self):
"""Get a list of all category folders."""
return list(self.category_files_map.keys())
def get_files_from_category(self, category):
"""Get all files from a specified category."""
return self.category_files_map.get(category, [])