Skip to content

Commit

Permalink
[ApiServer] WIP finish experiment json parser
Browse files Browse the repository at this point in the history
  • Loading branch information
ohad123 committed Feb 16, 2024
1 parent ab220ef commit 5ee34a6
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src_py/apiServer/apiServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def send_data_to_sources(self, experiment_phase: ExperimentPhase): # Todo cheack
globe.set_receiver_wait_for_ack() # send ack according to state of ack
globe.ack_debug_print()
# genrate csvs for each source
# do fort on the list of sources pieces
# do for on the list of sources pieces
# use generate_source_pieceDs_csv_file
self.transmitter.update_csv()
globe.waitForAck()
Expand Down
40 changes: 28 additions & 12 deletions src_py/apiServer/experiment_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from workerResult import *
from collections import OrderedDict
from NerlComDB import *
from NerlDB import *
from NerlDatasetDB import *
from NerlModelDB import *
from nerl_csv_dataSet_db import *
# Todo check imports and remove unused ones

PARAM_CSV_DB_PATH = "csv_db_path"
Expand All @@ -26,8 +26,9 @@ def __init__(self, phase : str):
def get_phase(self):
return self.phase

def get_name ():
def get_name(self):
pass

def get_sources_str_list(self):
return ",".join(self.source_pieces_dict.keys())

Expand All @@ -38,10 +39,10 @@ def add_source_piece(self, source_piece : SourcePieceDS):
LOG_ERROR(f"Source piece with name {source_piece.source_name} already exists in phase { self.phase}")

def get_sources_pieces(self):
return list(self.source_pieces_dict)
return list(self.source_pieces_dict.values())

def remove_source_piece(self, source_name: str): # Todo Ohad and Noa
pass
def remove_source_piece(self, source_name: str):
self.source_pieces_dict.pop(source_name)


class ExperimentFlow():
Expand Down Expand Up @@ -70,30 +71,45 @@ def get_exp_phase_list(self):

def generate_stats(self, experiment_phase: ExperimentPhase):
pass
# Todo implement this function : accuracy, confusion matrix, loss, etc.

def parse_experiment_flow_json(self, json_path : str):
# read json file from nerlPlanner output
with open(json_path) as json_file:
self.exp_flow_json = json.load(json_file)
# parse json and create experiment phases
self.exp_name = self.exp_flow_json[EXPFLOW_EXPERIMENT_NAME_FIELD]
self.batch_size = self.exp_flow_json[EXPFLOW_BATCH_SIZE_FIELD]
csv_file_path = self.exp_flow_json[EXPFLOW_CSV_FILE_PATH_FIELD]
headers_row = self.exp_flow_json[EXPFLOW_HEADERS_NAMES_FIELD]
num_of_features = self.exp_flow_json[EXPFLOW_NUM_OF_FEATURES_FIELD]
num_of_labels = self.exp_flow_json[EXPFLOW_NUM_OF_LABELS_FIELD]
self.set_csv_dataset(csv_file_path, num_of_features, num_of_labels, headers_row)
phases_list = self.exp_flow_json[EXPFLOW_PHASES_FIELD]
# Todo complete phases loop
for phase in phases_list:
phase_name = phase[EXPFLOW_PHASES_PHASE_NAME_FIELD]
phase_type = phase[EXPFLOW_PHASES_PHASE_TYPE_FIELD]
sourcePieces = phase[EXPFLOW_PHASES_PHASE_SOURCE_PIECES_FIELD]
source_pieces_inst_list = []
for source_piece in sourcePieces:
source_name = source_piece[EXPFLOW_PHASE_SOURCE_PIECES_SOURCE_NAME_FIELD]
strating_sample = source_piece[EXPFLOW_PHASE_SOURCE_PIECES_STRATING_SAMPLE_FIELD]
num_of_batches = source_piece[EXPFLOW_PHASE_SOURCE_PIECES_NUM_OF_BATCHES_FIELD]
workers = source_piece[EXPFLOW_PHASE_SOURCE_PIECES_WORKERS_FIELD]
source_piece_inst = SourcePieceDS(source_name, strating_sample, num_of_batches, workers)
source_pieces_inst_list.append(source_piece_inst)
self.add_phase(phase_name, phase_type, source_pieces_inst_list)



