diff --git a/src_py/apiServer/apiServer.py b/src_py/apiServer/apiServer.py index 4d9b50665..9cf1ebdbb 100644 --- a/src_py/apiServer/apiServer.py +++ b/src_py/apiServer/apiServer.py @@ -200,7 +200,7 @@ def send_data_to_sources(self, experiment_phase: ExperimentPhase): # Todo cheack globe.set_receiver_wait_for_ack() # send ack according to state of ack globe.ack_debug_print() # genrate csvs for each source - # do fort on the list of sources pieces + # do for on the list of sources pieces # use generate_source_pieceDs_csv_file self.transmitter.update_csv() globe.waitForAck() diff --git a/src_py/apiServer/experiment_flow.py b/src_py/apiServer/experiment_flow.py index 14453f161..48eaabb78 100644 --- a/src_py/apiServer/experiment_flow.py +++ b/src_py/apiServer/experiment_flow.py @@ -8,8 +8,8 @@ from workerResult import * from collections import OrderedDict from NerlComDB import * -from NerlDB import * -from NerlDatasetDB import * +from NerlModelDB import * +from nerl_csv_dataSet_db import * # Todo check imports and remove unused ones PARAM_CSV_DB_PATH = "csv_db_path" @@ -26,8 +26,9 @@ def __init__(self, phase : str): def get_phase(self): return self.phase - def get_name (): + def get_name(self): pass + def get_sources_str_list(self): return ",".join(self.source_pieces_dict.keys()) @@ -38,10 +39,10 @@ def add_source_piece(self, source_piece : SourcePieceDS): LOG_ERROR(f"Source piece with name {source_piece.source_name} already exists in phase { self.phase}") def get_sources_pieces(self): - return list(self.source_pieces_dict) + return list(self.source_pieces_dict.values()) - def remove_source_piece(self, source_name: str): # Todo Ohad and Noa - pass + def remove_source_piece(self, source_name: str): + self.source_pieces_dict.pop(source_name) class ExperimentFlow(): @@ -70,12 +71,14 @@ def get_exp_phase_list(self): def generate_stats(self, experiment_phase: ExperimentPhase): pass + # Todo implement this function : accuracy, confusion matrix, loss, etc. def parse_experiment_flow_json(self, json_path : str): # read json file from nerlPlanner output with open(json_path) as json_file: self.exp_flow_json = json.load(json_file) # parse json and create experiment phases + self.exp_name = self.exp_flow_json[EXPFLOW_EXPERIMENT_NAME_FIELD] self.batch_size = self.exp_flow_json[EXPFLOW_BATCH_SIZE_FIELD] csv_file_path = self.exp_flow_json[EXPFLOW_CSV_FILE_PATH_FIELD] headers_row = self.exp_flow_json[EXPFLOW_HEADERS_NAMES_FIELD] @@ -83,17 +86,30 @@ def parse_experiment_flow_json(self, json_path : str): num_of_labels = self.exp_flow_json[EXPFLOW_NUM_OF_LABELS_FIELD] self.set_csv_dataset(csv_file_path, num_of_features, num_of_labels, headers_row) phases_list = self.exp_flow_json[EXPFLOW_PHASES_FIELD] - # Todo complete phases loop + for phase in phases_list: + phase_name = phase[EXPFLOW_PHASES_PHASE_NAME_FIELD] + phase_type = phase[EXPFLOW_PHASES_PHASE_TYPE_FIELD] + sourcePieces = phase[EXPFLOW_PHASES_PHASE_SOURCE_PIECES_FIELD] + source_pieces_inst_list = [] + for source_piece in sourcePieces: + source_name = source_piece[EXPFLOW_PHASE_SOURCE_PIECES_SOURCE_NAME_FIELD] + strating_sample = source_piece[EXPFLOW_PHASE_SOURCE_PIECES_STRATING_SAMPLE_FIELD] + num_of_batches = source_piece[EXPFLOW_PHASE_SOURCE_PIECES_NUM_OF_BATCHES_FIELD] + workers = source_piece[EXPFLOW_PHASE_SOURCE_PIECES_WORKERS_FIELD] + source_piece_inst = SourcePieceDS(source_name, strating_sample, num_of_batches, workers) + source_pieces_inst_list.append(source_piece_inst) + self.add_phase(phase_name, phase_type, source_pieces_inst_list) + def set_csv_dataset(self, csv_file_path : str, num_of_features : int, num_of_labels : int, headers_row : bool): self.csv_dataset = CSVDataSet(csv_file_path, self.batch_size, num_of_features, num_of_labels, headers_row) # Todo get num of features and labels from csv file - def add_phase(self, phase_name : str, phase_type : str, sourcePieces : list): - exp_phase = ExperimentPhase(phase_name) - for source_piece in sourcePieces: - exp_phase.add_source_piece(source_piece) - self.exp_phase_list.append(exp_phase) + def add_phase(self, phase_name : str, phase_type : str, source_pieces_inst_list : list): + exp_phase_inst = ExperimentPhase(phase_name) + for source_piece_inst in source_pieces_inst_list: + exp_phase_inst.add_source_piece(source_piece_inst) + self.exp_phase_list.append(exp_phase_inst) diff --git a/src_py/apiServer/experiment_flow_defs.py b/src_py/apiServer/experiment_flow_defs.py index 5e4cc2d48..dd7f3dd00 100644 --- a/src_py/apiServer/experiment_flow_defs.py +++ b/src_py/apiServer/experiment_flow_defs.py @@ -11,9 +11,12 @@ EXPFLOW_PHASES_PHASE_NAME_FIELD = 'phaseName' EXPFLOW_PHASES_PHASE_TYPE_FIELD = 'phaseType' -EXPFLOW_PHASES_STRATING_SAMPLE_FIELD = 'stratingSample' -EXPFLOW_PHASES_NUM_OF_BATCHES_FIELD = 'numOfBatches' -EXPFLOW_PHASES_WORKERS_FIELD = 'workers' -EXPFLOW_PHASES_SOURCES_FIELD = 'sources' +EXPFLOW_PHASES_PHASE_SOURCE_PIECES_FIELD = 'sourcePieces' + +EXPFLOW_PHASE_SOURCE_PIECES_SOURCE_NAME_FIELD = 'sourceName' +EXPFLOW_PHASE_SOURCE_PIECES_STRATING_SAMPLE_FIELD = 'stratingSample' +EXPFLOW_PHASE_SOURCE_PIECES_NUM_OF_BATCHES_FIELD = 'numOfBatches' +EXPFLOW_PHASE_SOURCE_PIECES_WORKERS_FIELD = 'workers' + diff --git a/src_py/apiServer/nerl_csv_dataSet_db.py b/src_py/apiServer/nerl_csv_dataSet_db.py new file mode 100644 index 000000000..ac35836e3 --- /dev/null +++ b/src_py/apiServer/nerl_csv_dataSet_db.py @@ -0,0 +1,67 @@ + +from definitions import PHASE_TRAINING_STR, PHASE_PREDICTION_STR +import pandas as pd +import math.floor as floor + +class SourcePieceDS(): + def __init__(self, source_name : str, batch_size, phase : str, starting_offset = 0, num_of_batches = 0): + self.source_name = source_name + self.batch_size = batch_size + self.phase = phase + self.starting_offset = starting_offset # given as index of csv rows + self.num_of_batches = num_of_batches + self.workers_target = None + self.pointer_to_CsvDataSet = None # which csvDataSet + + def get_source_name(self): + return self.source_name + + def get_batch_size(self): + return self.batch_size + + def get_phase(self): + return self.phase + + def get_starting_offset(self): + return self.starting_offset + + + +class CsvDataSet(): + def __init__(self, csv_path, batch_size, num_of_features, num_of_labels, headers_row: bool): + self.csv_path = csv_path + self.batch_size = batch_size + self.num_of_features = num_of_features + self.num_of_labels = num_of_labels + self.headers_row = headers_row + + def get_csv_path(self): + return self.csv_path + + def get_batch_size(self): + return self.batch_size + + def get_num_of_features(self): + return self.num_of_features + + def get_num_of_labels(self): + return self.num_of_labels + + def get_total_num_of_batches(self): + return floor(pd.read_csv(self.csv_path, header = self.headers_row).shape[0] / self.batch_size) + + def generate_source_pieceDS(self, source_name : str, batch_size: int, phase : str, starting_offset = 0 : int, num_of_batches = 0 : int): + assert batch_size > 0 + assert num_of_batches >= 0 + assert offset >= 0 + assert phase == PHASE_TRAINING_STR or phase == PHASE_PREDICTION_STR + assert starting_offset >= 0 + assert (starting_offset + num_of_batches * batch_size) <= self.get_total_num_of_batches() + return SourcePieceDS(source_name, batch_size, phase, starting_offset, num_of_batches) + + def generate_source_pieceDs_csv_file(self, csv_file_path : str, source_pieceDS_inst: SourcePieceDS): + # Todo Ohad&Noa + # df_train = pd.df.read_csv(self.csv_dataset_path, skiprows=starting_offset_index_train, nrows=number_of_samples_train) + pass + +