Source code for denspp.offline.pipeline.pipeline_handler

from dataclasses import dataclass
from logging import getLogger
from denspp.offline.logger import define_logger_runtime
from denspp.offline.data_call import SettingsData, DefaultSettingsData, MergeDataset
from denspp.offline.data_format import YamlHandler
from denspp.offline.pipeline import DataloaderLibrary, PipelineLibrary, MultithreadHandler


def _start_processing_pipeline(sets_load_data: SettingsData=DefaultSettingsData):
    """Function for preparing the pipeline preprocessing
    :param sets_load_data:      Dataclass with settings for getting data to analyse
    :return:                    Tuple with (0) Settings class for data, (1) DataLoader, (2) Pipeline
    """
    # --- Getting the YAML files
    logger = getLogger(__name__)
    if sets_load_data == DefaultSettingsData:
        settings_data: SettingsData = YamlHandler(
            template=sets_load_data,
            path='config',
            file_name='Config_PipelineData'
        ).get_class(SettingsData)
    else:
        settings_data = sets_load_data
    logger.debug("Load YAML configs")

    # --- Getting the Pipeline and DataLoader
    datalib = DataloaderLibrary().get_registry()
    matches0 = [item for item in datalib.get_library_overview() if 'DataLoader' in item]
    assert len(matches0), "No DataLoader found"
    logger.debug("Found DataLoader")
    data_handler = datalib.build_object('DataLoader')

    pipelib = PipelineLibrary().get_registry()
    search_name = sets_load_data.pipeline + ('_Merge' if sets_load_data.do_merge else '')
    matches1 = [item for item in pipelib.get_library_overview() if search_name in item]
    assert len(matches1), "No Pipeline found"
    logger.debug("Found Pipeline")
    pipe = pipelib.build_object(matches1[0])

    return settings_data, data_handler, pipe


[docs] def select_process_pipeline(object_dataloader, object_pipeline, sets_load_data: SettingsData) -> None: """Function for handling the processing pipeline :param object_dataloader: object dataloader :param object_pipeline: object pipeline :param sets_load_data: Dataclass with settings for getting data to analyse :return: None """ logger = getLogger(__name__) # ----- Preparation: Module calling ----- logger.info("Running framework for end-to-end neural signal processing (DeNSPP)") logger.info("Step #1: Loading data") logger.info("==================================================================") datahand = object_dataloader(sets_load_data) datahand.do_call() datahand.do_cut() datahand.do_resample() datahand.build_mapping() datahand.output_meta() dataIn = datahand.get_data() del datahand # --- Thread Preparation: Processing data logger.info("Step #2: Processing data") logger.info("==================================================================") thr_station = MultithreadHandler( num_workers=1 ) dut = object_pipeline(dataIn.fs_used) thr_station.do_processing( data=dataIn.data_raw, chnnl_id=dataIn.electrode_id, func=dut.run ) # --- Plot all plots and save results logger.info("Step #3: Saving results and plotting") logger.info("==================================================================") thr_station.do_save_results(path2save=dut.path2save)
[docs] def select_process_merge(object_dataloader, object_pipeline, sets_load_data: SettingsData, frames_xoffset: int=0, list_merging_files: list=(), do_label_concatenation: bool=False) -> None: """Function for preparing and starting the merge process for generating datasets :param object_dataloader: DataLoader object :param object_pipeline: Pipeline object :param sets_load_data: SettingsData object :param frames_xoffset: Integer with offset :param list_merging_files: Taking the datapoints of the selected data set to process :param do_label_concatenation: Do concatenation of the class number with increasing id number (useful for non-biological clusters) :return: None """ logger = getLogger(__name__) logger.info("Running framework for end-to-end neural signal processing (DeNSPP)") logger.info("Building datasets from transient data") # ---- Merging spike frames from several files to one file merge_handler = MergeDataset( pipeline=object_pipeline, dataloader=object_dataloader, settings_data=sets_load_data, concatenate_id=do_label_concatenation, ) merge_handler.get_frames_from_dataset( process_points=list_merging_files, xpos_offset=frames_xoffset ) merge_handler.merge_data_from_all_iteration() # --- Merging the frames to new cluster device logger.info("=========================================================") logger.info("Final Step with merging cluster have to be done separately using SortDataset")
[docs] @dataclass class SettingsMerging: """Class for defining the properties for merging datasets Attributes: taking_datapoints: List with data_points to process [Default: [] -> taking all] do_label_concat: Boolean for concatenating the xoffset: Integer with delayed positions applied on frame/window extraction """ taking_datapoints: list[int] do_label_concat: bool xoffset: int
DefaultSettingsMerging = SettingsMerging( taking_datapoints=[], do_label_concat=False, xoffset=0, )
[docs] def run_transient_data_processing() -> None: """Function for running the offline data analysis of transient use-specific data :return: None """ define_logger_runtime(False) settings_data, data_handler, pipe = _start_processing_pipeline() if settings_data.do_merge: sets_merge: SettingsMerging = YamlHandler( template=DefaultSettingsMerging, path='config', file_name='Config_Merging' ).get_class(SettingsMerging) select_process_merge( object_dataloader=data_handler, object_pipeline=pipe, sets_load_data=settings_data, list_merging_files=sets_merge.taking_datapoints, do_label_concatenation=sets_merge.do_label_concat, frames_xoffset=sets_merge.xoffset, ) else: select_process_pipeline( object_dataloader=data_handler, object_pipeline=pipe, sets_load_data=settings_data )