def set_csv_dataset(self, csv_file_path : str, num_of_features : int, num_of_labels : int, headers_row : bool):
self.csv_dataset = CSVDataSet(csv_file_path, self.batch_size, num_of_features, num_of_labels, headers_row) # Todo get num of features and labels from csv file

def add_phase(self, phase_name : str, phase_type : str, sourcePieces : list):
exp_phase = ExperimentPhase(phase_name)
for source_piece in sourcePieces:
exp_phase.add_source_piece(source_piece)
self.exp_phase_list.append(exp_phase)
def add_phase(self, phase_name : str, phase_type : str, source_pieces_inst_list : list):
exp_phase_inst = ExperimentPhase(phase_name)
for source_piece_inst in source_pieces_inst_list:
exp_phase_inst.add_source_piece(source_piece_inst)
self.exp_phase_list.append(exp_phase_inst)



Expand Down
11 changes: 7 additions & 4 deletions src_py/apiServer/experiment_flow_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@

EXPFLOW_PHASES_PHASE_NAME_FIELD = 'phaseName'
EXPFLOW_PHASES_PHASE_TYPE_FIELD = 'phaseType'
EXPFLOW_PHASES_STRATING_SAMPLE_FIELD = 'stratingSample'
EXPFLOW_PHASES_NUM_OF_BATCHES_FIELD = 'numOfBatches'
EXPFLOW_PHASES_WORKERS_FIELD = 'workers'
EXPFLOW_PHASES_SOURCES_FIELD = 'sources'
EXPFLOW_PHASES_PHASE_SOURCE_PIECES_FIELD = 'sourcePieces'

EXPFLOW_PHASE_SOURCE_PIECES_SOURCE_NAME_FIELD = 'sourceName'
EXPFLOW_PHASE_SOURCE_PIECES_STRATING_SAMPLE_FIELD = 'stratingSample'
EXPFLOW_PHASE_SOURCE_PIECES_NUM_OF_BATCHES_FIELD = 'numOfBatches'
EXPFLOW_PHASE_SOURCE_PIECES_WORKERS_FIELD = 'workers'



67 changes: 67 additions & 0 deletions src_py/apiServer/nerl_csv_dataSet_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@

from definitions import PHASE_TRAINING_STR, PHASE_PREDICTION_STR
import pandas as pd
import math.floor as floor

class SourcePieceDS():
def __init__(self, source_name : str, batch_size, phase : str, starting_offset = 0, num_of_batches = 0):
self.source_name = source_name
self.batch_size = batch_size
self.phase = phase
self.starting_offset = starting_offset # given as index of csv rows
self.num_of_batches = num_of_batches
self.workers_target = None
self.pointer_to_CsvDataSet = None # which csvDataSet

def get_source_name(self):
return self.source_name

def get_batch_size(self):
return self.batch_size

def get_phase(self):
return self.phase

def get_starting_offset(self):
return self.starting_offset



class CsvDataSet():
def __init__(self, csv_path, batch_size, num_of_features, num_of_labels, headers_row: bool):
self.csv_path = csv_path
self.batch_size = batch_size
self.num_of_features = num_of_features
self.num_of_labels = num_of_labels
self.headers_row = headers_row

def get_csv_path(self):
return self.csv_path

def get_batch_size(self):
return self.batch_size

def get_num_of_features(self):
return self.num_of_features

def get_num_of_labels(self):
return self.num_of_labels

def get_total_num_of_batches(self):
return floor(pd.read_csv(self.csv_path, header = self.headers_row).shape[0] / self.batch_size)

def generate_source_pieceDS(self, source_name : str, batch_size: int, phase : str, starting_offset = 0 : int, num_of_batches = 0 : int):
assert batch_size > 0
assert num_of_batches >= 0
assert offset >= 0
assert phase == PHASE_TRAINING_STR or phase == PHASE_PREDICTION_STR
assert starting_offset >= 0
assert (starting_offset + num_of_batches * batch_size) <= self.get_total_num_of_batches()
return SourcePieceDS(source_name, batch_size, phase, starting_offset, num_of_batches)

def generate_source_pieceDs_csv_file(self, csv_file_path : str, source_pieceDS_inst: SourcePieceDS):
# Todo Ohad&Noa
# df_train = pd.df.read_csv(self.csv_dataset_path, skiprows=starting_offset_index_train, nrows=number_of_samples_train)
pass


0 comments on commit 5ee34a6

Please sign in to comment.