Source code for denspp.offline.preprocessing.window

import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
from dataclasses import dataclass
from denspp.offline.preprocessing.transformation import transformation_window_method


[docs] @dataclass class SettingsWindow: """Class for defining the properties for applying a window sequenzer on transient signals Attributes: sampling_rate: Floating value with sampling rate of the transient signal [Hz] window_sec: Floating value with the size of the window [s] overlap_sec: Floating value with overlapping the sequences [s] """ sampling_rate: float window_sec: float overlap_sec: float @property def window_length(self) -> int: """Returning an integer with total number of samples for building the window sequence""" assert self.window_sec > 0, "Window length must be greater than zero" return int(abs(self.window_sec * self.sampling_rate)) @property def overlap_length(self) -> int: """Returning an integer with total number of samples for overlapping""" assert self.overlap_sec < self.window_sec, "Overlapping size should be smaller than window size" return int(abs(self.overlap_sec * self.sampling_rate))
[docs] class WindowSequencer: _settings: SettingsWindow _window_normalization: np.ndarray
[docs] @staticmethod def get_values_non_incremented_change(data: list) -> list: """Returns values that are not incremented by one from the previous value. Always includes the first element. """ if not data: return [] else: return [data[0]] + [data[i] for i in range(1, len(data)) if data[i] != data[i - 1] + 1]
def __init__(self, settings: SettingsWindow) -> None: """Class for applying a window sequenzer on transient signals :param settings: Class SettingsWindow with definitions for the window sequenzer :return: None """ self._settings = settings self._window_normalization = transformation_window_method( window_size=self._settings.window_length, method='' )
[docs] def sequence(self, signal: np.ndarray) -> np.ndarray: """Building a sequence-to-sequence output array from signal input :param signal: Numpy array with input signal to build the sequence with shape=(N, ) :return: Numpy array of sequence signals with shape=(M, window length) """ num_sequences = int(signal.shape[0] / self._settings.window_length) array_length = num_sequences * self._settings.window_length return signal[0:array_length].reshape( (num_sequences, self._settings.window_length), copy=True )
[docs] def slide(self, signal: np.ndarray) -> np.ndarray: """Building a sliding window sequencer on signal input :param signal: Numpy array with input signal to build the sequence with shape=(N, ) :return: Numpy array of sequence signals with shape=(M, window length) """ delta_steps = self._settings.window_length - self._settings.overlap_length return sliding_window_view( x=signal, axis=0, window_shape=self._settings.window_length, writeable=True )[::delta_steps]
[docs] def window_event_detected(self, signal: np.ndarray, thr: float, pre_time: float) -> np.ndarray: """Building a window sequencer based on an event-detection (absolute input) :param signal: Numpy array with input signal to build the sequence with shape=(N, ) :param thr: Floating value with absolute threshold value :param pre_time: Floating value with pre-time in the window before event is detected :return: Numpy array of sequence signals with shape=(M, window length) """ if thr < 0: raise ValueError("Threshold must be positive") xpos_evnt_dtctd = self.get_values_non_incremented_change( np.argwhere(np.abs(signal) >= thr).flatten().tolist() ) if not xpos_evnt_dtctd: return np.empty((1, 1)) else: sequence_window = np.zeros((len(xpos_evnt_dtctd), self._settings.window_length)) num_samples_pre = int(pre_time * self._settings.sampling_rate) for ite, idx in enumerate(xpos_evnt_dtctd): start_xpos = idx - num_samples_pre if idx - num_samples_pre > 0 else idx num_pre_padding = 0 if idx - num_samples_pre > 0 else abs(idx - num_samples_pre) stop_xpos = start_xpos + self._settings.window_length if start_xpos + self._settings.window_length < signal.size else -1 num_post_padding = 0 if start_xpos + self._settings.window_length < signal.size else abs(signal.size - start_xpos) cutted_signal = signal[start_xpos+num_pre_padding:stop_xpos] if num_pre_padding: pre_padding = np.zeros((self._settings.window_length-cutted_signal.size, )) + cutted_signal[0] cutted_signal = np.concatenate((pre_padding, cutted_signal)) if num_post_padding: post_padding = np.zeros((self._settings.window_length-cutted_signal.size, )) + cutted_signal[-1] cutted_signal = np.concatenate((cutted_signal, post_padding)) sequence_window[ite, :] = cutted_signal return sequence_window