From ac97d5692d5f93ad81c3e251b2887839e1e1925d Mon Sep 17 00:00:00 2001 From: NoaShapira8 Date: Wed, 3 Jul 2024 14:20:39 +0000 Subject: [PATCH 1/4] [fix_phase_called_twice] fix bug that ensure when activate an experiment - don't allow calling the same phase twice --- src_py/apiServer/apiServer.py | 10 +++++++--- src_py/apiServer/experiment_flow_debug.py | 5 ++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src_py/apiServer/apiServer.py b/src_py/apiServer/apiServer.py index e50d7126..5dfac542 100644 --- a/src_py/apiServer/apiServer.py +++ b/src_py/apiServer/apiServer.py @@ -28,6 +28,7 @@ def __init__(self): self.experiments_dict = {} self.current_exp = None self.apiserver_event_sync = EventSync() # pay attention! there are two kinds of syncs one for experiment phase events and one for api-server events + self.next_expertiment_phase_exist = True # flag to check if there are more phases to run # Create a new folder for the results: Path(EXPERIMENT_RESULTS_PATH).mkdir(parents=True, exist_ok=True) @@ -153,6 +154,7 @@ def send_data_to_sources(self, csv_dataset: CsvDataSet, experiment_phase: Experi LOG_INFO("Data is ready in sources") def run_current_experiment_phase(self): + assert self.next_expertiment_phase_exist, "experiment override is not supported!" # don't allow calling the same phase twice current_exp_phase = self.current_exp.get_current_experiment_phase() LOG_INFO(f"Experiment phase: {current_exp_phase.get_name()} of type {current_exp_phase.get_phase_type()} starts running...") csv_dataset_inst = self.current_exp.get_csv_dataset() @@ -179,7 +181,8 @@ def run_current_experiment_phase(self): self.communication_stats() LOG_INFO(f"Phase of {current_exp_phase.get_name()} {current_exp_phase.get_phase_type()} completed") - + + self.next_expertiment_phase_exist = False def next_experiment_phase(self): @@ -189,8 +192,9 @@ def next_experiment_phase(self): current_exp_flow.current_exp_phase_index += 1 if not self.experiment_phase_is_valid(): LOG_WARNING("No more phases to run") - return False - return True + self.next_expertiment_phase_exist = False + else: + self.next_expertiment_phase_exist = True def communication_stats(self): assert self.experiment_phase_is_valid(), "No valid experiment phase" diff --git a/src_py/apiServer/experiment_flow_debug.py b/src_py/apiServer/experiment_flow_debug.py index 7909cfe9..72ff1540 100644 --- a/src_py/apiServer/experiment_flow_debug.py +++ b/src_py/apiServer/experiment_flow_debug.py @@ -29,8 +29,7 @@ def print_test(in_str : str): experiment_name = "test_exp" api_server_instance.initialization(experiment_name, dc_json , connmap_json, exp_flow_json) # start to debug api_server_instance.send_jsons_to_devices() - -next_expertiment_phase_exist = True + api_server_instance.run_current_experiment_phase() # blocking - deppended acks from mainserver stats = api_server_instance.get_experiment_flow(experiment_name).generate_stats() stats.get_communication_stats_workers() @@ -40,7 +39,7 @@ def print_test(in_str : str): stats.get_communication_stats_main_server() stats.get_loss_ts() stats.get_min_loss() -next_expertiment_phase_exist = api_server_instance.next_experiment_phase() +api_server_instance.next_experiment_phase() api_server_instance.run_current_experiment_phase() stats = api_server_instance.get_experiment_flow(experiment_name).generate_stats() confusion_matrix_source_dict, confusion_matrix_worker_dict = stats.get_confusion_matrices() From 72d42365e1c80cc777e72330685fc87f769444cc Mon Sep 17 00:00:00 2001 From: NoaShapira8 Date: Wed, 3 Jul 2024 15:39:28 +0000 Subject: [PATCH 2/4] [fix_phase_called_twice] Adopting the changes in flow_test file --- src_py/apiServer/experiment_flow_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src_py/apiServer/experiment_flow_test.py b/src_py/apiServer/experiment_flow_test.py index 20497551..5e9b4bef 100644 --- a/src_py/apiServer/experiment_flow_test.py +++ b/src_py/apiServer/experiment_flow_test.py @@ -54,8 +54,8 @@ def print_test(in_str : str , enable = True): api_server_instance.run_current_experiment_phase() # blocking until phase is completed stats_train = api_server_instance.get_experiment_flow(experiment_name).generate_stats() -next_expertiment_phase_exist = api_server_instance.next_experiment_phase() -assert next_expertiment_phase_exist, "No next experiment phase found" +api_server_instance.next_experiment_phase() +assert api_server_instance.next_expertiment_phase_exist, "No next experiment phase found" api_server_instance.run_current_experiment_phase() # blocking until phase is completed stats_predict = api_server_instance.get_experiment_flow(experiment_name).generate_stats() From 6d92799038748536531b955c5987ea4e0b72c192 Mon Sep 17 00:00:00 2001 From: NoaShapira8 Date: Sun, 14 Jul 2024 06:31:21 +0000 Subject: [PATCH 3/4] [fix_phase_called_twice] change to log warning that the program wouldn't fail --- src_py/apiServer/apiServer.py | 48 ++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/src_py/apiServer/apiServer.py b/src_py/apiServer/apiServer.py index 5dfac542..a84658ff 100644 --- a/src_py/apiServer/apiServer.py +++ b/src_py/apiServer/apiServer.py @@ -154,35 +154,37 @@ def send_data_to_sources(self, csv_dataset: CsvDataSet, experiment_phase: Experi LOG_INFO("Data is ready in sources") def run_current_experiment_phase(self): - assert self.next_expertiment_phase_exist, "experiment override is not supported!" # don't allow calling the same phase twice - current_exp_phase = self.current_exp.get_current_experiment_phase() - LOG_INFO(f"Experiment phase: {current_exp_phase.get_name()} of type {current_exp_phase.get_phase_type()} starts running...") - csv_dataset_inst = self.current_exp.get_csv_dataset() - events_sync_inst = self.current_exp.get_events_sync() - - send_jsons_event = self.apiserver_event_sync.get_event_status(EventSync.SEND_JSONS) - assert send_jsons_event == EventSync.DONE, "Jsons not sent to devices yet" + if not self.next_expertiment_phase_exist: # don't allow calling the same phase twice + LOG_WARNING("experiment override is not supported!") + else: + current_exp_phase = self.current_exp.get_current_experiment_phase() + LOG_INFO(f"Experiment phase: {current_exp_phase.get_name()} of type {current_exp_phase.get_phase_type()} starts running...") + csv_dataset_inst = self.current_exp.get_csv_dataset() + events_sync_inst = self.current_exp.get_events_sync() + + send_jsons_event = self.apiserver_event_sync.get_event_status(EventSync.SEND_JSONS) + assert send_jsons_event == EventSync.DONE, "Jsons not sent to devices yet" - self.send_data_to_sources(csv_dataset_inst, current_exp_phase, events_sync_inst) + self.send_data_to_sources(csv_dataset_inst, current_exp_phase, events_sync_inst) - events_sync_inst.set_event_wait(EventSync.UPDATE_PHASE) - self.transmitter.clients_set_phase(current_exp_phase.get_phase_type()) - events_sync_inst.sync_on_event(EventSync.UPDATE_PHASE) + events_sync_inst.set_event_wait(EventSync.UPDATE_PHASE) + self.transmitter.clients_set_phase(current_exp_phase.get_phase_type()) + events_sync_inst.sync_on_event(EventSync.UPDATE_PHASE) - events_sync_inst.set_event_wait(EventSync.START_CASTING) - self.transmitter.start_casting(current_exp_phase) # Source start sending data to workers - events_sync_inst.sync_on_event(EventSync.START_CASTING) + events_sync_inst.set_event_wait(EventSync.START_CASTING) + self.transmitter.start_casting(current_exp_phase) # Source start sending data to workers + events_sync_inst.sync_on_event(EventSync.START_CASTING) - LOG_INFO(f"Processing experiment phase data") - current_exp_phase.process_experiment_phase_data() - LOG_INFO(f"Processing experiment phase data completed") + LOG_INFO(f"Processing experiment phase data") + current_exp_phase.process_experiment_phase_data() + LOG_INFO(f"Processing experiment phase data completed") - LOG_INFO(f"Start generating communication statistics for {current_exp_phase.get_name()} of type {current_exp_phase.get_phase_type()}") - self.communication_stats() + LOG_INFO(f"Start generating communication statistics for {current_exp_phase.get_name()} of type {current_exp_phase.get_phase_type()}") + self.communication_stats() - LOG_INFO(f"Phase of {current_exp_phase.get_name()} {current_exp_phase.get_phase_type()} completed") - - self.next_expertiment_phase_exist = False + LOG_INFO(f"Phase of {current_exp_phase.get_name()} {current_exp_phase.get_phase_type()} completed") + + self.next_expertiment_phase_exist = False def next_experiment_phase(self): From b61747723b33a59336d1a7b25b23e45440f025f8 Mon Sep 17 00:00:00 2001 From: NoaShapira8 Date: Sun, 14 Jul 2024 09:56:28 +0000 Subject: [PATCH 4/4] [fix_phase_called_twice] --- src_py/apiServer/apiServer.py | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/src_py/apiServer/apiServer.py b/src_py/apiServer/apiServer.py index 07dfe4f0..a84658ff 100644 --- a/src_py/apiServer/apiServer.py +++ b/src_py/apiServer/apiServer.py @@ -154,7 +154,6 @@ def send_data_to_sources(self, csv_dataset: CsvDataSet, experiment_phase: Experi LOG_INFO("Data is ready in sources") def run_current_experiment_phase(self): -<<<<<<< HEAD if not self.next_expertiment_phase_exist: # don't allow calling the same phase twice LOG_WARNING("experiment override is not supported!") else: @@ -165,18 +164,6 @@ def run_current_experiment_phase(self): send_jsons_event = self.apiserver_event_sync.get_event_status(EventSync.SEND_JSONS) assert send_jsons_event == EventSync.DONE, "Jsons not sent to devices yet" -======= - assert self.next_expertiment_phase_exist, "experiment override is not supported!" # don't allow calling the same phase twice - current_exp_phase = self.current_exp.get_current_experiment_phase() - LOG_INFO(f"Experiment phase: {current_exp_phase.get_name()} of type {current_exp_phase.get_phase_type()} starts running...") - csv_dataset_inst = self.current_exp.get_csv_dataset() - events_sync_inst = self.current_exp.get_events_sync() - - send_jsons_event = self.apiserver_event_sync.get_event_status(EventSync.SEND_JSONS) - assert send_jsons_event == EventSync.DONE, "Jsons not sent to devices yet" - - self.send_data_to_sources(csv_dataset_inst, current_exp_phase, events_sync_inst) ->>>>>>> d2b6830b26775cd2874548f6d8f5b9a4d87d093d self.send_data_to_sources(csv_dataset_inst, current_exp_phase, events_sync_inst) @@ -192,18 +179,12 @@ def run_current_experiment_phase(self): current_exp_phase.process_experiment_phase_data() LOG_INFO(f"Processing experiment phase data completed") -<<<<<<< HEAD LOG_INFO(f"Start generating communication statistics for {current_exp_phase.get_name()} of type {current_exp_phase.get_phase_type()}") self.communication_stats() LOG_INFO(f"Phase of {current_exp_phase.get_name()} {current_exp_phase.get_phase_type()} completed") self.next_expertiment_phase_exist = False -======= - LOG_INFO(f"Phase of {current_exp_phase.get_name()} {current_exp_phase.get_phase_type()} completed") - - self.next_expertiment_phase_exist = False ->>>>>>> d2b6830b26775cd2874548f6d8f5b9a4d87d093d def next_experiment_phase(self):