import platform
import re
import subprocess
from copy import deepcopy
from dataclasses import dataclass
from datetime import datetime
from logging import Logger, getLogger
from os import cpu_count, remove
from pathlib import Path
from random import seed
from shutil import rmtree
from typing import Any
import numpy as np
from sklearn.model_selection import KFold
from torch import (
Generator,
Tensor,
argwhere,
backends,
cat,
cuda,
device,
float32,
is_tensor,
manual_seed,
nn,
optim,
randn,
unique,
use_deterministic_algorithms,
zeros,
)
from torch.utils.data import DataLoader, SubsetRandomSampler
from torchinfo import summary
from denspp.offline import get_path_to_project
from denspp.offline.data_format import JsonHandler
from denspp.offline.dnn import SettingsDataset
from denspp.offline.dnn.model_library import ModelLibrary
from denspp.offline.structure_builder import init_dnn_folder
[docs]
@dataclass
class DataValidation:
"""Dataclass with results from post-training validation phase
Attributes:
input: Numpy array with model input
valid_label: Numpy array with valid label during validation phase
train_label: Numpy array with training labels during training phase
feat: Numpy array with extracted features
mean: Numpy array with mean input signals for each class
output: Numpy array with model output
label_names: List with string names of each label class
"""
input: np.ndarray
valid_label: np.ndarray
train_label: np.ndarray
feat: np.ndarray
mean: np.ndarray
output: np.ndarray
label_names: list[str]
[docs]
@dataclass
class SettingsPytorch:
"""Class for handling the PyTorch training/inference pipeline
Attributes:
model_name: String with the model name
patience: Integer value with number of epochs before early stopping
optimizer: String with PyTorch optimizer name
loss: String with method name for the loss function
deterministic_do: Boolean if deterministic training should be done
deterministic_seed: Integer with the seed for deterministic training
num_kfold: Integer value with applying k-fold cross validation
num_epochs: Integer value with number of epochs
batch_size: Integer value with batch size
data_split_ratio: Float value for splitting the input dataset between training and validation
data_do_shuffle: Boolean if data should be shuffled before training
custom_metrics: List with string of custom metrics to calculate during training
"""
model_name: str
patience: int
optimizer: str
loss: str
deterministic_do: bool
deterministic_seed: int
num_kfold: int
num_epochs: int
batch_size: int
data_split_ratio: float
data_do_shuffle: bool
custom_metrics: list
[docs]
@staticmethod
def get_model_overview(print_overview: bool = False, index: str = "") -> list:
"""Function for getting an overview of existing models inside library"""
models_bib = ModelLibrary().get_registry()
return models_bib.get_library_overview(index, do_print=print_overview)
[docs]
def get_loss_func(self) -> Any:
"""Getting the loss function"""
match self.loss:
case "L1":
loss_func = nn.L1Loss
case "MSE":
loss_func = nn.MSELoss()
case "Cross Entropy":
loss_func = nn.CrossEntropyLoss()
case "Cosine Similarity":
loss_func = nn.CosineSimilarity()
case _:
raise NotImplementedError("Loss function unknown! - Please implement or check!")
return loss_func
[docs]
def load_optimizer(self, model, learn_rate: float = 0.1) -> Any:
"""Loading the optimizer function
:param model: PyTorch Sequential of the model with pre-defined configuration
:param learn_rate: Learning rate of the optimizer
:return: PyTorch Optimizer
"""
if len(list(model.parameters())) == 0:
params = model.model.parameters()
else:
params = model.parameters()
match self.optimizer:
case "Adam":
optim_func = optim.Adam(params)
case "SGD":
optim_func = optim.SGD(params, lr=learn_rate)
case _:
raise NotImplementedError("Optimizer function unknown! - Please implement or check!")
return optim_func
[docs]
def get_model(self, *args, **kwargs):
"""Function for loading the model to train"""
models_bib = ModelLibrary().get_registry()
if not self.model_name:
models_bib.get_library_overview(do_print=True)
raise AttributeError("Please select one model above and type-in the name into yaml file")
else:
if models_bib.check_module_available(self.model_name):
return deepcopy(models_bib.build(self.model_name, *args, **kwargs))
else:
models_bib.get_library_overview(do_print=True)
raise AttributeError("Model is not available - Please check again!")
[docs]
def get_signature(self) -> list:
"""Returning the signature or list with input names of model object"""
models_bib = ModelLibrary().get_registry()
if not self.model_name:
models_bib.get_library_overview(do_print=True)
raise AttributeError("Please select one model above and type-in the name into yaml file")
else:
if models_bib.check_module_available(self.model_name):
return models_bib.get_signature(self.model_name)
else:
models_bib.get_library_overview(do_print=True)
raise AttributeError("Model is not available - Please check again!")
[docs]
class PyTorchHandler:
_deterministic_generator: Generator
_used_hw_dev: device
_used_hw_num: int
_train_loader: list
_valid_loader: list
_selected_samples: dict
_cell_classes: list
_metric_methods: dict
_ptq_do_validation: bool = False
_ptq_level: list = [12, 8]
_logger: Logger
_path2save: Path = Path(".")
_path2log: Path
_path2temp: Path
_path2config: Path
def __init__(
self,
config_train: SettingsPytorch,
config_dataset: SettingsDataset,
do_train: bool = True,
) -> None:
"""Class for Handling Training of Deep Neural Networks in PyTorch
Args:
config_train: Configuration settings for the PyTorch Training
config_dataset: Configuration settings for dataset handling
do_train: Mention if training should be used (default = True)
Returns:
None
"""
init_dnn_folder()
self._logger = getLogger(__name__)
# --- Preparing Neural Network
self._model = None
self._loss_fn = None
self._optimizer = None
# --- Preparing options
self._config_available = False
self._kfold_do = False
self._shuffle_do = config_train.data_do_shuffle
self._kfold_run = 0
# --- Saving options
self._settings_train: SettingsPytorch = config_train
self._settings_data: SettingsDataset = config_dataset
self._index_folder = "train" if do_train else "inference"
self._model_addon = str()
# --- Logging paths for saving
self.__check_start_folder()
@staticmethod
def _get_cpu_name_windows() -> str:
return platform.processor()
@staticmethod
def _get_cpu_name_mac() -> str:
result = subprocess.run(
["sysctl", "-n", "machdep.cpu.brand_string"], capture_output=True, text=True
)
return result.stdout.strip()
@staticmethod
def _get_cpu_name_linux():
result = subprocess.run(["cat", "/proc/cpuinfo"], capture_output=True, text=True)
for line in result.stdout.split("\n"):
if "model name" in line:
return re.sub(".*model name.*:", "", line, 1).strip()
def _get_cpu_name(self) -> str:
match platform.system().lower():
case "windows":
return self._get_cpu_name_windows()
case "linux":
return self._get_cpu_name_linux()
case "darwin":
return self._get_cpu_name_mac()
case _:
return ""
def __check_start_folder(self, new_folder: str = "runs"):
"""Checking for starting folder to generate"""
self._path2run = Path(get_path_to_project(new_folder))
self._path2run.mkdir(parents=True, exist_ok=True)
def __setup_device(self) -> None:
if cuda.is_available():
# Using GPU
used_hw_gpu = cuda.get_device_name()
self._used_hw_dev = device("cuda")
self._used_hw_num = cuda.device_count()
device0 = used_hw_gpu
cuda.empty_cache()
elif (
backends.mps.is_available()
and backends.mps.is_built()
and platform.system().lower() == "darwin"
):
# Using Apple M1 Chip
self._used_hw_dev = device("mps")
self._used_hw_num = cuda.device_count()
device0 = self._get_cpu_name()
else:
# Using normal CPU
self._used_hw_dev = device("cpu")
self._used_hw_num = cpu_count()
device0 = self._get_cpu_name()
self._logger.debug(f"\nUsing PyTorch with {device0} on {platform.system()}")
def _init_train(self, path2save: Path, addon: str) -> None:
"""Do initialization of training routine
:param path2save: Path to the saved folder
:param addon: Addon name for model type ('ae' = Autoencoder or 'cl' = Classifier)
:return: None
"""
folder_name = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{self._index_folder}_{self._model.__class__.__name__}"
if path2save == Path("."):
self._path2save = self._path2run / folder_name
elif path2save.absolute().name == "runs" or path2save.absolute().parent == "runs":
self._path2save = path2save / folder_name
else:
self._path2save = path2save
self._path2temp = self._path2save / "temp"
# --- Generate folders
self._path2run.mkdir(parents=True, exist_ok=True)
self._path2save.mkdir(parents=True, exist_ok=True)
self._path2temp.mkdir(parents=True, exist_ok=True)
# --- Transfer model to computing hardware
self._model.to(device=self._used_hw_dev)
# --- Copy settings to YAML file
JsonHandler(
template=self._settings_data,
path=str(self._path2save),
file_name="Config_Dataset",
)
JsonHandler(
template=self._settings_train,
path=str(self._path2save),
file_name=f"Config_Training{addon}",
)
def __deterministic_training_preparation(self) -> None:
"""Preparing the CUDA hardware for deterministic training"""
if self._settings_train.deterministic_do:
np.random.seed(self._settings_train.deterministic_seed)
manual_seed(self._settings_train.deterministic_seed)
if cuda.is_available():
cuda.manual_seed_all(self._settings_train.deterministic_seed)
seed(self._settings_train.deterministic_seed)
backends.cudnn.deterministic = True
use_deterministic_algorithms(True)
self._logger.info(
f"=== DL Training with Deterministic @seed: {self._settings_train.deterministic_seed} ==="
)
else:
use_deterministic_algorithms(False)
self._logger.info("=== Normal DL Training ===")
def __deterministic_get_dataloader_params(self) -> dict:
"""Getting the parameters for preparing the Training and Validation DataLoader for Deterministic Training"""
if self._settings_train.deterministic_do:
self._deterministic_generator = Generator()
self._deterministic_generator.manual_seed(self._settings_train.deterministic_seed)
def worker_init_fn(worker_id):
return np.random.seed(self._settings_train.deterministic_seed)
return {
"worker_init_fn": worker_init_fn,
"generator": self._deterministic_generator,
}
else:
return {}
def _prepare_dataset_for_training(self, data_set, num_workers: int = 0) -> None:
"""Loading data for training and validation in DataLoader format into class
Args:
data_set: Dataclass DatasetFromFil loaded from file
num_workers: Number of workers for calculation [Default: 0 --> single core]
Return:
None
"""
self.__setup_device()
self._kfold_do = True if self._settings_train.num_kfold > 1 else False
self._model_addon = data_set.get_topology_type
self._cell_classes = data_set.get_dictionary
params_deterministic = self.__deterministic_get_dataloader_params()
# --- Preparing datasets
out_train = list()
out_valid = list()
if self._kfold_do:
kfold = KFold(
n_splits=self._settings_train.num_kfold,
shuffle=self._shuffle_do and not self._settings_train.deterministic_do,
)
for idx_train, idx_valid in kfold.split(np.arange(len(data_set))):
subsamps_train = SubsetRandomSampler(idx_train)
subsamps_valid = SubsetRandomSampler(idx_valid)
out_train.append(
DataLoader(
data_set,
batch_size=self._settings_train.batch_size,
sampler=subsamps_train,
**params_deterministic,
)
)
out_valid.append(
DataLoader(
data_set,
batch_size=self._settings_train.batch_size,
sampler=subsamps_valid,
**params_deterministic,
)
)
else:
idx = np.arange(len(data_set))
if self._shuffle_do and not self._settings_train.deterministic_do:
np.random.shuffle(idx)
split_pos = int(len(data_set) * (1 - self._settings_train.data_split_ratio))
idx_train = idx[0:split_pos]
idx_valid = idx[split_pos:]
subsamps_train = SubsetRandomSampler(idx_train)
subsamps_valid = SubsetRandomSampler(idx_valid)
out_train.append(
DataLoader(
data_set,
batch_size=self._settings_train.batch_size,
sampler=subsamps_train,
**params_deterministic,
)
)
out_valid.append(
DataLoader(
data_set,
batch_size=self._settings_train.batch_size,
sampler=subsamps_valid,
**params_deterministic,
)
)
# --- CUDA support for dataset
if cuda.is_available():
for idx, dataset in enumerate(out_train):
out_train[idx].pin_memory = True
out_train[idx].pin_memory_device = self._used_hw_dev.type
out_train[idx].num_workers = num_workers
out_valid[idx].pin_memory = True
out_valid[idx].pin_memory_device = self._used_hw_dev.type
out_valid[idx].num_workers = num_workers
# --- Output: Data
self._train_loader = out_train
self._valid_loader = out_valid
[docs]
def get_saving_path(self) -> Path:
"""Getting the absolute path for saving files in aim folder"""
return self._path2save.absolute()
[docs]
def get_best_model(self, type_model: str) -> list:
"""Getting the path to the best trained model"""
return [file for file in self._path2save.glob(f"*{type_model}*.pt")]
[docs]
def load_model(self, model, learn_rate: float = 0.1) -> None:
"""Loading optimizer, loss_fn into class
Args:
model: PyTorch Neural Network for Training / Inference
learn_rate: Learning rate used for SGD optimizer
Returns:
None
"""
self._model = model
self._optimizer = self._settings_train.load_optimizer(model, learn_rate=learn_rate)
self._loss_fn = self._settings_train.get_loss_func()
# --- Init. hardware for deterministic training
if self._settings_train.deterministic_do:
self.__deterministic_training_preparation()
# --- Print model
try:
self._logger.info("\nPrint summary of model")
self._logger.info(str(summary(self._model, input_size=self._model.model_shape)))
self._logger.info("\n\n")
except:
self._logger.info("Model summary is not possible due to internal errors (no shape, ...)")
def _save_train_results(
self,
last_metric_train: float | np.ndarray,
last_metric_valid: float | np.ndarray,
loss_type: str = "Loss",
) -> None:
"""Writing some training metrics into txt-file"""
if self._config_available:
with open(self._path2config, "a") as txt_handler:
txt_handler.write(f"\n--- Metrics of last epoch in fold #{self._kfold_run} ---")
txt_handler.write(f"\nTraining {loss_type} = {last_metric_train}")
txt_handler.write(f"\nValidation {loss_type} = {last_metric_valid}\n")
def _end_training_routine(self, timestamp_start: datetime, do_delete_temps: bool = True) -> None:
"""Doing the last step of training routine"""
timestamp_end = datetime.now()
timestamp_string = timestamp_end.strftime("%H:%M:%S")
diff_time = timestamp_end - timestamp_start
diff_string = diff_time
self._logger.info(f"\nTraining ends on: {timestamp_string}")
self._logger.info(f"Training runs: {diff_string}")
# Delete init model
for file in self._path2save.glob("*_reset.pt"):
remove(file)
# Delete log folders
if do_delete_temps:
for folder in self._path2save.glob("temp*"):
rmtree(folder, ignore_errors=True)
def __get_data_points(
self, only_getting_labels: bool = False, use_train_dataloader: bool = False
) -> dict:
"""Getting data from DataLoader for Plotting Results
Args:
only_getting_labels: Option for taking only labels
use_train_dataloader: Mode for selecting datatype (True=Training, False=Validation)
Returns:
Dict with data for plotting
"""
used_dataset = self._train_loader[-1] if use_train_dataloader else self._valid_loader[-1]
# --- Getting the keys
keys = list()
for data in used_dataset:
keys = list(data.keys())
break
if only_getting_labels:
keys.pop(0)
# --- Extracting data
data_extract = [randn(32, 1) for _ in keys]
first_run = True
for data in used_dataset:
for idx, key in enumerate(keys):
if first_run:
data_extract[idx] = data[key]
else:
data_extract[idx] = cat((data_extract[idx], data[key]), dim=0)
first_run = False
# --- Prepare output
mdict = dict()
for idx, data in enumerate(data_extract):
mdict.update({keys[idx]: data.numpy()})
return mdict
def _getting_data_for_plotting(
self, valid_input: np.ndarray, valid_label: np.ndarray, addon: str = ""
) -> DataValidation:
"""Getting the raw data for plotting results
:param valid_input: Numpy array with input data for training validation
:param valid_label: Numpy array with labels for training validation
:return: Dictionary with
"""
self._logger.info("... preparing results for plot generation")
data_train = self.__get_data_points(only_getting_labels=True, use_train_dataloader=True)
return DataValidation(
input=valid_input,
train_label=data_train["class"] if addon == "ae" else data_train["out"],
valid_label=valid_label,
feat=None, # Autoencoder specific value
mean=None, # Autoencoder specific value
output=data_train["out"],
label_names=self._cell_classes,
)
def _determine_epoch_metrics(self, do_metrics: str):
"""Determination of additional metrics during training
Args:
do_metrics: String with index for calculating epoch metric
Return:
Function for metric calculation
"""
func = Tensor
for metric_available, func in self._metric_methods.items():
if metric_available == do_metrics:
break
return func
def _separate_classes_from_label(
self, pred: Tensor, true: Tensor, label: str, *args
) -> tuple[Tensor, Tensor]:
"""Separating the classes for further metric processing
Args:
pred: Torch Tensor from prediction
true: Torch Tensor from labeled dataset (ground-truth)
key: String with processing metric
func: Function for metric calculation
Return:
Calculated metric results in Tensor array and total samples of each class
"""
if args or "cl" not in label:
metric_out = zeros((len(self._cell_classes),), dtype=float32)
else:
metric_out = [zeros((1,)) for _ in self._cell_classes]
length_out = zeros((len(self._cell_classes),), dtype=float32)
for idx, id0 in enumerate(unique(true)):
xpos = argwhere(true == id0).flatten()
length_out[idx] = len(xpos)
if args:
metric_out[idx] += args[0](pred[xpos], true[xpos])
else:
metric_out[idx] = pred[xpos]
return metric_out, length_out
@staticmethod
def _converting_tensor_to_numpy(metric_used: dict) -> dict:
"""Converting tensor array to numpy for later processing
:param metric_used: Dictionary of used metric
:return: Dictionary with calculated metrics
"""
metric_save = deepcopy(metric_used)
for key0, data0 in metric_used.items():
for key1, data1 in data0.items():
for idx2, data2 in enumerate(data1):
if isinstance(data2, list):
for idx3, data3 in enumerate(data2):
if is_tensor(data3):
metric_save[key0][key1][idx2][idx3] = data3.cpu().detach().numpy()
else:
if is_tensor(data2):
metric_save[key0][key1][idx2] = data2.cpu().detach().numpy()
return metric_save
@property
def get_epoch_metric_custom_methods(self) -> list:
"""Getting an overview of available methods for custom-written metric calculation in each epoch during training
:return: List with metrics name to call
"""
return [key for key in self._metric_methods.keys()]
@property
def get_number_parameters_from_model(self) -> int:
"""Getting the number of used parameters of used DNN model"""
return int(sum(p.numel() for p in self._model.parameters()))
[docs]
def define_ptq_level(self, total_bitwidth: int, frac_bitwidth: int) -> None:
"""Function for defining the post-training quantization level of the model
:param total_bitwidth: Total bitwidth of the model
:param frac_bitwidth: Fraction of bitwidth used for quantization
:return: None
"""
if frac_bitwidth < 0 or frac_bitwidth > total_bitwidth:
raise ValueError(f"Fraction of bitwidth must be between 0 and {total_bitwidth}")
if total_bitwidth < 0:
raise ValueError("Total bitwidth must be greater than 0")
self._ptq_level = [total_bitwidth, frac_bitwidth]