from os import remove, makedirs
from os.path import join
import platform
from copy import deepcopy
import cpuinfo
import numpy as np
from logging import getLogger, Logger
from random import seed
from shutil import rmtree
from glob import glob
from datetime import datetime
from torch import (device, cuda, backends, randn, cat, Tensor, is_tensor, zeros, unique, argwhere, float32,
Generator, manual_seed, use_deterministic_algorithms)
from torch.utils.data import DataLoader, SubsetRandomSampler
from torchinfo import summary
from sklearn.model_selection import KFold
from denspp.offline import get_path_to_project_start, check_elem_unique
from denspp.offline.dnn.pytorch_config_data import SettingsDataset
from denspp.offline.dnn.pytorch_config_model import ConfigPytorch
from denspp.offline.structure_builder import init_dnn_folder
from denspp.offline.yaml_handler import YamlHandler
[docs]
class PyTorchHandler:
deterministic_generator: Generator
used_hw_dev: device
used_hw_cpu: str
used_hw_gpu: str
used_hw_num: int
train_loader: list
valid_loader: list
selected_samples: dict
cell_classes: list
_metric_methods: dict
_ptq_do_validation: bool = False
_ptq_level: list = [12, 8]
_logger: Logger
def __init__(self, config_train: ConfigPytorch, config_dataset: SettingsDataset, do_train: bool=True) -> None:
"""Class for Handling Training of Deep Neural Networks in PyTorch
Args:
config_train: Configuration settings for the PyTorch Training
config_dataset: Configuration settings for dataset handling
do_train: Mention if training should be used (default = True)
Returns:
None
"""
init_dnn_folder()
self._logger = getLogger(__name__)
# --- Preparing Neural Network
self.os_type = platform.system()
self.model = None
self.loss_fn = None
self.optimizer = None
# --- Preparing options
self.config_available = False
self._kfold_do = False
self._shuffle_do = config_train.data_do_shuffle
self._kfold_run = 0
# --- Saving options
self.settings_train = config_train
self.settings_data = config_dataset
self._index_folder = 'train' if do_train else 'inference'
self._model_addon = str()
# --- Logging paths for saving
self.__check_start_folder()
self._path2save = str()
self._path2log = str()
self._path2temp = str()
self._path2config = str()
def __check_start_folder(self, new_folder: str='runs'):
"""Checking for starting folder to generate"""
self._path2run = get_path_to_project_start(new_folder)
makedirs(self._path2run, exist_ok=True)
def __setup_device(self) -> None:
"""Setup PyTorch for Training"""
self.used_hw_cpu = (f"{cpuinfo.get_cpu_info()['brand_raw']} "
f"(@ {1e-9 * cpuinfo.get_cpu_info()['hz_actual'][0]:.3f} GHz)")
if cuda.is_available():
# Using GPU
self.used_hw_gpu = cuda.get_device_name()
self.used_hw_dev = device("cuda")
self.used_hw_num = cuda.device_count()
device0 = self.used_hw_gpu
cuda.empty_cache()
elif backends.mps.is_available() and backends.mps.is_built() and self.os_type == "Darwin":
# Using Apple M1 Chip
self.used_hw_gpu = 'None'
self.used_hw_num = cuda.device_count()
self.used_hw_dev = device("mps")
device0 = self.used_hw_cpu
else:
# Using normal CPU
self.used_hw_gpu = 'None'
self.used_hw_dev = device("cpu")
self.used_hw_num = 1 # cpuinfo.get_cpu_info()['count']
device0 = self.used_hw_cpu
self._logger.debug(f"\nUsing PyTorch with {device0} on {self.os_type}")
def _init_train(self, path2save: str='', addon: str='') -> None:
"""Do init of class for training"""
if not path2save:
folder_name = f'{datetime.now().strftime("%Y%m%d_%H%M%S")}_{self._index_folder}_{self.model.__class__.__name__}'
self._path2save = join(self._path2run, folder_name)
else:
self._path2save = path2save
self._path2temp = join(self._path2save, f'temp')
# --- Generate folders
makedirs(self._path2run, exist_ok=True)
makedirs(self._path2save, exist_ok=True)
makedirs(self._path2temp, exist_ok=True)
# --- Transfer model to hardware
self.model.to(device=self.used_hw_dev)
# --- Copy settings to YAML file
YamlHandler(
template=self.settings_data,
path=self._path2save,
file_name='Config_Dataset'
)
YamlHandler(
template=self.settings_train,
path=self._path2save,
file_name=f'Config_Training{addon}'
)
def __deterministic_training_preparation(self) -> None:
"""Preparing the CUDA hardware for deterministic training"""
if self.settings_train.deterministic_do:
np.random.seed(self.settings_train.deterministic_seed)
manual_seed(self.settings_train.deterministic_seed)
if cuda.is_available():
cuda.manual_seed_all(self.settings_train.deterministic_seed)
seed(self.settings_train.deterministic_seed)
backends.cudnn.deterministic = True
use_deterministic_algorithms(True)
self._logger.info(f"=== DL Training with Deterministic @seed: {self.settings_train.deterministic_seed} ===")
else:
use_deterministic_algorithms(False)
self._logger.info(f"=== Normal DL Training ===")
def __deterministic_get_dataloader_params(self) -> dict:
"""Getting the parameters for preparing the Training and Validation DataLoader for Deterministic Training"""
if self.settings_train.deterministic_do:
self.deterministic_generator = Generator()
self.deterministic_generator.manual_seed(self.settings_train.deterministic_seed)
worker_init_fn = lambda worker_id: np.random.seed(self.settings_train.deterministic_seed)
return {'worker_init_fn': worker_init_fn, 'generator': self.deterministic_generator}
else:
return {}
[docs]
def load_data(self, data_set, num_workers: int=0) -> None:
"""Loading data for training and validation in DataLoader format into class
Args:
data_set: DataLoader of used dataset
num_workers: Number of workers for calculation [Default: 0 --> single core]
Return:
None
"""
self.__setup_device()
self._kfold_do = True if self.settings_train.num_kfold > 1 else False
self._model_addon = data_set.get_topology_type
self.cell_classes = data_set.get_dictionary
params_deterministic = self.__deterministic_get_dataloader_params()
# --- Preparing datasets
out_train = list()
out_valid = list()
if self._kfold_do:
kfold = KFold(n_splits=self.settings_train.num_kfold,
shuffle=self._shuffle_do and not self.settings_train.deterministic_do)
for idx_train, idx_valid in kfold.split(np.arange(len(data_set))):
subsamps_train = SubsetRandomSampler(idx_train)
subsamps_valid = SubsetRandomSampler(idx_valid)
out_train.append(DataLoader(data_set,
batch_size=self.settings_train.batch_size,
sampler=subsamps_train,
**params_deterministic))
out_valid.append(DataLoader(data_set,
batch_size=self.settings_train.batch_size,
sampler=subsamps_valid,
**params_deterministic))
else:
idx = np.arange(len(data_set))
if self._shuffle_do and not self.settings_train.deterministic_do:
np.random.shuffle(idx)
split_pos = int(len(data_set) * (1 - self.settings_train.data_split_ratio))
idx_train = idx[0:split_pos]
idx_valid = idx[split_pos:]
subsamps_train = SubsetRandomSampler(idx_train)
subsamps_valid = SubsetRandomSampler(idx_valid)
out_train.append(DataLoader(data_set,
batch_size=self.settings_train.batch_size,
sampler=subsamps_train,
**params_deterministic))
out_valid.append(DataLoader(data_set,
batch_size=self.settings_train.batch_size,
sampler=subsamps_valid,
**params_deterministic))
# --- CUDA support for dataset
if cuda.is_available():
for idx, dataset in enumerate(out_train):
out_train[idx].pin_memory = True
out_train[idx].pin_memory_device = self.used_hw_dev.type
out_train[idx].num_workers = num_workers
out_valid[idx].pin_memory = True
out_valid[idx].pin_memory_device = self.used_hw_dev.type
out_valid[idx].num_workers = num_workers
# --- Output: Data
self.train_loader = out_train
self.valid_loader = out_valid
[docs]
def load_model(self, model, learn_rate: float=0.1) -> None:
"""Loading optimizer, loss_fn into class
Args:
model: PyTorch Neural Network for Training / Inference
learn_rate: Learning rate used for SGD optimier
Returns:
None
"""
self.model = model
self.optimizer = self.settings_train.load_optimizer(model, learn_rate=learn_rate)
self.loss_fn = self.settings_train.get_loss_func()
# --- Init. hardware for deterministic training
if self.settings_train.deterministic_do:
self.__deterministic_training_preparation()
# --- Print model
self._logger.info("\nPrint summary of model")
self._logger.info(str(summary(self.model, input_size=self.model.model_shape)))
self._logger.info("\n\n")
def _save_train_results(self, last_metric_train: float | np.ndarray,
last_metric_valid: float | np.ndarray, loss_type: str='Loss') -> None:
"""Writing some training metrics into txt-file"""
if self.config_available:
with open(self._path2config, 'a') as txt_handler:
txt_handler.write(f'\n--- Metrics of last epoch in fold #{self._kfold_run} ---')
txt_handler.write(f'\nTraining {loss_type} = {last_metric_train}')
txt_handler.write(f'\nValidation {loss_type} = {last_metric_valid}\n')
[docs]
def get_saving_path(self) -> str:
"""Getting the path for saving files in aim folder"""
return self._path2save
[docs]
def get_best_model(self, type_model: str) -> list:
"""Getting the path to the best trained model"""
return glob(join(self._path2save, f'*{type_model}*.pt'))
def _end_training_routine(self, timestamp_start: datetime, do_delete_temps: bool=True) -> None:
"""Doing the last step of training routine"""
timestamp_end = datetime.now()
timestamp_string = timestamp_end.strftime('%H:%M:%S')
diff_time = timestamp_end - timestamp_start
diff_string = diff_time
self._logger.info(f'\nTraining ends on: {timestamp_string}')
self._logger.info(f'Training runs: {diff_string}')
# Delete init model
init_model = glob(join(self._path2save, '*_reset.pt'))
for file in init_model:
remove(file)
# Delete log folders
if do_delete_temps:
folder_logs = glob(join(self._path2save, 'temp*'))
for folder in folder_logs:
rmtree(folder, ignore_errors=True)
def __get_data_points(self, only_getting_labels: bool=False, use_train_dataloader: bool=False) -> dict:
"""Getting data from DataLoader for Plotting Results
Args:
only_getting_labels: Option for taking only labels
use_train_dataloader: Mode for selecting datatype (True=Training, False=Validation)
Returns:
Dict with data for plotting
"""
used_dataset = self.train_loader[-1] if use_train_dataloader else self.valid_loader[-1]
# --- Getting the keys
keys = list()
for data in used_dataset:
keys = list(data.keys())
break
if only_getting_labels:
keys.pop(0)
# --- Extracting data
data_extract = [randn(32, 1) for _ in keys]
first_run = True
for data in used_dataset:
for idx, key in enumerate(keys):
if first_run:
data_extract[idx] = data[key]
else:
data_extract[idx] = cat((data_extract[idx], data[key]), dim=0)
first_run = False
# --- Prepare output
mdict = dict()
for idx, data in enumerate(data_extract):
mdict.update({keys[idx]: data.numpy()})
return mdict
def _getting_data_for_plotting(self, valid_input: np.ndarray, valid_label: np.ndarray,
results=None, addon: str='cl') -> dict:
"""Getting the raw data for plotting results"""
# --- Producing and Saving the output
if results is None:
results = dict()
self._logger.info(f"... preparing results for plot generation")
data_train = self.__get_data_points(only_getting_labels=True, use_train_dataloader=True)
output = dict()
output.update({'settings': self.settings_train, 'date': datetime.now().strftime('%d/%m/%Y, %H:%M:%S')})
output.update({'train_clus': data_train['class'] if addon == 'ae' else data_train['out'], 'cl_dict': self.cell_classes})
output.update({'input': valid_input, 'valid_clus': valid_label})
output.update(results)
data2save = join(self.get_saving_path(), f'results_{addon}.npy')
self._logger.debug(f"... saving results: {data2save}")
np.save(data2save, output)
return output
def _determine_epoch_metrics(self, do_metrics: str):
"""Determination of additional metrics during training
Args:
do_metrics: String with index for calculating epoch metric
Return:
Function for metric calculation
"""
func = Tensor
for metric_avai, func in self._metric_methods.items():
if metric_avai == do_metrics:
break
return func
def _separate_classes_from_label(self, pred: Tensor, true: Tensor, label: str, *args) -> [Tensor, Tensor]:
"""Separating the classes for further metric processing
Args:
pred: Torch Tensor from prediction
true: Torch Tensor from labeled dataset (ground-truth)
key: String with processing metric
func: Function for metric calculation
Return:
Calculated metric results in Tensor array and total samples of each class
"""
if args or not "cl" in label:
metric_out = zeros((len(self.cell_classes),), dtype=float32)
else:
metric_out = [zeros((1,)) for _ in self.cell_classes]
length_out = zeros((len(self.cell_classes),), dtype=float32)
for idx, id in enumerate(unique(true)):
xpos = argwhere(true == id).flatten()
length_out[idx] = len(xpos)
if args:
metric_out[idx] += args[0](pred[xpos], true[xpos])
else:
metric_out[idx] = pred[xpos]
return metric_out, length_out
@staticmethod
def _converting_tensor_to_numpy(metric_used: dict) -> dict:
"""Converting tensor array to numpy for later processing
:param metric_used: Dictionary of used metric
:return: Dictionary with calculated metrics
"""
# --- Metric out for saving (converting from tensor to numpy)
metric_save = deepcopy(metric_used)
for key0, data0 in metric_used.items():
for key1, data1 in data0.items():
for idx2, data2 in enumerate(data1):
if isinstance(data2, list):
for idx3, data3 in enumerate(data2):
if is_tensor(data3):
metric_save[key0][key1][idx2][idx3] = data3.cpu().detach().numpy()
else:
if is_tensor(data2):
metric_save[key0][key1][idx2] = data2.cpu().detach().numpy()
return metric_save
[docs]
def get_epoch_metric_custom_methods(self) -> list:
"""Getting an overview of available methods for custom-written metric calculation in each epoch during training
:return: List with metrics name to call
"""
return [key for key in self._metric_methods.keys()]
@property
def get_number_parameters_from_model(self) -> int:
"""Getting the number of used parameters of used DNN model"""
return int(sum(p.numel() for p in self.model.parameters()))
[docs]
def define_ptq_level(self, total_bitwidth: int, frac_bitwidth: int) -> None:
"""Function for defining the post-training quantization level of the model
:param total_bitwidth: Total bitwidth of the model
:param frac_bitwidth: Fraction of bitwidth used for quantization
:return: None
"""
self._ptq_level = [total_bitwidth, frac_bitwidth]
[docs]
def logic_combination(true_labels: np.ndarray, pred_labels: np.ndarray, translate_list: list) -> [np.ndarray, np.ndarray]:
"""Combination of logic for Reducing Label Classes
:param true_labels: Numpy array with true labels
:param pred_labels: Numpy array with predicted labels
:param translate_list: List with label ids to combine (e.g. [[1, 2], [0, 3]] -> [0, 1])
:returns: Two numpy arrays with true_labels_new and pred_labels_new
"""
assert true_labels.shape == pred_labels.shape, "Shape of labels are not equal"
assert len(translate_list), "List with new translation is empty"
assert check_elem_unique(translate_list), "Not all key elements in sublists are unique"
true_labels_new = np.zeros_like(true_labels, dtype=np.uint8)
pred_labels_new = np.zeros_like(pred_labels, dtype=np.uint8)
for idx, cluster in enumerate(translate_list):
for id in cluster:
pos = np.argwhere(true_labels == id).flatten()
true_labels_new[pos] = idx
pos = np.argwhere(pred_labels == id).flatten()
pred_labels_new[pos] = idx
return true_labels_new, pred_labels_new