File size: 8,076 Bytes
836f75b
 
 
0b7f1e4
836f75b
 
 
 
 
 
 
 
 
b8db82b
836f75b
 
 
 
 
 
 
 
 
 
65e40ba
836f75b
b8db82b
836f75b
 
 
 
 
733f7a8
b8db82b
836f75b
 
 
 
 
 
733f7a8
 
 
b8db82b
 
 
 
 
 
 
 
 
 
 
 
733f7a8
c137f8e
733f7a8
b8db82b
c137f8e
e0c041b
836f75b
 
65e40ba
836f75b
e0c041b
836f75b
 
 
65e40ba
836f75b
 
 
 
65e40ba
836f75b
65e40ba
 
836f75b
 
 
 
65e40ba
 
 
 
836f75b
65e40ba
836f75b
 
 
 
 
 
 
 
 
 
 
 
 
733f7a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
836f75b
65e40ba
836f75b
 
65e40ba
aa7ee0b
836f75b
 
 
aa7ee0b
836f75b
aa7ee0b
 
836f75b
65e40ba
836f75b
 
 
65e40ba
aa7ee0b
 
 
 
 
 
65e40ba
aa7ee0b
 
 
 
 
 
 
 
 
 
b8a5d63
836f75b
65e40ba
aa7ee0b
836f75b
65e40ba
 
 
836f75b
 
aa7ee0b
 
 
 
 
 
 
 
836f75b
65e40ba
 
836f75b
65e40ba
836f75b
65e40ba
836f75b
 
 
65e40ba
 
836f75b
 
65e40ba
 
 
836f75b
65e40ba
836f75b
65e40ba
 
 
b8db82b
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import os
from indexer import indexer
from threading import Event, Thread
import asyncio
import time
import logging
from utils import convert_to_gb
from api import InstancesAPI

CACHE_DIR = os.getenv("CACHE_DIR")

