Source code for denspp.offline.pipeline.pipeline_handler

from logging import getLogger
from denspp.offline.logger import define_logger_runtime
from denspp.offline.yaml_handler import YamlHandler
from denspp.offline.data_call import SettingsData, DefaultSettingsData
from denspp.offline.data_process.merge_datasets_frames import MergeDatasets
from denspp.offline.pipeline.pipeline_cmds import ProcessingData, SettingsThread, RecommendedSettingsThread, DataloaderLibrary, PipelineLibrary


[docs] def start_processing_pipeline(sets_load_data: SettingsData=DefaultSettingsData, sets_load_thread: SettingsThread=RecommendedSettingsThread): """Function for preparing the pipeline preprocessing :param sets_load_data: Dataclass with settings for getting data to analyse :param sets_load_thread: Dataclass with settings for handling the processing threads :return: Tuple with (0) Settings class for data, (1) Settings class for thread handling, (2) DataLoader, (3) Pipeline, (4) mode """ # --- Getting the YAML files logger = getLogger(__name__) if sets_load_data == DefaultSettingsData: settings_data = YamlHandler( template=sets_load_data, path='config', file_name='Config_PipelineData' ).get_class(SettingsData) settings_thr = YamlHandler( template=sets_load_thread, path='config', file_name='Config_Pipeline' ).get_class(SettingsThread) else: settings_data = sets_load_data settings_thr = sets_load_thread logger.debug("Load YAML configs") # --- Getting the Pipeline and DataLoader datalib = DataloaderLibrary().get_registry() matches0 = [item for item in datalib.get_library_overview() if 'DataLoader' in item] assert len(matches0), "No DataLoader found" logger.debug("Found DataLoader") data_handler = datalib.build_object('DataLoader') pipelib = PipelineLibrary().get_registry() matches1 = [item for item in pipelib.get_library_overview() if settings_data.pipeline in item] assert len(matches1), "No Pipeline found" logger.debug("Found Pipeline") pipe = pipelib.build_object(settings_data.pipeline) mode = 'merge' in settings_data.pipeline.lower() return settings_data, settings_thr, data_handler, pipe, mode
[docs] def select_process_pipeline(object_dataloader, object_pipeline, sets_load_data: SettingsData, sets_load_thread: SettingsThread=RecommendedSettingsThread) -> None: """Function for handling the processing pipeline :param object_dataloader: object dataloader :param object_pipeline: object pipeline :param sets_load_data: Dataclass with settings for getting data to analyse :param sets_load_thread: Dataclass with settings for handling the processing threads :return: None """ define_logger_runtime(False) logger = getLogger(__name__) # ----- Preparation: Module calling ----- logger.info("Running framework for end-to-end neural signal processing (DeNSPP)") logger.info("Step #1: Loading data") logger.info("==================================================================") datahand = object_dataloader(sets_load_data) datahand.do_call() datahand.do_cut() datahand.do_resample() datahand.build_mapping() datahand.output_meta() dataIn = datahand.get_data() del datahand # --- Thread Preparation: Processing data logger.info("Step #2: Processing data") logger.info("==================================================================") thr_station = ProcessingData( pipeline=object_pipeline, settings=sets_load_thread, data_in=dataIn.data_raw, fs=dataIn.fs_used, channel_id=dataIn.electrode_id ) thr_station.do_processing() # --- Plot all plots and save results logger.info("Step #3: Saving results and plotting") logger.info("==================================================================") thr_station.do_save_results() thr_station.do_plot_results()
[docs] def select_process_merge(object_dataloader, object_pipeline) -> None: """Function for preparing and starting the merge process for generating datasets :param object_dataloader: DataLoader object :param object_pipeline: Pipeline object """ logger = getLogger(__name__) logger.info("\nPreparing datasets for AI Training in end-to-end spike-sorting frame-work (MERCUR-project Sp:AI:ke, 2022-2024)") settings = YamlHandler( template=DefaultSettingsData, path='config', file_name='Config_Merge' ).get_class(SettingsData) # ---- Merging spike frames from several files to one file merge_handler = MergeDatasets(object_pipeline, settings, True) merge_handler.get_frames_from_dataset( data_loader=object_dataloader, cluster_class_avai=False, process_points=list() ) merge_handler.merge_data_from_diff_files() merge_handler.save_merged_data_in_npyfile() # --- Merging the frames to new cluster device logger.info("=========================================================") logger.info("Final Step with merging cluster have to be done in MATLAB")