Source code for denspp.offline.digital.cluster

import numpy as np
import joblib
from dataclasses import dataclass
from os import environ
from os.path import join
from psutil import cpu_count
from sklearn.metrics import accuracy_score
from sklearn.cluster import KMeans, DBSCAN
from sklearn.neighbors import KNeighborsClassifier
from sklearn.mixture import GaussianMixture


[docs] @dataclass class SettingsCluster: """"Individual data class to configure clustering Attributes: type: String with used cluster/classification method [kMeans, GMM, DBSCAN, kNN] no_cluster: Integer with number of clusters max_iter: Integer with maximum number of iterations tolerance: Float with tolerance between iterations random_state: Definition of the state to start """ type: str no_cluster: int max_iter: int = 1000 tolerance: float = 1e-9 random_state = None # np.random.RandomState(seed=1234)
RecommendedSettingsCluster = SettingsCluster( type="kMeans", no_cluster=3 )
[docs] class Clustering: @staticmethod def __define_number_physical_cores() -> int: number_cores = cpu_count(logical=True) environ['OMP_NUM_THREADS'] = str(number_cores) return number_cores def __init__(self, settings: SettingsCluster) -> None: """Initialization of module for clustering Args: settings: Settings for setting-up the clustering pipeline Returns: None """ self.__define_number_physical_cores() self._settings = settings self._cluster = None self._cluster_init_done = False self.__method_used = '' self.__method_avai_checked = False self.__method_bib = dict() self.__method_bib.update({'kMeans': [self.__kmeans_init, self.__kmeans_predict]}) self.__method_bib.update({'GMM': [self.__gmm_init, self.__gmm_predict]}) self.__method_bib.update({'DBSCAN': [self.__dbscan_init, self.__dbscan_predict]}) self.__method_bib.update({'kNN': [self.__knn_init, self.__knn_predict]})
[docs] def methods_available(self, do_lower: bool=False) -> list: """Getting the list already methods""" dict_keys = self.__method_bib.keys() list_keys = [key if not do_lower else key.lower() for key in dict_keys] return list_keys
def __check_for_available_method(self) -> None: """Function for checking if clustering method is implemented""" if not self.__method_avai_checked: list_keys = self.methods_available(True) input_key = self._settings.type.lower() if input_key not in list_keys: raise TypeError("Wrong defined cluster method! - It is not defined in class") else: self.__method_avai_checked = True sel_pos = [idx for idx, key in enumerate(list_keys) if key == input_key] self.__method_used = self.methods_available()[sel_pos[0]] else: pass
[docs] def init(self, features: np.ndarray, true_label=None) -> np.ndarray: """Initialization of used clustering method Args: features: Numpy array with features true_label: Optional array with true_labeled features Returns: Numpy arrays with cluster results """ self.__check_for_available_method() pred_label = self.__method_bib[self.__method_used][0](features, true_label) self._cluster_init_done = True self.__determine_accuracy(pred_label, true_label) return pred_label
[docs] def predict(self, features: np.ndarray) -> np.ndarray: """Prediction of features with defined clustering method Args: features: Numpy array with features Returns: Numpy arrays with cluster results """ self.__check_for_available_method() return self.__method_bib[self.__method_used][1](features)
[docs] def get_cluster_model(self): """Getting the cluster model""" if not self._cluster_init_done: raise Warning("No cluster model is defined and trained") else: return self._cluster
[docs] def save_model_to_file(self, filename: str, path: str='') -> None: """Saving model to an external *.joblib file""" model2save = self.get_cluster_model() path2save = join(path, filename.split('.')[0]) + '.joblib' joblib.dump(model2save, path2save, compress=4)
[docs] def load_model_from_file(self, path2model: str) -> None: """Loading an already pre-trained model with *.joblib file""" self._cluster = joblib.load(path2model.split('.')[0] + '.joblib') self._cluster_init_done = True
[docs] def create_dummy_data(self, num_samples=1000, noise_std=0.6): """Function for generating dummy data for testing""" from sklearn.datasets import make_blobs from sklearn.preprocessing import StandardScaler X, labels_true = make_blobs( n_samples=num_samples, centers=self._settings.no_cluster, cluster_std=noise_std, random_state=0 ) return StandardScaler().fit_transform(X), labels_true
[docs] def sort_pred2label_data(self, pred_label: np.ndarray, true_label: np.ndarray, features: np.ndarray, take_num_samples: int=-1) -> np.ndarray: """Sorting predicted labels with true labels for getting the right-/similiar ID representation Args: pred_label: Array with predicted labels true_label: Array with true labels features: Array with features take_num_samples: Integer value of taking samples for each class [-1 --> all] Returns: Numpy array with sorted output """ num_repeat_process = 2 label_out = np.zeros(pred_label.shape, dtype=int) - 1 true_order = np.unique(true_label) new_order = np.zeros((self._settings.no_cluster, ), dtype=int) - 1 for idx, true_id in enumerate(true_order): true_pos_id = np.argwhere(true_label == true_id).flatten() if not take_num_samples == -1: np.random.shuffle(true_pos_id) true_pos_id = true_pos_id[:take_num_samples] pred_class = list() for i0 in range(num_repeat_process): for i1 in true_pos_id: pred_class.append(self._cluster.predict(features[i1, :].reshape((1, -1)))[0]) pred_class = np.array(pred_class, dtype=int) del i0, i1 # --- Decision ids, cnt = np.unique(pred_class, return_counts=True) if ids.size == 1: new_order[idx] = pred_class[0] else: new_pos = np.argmax(cnt) new_class = ids[new_pos] if new_class in new_order: while(new_class in new_order or cnt.size > 1): cnt = np.delete(cnt, new_pos, 0) ids = np.delete(ids, new_pos, 0) new_pos = np.argmax(cnt) new_class = ids[new_pos] new_order[idx] = -1 if cnt.size == 1 else new_class else: new_order[idx] = new_class # --- Decision: Check for ids with value -1 set_difference = set(true_order.tolist()) - set(new_order) list_difference_result = list(set_difference) if len(list_difference_result) == 1: pos0 = np.argwhere(new_order == -1).flatten() new_order[pos0] = list_difference_result[0] # --- Transform for idx, id in enumerate(new_order): pos = np.argwhere(pred_label == id).flatten() label_out[pos] = true_order[idx] return label_out
def __determine_accuracy(self, pred_labels: np.ndarray, true_labels=None) -> None: """Calculating the accuracy for clustering tasks""" if true_labels is None: pass elif pred_labels.size != true_labels.size: print("Accuracy can not be determined due to uncommon size") else: print(f"init. of clustering methods done with accuracy of " f"{accuracy_score(true_labels, pred_labels) * 100:.2f}") # ################################# CLUSTERING METHODS ############################################### def __gmm_init(self, features: np.ndarray, true_labels=None) -> np.ndarray: """Performing the gaussian mixture model for clustering""" self._cluster = GaussianMixture( n_init=1, n_components=self._settings.no_cluster, covariance_type='full', init_params='kmeans', tol=self._settings.tolerance, max_iter=self._settings.max_iter, random_state=self._settings.random_state ).fit(X=features, y=true_labels) return self._cluster.predict(features) def __gmm_predict(self, features: np.ndarray) -> np.ndarray: """Output with predicted classes of given feature array (GMM)""" if not isinstance(self._cluster, GaussianMixture): raise TypeError("Please init GaussianMixture (GMM) for prediction!") else: return self._cluster.predict(features) def __knn_init(self, features: np.ndarray, true_labels: np.ndarray) -> np.ndarray: """Initialization of kNN for clustering""" self._cluster = KNeighborsClassifier( n_neighbors=self._settings.no_cluster ).fit(X=features, y=true_labels) return self._cluster.classes_ def __knn_predict(self, features: np.ndarray) -> np.ndarray: """Output with predicted classes of given feature array (DBSCAN)""" if not isinstance(self._cluster, KNeighborsClassifier): raise TypeError("Please init k Nearest Neighboors for prediction") else: return self._cluster.predict(features) def __dbscan_init(self, features: np.ndarray, true_labels=None) -> np.ndarray: """Initialization of DBSCAN for clustering (Comment: true_label is ignored due to unsupervised learning)""" self._cluster = DBSCAN( eps=0.3, min_samples=8 ).fit(X=features, y=true_labels) return self._cluster.labels_ def __dbscan_predict(self, features: np.ndarray) -> np.ndarray: """Output with predicted classes of given feature array (DBSCAN)""" if not isinstance(self._cluster, DBSCAN): raise TypeError("Please init DBSCAN for prediction!") else: return self._cluster.fit_predict(features) def __kmeans_init(self, features: np.ndarray, true_labels=None) -> np.ndarray: """Initialization of kmeans for clustering (Comment: true_label is ignored due to unsupervised learning)""" self._cluster = KMeans( init="k-means++", n_init='auto', max_iter=self._settings.max_iter, random_state=self._settings.random_state, tol=self._settings.tolerance, n_clusters=self._settings.no_cluster ).fit(X=features, y=true_labels) return self._cluster.labels_ def __kmeans_predict(self, features: np.ndarray) -> np.ndarray: """Output with predicted classes of given feature array (kMeans)""" if not isinstance(self._cluster, KMeans): raise TypeError("Please init KMeans for predicting classes!") else: return self._cluster.predict(features)