import concurrent.futures
import random
from typing import List, Tuple
from doe_xstock.end_use_load_profiles import EndUseLoadProfiles
import math
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.metrics import cluster as cluster_metrics
from sklearn.preprocessing import MinMaxScaler
from citylearn.base import Environment
[docs]
class Clustering:
__MINIMUM_BUILDING_COUNT = 3
def __init__(self, end_use_load_profiles: EndUseLoadProfiles, bldg_ids: List[int], maximum_clusters: int = None, sum_of_squares_error_minimum_percent_change: float = None, random_seed: int = None) -> None:
self.end_use_load_profiles = end_use_load_profiles
self.bldg_ids = bldg_ids
self.maximum_clusters = maximum_clusters
self.sum_of_squares_error_minimum_percent_change = sum_of_squares_error_minimum_percent_change
self.random_seed = random_seed
@property
def maximum_clusters(self) -> int:
return self.__maximum_clusters
@property
def bldg_ids(self) -> List[int]:
return self.__bldg_ids
@property
def sum_of_squares_error_minimum_percent_change(self) -> float:
return self.__sum_of_squares_error_minimum_percent_change
@property
def random_seed(self) -> int:
return self.__random_seed
@bldg_ids.setter
def bldg_ids(self, value: List[int]):
assert len(value) > self.__MINIMUM_BUILDING_COUNT, f'Provide at least {self.__MINIMUM_BUILDING_COUNT} bldg_ids.'
self.__bldg_ids = sorted(value)
@maximum_clusters.setter
def maximum_clusters(self, value: int):
value = math.ceil(len(self.bldg_ids)/2) if value is None else value
assert 2 <= value < len(self.bldg_ids), f'maximum_clusters must be > 2 and less than number of bldg_ids'
self.__maximum_clusters = value
@sum_of_squares_error_minimum_percent_change.setter
def sum_of_squares_error_minimum_percent_change(self, value: float):
self.__sum_of_squares_error_minimum_percent_change = 10.0 if value is None else value
@random_seed.setter
def random_seed(self, value: int):
self.__random_seed = random.randint(*Environment.DEFAULT_RANDOM_SEED_RANGE) if value is None else value
[docs]
def cluster(self) -> Tuple[int, pd.DataFrame, pd.DataFrame]:
data = self.set_data()
scaler = MinMaxScaler()
scaler.fit(data)
data = scaler.transform(data)
scores = []
labels = []
with concurrent.futures.ThreadPoolExecutor() as executor:
results = {executor.submit(self.__cluster, *(data, c)): c for c in range(2, self.maximum_clusters + 1)}
for future in concurrent.futures.as_completed(results):
try:
clusters = results[future]
_labels, sum_of_squares_error, calinski_harabasz_score, silhouette_score, davies_bouldin_score = future.result()
scores.append({
'clusters': clusters,
'sum_of_square_error': sum_of_squares_error,
'calinski_harabasz_score': calinski_harabasz_score,
'silhouette_score': silhouette_score,
'davies_bouldin_score': davies_bouldin_score
})
labels.append(pd.DataFrame({
'clusters': clusters,
'bldg_id': self.bldg_ids,
'label': _labels
}))
except Exception as e:
raise(e)
scores = pd.DataFrame(scores).sort_values('clusters')
labels = pd.concat(labels, ignore_index=True)
optimal_clusters = self.get_optimal_clusters(*scores.T.values)
return optimal_clusters, scores, labels
[docs]
def get_optimal_clusters(self, clusters: List[int], sum_of_squares_error: List[float], calinski_harabasz_score: List[float], silhouette_score: List[float], davies_bouldin_score: List[float]) -> int:
assert len(clusters) == len(sum_of_squares_error) == len(calinski_harabasz_score) == len(davies_bouldin_score), \
'clusters and scores lists must have equal lengths.'
sum_of_squares_error_change = (
np.array(sum_of_squares_error, dtype=float)[:-1]
- np.array(sum_of_squares_error, dtype=float)[1:]
)*100/np.array(sum_of_squares_error, dtype=float)[:-1]
sse_candidates = np.array(clusters[:-1], dtype=int)[
sum_of_squares_error_change < self.sum_of_squares_error_minimum_percent_change
]
optimal_clusters = np.nanmean([
# sse_candidates.min() if len(sse_candidates) > 0 else np.nan,
clusters[np.array(calinski_harabasz_score, dtype=float).argmax()],
clusters[np.array(silhouette_score, dtype=float).argmax()],
clusters[np.array(davies_bouldin_score, dtype=float).argmin()],
], dtype=float)
optimal_clusters = math.floor(optimal_clusters)
return optimal_clusters
def __cluster(self, data: np.ndarray, clusters: int):
model = KMeans(clusters, random_state=self.random_seed).fit(data)
labels = model.labels_
sum_of_squares_error = model.inertia_
calinski_harabasz_score = cluster_metrics.calinski_harabasz_score(data, model.labels_)
davies_bouldin_score = cluster_metrics.davies_bouldin_score(data, model.labels_)
silhouette_score = cluster_metrics.silhouette_score(data, model.labels_)
return labels, sum_of_squares_error, calinski_harabasz_score, silhouette_score, davies_bouldin_score
[docs]
def set_data(self) -> pd.DataFrame:
raise NotImplementedError