Source code for denspp.offline.pipeline.pipeline_handler
from logging import getLogger
from denspp.offline.logger import define_logger_runtime
from denspp.offline.data_format.yaml import YamlHandler
from denspp.offline.data_call import SettingsData, DefaultSettingsData
from denspp.offline.data_call.merge_datasets_frames import MergeDatasets
from denspp.offline.pipeline.pipeline_cmds import DataloaderLibrary, PipelineLibrary
from denspp.offline.pipeline.multithread import MultithreadHandler
[docs]
def start_processing_pipeline(sets_load_data: SettingsData=DefaultSettingsData):
"""Function for preparing the pipeline preprocessing
:param sets_load_data: Dataclass with settings for getting data to analyse
:return: Tuple with (0) Settings class for data, (1) Settings class for thread handling, (2) DataLoader, (3) Pipeline
"""
# --- Getting the YAML files
logger = getLogger(__name__)
if sets_load_data == DefaultSettingsData:
settings_data = YamlHandler(
template=sets_load_data,
path='config',
file_name='Config_PipelineData'
).get_class(SettingsData)
else:
settings_data = sets_load_data
logger.debug("Load YAML configs")
# --- Getting the Pipeline and DataLoader
datalib = DataloaderLibrary().get_registry()
matches0 = [item for item in datalib.get_library_overview() if 'DataLoader' in item]
assert len(matches0), "No DataLoader found"
logger.debug("Found DataLoader")
data_handler = datalib.build_object('DataLoader')
pipelib = PipelineLibrary().get_registry()
search_name = sets_load_data.pipeline + ('_Merge' if sets_load_data.do_merge else '')
matches1 = [item for item in pipelib.get_library_overview() if search_name in item]
assert len(matches1), "No Pipeline found"
logger.debug("Found Pipeline")
pipe = pipelib.build_object(matches1[0])
return settings_data, data_handler, pipe
[docs]
def select_process_pipeline(object_dataloader, object_pipeline, sets_load_data: SettingsData) -> None:
"""Function for handling the processing pipeline
:param object_dataloader: object dataloader
:param object_pipeline: object pipeline
:param sets_load_data: Dataclass with settings for getting data to analyse
:return: None
"""
define_logger_runtime(False)
logger = getLogger(__name__)
# ----- Preparation: Module calling -----
logger.info("Running framework for end-to-end neural signal processing (DeNSPP)")
logger.info("Step #1: Loading data")
logger.info("==================================================================")
datahand = object_dataloader(sets_load_data)
datahand.do_call()
datahand.do_cut()
datahand.do_resample()
datahand.build_mapping()
datahand.output_meta()
dataIn = datahand.get_data()
del datahand
# --- Thread Preparation: Processing data
logger.info("Step #2: Processing data")
logger.info("==================================================================")
thr_station = MultithreadHandler(
num_workers=1
)
dut = object_pipeline(dataIn.fs_used)
thr_station.do_processing(
data=dataIn.data_raw,
chnnl_id=dataIn.electrode_id,
func=dut.run
)
# --- Plot all plots and save results
logger.info("Step #3: Saving results and plotting")
logger.info("==================================================================")
thr_station.do_save_results(path2save=dut.path2save)
[docs]
def select_process_merge(object_dataloader, object_pipeline, sets_load_data: SettingsData) -> None:
"""Function for preparing and starting the merge process for generating datasets
:param object_dataloader: DataLoader object
:param object_pipeline: Pipeline object
:param sets_load_data: SettingsData object
:return: None
"""
logger = getLogger(__name__)
logger.info("Running framework for end-to-end neural signal processing (DeNSPP)")
logger.info("Building datasets from transient data")
# ---- Merging spike frames from several files to one file
merge_handler = MergeDatasets(object_pipeline, object_dataloader, sets_load_data, True)
merge_handler.get_frames_from_dataset(
concatenate_id=False,
process_points=list()
)
merge_handler.merge_data_from_diff_files()
merge_handler.save_merged_data_in_npyfile()
# --- Merging the frames to new cluster device
logger.info("=========================================================")
logger.info("Final Step with merging cluster have to be done in MATLAB")