Source code for denspp.offline.pipeline.multicore

import numpy as np
from multiprocessing import Process, Queue
from os import cpu_count
from tqdm import tqdm
from logging import getLogger, Logger


def _core_task(rawdata: np.ndarray, id: int, func, result) -> None:
        result.put({id: func(rawdata)})


[docs] class MultiprocessingHandler: _logger: Logger __threads_core: list = list() _results: dict = dict() _num_cores: int = cpu_count() def __init__(self, num_workers: int) -> None: """Thread processor for analyzing data :param num_workers: Integer with number of parallel workers :returns: None """ super().__init__() self._logger = getLogger(__name__) self._max_num_workers = num_workers def __perform_single_threads(self, func, rawdata: np.ndarray | list, chnnl_id: list) -> None: self._logger.info('... processing data via single threading') self._results = dict() self.__threads_core = list() rslt = Queue() for idx, (chnnl, data) in enumerate(tqdm(zip(chnnl_id, rawdata), ncols=100, desc='Progress Threads: ')): thread = Process( target=_core_task, kwargs=dict( rawdata=data, id=chnnl, func=func, result=rslt )) thread.start() thread.join() self._results.update(rslt.get()) def __perform_multi_threads(self, func, data: np.ndarray | list, chnnl_id: list) -> None: num_iterations = int(np.ceil(len(chnnl_id) / self._max_num_workers)) num_effective = num_iterations if num_iterations < self._num_cores else self._num_cores split_groups = [chnnl_id[i:i + num_effective] for i in range(0, len(chnnl_id), num_effective)] self._logger.info(f"... processing data with {self._max_num_workers} threading workers on {self._num_cores} cores") self._results = dict() for group in tqdm(split_groups, ncols=100, desc='Progress Threads: '): self.__threads_core = list() # --- Starting all threads rslt = Queue() for idx, group_num in enumerate(group): thread = Process( target=_core_task, kwargs=dict( rawdata=data[idx], id=group_num, func=func, result=rslt ) ) self.__threads_core.append(thread) self.__threads_core[idx].start() # --- Waiting all threads are ready for thread in self.__threads_core: thread.join() self._results.update(rslt.get())
[docs] def do_save_results(self, path2save: str) -> None: """Saving results in desired numpy format""" np.save(f'{path2save}/results.npy', self._results)
[docs] def get_results(self) -> dict: """Return the signals after processing""" return self._results
[docs] def do_processing(self, func, data: np.ndarray | list, chnnl_id: list) -> None: """Performing the data processing :param func: Function to add Thread Processor for parallel processing :param data: Numpy array with data to process (shape=[num. channels, num. samples]) :param chnnl_id: List wit hInteger with number of parallel workers """ if self._max_num_workers > 1 and len(chnnl_id) > 1: self.__perform_multi_threads(func, data, chnnl_id) else: self.__perform_single_threads(func, data, chnnl_id)