| | import gym |
| | import numpy as np |
| | import math |
| | import sys |
| | import os |
| | import functools |
| |
|
| | import pandas as pd |
| |
|
| | |
| | |
| | sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) |
| | from Environment.solar_sys_environment import SolarSys |
| |
|
| |
|
| | def form_clusters(metrics: dict, size: int) -> list: |
| | """ |
| | Forms balanced, heterogeneous clusters by categorizing houses based on their |
| | energy profile and distributing them evenly in a round-robin fashion. |
| | """ |
| | house_ids = list(metrics.keys()) |
| | if not house_ids: |
| | return [] |
| | all_consumption = [m['consumption'] for m in metrics.values()] |
| | all_solar = [m['solar'] for m in metrics.values()] |
| | |
| | median_consumption = np.median(all_consumption) if all_consumption else 0 |
| | median_solar = np.median(all_solar) if all_solar else 0 |
| |
|
| | |
| | producers = [h for h in house_ids if metrics[h]['solar'] >= median_solar and metrics[h]['consumption'] < median_consumption] |
| | consumers = [h for h in house_ids if metrics[h]['solar'] < median_solar and metrics[h]['consumption'] >= median_consumption] |
| | prosumers = [h for h in house_ids if metrics[h]['solar'] >= median_solar and metrics[h]['consumption'] >= median_consumption] |
| | neutrals = [h for h in house_ids if metrics[h]['solar'] < median_solar and metrics[h]['consumption'] < median_consumption] |
| |
|
| | |
| | sorted_categorized_houses = producers + consumers + prosumers + neutrals |
| | |
| | |
| | categorized_set = set(sorted_categorized_houses) |
| | uncategorized = [h for h in house_ids if h not in categorized_set] |
| | final_house_list = sorted_categorized_houses + uncategorized |
| | num_houses = len(house_ids) |
| | num_clusters = math.ceil(num_houses / size) |
| | |
| | clusters = [[] for _ in range(num_clusters)] |
| | |
| | for i, house_id in enumerate(final_house_list): |
| | target_cluster_idx = i % num_clusters |
| | clusters[target_cluster_idx].append(house_id) |
| |
|
| | return clusters |
| |
|
| | class GlobalPriceVecEnvWrapper(gym.vector.VectorEnvWrapper): |
| | def __init__(self, env, clusters: list): |
| | super().__init__(env) |
| | self.clusters = clusters |
| | |
| | |
| | self.cluster_envs = self.env.envs |
| |
|
| | def step(self, actions: np.ndarray, exports: np.ndarray = None, imports: np.ndarray = None): |
| | num_clusters = len(self.cluster_envs) |
| | net_transfers = np.zeros(num_clusters) |
| | if exports is not None and imports is not None: |
| | net_transfers = imports - exports |
| | batched_low_level_actions = actions |
| | batched_transfers = net_transfers.reshape(-1, 1).astype(np.float32) |
| | batched_prices = np.full((num_clusters, 1), -1.0, dtype=np.float32) |
| | final_packed_actions_tuple = (batched_low_level_actions, batched_transfers, batched_prices) |
| | obs_next, rewards, terminateds, truncateds, infos = self.env.step(final_packed_actions_tuple) |
| | dones = terminateds | truncateds |
| | done_all = dones.all() |
| |
|
| |
|
| |
|
| | if done_all: |
| | final_infos = infos['final_info'] |
| | keys = final_infos[0].keys() |
| | infos = {k: np.stack([info[k] for info in final_infos]) for k in keys} |
| |
|
| | info_agg = { |
| | "cluster_dones": dones, |
| | "cluster_infos": infos, |
| | } |
| | |
| | return obs_next, rewards, done_all, info_agg |
| |
|
| | def get_export_capacity(self, cluster_idx: int) -> float: |
| | """Returns the total physically exportable energy from a cluster's batteries and solar in kWh.""" |
| | cluster_env = self.cluster_envs[cluster_idx] |
| | available_from_batt = cluster_env.battery_soc * cluster_env.battery_discharge_efficiency |
| | total_exportable = np.sum(available_from_batt) + cluster_env.current_solar |
| | return float(total_exportable) |
| |
|
| | def get_import_capacity(self, cluster_idx: int) -> float: |
| | """Returns the total physically importable space in a cluster's batteries in kWh.""" |
| | cluster_env = self.cluster_envs[cluster_idx] |
| | free_space = cluster_env.battery_max_capacity - cluster_env.battery_soc |
| | total_storable = np.sum(free_space) |
| | return float(total_storable) |
| |
|
| | def send_energy(self, from_cluster_idx: int, amount: float) -> float: |
| | """Drains 'amount' of energy from the specified cluster (batteries first, then solar).""" |
| | cluster_env = self.cluster_envs[from_cluster_idx] |
| | return cluster_env.send_energy(amount) |
| |
|
| | def receive_energy(self, to_cluster_idx: int, amount: float) -> float: |
| | """Charges batteries in the specified cluster with 'amount' of energy.""" |
| | cluster_env = self.cluster_envs[to_cluster_idx] |
| | return cluster_env.receive_energy(amount) |
| |
|
| |
|
| | def make_vec_env(data_path: str, time_freq: str, cluster_size: int, state: str): |
| | print("--- Pre-loading shared dataset for all environments ---") |
| | try: |
| | shared_df = pd.read_csv(data_path) |
| | shared_df["local_15min"] = pd.to_datetime(shared_df["local_15min"], utc=True) |
| | shared_df.set_index("local_15min", inplace=True) |
| |
|
| | |
| | shared_df = shared_df.resample(time_freq).mean() |
| | |
| |
|
| | except Exception as e: |
| | raise ValueError(f"Failed to pre-load data in make_vec_env: {e}") |
| |
|
| | base_env_for_metrics = SolarSys( |
| | data_path=data_path, |
| | time_freq=time_freq, |
| | preloaded_data=shared_df, |
| | state=state |
| | ) |
| | |
| | |
| | metrics = {} |
| | for hid in base_env_for_metrics.house_ids: |
| | total_consumption = float( |
| | np.clip(base_env_for_metrics.original_no_p2p_import[hid], 0.0, None).sum() |
| | ) |
| | total_solar = float( |
| | base_env_for_metrics.all_data[f"total_solar_{hid}"].clip(lower=0.0).sum() |
| | ) |
| | metrics[hid] = {'consumption': total_consumption, 'solar': total_solar} |
| | |
| | clusters = form_clusters(metrics, cluster_size) |
| | print(f"Formed {len(clusters)} clusters of size up to {cluster_size}.") |
| |
|
| | |
| | env_fns = [] |
| | for cluster_house_ids in clusters: |
| | preset_env_fn = functools.partial( |
| | SolarSys, |
| | data_path=data_path, |
| | time_freq=time_freq, |
| | house_ids_in_cluster=cluster_house_ids, |
| | preloaded_data=shared_df, |
| | state=state |
| | ) |
| | env_fns.append(preset_env_fn) |
| | sync_vec_env = gym.vector.SyncVectorEnv(env_fns) |
| | wrapped_vec_env = GlobalPriceVecEnvWrapper(sync_vec_env, clusters=clusters) |
| |
|
| | return wrapped_vec_env |