Source code for denspp.offline.pipeline.multithread
import numpy as np
from os import cpu_count
from logging import getLogger, Logger
from threading import Thread
from tqdm import tqdm
class _ThreadProcess(Thread):
def __init__(self, rawdata: np.ndarray, chnnl_name: int | str, func) -> None:
super().__init__()
self._logger = getLogger(__name__)
self.input = rawdata
self.id = chnnl_name
self.pipeline = func
self.results = dict()
def run(self) -> None:
self.results = {self.id: self.pipeline(self.input)}
[docs]
class MultithreadHandler:
_logger: Logger
__threads_worker: list = list()
_results: dict = dict()
_num_cores: int = cpu_count()
def __init__(self, num_workers: int) -> None:
"""Thread processor for analyzing data
:param num_workers: Integer with number of parallel workers
:returns: None
"""
super().__init__()
self._logger = getLogger(__name__)
self._max_num_workers = num_workers
def __perform_single_threads(self, func, rawdata: np.ndarray | list, chnnl_id: list) -> None:
self.__threads_worker = list()
self._logger.info('... processing data via single threading')
for idx, (chnnl, data) in enumerate(tqdm(zip(chnnl_id, rawdata), ncols=100, desc='Progress: ')):
thread = _ThreadProcess(
rawdata=data,
chnnl_name=chnnl,
func=func
)
self.__threads_worker.append(thread)
self.__threads_worker[idx].start()
self.__threads_worker[idx].join()
self._results.update(self.__threads_worker[idx].results)
def __perform_multi_threads(self, func, data: np.ndarray | list, chnnl_id: list) -> None:
num_iterations = int(np.ceil(len(chnnl_id) / self._max_num_workers))
num_effective = num_iterations if num_iterations < self._num_cores else self._num_cores
split_groups = [chnnl_id[i:i + num_effective] for i in range(0, len(chnnl_id), num_effective)]
self._logger.info(f"... processing data with {self._max_num_workers} threading workers on {self._num_cores} cores")
for group in tqdm(split_groups, ncols=100, desc='Progress: '):
self.__threads_worker = list()
# --- Starting all threads
for idx, group_num in enumerate(group):
thread = _ThreadProcess(
rawdata=data[idx],
chnnl_name=group_num,
func=func
)
self.__threads_worker.append(thread)
self.__threads_worker[idx].start()
# --- Waiting all threads are ready
for thread in self.__threads_worker:
thread.join()
self._results.update(thread.results)
[docs]
def do_save_results(self, path2save: str) -> None:
"""Saving results in desired numpy format"""
np.save(f'{path2save}/results.npy', self._results)
[docs]
def get_results(self) -> dict:
"""Return the signals after processing"""
return self._results
[docs]
def do_processing(self, func, data: np.ndarray | list, chnnl_id: list) -> None:
"""Performing the data processing
:param func: Function to add Thread Processor for parallel processing
:param data: Numpy array with data to process (shape=[num. channels, num. samples])
:param chnnl_id: List wit hInteger with number of parallel workers
"""
if self._max_num_workers > 1 and len(chnnl_id) > 1:
self.__perform_multi_threads(func, data, chnnl_id)
else:
self.__perform_single_threads(func, data, chnnl_id)