From 8ceae299ff488a4506fb6e82e447bf9638d1b4c4 Mon Sep 17 00:00:00 2001 From: NoaShapira8 Date: Sat, 27 Jul 2024 17:58:23 +0000 Subject: [PATCH 1/3] [fix_missed_batches_RR] add support to RR and random on generate missed batches --- src_py/apiServer/networkComponents.py | 2 ++ src_py/apiServer/stats.py | 50 +++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/src_py/apiServer/networkComponents.py b/src_py/apiServer/networkComponents.py index 76377a5f..d0387794 100644 --- a/src_py/apiServer/networkComponents.py +++ b/src_py/apiServer/networkComponents.py @@ -39,6 +39,7 @@ def __init__(self, dc_json: dict): self.sourcesPolicies = [] self.sourceEpochs = {} self.routers = [] + self.sources_policy_dict = {} # Initializing maps @@ -87,6 +88,7 @@ def __init__(self, dc_json: dict): self.sourcesPolicies.append(source[GetFields.get_policy_field_name()]) self.sourceEpochs[source[GetFields.get_name_field_name()]] = source[GetFields.get_epochs_field_name()] self.map_name_to_type[source[GetFields.get_name_field_name()]] = TYPE_SOURCE + self.sources_policy_dict[source[GetFields.get_name_field_name()]] = source[GetFields.get_policy_field_name()] # Getting the names of all the routers: routersJsons = self.jsonData[GetFields.get_routers_field_name()] diff --git a/src_py/apiServer/stats.py b/src_py/apiServer/stats.py index fc65130e..b48b7e62 100644 --- a/src_py/apiServer/stats.py +++ b/src_py/apiServer/stats.py @@ -296,6 +296,56 @@ def missed_batches_key(phase_name, source_name, worker_name): #print(f"missed_batches_dict: {missed_batches_dict}") return missed_batches_dict + def get_missed_batches_new(self): + """ + Returns a list of missed batches in the experiment phase. + {(source_name, worker_name): [batch_id,...]} + """ + def missed_batches_key(phase_name, source_name, worker_name): + return f"phase:{phase_name},{source_name}->{worker_name}" + + phase_name = self.experiment_phase.get_name() + missed_batches_dict = {} + sources_pieces_list = self.experiment_phase.get_sources_pieces() + workers_model_db_list = self.nerl_model_db.get_workers_model_db_list() + for source_piece_inst in sources_pieces_list: + source_name = source_piece_inst.get_source_name() + source_policy = globe.components.sources_policy_dict[source_name] # 0 -> casting , 1 -> round robin, 2 -> random + + if source_policy == '0': # casting policy + for worker_db in workers_model_db_list: + worker_name = worker_db.get_worker_name() + total_batches_per_source = worker_db.get_total_batches_per_source(source_name) + for batch_id in range(total_batches_per_source): + batch_db = worker_db.get_batch(source_name, str(batch_id)) + if not batch_db: # if batch is missing + missed_batch_key_str = missed_batches_key(phase_name, source_name, worker_name) + if missed_batch_key_str not in missed_batches_dict: + missed_batches_dict[missed_batch_key_str] = [] + missed_batches_dict[missed_batch_key_str].append(batch_id) + elif source_policy == '1': # round robin policy + target_workers_string = source_piece_inst.get_target_workers() + target_workers_names = target_workers_string.split(',') + number_of_workers = len(target_workers_names) + batches_indexes = [i for i in range(source_piece_inst.get_num_of_batches())] + batch_worker_tuple = [(batch_index, target_workers_names[batch_index % number_of_workers]) for batch_index in batches_indexes] # (batch_index, worker_name_that_should_recive_the_batch) + worker_batches_dict = {worker_name: [] for worker_name in target_workers_names} # Create a dictionary to hold batches id for each worker + for batch_index, worker_name in batch_worker_tuple: + worker_batches_dict[worker_name].append(batch_index) + for worker_db in workers_model_db_list: + worker_name = worker_db.get_worker_name() + for batch_id in worker_batches_dict[worker_name]: + batch_db = worker_db.get_batch(source_name, str(batch_id)) + if not batch_db: + missed_batch_key_str = missed_batches_key(phase_name, source_name, worker_name) + if missed_batch_key_str not in missed_batches_dict: + missed_batches_dict[missed_batch_key_str] = [] + missed_batches_dict[missed_batch_key_str].append(batch_id) + elif source_policy == '2': # random policy + LOG_INFO(f"Source {source_name} policy is random, it's not posiblle check for missed batches") + break + return missed_batches_dict + def get_communication_stats_workers(self): # return dictionary of {worker : {communication_stats}} communication_stats_workers_dict = OrderedDict() From ef580de9519f51ade4dff211b4a347965a3660d7 Mon Sep 17 00:00:00 2001 From: NoaShapira8 Date: Sat, 27 Jul 2024 21:31:16 +0000 Subject: [PATCH 2/3] [fix_missed_batches_RR] fix bugs --- src_py/apiServer/stats.py | 66 ++++++++++++--------------------------- 1 file changed, 20 insertions(+), 46 deletions(-) diff --git a/src_py/apiServer/stats.py b/src_py/apiServer/stats.py index b48b7e62..47a5ed7d 100644 --- a/src_py/apiServer/stats.py +++ b/src_py/apiServer/stats.py @@ -275,57 +275,30 @@ def get_missed_batches(self): """ def missed_batches_key(phase_name, source_name, worker_name): return f"phase:{phase_name},{source_name}->{worker_name}" - - if self.phase == PHASE_PREDICTION_STR: - phase_name = self.experiment_phase.get_name() - missed_batches_dict = {} - sources_pieces_list = self.experiment_phase.get_sources_pieces() - workers_model_db_list = self.nerl_model_db.get_workers_model_db_list() - for source_piece_inst in sources_pieces_list: - source_name = source_piece_inst.get_source_name() - for worker_db in workers_model_db_list: - worker_name = worker_db.get_worker_name() - total_batches_per_source = worker_db.get_total_batches_per_source(source_name) - for batch_id in range(total_batches_per_source): - batch_db = worker_db.get_batch(source_name, str(batch_id)) - if not batch_db: # if batch is missing - missed_batch_key_str = missed_batches_key(phase_name, source_name, worker_name) - if missed_batch_key_str not in missed_batches_dict: - missed_batches_dict[missed_batch_key_str] = [] - missed_batches_dict[missed_batch_key_str].append(batch_id) - #print(f"missed_batches_dict: {missed_batches_dict}") - return missed_batches_dict - def get_missed_batches_new(self): - """ - Returns a list of missed batches in the experiment phase. - {(source_name, worker_name): [batch_id,...]} - """ - def missed_batches_key(phase_name, source_name, worker_name): - return f"phase:{phase_name},{source_name}->{worker_name}" - phase_name = self.experiment_phase.get_name() missed_batches_dict = {} sources_pieces_list = self.experiment_phase.get_sources_pieces() workers_model_db_list = self.nerl_model_db.get_workers_model_db_list() + print(f"worker name: {workers_model_db_list[0].get_worker_name()}, batch id: {workers_model_db_list[0].get_batches_dict().keys()}") + print(len(workers_model_db_list[0].get_batches_dict().keys())) for source_piece_inst in sources_pieces_list: source_name = source_piece_inst.get_source_name() source_policy = globe.components.sources_policy_dict[source_name] # 0 -> casting , 1 -> round robin, 2 -> random - + target_workers_string = source_piece_inst.get_target_workers() + target_workers_names = target_workers_string.split(',') if source_policy == '0': # casting policy for worker_db in workers_model_db_list: worker_name = worker_db.get_worker_name() - total_batches_per_source = worker_db.get_total_batches_per_source(source_name) - for batch_id in range(total_batches_per_source): - batch_db = worker_db.get_batch(source_name, str(batch_id)) - if not batch_db: # if batch is missing - missed_batch_key_str = missed_batches_key(phase_name, source_name, worker_name) - if missed_batch_key_str not in missed_batches_dict: - missed_batches_dict[missed_batch_key_str] = [] - missed_batches_dict[missed_batch_key_str].append(batch_id) + if worker_name in target_workers_names: # Check if the worker is in the target workers list of this source + for batch_id in range(source_piece_inst.get_num_of_batches()): + batch_db = worker_db.get_batch(source_name, str(batch_id)) + if not batch_db: # if batch is missing + missed_batch_key_str = missed_batches_key(phase_name, source_name, worker_name) + if missed_batch_key_str not in missed_batches_dict: + missed_batches_dict[missed_batch_key_str] = [] + missed_batches_dict[missed_batch_key_str].append(batch_id) elif source_policy == '1': # round robin policy - target_workers_string = source_piece_inst.get_target_workers() - target_workers_names = target_workers_string.split(',') number_of_workers = len(target_workers_names) batches_indexes = [i for i in range(source_piece_inst.get_num_of_batches())] batch_worker_tuple = [(batch_index, target_workers_names[batch_index % number_of_workers]) for batch_index in batches_indexes] # (batch_index, worker_name_that_should_recive_the_batch) @@ -334,13 +307,14 @@ def missed_batches_key(phase_name, source_name, worker_name): worker_batches_dict[worker_name].append(batch_index) for worker_db in workers_model_db_list: worker_name = worker_db.get_worker_name() - for batch_id in worker_batches_dict[worker_name]: - batch_db = worker_db.get_batch(source_name, str(batch_id)) - if not batch_db: - missed_batch_key_str = missed_batches_key(phase_name, source_name, worker_name) - if missed_batch_key_str not in missed_batches_dict: - missed_batches_dict[missed_batch_key_str] = [] - missed_batches_dict[missed_batch_key_str].append(batch_id) + if worker_name in target_workers_names: # Check if the worker is in the target workers list of this source + for batch_id in worker_batches_dict[worker_name]: + batch_db = worker_db.get_batch(source_name, str(batch_id)) + if not batch_db: + missed_batch_key_str = missed_batches_key(phase_name, source_name, worker_name) + if missed_batch_key_str not in missed_batches_dict: + missed_batches_dict[missed_batch_key_str] = [] + missed_batches_dict[missed_batch_key_str].append(batch_id) elif source_policy == '2': # random policy LOG_INFO(f"Source {source_name} policy is random, it's not posiblle check for missed batches") break From c6b4210241e7538fe725117e786710ca007a6d97 Mon Sep 17 00:00:00 2001 From: NoaShapira8 Date: Sat, 27 Jul 2024 21:37:45 +0000 Subject: [PATCH 3/3] remove prints --- src_py/apiServer/stats.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src_py/apiServer/stats.py b/src_py/apiServer/stats.py index 47a5ed7d..7c5252ce 100644 --- a/src_py/apiServer/stats.py +++ b/src_py/apiServer/stats.py @@ -280,8 +280,6 @@ def missed_batches_key(phase_name, source_name, worker_name): missed_batches_dict = {} sources_pieces_list = self.experiment_phase.get_sources_pieces() workers_model_db_list = self.nerl_model_db.get_workers_model_db_list() - print(f"worker name: {workers_model_db_list[0].get_worker_name()}, batch id: {workers_model_db_list[0].get_batches_dict().keys()}") - print(len(workers_model_db_list[0].get_batches_dict().keys())) for source_piece_inst in sources_pieces_list: source_name = source_piece_inst.get_source_name() source_policy = globe.components.sources_policy_dict[source_name] # 0 -> casting , 1 -> round robin, 2 -> random