Source code for denspp.offline.data_call.waveform_generator
import numpy as np
from logging import getLogger, Logger
from scipy import signal
from fxpmath import Fxp, Config
from denspp.offline import check_keylist_elements_any
from denspp.offline.analog.dev_noise import ProcessNoise, SettingsNoise, RecommendedSettingsNoise
[docs]
class WaveformGenerator:
__handler_noise: ProcessNoise
_logger: Logger
def __init__(self, sampling_rate: float, add_noise: bool=False, settings_noise: SettingsNoise=RecommendedSettingsNoise):
"""Class for generating the transient stimulation signal
:param sampling_rate: Sampling rate of the signal
:param add_noise: Boolean for adding noise to output
:param settings_noise: Settings noise to add to output
"""
self._logger = getLogger(__name__)
self.__handler_noise = ProcessNoise(settings_noise, sampling_rate)
self.__add_noise: bool = add_noise
self._sampling_rate: float = sampling_rate
self._time_duration: float = 1.0
self.__func_dict = {'RECT_HALF': self.__generate_rectangular_half}
self.__func_dict.update({'RECT_FULL': self.__generate_rectangular_full})
self.__func_dict.update({'LIN_RISE': self.__generate_linear_rising})
self.__func_dict.update({'LIN_FALL': self.__generate_linear_falling})
self.__func_dict.update({'SINE_HALF': self.__generate_sinusoidal_half})
self.__func_dict.update({'SINE_HALF_INV': self.__generate_sinusoidal_half_inverse})
self.__func_dict.update({'SINE_FULL': self.__generate_sinusoidal_full})
self.__func_dict.update({'TRI_HALF': self.__generate_triangle_half})
self.__func_dict.update({'TRI_FULL': self.__generate_triangle_full})
self.__func_dict.update({'SAW_POS': self.__generate_sawtooth_positive})
self.__func_dict.update({'SAW_NEG': self.__generate_sawtooth_negative})
self.__func_dict.update({'GAUSS': self.__generate_gaussian})
self.__func_dict.update({'ZERO': self.__generate_zero})
@property
def _num_samples(self) -> int:
"""Calculating the number of samples of the transient window"""
return int(self._time_duration * self._sampling_rate)
@property
def _build_time_cycle(self) -> np.ndarray:
return np.linspace(start=0.0, stop=2 * np.pi, num=self._num_samples, endpoint=False, dtype=float)
@staticmethod
def __switching_polarity(signal_in: np.ndarray, do_cathodic: bool) -> np.ndarray:
"""Switching the polarity for cathodic-first (True) or anodic-first (False) waveform"""
return signal_in if not do_cathodic else (-1) * signal_in
def __get_charge_balancing_factor(self, waveforms: list) -> float:
"""Getting the coefficient for area-related comparison for charge balancing the biphasic waveform"""
if not len(waveforms) == 2 and not len(waveforms) == 3:
self._logger.info("It is not a biphasic waveform available - Please check!")
return 1.0
else:
area_first = np.trapezoid(waveforms[0])
area_second = np.trapezoid(waveforms[-1])
return np.abs(area_first / area_second)
[docs]
def check_charge_balancing(self, signal: np.ndarray) -> float:
"""Checking if stimulation signal is charge balanced"""
dq = np.trapezoid(signal)
self._logger.info(f"... waveform has an error of {dq:.6f}")
return dq
def __generate_zero(self) -> np.ndarray:
"""Creating an output array with zero value"""
out = np.zeros((self._num_samples,), dtype=float)
return out
def __generate_rectangular_half(self) -> np.ndarray:
"""Creating an output array with constant value"""
return 1.0 + self.__generate_zero()
def __generate_rectangular_full(self) -> np.ndarray:
"""Creating an output array with constant value"""
return signal.square(self._build_time_cycle, duty=0.5)
def __generate_linear_rising(self) -> np.ndarray:
"""Creating an output array with linear positive slope"""
return np.linspace(0.0, 1.0, self._num_samples, endpoint=True, dtype=float)
def __generate_linear_falling(self) -> np.ndarray:
"""Creating an output array with linear negative slope"""
return np.linspace(1.0, 0.0, self._num_samples, endpoint=True, dtype=float)
def __generate_sinusoidal_half(self) -> np.ndarray:
"""Creating an output array with half sinusoidal waveform"""
return np.sin(0.5* self._build_time_cycle, dtype=float)
def __generate_sinusoidal_half_inverse(self) -> np.ndarray:
"""Creating an output array with half sinusoidal waveform in inverse manner"""
return 1.0 - np.sin(0.5* self._build_time_cycle, dtype=float)
def __generate_sinusoidal_full(self) -> np.ndarray:
"""Creating an output array with full sinusoidal waveform"""
return np.sin(self._build_time_cycle, dtype=float)
def __generate_triangle_half(self) -> np.ndarray:
"""Creating an output array with half triangular waveform"""
return signal.sawtooth(0.5*self._build_time_cycle + np.pi/2, width=0.5)
def __generate_triangle_full(self) -> np.ndarray:
"""Creating an output array with full triangular waveform"""
return signal.sawtooth(self._build_time_cycle + np.pi/2, width=0.5)
def __generate_sawtooth_positive(self) -> np.ndarray:
"""Creating an output array with linear positive sawtooth"""
return 2 * self.__generate_linear_rising() - 1.0
def __generate_sawtooth_negative(self) -> np.ndarray:
"""Creating an output array with linear negative sawtooth"""
return 2 * self.__generate_linear_falling() - 1.0
def __generate_gaussian(self) -> np.ndarray:
"""Creating an output array with gaussian pulse"""
time = self.__generate_sawtooth_positive()
out = signal.gausspulse(time, fc=np.pi, retenv=True)[1]
scale_amp = (out.max()+out.min())/(out.max())
return out * scale_amp - out.min()
[docs]
def get_dictionary_classes(self) -> list:
"""Getting a list with class names / labels of waveforms
:return: List with class names
"""
return [val for val in self.__func_dict.keys()]
def __select_waveform_template(self, time_duration: float, sel_wfg: str, do_cathodic: bool=False) -> np.ndarray:
"""Selection for generating a waveform template
Args:
time_duration: Time window for the waveform
sel_wfg: Selected waveform type [0: rect., 1: linear-rising, 2: linear-falling, 3: half-sinusoidal,
4: half-sinusoidal (inverse), 5: full-sinusoidal, 6: half-triangular, 7: full-triangular,
8: positive sawtooth, 9: negative sawtooth, 10: gaussian]
do_cathodic: Boolean for cathodic-first impulse
Returns:
Numpy array with selected waveform
"""
if sel_wfg in self.__func_dict.keys():
self._time_duration = time_duration
signal = self.__func_dict[sel_wfg]()
waveform = self.__switching_polarity(signal, do_cathodic)
self._logger.debug(f"Selected waveform type {sel_wfg} is generated with shape {waveform.shape}")
return waveform
else:
raise NotImplementedError("Waveform is not implemented!")
[docs]
def generate_noise(self, shape: tuple) -> np.ndarray:
"""Generating a transient signal with noise
:param shape: Numpy shape of the transient signal
:return: Numpy array with noise signal
"""
if len(shape) == 2:
noise = np.zeros(shape)
for dim0 in range(shape[0]):
noise[dim0,:] = self.__handler_noise.gen_noise_real_pwr(shape[1])
else:
noise = self.__handler_noise.gen_noise_real_pwr(shape[0])
return noise
[docs]
def generate_waveform(self, time_points: list, time_duration: list,
waveform_select: list, polarity_cathodic: list) -> dict:
"""Generating the signal with waveforms for stimulation
:param time_points: List of time points for applying a stimulation waveform
:param time_duration: List of stimulation waveform duration
:param waveform_select: List of selected waveforms
:param polarity_cathodic: List for performing cathodic-first generation
:returns: List with three numpy arrays (time, output_signal, true rms value)
"""
if not len(time_points) == len(waveform_select) == len(time_duration):
raise RuntimeError("Please check input! --> Length is not equal")
else:
self._time_duration = 2 * time_points[-1] + time_duration[-1]
out = self.__generate_zero()
rms_value = 0.0
for idx, (time_off, time_sec, wvf_type) in enumerate(zip(time_points, time_duration, waveform_select)):
time_xpos = int(time_off * self._sampling_rate)
do_polarity = polarity_cathodic[idx] if not len(polarity_cathodic) == 0 else False
waveform = self.__select_waveform_template(time_sec, wvf_type, do_polarity)
out[time_xpos:time_xpos+waveform.size] += waveform
rms_value = np.sqrt(np.sum(np.square(waveform)) / waveform.size)
noise = self.__handler_noise.gen_noise_real_pwr(out.size) if self.__add_noise else np.zeros_like(out)
time = np.linspace(0, out.size, out.size, endpoint=False) / self._sampling_rate
return {'time': time, 'sig': out + noise, 'rms': rms_value}
[docs]
def generate_waveform_quant_fxp(self, time_points: list, time_duration: list,
waveform_select: list, polarity_cathodic: list,
bitwidth: int, bitfrac: int, signed: bool, do_opt: bool=False) -> dict:
"""Generating the signal with waveforms for stimulation in quantized matter
:param time_points: List of time points for applying a stimulation waveform
:param time_duration: List of stimulation waveform duration
:param waveform_select: List of selected waveforms
:param polarity_cathodic: List for performing cathodic-first generation
:param bitwidth: Integer with total bitwidth
:param bitfrac: Integer with fraction bitwidth
:param signed: If quantized output should be signed integer
:param do_opt: Boolean for taking quarter signal (optimzed version for hardware implementation)
:returns: List with three numpy arrays (time, output_signal, true rms value)
"""
assert check_keylist_elements_any(waveform_select, ['SINE_FULL', 'RECT_FULL', 'TRI_FULL']), "Only 'waveform_select' with ['SINE_FULL', 'RECT_FULL', 'TRI_FULL'] are allowed!"
wvf_norm = self.generate_waveform(
time_points=time_points,
time_duration=time_duration,
waveform_select=waveform_select,
polarity_cathodic=polarity_cathodic
)
wvf_used = wvf_norm['sig'] / (wvf_norm['sig'].max() - wvf_norm['sig'].min()) + (0 if signed or do_opt else 0.5)
wvf_used = np.array(wvf_used * (2**(bitwidth-bitfrac)), dtype=np.int32)
if do_opt:
wvf_used = wvf_used[:wvf_norm['sig'].argmax() + 1]
config_fxp = Config()
config_fxp.rounding = "around"
config_fxp.overflow = "saturate"
config_fxp.underflow = "saturate"
wvf_quant = Fxp(val=wvf_used, signed=signed, n_word=bitwidth, n_frac=bitfrac, config=config_fxp).get_val()
return {'time': wvf_norm['time'], 'sig': wvf_quant, 'rms': wvf_norm['rms']}
[docs]
def generate_biphasic_waveform(self, anodic_wvf: str, anodic_duration: float,
cathodic_wvf: str, cathodic_duration: float,
intermediate_duration: float=0.0, do_cathodic_first: bool=False,
do_charge_balancing: bool=False) -> dict:
"""Generating the waveform for stimulation
Args:
anodic_wvf: String with waveform type for anodic phase
anodic_duration: Time window of the anodic phase
cathodic_wvf: String with waveform type for cathodic phase
cathodic_duration: Time window of the cathodic phase
intermediate_duration: Time window for the intermediate idle time during anodic and cathodic phase
do_cathodic_first: Starting with cathodic phase
do_charge_balancing: Performing a charge balancing on second phase (same area)
Returns:
Two numpy arrays (time, output_signal)
"""
width = [anodic_duration, cathodic_duration] if not do_cathodic_first else [cathodic_duration, anodic_duration]
mode = [anodic_wvf, cathodic_wvf] if not do_cathodic_first else [cathodic_wvf, anodic_wvf]
poly = [False, True] if not do_cathodic_first else [True, False]
waveforms = list()
# --- Creating the waveforms
for idx, (window, wvf_type, inverter) in enumerate(zip(width, mode, poly)):
if idx == 1 and not intermediate_duration == 0.0:
self._time_duration = intermediate_duration
waveforms.append(self.__generate_zero())
waveforms.append(self.__select_waveform_template(window, wvf_type, inverter))
if do_charge_balancing:
waveform = self.__get_charge_balancing_factor(waveforms) * waveforms[-1]
waveforms[-1] = waveform
# --- Creating the output signal
out = np.concatenate([waveform for waveform in waveforms], axis=0)
out = np.concatenate((out, np.zeros((1,))), axis=0)
noise = self.__handler_noise.gen_noise_real_pwr(out.size) if self.__add_noise else np.zeros_like(out)
time = np.linspace(0, out.size, out.size) / self._sampling_rate
return {'t': time, 'y': out + noise}