from os import remove, cpu_count
from pathlib import Path
import platform
import subprocess
import re
from copy import deepcopy
from dataclasses import dataclass
from typing import Any
import numpy as np
from logging import getLogger, Logger
from random import seed
from shutil import rmtree
from datetime import datetime
from torch import (device, cuda, backends, randn, cat, Tensor, is_tensor, zeros, unique, argwhere, float32,
Generator, manual_seed, use_deterministic_algorithms, nn, optim)
from torch.utils.data import DataLoader, SubsetRandomSampler
from torchinfo import summary
from sklearn.model_selection import KFold
from denspp.offline import get_path_to_project
from denspp.offline.data_format import YamlHandler
from denspp.offline.dnn import SettingsDataset
from denspp.offline.dnn.model_library import ModelLibrary
from denspp.offline.structure_builder import init_dnn_folder
[docs]
@dataclass
class DataValidation:
"""Dataclass with results from post-training validation phase
Attributes:
input: Numpy array with model input
valid_label: Numpy array with valid label during validation phase
train_label: Numpy array with training labels during training phase
feat: Numpy array with extracted features
mean: Numpy array with mean input signals for each class
output: Numpy array with model output
label_names: List with string names of each label class
"""
input: np.ndarray
valid_label: np.ndarray
train_label: np.ndarray
feat: np.ndarray
mean: np.ndarray
output: np.ndarray
label_names: list[str]
[docs]
@dataclass
class SettingsPytorch:
"""Class for handling the PyTorch training/inference pipeline
Attributes:
model_name: String with the model name
patience: Integer value with number of epochs before early stopping
optimizer: String with PyTorch optimizer name
loss: String with method name for the loss function
deterministic_do: Boolean if deterministic training should be done
deterministic_seed: Integer with the seed for deterministic training
num_kfold: Integer value with applying k-fold cross validation
num_epochs: Integer value with number of epochs
batch_size: Integer value with batch size
data_split_ratio: Float value for splitting the input dataset between training and validation
data_do_shuffle: Boolean if data should be shuffled before training
custom_metrics: List with string of custom metrics to calculate during training
"""
model_name: str
patience: int
optimizer: str
loss: str
deterministic_do: bool
deterministic_seed: int
num_kfold: int
num_epochs: int
batch_size: int
data_split_ratio: float
data_do_shuffle: bool
custom_metrics: list
[docs]
@staticmethod
def get_model_overview(print_overview: bool=False, index: str='') -> list:
"""Function for getting an overview of existing models inside library"""
models_bib = ModelLibrary().get_registry()
return models_bib.get_library_overview(index, do_print=print_overview)
[docs]
def get_loss_func(self) -> Any:
"""Getting the loss function"""
match self.loss:
case 'L1':
loss_func = nn.L1Loss
case 'MSE':
loss_func = nn.MSELoss()
case 'Cross Entropy':
loss_func = nn.CrossEntropyLoss()
case 'Cosine Similarity':
loss_func = nn.CosineSimilarity()
case _:
raise NotImplementedError("Loss function unknown! - Please implement or check!")
return loss_func
[docs]
def load_optimizer(self, model, learn_rate: float=0.1) -> Any:
"""Loading the optimizer function
:param model: PyTorch Sequential of the model with pre-defined configuration
:param learn_rate: Learning rate of the optimizer
:return: PyTorch Optimizer
"""
match self.optimizer:
case 'Adam':
optim_func = optim.Adam(model.parameters())
case 'SGD':
optim_func = optim.SGD(model.parameters(), lr=learn_rate)
case _:
raise NotImplementedError("Optimizer function unknown! - Please implement or check!")
return optim_func
[docs]
def get_model(self, *args, **kwargs):
"""Function for loading the model to train"""
models_bib = ModelLibrary().get_registry()
if not self.model_name:
models_bib.get_library_overview(do_print=True)
raise AttributeError("Please select one model above and type-in the name into yaml file")
else:
if models_bib.check_module_available(self.model_name):
return deepcopy(models_bib.build(self.model_name, *args, **kwargs))
else:
models_bib.get_library_overview(do_print=True)
raise AttributeError(f"Model is not available - Please check again!")
[docs]
def get_signature(self) -> list:
"""Returning the signature or list with input names of model object"""
models_bib = ModelLibrary().get_registry()
if not self.model_name:
models_bib.get_library_overview(do_print=True)
raise AttributeError("Please select one model above and type-in the name into yaml file")
else:
if models_bib.check_module_available(self.model_name):
return models_bib.get_signature(self.model_name)
else:
models_bib.get_library_overview(do_print=True)
raise AttributeError(f"Model is not available - Please check again!")
[docs]
class PyTorchHandler:
_deterministic_generator: Generator
_used_hw_dev: device
_used_hw_num: int
_train_loader: list
_valid_loader: list
_selected_samples: dict
_cell_classes: list
_metric_methods: dict
_ptq_do_validation: bool = False
_ptq_level: list = [12, 8]
_logger: Logger
_path2save: Path = Path(".")
_path2log: Path
_path2temp: Path
_path2config: Path
def __init__(self, config_train: SettingsPytorch, config_dataset: SettingsDataset, do_train: bool=True) -> None:
"""Class for Handling Training of Deep Neural Networks in PyTorch
Args:
config_train: Configuration settings for the PyTorch Training
config_dataset: Configuration settings for dataset handling
do_train: Mention if training should be used (default = True)
Returns:
None
"""
init_dnn_folder()
self._logger = getLogger(__name__)
# --- Preparing Neural Network
self._model = None
self._loss_fn = None
self._optimizer = None
# --- Preparing options
self._config_available = False
self._kfold_do = False
self._shuffle_do = config_train.data_do_shuffle
self._kfold_run = 0
# --- Saving options
self._settings_train: SettingsPytorch = config_train
self._settings_data: SettingsDataset = config_dataset
self._index_folder = 'train' if do_train else 'inference'
self._model_addon = str()
# --- Logging paths for saving
self.__check_start_folder()
@staticmethod
def _get_cpu_name_windows() -> str:
return platform.processor()
@staticmethod
def _get_cpu_name_mac() -> str:
result = subprocess.run(['sysctl', '-n', 'machdep.cpu.brand_string'], capture_output=True, text=True)
return result.stdout.strip()
@staticmethod
def _get_cpu_name_linux():
result = subprocess.run(['cat', '/proc/cpuinfo'], capture_output=True, text=True)
for line in result.stdout.split('\n'):
if "model name" in line:
return re.sub(".*model name.*:", "", line, 1).strip()
def _get_cpu_name(self) -> str:
match platform.system().lower():
case 'windows':
return self._get_cpu_name_windows()
case 'linux':
return self._get_cpu_name_linux()
case 'darwin':
return self._get_cpu_name_mac()
case _:
return ''
def __check_start_folder(self, new_folder: str='runs'):
"""Checking for starting folder to generate"""
self._path2run = Path(get_path_to_project(new_folder))
self._path2run.mkdir(parents=True, exist_ok=True)
def __setup_device(self) -> None:
if cuda.is_available():
# Using GPU
used_hw_gpu = cuda.get_device_name()
self._used_hw_dev = device("cuda")
self._used_hw_num = cuda.device_count()
device0 = used_hw_gpu
cuda.empty_cache()
elif backends.mps.is_available() and backends.mps.is_built() and platform.system().lower() == "darwin":
# Using Apple M1 Chip
self._used_hw_dev = device("mps")
self._used_hw_num = cuda.device_count()
device0 = self._get_cpu_name()
else:
# Using normal CPU
self._used_hw_dev = device("cpu")
self._used_hw_num = cpu_count()
device0 = self._get_cpu_name()
self._logger.debug(f"\nUsing PyTorch with {device0} on {platform.system()}")
def _init_train(self, path2save: Path, addon: str) -> None:
"""Do initialization of training routine
:param path2save: Path to the saved folder
:param addon: Addon name for model type ('ae' = Autoencoder or 'cl' = Classifier)
:return: None
"""
print(path2save)
if path2save == Path("."):
folder_name = f'{datetime.now().strftime("%Y%m%d_%H%M%S")}_{self._index_folder}_{self._model.__class__.__name__}'
self._path2save = self._path2run / folder_name
else:
self._path2save = path2save
self._path2temp = self._path2save / f'temp'
# --- Generate folders
self._path2run.mkdir(parents=True, exist_ok=True)
self._path2save.mkdir(parents=True, exist_ok=True)
self._path2temp.mkdir(parents=True, exist_ok=True)
# --- Transfer model to hardware
self._model.to(device=self._used_hw_dev)
# --- Copy settings to YAML file
YamlHandler(
template=self._settings_data,
path=str(self._path2save),
file_name='Config_Dataset'
)
YamlHandler(
template=self._settings_train,
path=str(self._path2save),
file_name=f'Config_Training{addon}'
)
def __deterministic_training_preparation(self) -> None:
"""Preparing the CUDA hardware for deterministic training"""
if self._settings_train.deterministic_do:
np.random.seed(self._settings_train.deterministic_seed)
manual_seed(self._settings_train.deterministic_seed)
if cuda.is_available():
cuda.manual_seed_all(self._settings_train.deterministic_seed)
seed(self._settings_train.deterministic_seed)
backends.cudnn.deterministic = True
use_deterministic_algorithms(True)
self._logger.info(f"=== DL Training with Deterministic @seed: {self._settings_train.deterministic_seed} ===")
else:
use_deterministic_algorithms(False)
self._logger.info(f"=== Normal DL Training ===")
def __deterministic_get_dataloader_params(self) -> dict:
"""Getting the parameters for preparing the Training and Validation DataLoader for Deterministic Training"""
if self._settings_train.deterministic_do:
self._deterministic_generator = Generator()
self._deterministic_generator.manual_seed(self._settings_train.deterministic_seed)
worker_init_fn = lambda worker_id: np.random.seed(self._settings_train.deterministic_seed)
return {'worker_init_fn': worker_init_fn, 'generator': self._deterministic_generator}
else:
return {}
def _prepare_dataset_for_training(self, data_set, num_workers: int=0) -> None:
"""Loading data for training and validation in DataLoader format into class
Args:
data_set: Dataclass DatasetFromFil loaded from file
num_workers: Number of workers for calculation [Default: 0 --> single core]
Return:
None
"""
self.__setup_device()
self._kfold_do = True if self._settings_train.num_kfold > 1 else False
self._model_addon = data_set.get_topology_type
self._cell_classes = data_set.get_dictionary
params_deterministic = self.__deterministic_get_dataloader_params()
# --- Preparing datasets
out_train = list()
out_valid = list()
if self._kfold_do:
kfold = KFold(n_splits=self._settings_train.num_kfold,
shuffle=self._shuffle_do and not self._settings_train.deterministic_do)
for idx_train, idx_valid in kfold.split(np.arange(len(data_set))):
subsamps_train = SubsetRandomSampler(idx_train)
subsamps_valid = SubsetRandomSampler(idx_valid)
out_train.append(DataLoader(data_set,
batch_size=self._settings_train.batch_size,
sampler=subsamps_train,
**params_deterministic))
out_valid.append(DataLoader(data_set,
batch_size=self._settings_train.batch_size,
sampler=subsamps_valid,
**params_deterministic))
else:
idx = np.arange(len(data_set))
if self._shuffle_do and not self._settings_train.deterministic_do:
np.random.shuffle(idx)
split_pos = int(len(data_set) * (1 - self._settings_train.data_split_ratio))
idx_train = idx[0:split_pos]
idx_valid = idx[split_pos:]
subsamps_train = SubsetRandomSampler(idx_train)
subsamps_valid = SubsetRandomSampler(idx_valid)
out_train.append(DataLoader(data_set,
batch_size=self._settings_train.batch_size,
sampler=subsamps_train,
**params_deterministic))
out_valid.append(DataLoader(data_set,
batch_size=self._settings_train.batch_size,
sampler=subsamps_valid,
**params_deterministic))
# --- CUDA support for dataset
if cuda.is_available():
for idx, dataset in enumerate(out_train):
out_train[idx].pin_memory = True
out_train[idx].pin_memory_device = self._used_hw_dev.type
out_train[idx].num_workers = num_workers
out_valid[idx].pin_memory = True
out_valid[idx].pin_memory_device = self._used_hw_dev.type
out_valid[idx].num_workers = num_workers
# --- Output: Data
self._train_loader = out_train
self._valid_loader = out_valid
[docs]
def get_saving_path(self) -> Path:
"""Getting the absolute path for saving files in aim folder"""
return self._path2save.absolute()
[docs]
def get_best_model(self, type_model: str) -> list:
"""Getting the path to the best trained model"""
return [file for file in self._path2save.glob(f'*{type_model}*.pt')]
[docs]
def load_model(self, model, learn_rate: float=0.1) -> None:
"""Loading optimizer, loss_fn into class
Args:
model: PyTorch Neural Network for Training / Inference
learn_rate: Learning rate used for SGD optimizer
Returns:
None
"""
self._model = model
self._optimizer = self._settings_train.load_optimizer(model, learn_rate=learn_rate)
self._loss_fn = self._settings_train.get_loss_func()
# --- Init. hardware for deterministic training
if self._settings_train.deterministic_do:
self.__deterministic_training_preparation()
# --- Print model
try:
self._logger.info("\nPrint summary of model")
self._logger.info(str(summary(self._model, input_size=self._model.model_shape)))
self._logger.info("\n\n")
except:
self._logger.info("Model summary is not possible due to internal errors (no shape, ...)")
def _save_train_results(self, last_metric_train: float | np.ndarray,
last_metric_valid: float | np.ndarray, loss_type: str='Loss') -> None:
"""Writing some training metrics into txt-file"""
if self._config_available:
with open(self._path2config, 'a') as txt_handler:
txt_handler.write(f'\n--- Metrics of last epoch in fold #{self._kfold_run} ---')
txt_handler.write(f'\nTraining {loss_type} = {last_metric_train}')
txt_handler.write(f'\nValidation {loss_type} = {last_metric_valid}\n')
def _end_training_routine(self, timestamp_start: datetime, do_delete_temps: bool=True) -> None:
"""Doing the last step of training routine"""
timestamp_end = datetime.now()
timestamp_string = timestamp_end.strftime('%H:%M:%S')
diff_time = timestamp_end - timestamp_start
diff_string = diff_time
self._logger.info(f'\nTraining ends on: {timestamp_string}')
self._logger.info(f'Training runs: {diff_string}')
# Delete init model
for file in self._path2save.glob('*_reset.pt'):
remove(file)
# Delete log folders
if do_delete_temps:
for folder in self._path2save.glob('temp*'):
rmtree(folder, ignore_errors=True)
def __get_data_points(self, only_getting_labels: bool=False, use_train_dataloader: bool=False) -> dict:
"""Getting data from DataLoader for Plotting Results
Args:
only_getting_labels: Option for taking only labels
use_train_dataloader: Mode for selecting datatype (True=Training, False=Validation)
Returns:
Dict with data for plotting
"""
used_dataset = self._train_loader[-1] if use_train_dataloader else self._valid_loader[-1]
# --- Getting the keys
keys = list()
for data in used_dataset:
keys = list(data.keys())
break
if only_getting_labels:
keys.pop(0)
# --- Extracting data
data_extract = [randn(32, 1) for _ in keys]
first_run = True
for data in used_dataset:
for idx, key in enumerate(keys):
if first_run:
data_extract[idx] = data[key]
else:
data_extract[idx] = cat((data_extract[idx], data[key]), dim=0)
first_run = False
# --- Prepare output
mdict = dict()
for idx, data in enumerate(data_extract):
mdict.update({keys[idx]: data.numpy()})
return mdict
def _getting_data_for_plotting(self, valid_input: np.ndarray, valid_label: np.ndarray, addon: str='') -> DataValidation:
"""Getting the raw data for plotting results
:param valid_input: Numpy array with input data for training validation
:param valid_label: Numpy array with labels for training validation
:return: Dictionary with
"""
self._logger.info(f"... preparing results for plot generation")
data_train = self.__get_data_points(
only_getting_labels=True,
use_train_dataloader=True
)
return DataValidation(
input=valid_input,
train_label=data_train['class'] if addon == 'ae' else data_train['out'],
valid_label=valid_label,
feat=None, # Autoencoder specific value
mean=None, # Autoencoder specific value
output=data_train['out'],
label_names=self._cell_classes
)
def _determine_epoch_metrics(self, do_metrics: str):
"""Determination of additional metrics during training
Args:
do_metrics: String with index for calculating epoch metric
Return:
Function for metric calculation
"""
func = Tensor
for metric_available, func in self._metric_methods.items():
if metric_available == do_metrics:
break
return func
def _separate_classes_from_label(self, pred: Tensor, true: Tensor, label: str, *args) -> tuple[Tensor, Tensor]:
"""Separating the classes for further metric processing
Args:
pred: Torch Tensor from prediction
true: Torch Tensor from labeled dataset (ground-truth)
key: String with processing metric
func: Function for metric calculation
Return:
Calculated metric results in Tensor array and total samples of each class
"""
if args or not "cl" in label:
metric_out = zeros((len(self._cell_classes),), dtype=float32)
else:
metric_out = [zeros((1,)) for _ in self._cell_classes]
length_out = zeros((len(self._cell_classes),), dtype=float32)
for idx, id0 in enumerate(unique(true)):
xpos = argwhere(true == id0).flatten()
length_out[idx] = len(xpos)
if args:
metric_out[idx] += args[0](pred[xpos], true[xpos])
else:
metric_out[idx] = pred[xpos]
return metric_out, length_out
@staticmethod
def _converting_tensor_to_numpy(metric_used: dict) -> dict:
"""Converting tensor array to numpy for later processing
:param metric_used: Dictionary of used metric
:return: Dictionary with calculated metrics
"""
metric_save = deepcopy(metric_used)
for key0, data0 in metric_used.items():
for key1, data1 in data0.items():
for idx2, data2 in enumerate(data1):
if isinstance(data2, list):
for idx3, data3 in enumerate(data2):
if is_tensor(data3):
metric_save[key0][key1][idx2][idx3] = data3.cpu().detach().numpy()
else:
if is_tensor(data2):
metric_save[key0][key1][idx2] = data2.cpu().detach().numpy()
return metric_save
@property
def get_epoch_metric_custom_methods(self) -> list:
"""Getting an overview of available methods for custom-written metric calculation in each epoch during training
:return: List with metrics name to call
"""
return [key for key in self._metric_methods.keys()]
@property
def get_number_parameters_from_model(self) -> int:
"""Getting the number of used parameters of used DNN model"""
return int(sum(p.numel() for p in self._model.parameters()))
[docs]
def define_ptq_level(self, total_bitwidth: int, frac_bitwidth: int) -> None:
"""Function for defining the post-training quantization level of the model
:param total_bitwidth: Total bitwidth of the model
:param frac_bitwidth: Fraction of bitwidth used for quantization
:return: None
"""
if frac_bitwidth < 0 or frac_bitwidth > total_bitwidth:
raise ValueError(f"Fraction of bitwidth must be between 0 and {total_bitwidth}")
if total_bitwidth < 0:
raise ValueError(f"Total bitwidth must be greater than 0")
self._ptq_level = [total_bitwidth, frac_bitwidth]