From 23a87ed71ff6882f5a1217f2e3ca69d59e1ad784 Mon Sep 17 00:00:00 2001 From: David Date: Sat, 25 May 2024 01:05:40 +0300 Subject: [PATCH] [STATS] Send actual frequency to api server (#345) --- src_erl/NerlnetApp/src/Source/sourceStatem.erl | 3 +++ src_erl/NerlnetApp/src/Stats/stats.erl | 11 ++++++++--- src_py/apiServer/NerlComDB.py | 5 ++++- src_py/apiServer/experiment_flow_test.py | 5 +++++ src_py/apiServer/stats.py | 8 ++++++++ 5 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src_erl/NerlnetApp/src/Source/sourceStatem.erl b/src_erl/NerlnetApp/src/Source/sourceStatem.erl index 99b492a0..d8972bdb 100644 --- a/src_erl/NerlnetApp/src/Source/sourceStatem.erl +++ b/src_erl/NerlnetApp/src/Source/sourceStatem.erl @@ -76,6 +76,7 @@ init({MyName, WorkersMap, NerlnetGraph, Policy, BatchSize, Frequency , Epochs, T ets:insert(EtsRef, {sample_size, none}), ets:insert(EtsRef, {workers_list, []}), ets:insert(EtsRef, {csv_name, ""}), % not in use + ets:insert(EtsRef, {stats_ets, EtsStatsRef}), {MyRouterHost,MyRouterPort} = nerl_tools:getShortPath(MyName,?MAIN_SERVER_ATOM, NerlnetGraph), ets:insert(EtsRef, {my_router,{MyRouterHost,MyRouterPort}}), @@ -402,4 +403,6 @@ transmitter(TimeInterval_ms, SourceEtsRef, SourcePid ,ClientWorkerPairs, Batches gen_statem:cast(SourcePid,{finishedCasting,BatchesSent}), ActualFrequency = 1/(TransmissionTimeTook_sec/BatchesSent), ?LOG_INFO("Source ~p Actual Frequency: ~p [B/Sec]",[MyName, ActualFrequency]), + StatsEtsRef = ets:lookup_element(SourceEtsRef, stats_ets, ?DATA_IDX), + stats:set_value(StatsEtsRef, actual_frequency, ActualFrequency), ets:delete(TransmitterEts). \ No newline at end of file diff --git a/src_erl/NerlnetApp/src/Stats/stats.erl b/src_erl/NerlnetApp/src/Stats/stats.erl index 081ca4bd..0b15ae83 100644 --- a/src_erl/NerlnetApp/src/Stats/stats.erl +++ b/src_erl/NerlnetApp/src/Stats/stats.erl @@ -10,6 +10,7 @@ -export([get_bytes_sent/1, increment_bytes_sent/2]). -export([get_bad_messages/1, increment_bad_messages/1]). -export([get_value/2, increment_by_value/3]). +-export([set_value/3]). -export([encode_ets_to_http_bin_str/1 , decode_http_bin_str_to_ets/1 , encode_workers_ets_to_http_bin_str/1]). -export([update_workers_ets/4, increment_workers_ets/4 , generate_workers_stats_ets/0]). @@ -63,8 +64,8 @@ decode_http_bin_str_to_ets(EncodedStr) -> ReturnedEts. -generate_stats_ets() -> %% clients , routers , mainserver... - StatsEts = ets:new(stats_ets , [set]), +generate_stats_ets() -> %% sources, clients , routers , mainserver... + StatsEts = ets:new(stats_ets , [set, public]), ets:insert(StatsEts, {messages_received , 0}), ets:insert(StatsEts, {messages_sent , 0}), ets:insert(StatsEts, {messages_dropped , 0}), @@ -73,7 +74,8 @@ generate_stats_ets() -> %% clients , routers , mainserver... ets:insert(StatsEts, {bad_messages , 0}), ets:insert(StatsEts, {batches_received , 0}), % related with client only ets:insert(StatsEts, {batches_dropped , 0}), % related with client only - ets:insert(StatsEts, {batches_sent , 0}), % related with source + ets:insert(StatsEts, {batches_sent , 0}), % related with source only + ets:insert(StatsEts, {actual_frequency, 0}), % related with source only StatsEts. generate_workers_stats_ets() -> %% workers.. @@ -107,6 +109,9 @@ increment_workers_ets(StatsEts, WorkerName, WorkerAttribute, Value) -> ets:update_counter(WorkerStatsEts, Key, Value). %% ---- Stats ETS Methods -----%% +set_value(StatsEts, Key, Value) -> + ets:update_element(StatsEts, Key, {?STATS_KEYVAL_VAL_IDX, Value}). + increment_by_value(StatsEts, Key, Value) -> ets:update_counter(StatsEts, Key, Value). diff --git a/src_py/apiServer/NerlComDB.py b/src_py/apiServer/NerlComDB.py index 33abd899..1989f574 100644 --- a/src_py/apiServer/NerlComDB.py +++ b/src_py/apiServer/NerlComDB.py @@ -203,6 +203,7 @@ class SourceComDB(EntityComDB): def __init__(self): super().__init__() self.batches_sent = 0 + self.actual_frequency = 0 def __add__(self, other): self.batches_sent += other.batches_sent @@ -217,6 +218,7 @@ def update_stats(self, input_dict): self.bytes_sent = input_dict["bytes_sent"] self.bad_messages = input_dict["bad_messages"] self.batches_sent = input_dict["batches_sent"] + self.actual_frequency = input_dict["actual_frequency"] def get_as_dict(self): return { @@ -226,7 +228,8 @@ def get_as_dict(self): "bytes_received": self.bytes_received, "bytes_sent": self.bytes_sent, "bad_messages": self.bad_messages, - "batches_sent": self.batches_sent + "batches_sent": self.batches_sent, + "actual_frequency": self.actual_frequency } class NerlComDB(): diff --git a/src_py/apiServer/experiment_flow_test.py b/src_py/apiServer/experiment_flow_test.py index 2f22ac6d..20497551 100644 --- a/src_py/apiServer/experiment_flow_test.py +++ b/src_py/apiServer/experiment_flow_test.py @@ -85,6 +85,9 @@ def print_test(in_str : str , enable = True): clients: {stats_train.get_communication_stats_clients()}\ routers: {stats_train.get_communication_stats_routers()}" +LOG_INFO("Actual Frequencies:") +print(f"{stats_train.get_actual_frequencies_of_sources()}") + LOG_INFO("Missed Batches training:") #LOG_INFO(stats_train.get_missed_batches()) @@ -95,6 +98,8 @@ def print_test(in_str : str , enable = True): clients: {stats_predict.get_communication_stats_clients()}\ routers: {stats_predict.get_communication_stats_routers()}" +LOG_INFO("Actual Frequencies:") +print(f"{stats_predict.get_actual_frequencies_of_sources()}") missed_batches = stats_predict.get_missed_batches() if missed_batches: diff --git a/src_py/apiServer/stats.py b/src_py/apiServer/stats.py index 24180a88..e7811b6c 100644 --- a/src_py/apiServer/stats.py +++ b/src_py/apiServer/stats.py @@ -311,6 +311,14 @@ def get_communication_stats_main_server(self): main_server_communication_stats = self.nerl_comm_db.get_main_server().get_as_dict() return main_server_communication_stats + def get_actual_frequencies_of_sources(self): + # return dictionary of {source : {actual_freq}} + actual_frequencies_of_sources_dict = OrderedDict() + sources_dict = self.nerl_comm_db.get_sources() + for source_name, source_db_dict in sources_dict.items(): + actual_frequencies_of_sources_dict[source_name] = source_db_dict.get_as_dict()["actual_frequency"] + return actual_frequencies_of_sources_dict + def get_model_performence_stats(self , confusion_matrix_worker_dict , show : bool = False , saveToFile : bool = False, printStats = False) -> dict: """ Returns a dictionary of {(worker, class): {Performence_Stat : VALUE}}} for each worker and class in the experiment.