Source code for elasticai.preprocessor._common_func
from copy import deepcopy
from typing import overload
import numpy as np
from elasticai.creator.arithmetic import FxpArithmetic, FxpParams
[docs]
class CommonDigitalFunctions:
_digital_border: np.ndarray
_bitwidth: list = [2, 0]
_bitsigned: bool = False
[docs]
def define_limits(self, bit_signed: bool, total_bitwidth: int, frac_bitwidth: int) -> np.ndarray:
"""Defining the digital limitation values
:param bit_signed: Integer data type (unsigned: False, signed: True)
:param total_bitwidth: Total bitwidth
:param frac_bitwidth: Fraction bitwidth
:return: Numpy array with range (min, max)
"""
if total_bitwidth < 0 or frac_bitwidth < 0:
raise ValueError("total_bitwidth and frac_bitwidth must be positive")
else:
self._bitwidth = [total_bitwidth, frac_bitwidth]
self._bitsigned = bit_signed
self._digital_border = self._quantize_fxp(xin=np.array([-np.inf, np.inf]))
return self._digital_border
def _clamp_digital(self, xin: np.ndarray) -> np.ndarray:
"""Do digital clamping of input data values
:param xin: Input data stream
:return: Output data stream
"""
xout = deepcopy(xin)
np.clip(xout, a_min=self._digital_border[0], a_max=self._digital_border[1], out=xout)
return xout
@overload
def _quantize_fxp(self, xin: float) -> np.ndarray: ...
@overload
def _quantize_fxp(self, xin: np.ndarray) -> np.ndarray: ...
@overload
def _quantize_fxp(self, xin: list[int | float]) -> np.ndarray: ...
def _quantize_fxp(self, xin: float | list[int | float] | np.ndarray) -> np.ndarray:
"""Do signed quantization of input with full precision
:param xin: Input data stream
:return: Quantized output data stream
"""
arith = FxpArithmetic(
FxpParams(
total_bits=self._bitwidth[0],
frac_bits=self._bitwidth[1],
signed=self._bitsigned,
)
)
if isinstance(xin, (float, int)):
values = [xin]
else:
values = xin
out = list()
for val in values:
if val == np.inf:
out.append(arith.maximum_as_rational)
elif val == -np.inf:
out.append(arith.minimum_as_rational)
else:
out.append(arith.cut_as_integer(val) * arith._config.minimum_step_as_rational)
return np.asarray(out)
@staticmethod
def _extract_rising_edge(trigger: np.ndarray) -> list:
"""Extracting the rising edges of a boolean array (e.g. output signal of a comparator)
:param trigger: Numpy array with trigger signal (transient)
:return: List with index of rising edges
"""
trgg_evnt = np.flatnonzero((~trigger[:-1]) & (trigger[1:])) + 1
return trgg_evnt.tolist()
@staticmethod
def _extract_falling_edge(trigger: np.ndarray) -> list:
"""Extracting the falling edges of a boolean array (e.g. output signal of a comparator)
:param trigger: Numpy array with trigger signal (transient)
:return: List with index of rising edges
"""
trgg_evnt = np.flatnonzero((trigger[:-1]) & (~trigger[1:])) + 1
return trgg_evnt.tolist()