Source code for denspp.offline.preprocessing.downsampling
import numpy as np
from dataclasses import dataclass
[docs]
@dataclass
class SettingsDownSampling:
"""Settings class for configuring the properties of the downsampling module
Attributes:
sampling_rate: Floating value with input sampling rate of the transient data stream
dsr: Integer with downsampling ratio for reducing the input sampling rate (SR_out = SR_in / OSR)
"""
sampling_rate: float
dsr: int
DefaultSettingsDownSampling = SettingsDownSampling(
sampling_rate=1000.,
dsr=10,
)
[docs]
class DownSampling:
def __init__(self, settings: SettingsDownSampling):
self._settings = settings
@property
def sampling_rate_out(self) -> float:
return self._settings.sampling_rate / self._settings.dsr
[docs]
def do_simple(self, uin: np.ndarray) -> np.ndarray:
"""Performing a simple downsampling of the adc data stream
param uin: Numpy array with transient signal input (high sampling rate)
return: Numpy array with transient signal output (low sampling rate)
"""
n = uin.size // self._settings.dsr * self._settings.dsr
data = uin[:n]
return data.reshape(-1, self._settings.dsr).mean(axis=1)
[docs]
def do_cic(self, uin: np.ndarray, num_stages: int=5) -> np.ndarray:
"""Performing the CIC filter at the output of oversampled ADC
param uin: Numpy array with transient signal input (high sampling rate)
param num_stages: Number of stages to perform the CIC downsampling
return: Numpy array with transient signal output (low sampling rate)
"""
output_transient = list()
gain = (self._settings.dsr * 1) ** num_stages
class integrator:
def __init__(self):
self.yn = 0
self.ynm = 0
def update(self, inp):
self.ynm = self.yn
self.yn = (self.ynm + inp)
return (self.yn)
class comb:
def __init__(self):
self.xn = 0
self.xnm = 0
def update(self, inp):
self.xnm = self.xn
self.xn = inp
return (self.xn - self.xnm)
intes = [integrator() for a in range(num_stages)]
combs = [comb() for a in range(num_stages)]
for (s, v) in enumerate(uin):
z = v
for i in range(num_stages):
z = intes[i].update(z)
if (s % self._settings.dsr) == 0:
for c in range(num_stages):
z = combs[c].update(z)
j = z
output_transient.append(j / gain)
return np.array(output_transient)
[docs]
@staticmethod
def do_decimation_polyphase_order_one(uin: np.ndarray) -> np.ndarray:
"""Performing first order Non-Recursive Polyphase Decimation on input
param uin: Numpy array with transient signal input (high sampling rate)
return: Numpy array with transient signal output (low sampling rate)
"""
last_sample_hs = 0
uout = []
for idx, val in enumerate(uin):
if idx % 2 == 1:
uout.append(val + last_sample_hs)
last_sample_hs = val
uout = np.array(uout)
return uout
[docs]
@staticmethod
def do_decimation_polyphase_order_two(uin: np.ndarray) -> np.ndarray:
"""Performing second order Non-Recursive Polyphase Decimation on input
param uin: Numpy array with transient signal input (high sampling rate)
return: Numpy array with transient signal output (low sampling rate)
"""
last_sample_hs = 0
last_sample_ls = 0
uout = []
for idx, val in enumerate(uin):
if idx % 2 == 1:
uout.append(val + last_sample_ls + 2 * last_sample_hs)
last_sample_ls = val
last_sample_hs = val
uout = np.array(uout)
return uout
[docs]
def do_decimation_polyphase(self, uin: np.ndarray, take_first_order: bool=False) -> np.ndarray:
"""Performing Non-Recursive Polyphase Decimation on input (depends on DSR)
param uin: Numpy array with transient signal input (high sampling rate)
return: Numpy array with transient signal output (low sampling rate)
"""
if self._settings.dsr % 2 == 0:
raise ValueError("self._settings.dsr should be 2^x")
x = uin
for _ in range(int(np.log2(self._settings.dsr))):
if take_first_order:
x = self.do_decimation_polyphase_order_one(x)
else:
x = self.do_decimation_polyphase_order_two(x)
return x