Source code for denspp.offline.data_generator.waveform_generator

import numpy as np
from logging import getLogger
from scipy import signal
from denspp.offline.analog.dev_noise import ProcessNoise, SettingsNoise, RecommendedSettingsNoise


[docs] class WaveformGenerator: __handler_noise: ProcessNoise def __init__(self, sampling_rate: float, add_noise: bool=False, settings_noise: SettingsNoise=RecommendedSettingsNoise): """Class for generating the transient stimulation signal :param sampling_rate: Sampling rate of the signal :param add_noise: Boolean for adding noise to output :param settings_noise: Settings noise to add to output """ self._logger = getLogger(__name__) self.__handler_noise = ProcessNoise(settings_noise, sampling_rate) self.__add_noise = add_noise self._sampling_rate = sampling_rate self.__func_dict = {'RECT': self.__generate_rectangular} self.__func_dict.update({'LIN_RISE': self.__generate_linear_rising}) self.__func_dict.update({'LIN_FALL': self.__generate_linear_falling}) self.__func_dict.update({'SINE_HALF': self.__generate_sinusoidal_half}) self.__func_dict.update({'SINE_HALF_INV': self.__generate_sinusoidal_half_inverse}) self.__func_dict.update({'SINE_FULL': self.__generate_sinusoidal_full}) self.__func_dict.update({'TRI_HALF': self.__generate_triangle_half}) self.__func_dict.update({'TRI_FULL': self.__generate_triangle_full}) self.__func_dict.update({'SAW_POS': self.__generate_sawtooth_positive}) self.__func_dict.update({'SAW_NEG': self.__generate_sawtooth_negative}) self.__func_dict.update({'GAUSS': self.__generate_gaussian}) self.__func_dict.update({'NOISE_ABS_RAND': self.__generate_sawtooth_negative}) self.__func_dict.update({'ZERO': self.__generate_zero}) @staticmethod def __switching_polarity(signal_in: np.ndarray, do_cathodic: bool) -> np.ndarray: """Switching the polarity for cathodic-first (True) or anodic-first (False) waveform""" return signal_in if not do_cathodic else (-1) * signal_in def __get_charge_balancing_factor(self, waveforms: list) -> float: """Getting the coefficient for area-related comparison for charge balancing the biphasic waveform""" if not len(waveforms) == 2 and not len(waveforms) == 3: self._logger.info("It is not a biphasic waveform available - Please check!") return 1.0 else: area_first = np.trapezoid(waveforms[0]) area_second = np.trapezoid(waveforms[-1]) return np.abs(area_first / area_second)
[docs] def check_charge_balancing(self, signal: np.ndarray) -> float: """Checking if stimulation signal is charge balanced""" dq = np.trapezoid(signal) self._logger.info(f"... waveform has an error of {dq:.6f}") return dq
def __generate_zero(self, time_duration: float) -> np.ndarray: """Creating an output array with zero value""" num_samples = int(time_duration * self._sampling_rate) out = np.zeros((num_samples,), dtype=float) return out def __generate_rectangular(self, time_duration: float) -> np.ndarray: """Creating an output array with constant value""" return 1.0 + self.__generate_zero(time_duration) def __generate_linear_rising(self, time_duration: float) -> np.ndarray: """Creating an output array with linear positive slope""" num_samples = int(time_duration * self._sampling_rate) out = np.linspace(0.0, 1.0, num_samples, dtype=float) return out def __generate_linear_falling(self, time_duration: float) -> np.ndarray: """Creating an output array with linear negative slope""" num_samples = int(time_duration * self._sampling_rate) out = np.linspace(1.0, 0.0, num_samples, dtype=float) return out def __generate_sinusoidal_half(self, time_duration: float) -> np.ndarray: """Creating an output array with half sinusoidal waveform""" num_samples = int(time_duration * self._sampling_rate) out = np.sin(np.pi * np.linspace(0.0, num_samples, num_samples, endpoint=False) / num_samples, dtype=float) return out def __generate_sinusoidal_half_inverse(self, time_duration: float) -> np.ndarray: """Creating an output array with half sinusoidal waveform in inverse manner""" num_samples = int(time_duration * self._sampling_rate) out = 1.0 - np.sin(np.pi * np.linspace(0.0, num_samples, num_samples, endpoint=False) / num_samples, dtype=float) return out def __generate_sinusoidal_full(self, time_duration: float) -> np.ndarray: """Creating an output array with full sinusoidal waveform""" num_samples = int(time_duration * self._sampling_rate) out = np.sin(2 * np.pi * np.linspace(0.0, num_samples, num_samples, endpoint=False) / num_samples, dtype=float) return out def __generate_triangle_half(self, time_duration: float) -> np.ndarray: """Creating an output array with half triangular waveform""" out0 = self.__generate_linear_rising(0.5 * time_duration) out1 = self.__generate_linear_falling(0.5 * time_duration) return np.concatenate((out0, out1), axis=0) def __generate_triangle_full(self, time_duration: float) -> np.ndarray: """Creating an output array with full triangular waveform""" out0 = self.__generate_linear_rising(0.25 * time_duration) out1 = self.__generate_linear_falling(0.25 * time_duration) return np.concatenate((out0, out1, -out0, -out1), axis=0) def __generate_sawtooth_positive(self, time_duration: float) -> np.ndarray: """Creating an output array with linear positive sawtooth""" return 2 * self.__generate_linear_rising(time_duration) - 1.0 def __generate_sawtooth_negative(self, time_duration: float) -> np.ndarray: """Creating an output array with linear negative sawtooth""" return 2 * self.__generate_linear_falling(time_duration) - 1.0 def __generate_gaussian(self, time_duration: float) -> np.ndarray: """Creating an output array with gaussian pulse""" time = 2 * self.__generate_sawtooth_positive(time_duration) out = signal.gausspulse(time, 1.05 * np.sqrt(2)/(np.pi* time_duration), retenv=True)[1] scale_amp = (out.max()+out.min())/(out.max()) return out * scale_amp - out.min()
[docs] def get_dictionary_classes(self) -> list: """Getting a list with class names / labels of waveforms :return: List with class names """ return [val for val in self.__func_dict.keys()]
[docs] def print_dictionary_classes(self) -> None: """Printing the available waveform types available in class :return: List with class names """ out_list = self.get_dictionary_classes() self._logger.info("\nGetting information about signal types") self._logger.info("\n====================================================") for idx, type_id in enumerate(out_list): self._logger.info(f"Class {idx:02d} = {type_id}") self._logger.info("====================================================")
def __select_waveform_template(self, time_duration: float, sel_wfg: str, do_cathodic: bool=False) -> np.ndarray: """Selection for generating a waveform template Args: time_duration: Time window for the waveform sel_wfg: Selected waveform type [0: rect., 1: linear-rising, 2: linear-falling, 3: half-sinusoidal, 4: half-sinusoidal (inverse), 5: full-sinusoidal, 6: half-triangular, 7: full-triangular, 8: positive sawtooth, 9: negative sawtooth, 10: gaussian] do_cathodic: Boolean for cathodic-first impulse Returns: Numpy array with selected waveform """ if sel_wfg in self.__func_dict.keys(): signal = self.__func_dict[sel_wfg](time_duration) waveform = self.__switching_polarity(signal, do_cathodic) else: raise NotImplementedError("Waveform is not implemented!") return waveform
[docs] def generate_waveform(self, time_points: list, time_duration: list, waveform_select: list, polarity_cathodic: list) -> dict: """Generating the signal with waveforms for stimulation Args: time_points: List of time points for applying a stimulation waveform time_duration: List of stimulation waveform duration waveform_select: List of selected waveforms polarity_cathodic: List for performing cathodic-first generation Returns: List with three numpy arrays (time, output_signal, true rms value) """ if not len(time_points) == len(waveform_select) == len(time_duration): raise RuntimeError("Please check input! --> Length is not equal") else: # Generate dummy out = self.__generate_zero(2 * time_points[-1] + time_duration[-1]) noise = np.zeros_like(out) time = np.linspace(0, out.size, out.size, endpoint=False) / self._sampling_rate rms_value = 0.0 # Create waveform for idx, time_sec in enumerate(time_points): do_polarity = polarity_cathodic[idx] if not len(polarity_cathodic) == 0 else False time_xpos = int(time_sec * self._sampling_rate) waveform = self.__select_waveform_template(time_duration[idx], waveform_select[idx], do_polarity) out[time_xpos:time_xpos+waveform.size] = waveform noise = self.__handler_noise.gen_noise_real_pwr(out.size) if self.__add_noise else np.zeros_like(out) rms_value = np.sqrt(np.sum(np.square(waveform)) / waveform.size) return {'time': time, 'sig': out + noise, 'rms': rms_value}
[docs] def generate_biphasic_waveform(self, anodic_mode: int, anodic_duration: float, cathodic_mode: int, cathodic_duration: float, intermediate_duration: float=0.0, do_cathodic_first: bool=False, do_charge_balancing: bool=False) -> dict: """Generating the waveform for stimulation Args: anodic_mode: Mode of the anodic phase anodic_duration: Time window of the anodic phase cathodic_mode: Mode of cathodic phase cathodic_duration: Time window of the cathodic phase intermediate_duration: Time window for the intermediate idle time during anodic and cathodic phase do_cathodic_first: Starting with cathodic phase do_charge_balancing: Performing a charge balancing on second phase (same area) Returns: Two numpy arrays (time, output_signal) """ width = [anodic_duration, cathodic_duration] if not do_cathodic_first else [cathodic_duration, anodic_duration] mode = [anodic_mode, cathodic_mode] if not do_cathodic_first else [cathodic_mode, anodic_mode] poly = [False, True] if not do_cathodic_first else [True, False] waveforms = list() # --- Creating the waveforms for idx, window in enumerate(width): if idx == 1 and not intermediate_duration == 0.0: waveforms.append(self.__generate_zero(intermediate_duration)) waveforms.append(self.__select_waveform_template(window, mode[idx], poly[idx])) if do_charge_balancing: waveform = self.__get_charge_balancing_factor(waveforms) * waveforms[-1] waveforms[-1] = waveform # --- Creating the output signal out = np.concatenate([waveform for waveform in waveforms], axis=0) noise = self.__handler_noise.gen_noise_real_pwr(out.size) if self.__add_noise else np.zeros_like(out) time = np.linspace(0, out.size, out.size) / self._sampling_rate return {'t': time, 'y': out + noise}