import numpy as np
import scipy.signal as scft
import matplotlib.pyplot as plt
from logging import getLogger, Logger
from dataclasses import dataclass
from fxpmath import Fxp
from denspp.offline.analog.common_func import CommonDigitalFunctions
from denspp.offline.plot_helper import save_figure, get_plot_color
[docs]
@dataclass
class SettingsFilter:
"""Configuration class for defining the filter processor
Attributes:
gain: Integer with applied amplification factor [V/V]
fs: Sampling rate [Hz]
n_order: Integer with number of filter order or delay values in FIR-allpass
f_filt: List with filter frequencies [Hz] (low/high-pass: only one value - rest: two values)
type: String with selected filter algorithm ['iir', 'fir']
f_type: String with selected filter structure ['butter', 'cheby1', 'cheby2', 'ellip', 'bessel']
b_type: String with selected filter type ['lowpass', 'highpass', 'bandpass', 'bandstop', 'notch', 'allpass']
"""
gain: int
fs: float
n_order: int | float
f_filt: list
type: str
f_type: str
b_type: str
RecommendedSettingsFilter = SettingsFilter(
gain=1, fs=0.3e3,
n_order=2, f_filt=[0.1, 100],
type='iir', f_type='butter', b_type='bandpass'
)
[docs]
class DSP(CommonDigitalFunctions):
__logger: Logger
_type_supported: list = ['fir', 'iir']
_btype_supported: list = ['lowpass', 'highpass', 'bandpass', 'bandstop', 'notch', 'allpass']
_ftype_supported: list = ['butter', 'bessel', 'cheby1', 'cheby2', 'ellip']
_coeff_a: np.ndarray
_coeff_b: np.ndarray
def __init__(self, setting: SettingsFilter, use_filtfilt: bool=False):
"""Class for Emulating Digital Signal Processing on FPGA
:param setting: Class for handling the filter stage (using SettingsFilter)
:param use_filtfilt: Boolean for applying zero-phase filtering
:return: None
"""
super().__init__()
self.__logger = getLogger(__name__)
self.settings = setting
self.__use_filtfilt = use_filtfilt
self.__process_filter()
[docs]
def get_coeffs(self) -> dict:
"""Getting the filter coefficients"""
return {'b': self._coeff_b.tolist(), 'a': self._coeff_a.tolist()}
[docs]
def get_coeffs_quantized(self, bit_size: int, bit_frac: int, signed: bool=True) -> dict:
"""Quantize the coefficients with given bit fraction for adding into hardware designs
:param bit_size: Integer with total bitwidth
:param bit_frac: Integer with fraction width
:param signed: Boolean with whether to sign the coefficients
:return: Dictionary with quantized coefficients
"""
self.define_limits(bit_signed=signed, total_bitwidth=bit_size, frac_bitwidth=bit_frac)
quant_a = Fxp(self._coeff_a, signed=signed, n_word=bit_size, n_frac=bit_frac)
error_a = self._coeff_a - quant_a.all()
quant_b = Fxp(self._coeff_b, signed=signed, n_word=bit_size, n_frac=bit_frac)
error_b = self._coeff_b - quant_b.all()
return {'b': quant_b, 'a': quant_a, 'error_b': error_b, 'error_a': error_a}
def __extract_filter_coeffs_iir(self) -> None:
frange = 2 * np.array(self.settings.f_filt) / self.settings.fs
match self.settings.b_type.lower():
case 'notch':
filter = scft.iirnotch(
w0=float(self.settings.f_filt[0]),
Q=self.settings.n_order,
fs=self.settings.fs
)
self._coeff_b = filter[0]
self._coeff_a = filter[1]
case 'allpass':
if self.settings.n_order == 1:
assert len(self.settings.f_filt) == 1, "f_filt should have length of 1 with [f_b] value"
val = np.tan(np.pi * frange[0] / self.settings.fs)
iir_c0 = (val - 1) / (val + 1)
self._coeff_b = np.array([iir_c0, 1.0])
self._coeff_a = np.array([1.0, iir_c0])
elif self.settings.n_order == 2:
assert len(self.settings.f_filt) == 2, "f_filt should have length of 2 with [f_b, bandwidth] value"
val = np.tan(np.pi * frange[1] / self.settings.fs)
iir_c0 = (val - 1) / (val + 1)
iir_c1 = -np.cos(2 * np.pi * frange[0] / self.settings.fs)
self._coeff_b = np.array([-iir_c0, iir_c1 * (1 - iir_c0), 1.0])
self._coeff_a = np.array([1.0, iir_c1 * (1 - iir_c0), -iir_c0])
else:
raise NotImplementedError
case _:
filter = scft.iirfilter(
N=self.settings.n_order, Wn=frange,
ftype=self.settings.f_type.lower(), btype=self.settings.b_type.lower(),
analog=False, output='ba'
)
self._coeff_b = filter[0]
self._coeff_a = filter[1]
def __extract_filter_coeffs_fir(self) -> None:
frange = np.array(self.settings.f_filt)
self._coeff_a = np.array(1.0)
match self.settings.b_type.lower():
case 'notch':
assert len(self.settings.f_filt) == 2, "Size of f_filt should be 2 with [f_notch, bandwidth]"
freq = [0, frange[0] - frange[1], frange[0], frange[0] + frange[1], self.settings.fs / 2]
gain = [1, 1, 0, 1, 1]
self._coeff_b = scft.firwin2(
numtaps=self.settings.n_order,
freq=freq,
gain=gain,
fs=self.settings.fs
)
case 'allpass':
self._coeff_b = self._coeff_a
case _:
self._coeff_b = scft.firwin(
numtaps=self.settings.n_order,
cutoff=frange,
fs=self.settings.fs,
pass_zero=self.settings.b_type.lower()
)
def __process_filter(self) -> None:
assert self.settings.type.lower() in self._type_supported, f"Type {self.settings.type} is not supported from {self._type_supported}"
assert self.settings.f_type.lower() in self._ftype_supported, f"Filter type {self.settings.f_type} is not supported from {self._ftype_supported}"
assert self.settings.b_type.lower() in self._btype_supported, f"Structure type {self.settings.b_type} is not supported from {self._btype_supported}"
self.__logger.debug(f'Build {self.settings.type.upper()} filter: {self.settings.b_type}, {self.settings.f_type}')
if self.settings.type.lower() == 'iir':
self.__extract_filter_coeffs_iir()
elif self.settings.type.lower() == 'fir':
self.__extract_filter_coeffs_fir()
[docs]
def filter(self, xin: np.ndarray) -> np.ndarray:
"""Apply configured filter structure on transient data
:param xin: Numpy array with transient data
:return: Numpy array with filtered data
"""
if self.settings.type.lower() == 'fir' and self.settings.b_type.lower() == 'allpass':
mat = np.zeros(shape=(self.settings.n_order,), dtype=float)
xout = np.concatenate((mat, xin[0:xin.size - self.settings.n_order]), axis=None)
elif not self.__use_filtfilt:
xout = self.settings.gain * scft.lfilter(
b=self._coeff_b,
a=self._coeff_a,
x=xin
)
else:
xout = self.settings.gain * scft.filtfilt(
b=self._coeff_b,
a=self._coeff_a,
x=xin
)
return xout
def __get_frequency_behaviour(self, num_points: int=1001) -> tuple[np.ndarray, np.ndarray]:
if not self._coeff_a.size > 1:
return scft.freqz(
b=self._coeff_b,
a=self._coeff_a,
worN=num_points,
fs=2 * np.pi * self.settings.fs,
include_nyquist=True
)
else:
return scft.freqs(
b=self._coeff_b,
a=1,
worN=num_points
)
[docs]
def plot_freq_response(self, num_points: int=1001,
show_plot: bool=True, path2save: str='') -> None:
"""Function for plotting the frequency response of desired filter type
:param num_points: Number of points to plot
:param show_plot: Boolean for showing plot
:param path2save: Path to save figure
"""
w, h = self.__get_frequency_behaviour(num_points=num_points)
f = w / (2 * np.pi)
fig1, ax11 = plt.subplots()
plt.title('Frequency response')
amplit_log = 20 * np.log10(np.abs(h))
amplit_ref = np.zeros_like(amplit_log) + amplit_log.max() - 3 * self.settings.n_order
plt.semilogx(f, amplit_log, color=get_plot_color(0), label='Gain')
plt.semilogx(f, amplit_ref, linestyle='--', color=get_plot_color(0), label='Ref.')
plt.ylabel(r'Amplitude |$H(\omega)$| (dB)', color=get_plot_color(0))
plt.xlabel(r'Frequency $f_\mathrm{sig}$ (Hz)')
plt.ylim([amplit_log.max()-20, amplit_log.max()+3])
plt.xlim([f[0], f[-1]])
ax11.grid(True, which="both", ls="--")
ax21 = ax11.twinx()
phase = np.angle(h, deg=True)
plt.semilogx(f, phase, color=get_plot_color(1), label='Phase')
plt.ylabel(r'Phase $\alpha$ (°)', color=get_plot_color(1))
plt.tight_layout()
if path2save:
save_figure(plt, path2save, 'freq_response')
plt.show(block=show_plot)
[docs]
def plot_grp_delay(self, num_points: int=1001, show_plot: bool=False) -> None:
""" Plotting the Group Delay of filter
:param num_points: Number of points to plot
:param show_plot: Boolean for showing plot
:return: None
"""
w, h = self.__get_frequency_behaviour(num_points=num_points)
f = w / (2 * np.pi)
phase = np.unwrap(np.angle(h)) / np.pi * 180
grp_dly = - np.diff(phase) / np.diff(w)
plt.figure()
plt.semilogx(f[2:], grp_dly[1:], 'k', linewidth=1)
plt.ylabel(r'Group Delay $\tau_\mathrm{grp}$ (s)')
plt.xlabel(r'Frequency $f_\mathrm{sig}$ (Hz)')
plt.grid()
plt.tight_layout()
plt.show(block=show_plot)