class LoadBalancer:
    def __init__(self, cache_dir, token, repo, polling_interval=4, max_retries=3, initial_delay=1):
        self.version = "0.0.2 Alpha"
        self.instances = []
        self.instances_health = {}
        self.polling_interval = polling_interval
        self.max_retries = max_retries
        self.initial_delay = initial_delay
        self.stop_event = Event()
        self.instances_api = InstancesAPI(self.instances)
        self.CACHE_DIR = cache_dir
        self.TOKEN = token
        self.REPO = repo
        self.MUSIC_STORE = {}
        self.file_structure = None
        self.category_files_map = {}

        # Ensure CACHE_DIR exists
        if not os.path.exists(self.CACHE_DIR):
            os.makedirs(self.CACHE_DIR)

        # Initialize file structure and start prefetching
        self.update_file_structure()

        # Start polling and file checking in separate threads
        polling_thread = Thread(target=self.start_polling)
        polling_thread.daemon = True
        polling_thread.start()

        # Start periodic tasks
        asyncio.create_task(self.run_periodic_tasks())

    def update_file_structure(self):
        """Update the file structure and the category-files map."""
        self.file_structure = indexer()  # Assume this re-fetches the file structure
        self.category_files_map = {}  # Reset the map

        for directory in self.file_structure:
            if directory['type'] == 'directory':
                # Map category to its files
                self.category_files_map[directory['path']] = [
                    file['path'] for file in directory['contents'] if file['type'] == 'file'
                ]

    async def run_periodic_tasks(self):
        """Run indexer and prefetch functions every 5 minutes."""
        while not self.stop_event.is_set():
            self.update_file_structure()  # Re-run indexer and update the map
            await asyncio.sleep(300)  # Sleep for 5 minutes

    def get_reports(self):
        reports = self.instances_api.fetch_reports()
        temp_music_store = {}

        for instance_url in self.instances[:]:
            if instance_url in reports:
                report = reports[instance_url]
                logging.info(f"Report from {instance_url}: {report}")
                self.process_report(instance_url, report, temp_music_store)
            else:
                logging.error(f"Failed to get report from {instance_url}. Removing instance.")
                self.remove_instance(instance_url)

        self.MUSIC_STORE = temp_music_store

    def process_report(self, instance_url, report, temp_music_store):
        music_store = report.get('music_store', {})
        cache_size = report.get('cache_size')

        logging.info(f"Processing report from {instance_url}")

        # Update temporary music store
        for title, path in music_store.items():
            url = f"{instance_url}/api/get/music/{title.replace(' ', '%20')}"
            temp_music_store[title] = url

        logging.info("Music Store processed successfully.")
        self.update_instances_health(instance=instance_url, cache_size=cache_size)

    def start_polling(self):
        logging.info("Starting polling.")
        while not self.stop_event.is_set():
            self.get_reports()
            time.sleep(self.polling_interval)
        logging.info("Polling stopped.")

    def stop_polling(self):
        logging.info("Stopping polling.")
        self.stop_event.set()

    def register_instance(self, instance_url):
        if instance_url not in self.instances:
            self.instances.append(instance_url)
            logging.info(f"Registered instance {instance_url}")
        else:
            logging.info(f"Instance {instance_url} is already registered.")

    def remove_instance(self, instance_url):
        if instance_url in self.instances:
            self.instances.remove(instance_url)
            self.instances_health.pop(instance_url, None)
            logging.info(f"Removed instance {instance_url}")
        else:
            logging.info(f"Instance {instance_url} not found for removal.")

    def update_instances_health(self, instance, cache_size):
        self.instances_health[instance] = {"used": cache_size["cache_size"], "total": "50 GB"}
        logging.info(f"Updated instance {instance} with cache size {cache_size}")

    def download_music_to_best_instance(self, file_name):
        """Downloads a music file to the instance with the most free space in self.instance_health."""
        best_instance = None
        max_free_space = -1

        # Determine the instance with the most free space
        for instance_url, space_info in self.instances_health.items():
            total_space = convert_to_gb(space_info.get('total', 0))
            used_space = convert_to_gb(space_info.get('used', 0))
            free_space = total_space - used_space

            if free_space > max_free_space:
                max_free_space = free_space
                best_instance = instance_url

        if not best_instance:
            logging.error("No suitable instance found for downloading the music.")
            return {"error": "No suitable instance found for downloading the music."}

        # Attempt to download music to the best instance
        try:
            result = self.instances_api.download_music(best_instance, file_name)

            # Check if the response is as expected
            if not result or "music_id" not in result or "status" not in result:
                logging.error(f"Unexpected response from instance {best_instance}: {result}")
                return {
                    "error": "Failed to retrieve valid download data from the instance.",
                    "details": result if result else "Empty response"
                }

            # Prepare response with download progress URL
            music_id = self.get_music_id(file_name)
            status = result["status"]
            progress_url = f'{best_instance}/api/get/progress/{music_id}'

            response = {
                "music_id": music_id,
                "status": status,
                "progress_url": progress_url
            }
            return response

        except Exception as e:
            # Log network or API call-related errors
            logging.error(f"Error downloading music to {best_instance}: {str(e)}")
            return {
                "error": "Error occurred during music download.",
                "details": str(e)
            }

    def find_music_path(self, title):
        """Find the path of the music in the indexed data based on the title."""
        for directory in self.file_structure:
            if directory['type'] == 'directory':
                for sub_directory in directory['contents']:
                    if sub_directory['type'] == 'file' and title.lower() in sub_directory['path'].lower():
                        return sub_directory['path']
        return None

    def get_music_id(self, title):
        """Generate a unique music ID based on the title."""
        return title.replace(" ", "_").lower()

    def get_all_music(self):
        """Get all music files from the indexed file structure."""
        music_files = []
        for directory in self.file_structure:
            if directory['type'] == 'directory':
                for sub_directory in directory['contents']:
                    if sub_directory['type'] == 'file':
                        music_files.append(sub_directory['path'])
        return music_files

    def get_all_categories(self):
        """Get a list of all category folders."""
        return list(self.category_files_map.keys())

    def get_files_from_category(self, category):
        """Get all files from a specified category."""
        return self.category_files_map.get(category, [])