Source code for denspp.offline.preprocessing.common_referencing

import numpy as np
from logging import getLogger, Logger
from dataclasses import dataclass
from scipy.signal import convolve2d


[docs] @dataclass class SettingsReferencing: """Class for defining the properties of the common referencing methods Attributes: dim: Integer with applied dimension kernel_size: Kernel size for convolution (must be odd-numbered) """ kernel_size: int
DefaultSettingsReferencing = SettingsReferencing( kernel_size=3, )
[docs] class CommonReferencing: _logger: Logger def __init__(self, settings: SettingsReferencing) -> None: self._logger = getLogger(__name__) self._settings = settings
[docs] def build_dummy_active_mapping(self, signal: np.ndarray) -> np.ndarray: """Function for building a dummy active mapper with True values :param signal: Numpy array with channel-specific signals :return: Numpy array with boolean (default: true) for channels are used """ match len(signal.shape): case 1: return np.ones(shape=(1, ), dtype=bool) case 2: return np.ones(shape=(signal.shape[0], ), dtype=bool) case 3: return np.ones(shape=(signal.shape[0], signal.shape[1]), dtype=bool) case _: raise NotImplementedError
[docs] def get_reference_map(self, signal: np.ndarray, active: np.ndarray) -> np.ndarray: """Building the common reference mapper using CAR algorithm (Common Average Referencing) on input signals :param signal: Input signal of transient analysis with shape [num_channels, num_smaples] or num_channels splitted into electrode design :param active: Overview of active channels used for referencing :return: Numpy array with convolved signal for doing common average referencing """ match len(signal.shape): case 1: return self._calculate_reference_car_1d(signal, active) case 2: return self._calculate_reference_car_1d(signal, active) case 3: return self._calculate_reference_car_2d(signal, active) case _: raise NotImplementedError
[docs] def apply_reference(self, signal: np.ndarray, active: np.ndarray) -> np.ndarray: """""" return signal - self.get_reference_map(signal, active)
@staticmethod def _calculate_reference_car_1d(mea_signal: np.ndarray, mapp_used: np.ndarray) -> np.ndarray: if len(mea_signal.shape) >= 2: mapping = np.repeat(mapp_used[:, np.newaxis], mea_signal.shape[-1], axis=1) else: mapping = np.array([mapp_used] * mea_signal.size) return np.mean(mea_signal * mapping, axis=0) def _calculate_reference_car_2d(self, mea_signal: np.ndarray, mapp_used: np.ndarray) -> np.ndarray: if not len(mea_signal.shape) == 3: raise NotImplementedError("The input numpy array has wrong size - Please check!") if self._settings.kernel_size == 1: raise ValueError("Kernel size must greater then 1") if self._settings.kernel_size % 2 == 0: raise ValueError("Value for building the kernel in CAR algorithm must be odd-numbered!") # --- Generating the kernel kernel = np.ones((self._settings.kernel_size, self._settings.kernel_size), dtype=float) mid_number = int(np.floor(self._settings.kernel_size / 2)) kernel[mid_number, mid_number] = 0.0 kernel = kernel / np.sum(kernel) # --- Do the convolution data_out = np.zeros(mea_signal.shape, dtype=float) for idx in range(0, mea_signal.shape[-1]): data_in = mea_signal[:, :, idx] conv_out = convolve2d(data_in, kernel, mode='same') data_out[:, :, idx] = conv_out # --- Correction of not available channels for row in range(0, mea_signal.shape[0]): for col in range(0, mea_signal.shape[1]): if not mapp_used[row, col]: data_out[row, col, :] = np.zeros((mea_signal.shape[-1], ), dtype=float) return data_out