Source code for elasticai.preprocessor._common_func

from copy import deepcopy
from typing import overload

import numpy as np
from elasticai.creator.arithmetic import FxpArithmetic, FxpParams


[docs] class CommonDigitalFunctions: _digital_border: np.ndarray _bitwidth: list = [2, 0] _bitsigned: bool = False
[docs] def define_limits(self, bit_signed: bool, total_bitwidth: int, frac_bitwidth: int) -> np.ndarray: """Defining the digital limitation values :param bit_signed: Integer data type (unsigned: False, signed: True) :param total_bitwidth: Total bitwidth :param frac_bitwidth: Fraction bitwidth :return: Numpy array with range (min, max) """ if total_bitwidth < 0 or frac_bitwidth < 0: raise ValueError("total_bitwidth and frac_bitwidth must be positive") else: self._bitwidth = [total_bitwidth, frac_bitwidth] self._bitsigned = bit_signed self._digital_border = self._quantize_fxp(xin=np.array([-np.inf, np.inf])) return self._digital_border
def _clamp_digital(self, xin: np.ndarray) -> np.ndarray: """Do digital clamping of input data values :param xin: Input data stream :return: Output data stream """ xout = deepcopy(xin) np.clip(xout, a_min=self._digital_border[0], a_max=self._digital_border[1], out=xout) return xout @overload def _quantize_fxp(self, xin: float) -> np.ndarray: ... @overload def _quantize_fxp(self, xin: np.ndarray) -> np.ndarray: ... @overload def _quantize_fxp(self, xin: list[int | float]) -> np.ndarray: ... def _quantize_fxp(self, xin: float | list[int | float] | np.ndarray) -> np.ndarray: """Do signed quantization of input with full precision :param xin: Input data stream :return: Quantized output data stream """ arith = FxpArithmetic( FxpParams( total_bits=self._bitwidth[0], frac_bits=self._bitwidth[1], signed=self._bitsigned, ) ) if isinstance(xin, (float, int)): values = [xin] else: values = xin out = list() for val in values: if val == np.inf: out.append(arith.maximum_as_rational) elif val == -np.inf: out.append(arith.minimum_as_rational) else: out.append(arith.cut_as_integer(val) * arith._config.minimum_step_as_rational) return np.asarray(out) @staticmethod def _extract_rising_edge(trigger: np.ndarray) -> list: """Extracting the rising edges of a boolean array (e.g. output signal of a comparator) :param trigger: Numpy array with trigger signal (transient) :return: List with index of rising edges """ trgg_evnt = np.flatnonzero((~trigger[:-1]) & (trigger[1:])) + 1 return trgg_evnt.tolist() @staticmethod def _extract_falling_edge(trigger: np.ndarray) -> list: """Extracting the falling edges of a boolean array (e.g. output signal of a comparator) :param trigger: Numpy array with trigger signal (transient) :return: List with index of rising edges """ trgg_evnt = np.flatnonzero((trigger[:-1]) & (~trigger[1:])) + 1 return trgg_evnt.tolist()