From af5ee1b288ee4084db29c49fb4b61ec4cf1081ac Mon Sep 17 00:00:00 2001 From: leondavi Date: Sat, 8 Jun 2024 11:37:15 +0300 Subject: [PATCH] [SOURCE] Epochs is set to 1 when phase is prediction --- .../NerlnetApp/src/MainServer/initHandler.erl | 4 ++-- .../NerlnetApp/src/MainServer/mainGenserver.erl | 5 +++-- .../NerlnetApp/src/Source/castingHandler.erl | 4 ++-- src_erl/NerlnetApp/src/Source/sourceStatem.erl | 17 ++++++++++++++--- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/src_erl/NerlnetApp/src/MainServer/initHandler.erl b/src_erl/NerlnetApp/src/MainServer/initHandler.erl index 46af9700..e1a44990 100644 --- a/src_erl/NerlnetApp/src/MainServer/initHandler.erl +++ b/src_erl/NerlnetApp/src/MainServer/initHandler.erl @@ -22,8 +22,8 @@ init(Req0, [Main_genServer_Pid]) -> %Bindings also can be accessed as once, giving a map of all bindings of Req0: {_,Body,_} = cowboy_req:read_body(Req0, #{length => ?DATA_LEN}), %read up to X MB (default was 8MB) Decoded_body = binary_to_list(Body), - [Index, TotalSources, SourceName, WorkersStr, NumOfBatches, NerlTensorType, Data] = string:split(Decoded_body, "#", all), - gen_server:cast(Main_genServer_Pid,{initCSV, Index, TotalSources, SourceName, WorkersStr, NumOfBatches, NerlTensorType, Data}), + [Index, TotalSources, SourceName, WorkersStr, Phase, NumOfBatches, NerlTensorType, Data] = string:split(Decoded_body, "#", all), + gen_server:cast(Main_genServer_Pid,{initCSV, Index, TotalSources, SourceName, WorkersStr, Phase, NumOfBatches, NerlTensorType, Data}), %[Source|WorkersAndInput] = re:split(binary_to_list(Body), "#", [{return, list}]), %{Workers,SourceData} = getWorkerInput(WorkersAndInput,[]), diff --git a/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl b/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl index 6c61df6e..07659667 100644 --- a/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl +++ b/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl @@ -82,13 +82,13 @@ init({MyName,ClientsNames,BatchSize,WorkersMap,NerlnetGraph , DeviceName}) -> {ok, #main_genserver_state{myName = MyNameStr , state=idle, total_sources=0, sources_data_ready_ctr = 0}}. -handle_cast({initCSV, _Index, TotalSources, SourceName, WorkersList, NumOfBatches, NerlTensorType, Data}, State = #main_genserver_state{state = idle, sourcesWaitingList = SourcesWaitingList, total_sources = TotalSourcesOld, sources_data_ready_ctr = SourcesDataReadyCtrOld}) -> +handle_cast({initCSV, _Index, TotalSources, SourceName, WorkersList, Phase, NumOfBatches, NerlTensorType, Data}, State = #main_genserver_state{state = idle, sourcesWaitingList = SourcesWaitingList, total_sources = TotalSourcesOld, sources_data_ready_ctr = SourcesDataReadyCtrOld}) -> {RouterHost,RouterPort} = ets:lookup_element(get(main_server_ets), my_router, ?DATA_IDX), ActionStr = atom_to_list(updateCSV), {TotalSourcesInt, _Rest} = string:to_integer(TotalSources), % MessageBody = WorkersList ++ "#" ++ NumOfBatches ++ "#" ++ NerlTensorType ++ "#" ++ Data, WorkersListSeperated = string:split(WorkersList, ",", all), - MessageBody = {WorkersListSeperated, NumOfBatches, NerlTensorType, Data}, + MessageBody = {WorkersListSeperated, Phase, NumOfBatches, NerlTensorType, Data}, nerl_tools:http_router_request(RouterHost,RouterPort, [SourceName], ActionStr, MessageBody), % update the source with its data UpdatedSourceWaitingList = SourcesWaitingList++[list_to_atom(SourceName)], {SourcesDataReadyCtr, NewTotalSources} = @@ -98,6 +98,7 @@ handle_cast({initCSV, _Index, TotalSources, SourceName, WorkersList, NumOfBatche end, {noreply, State#main_genserver_state{sourcesWaitingList = UpdatedSourceWaitingList, total_sources = NewTotalSources, sources_data_ready_ctr = SourcesDataReadyCtr}}; +% TODO Guy - I think this pattern is redundant - it is not relevant to the main server state handle_cast({initCSV, _Index, _TotalSources, _SourceName ,_SourceData}, State) -> ?LOG_ERROR("initCSV is only applicalble when main server is in idle state!"), {noreply, State#main_genserver_state{}}; diff --git a/src_erl/NerlnetApp/src/Source/castingHandler.erl b/src_erl/NerlnetApp/src/Source/castingHandler.erl index 97d62e31..20f83931 100644 --- a/src_erl/NerlnetApp/src/Source/castingHandler.erl +++ b/src_erl/NerlnetApp/src/Source/castingHandler.erl @@ -18,8 +18,8 @@ init(Req0, [Action,Source_StateM_Pid]) -> %% io:format("casting handler got Body:~p~n",[Body]), case Action of csv -> {_ , Body} = nerl_tools:read_all_data(Req0 , <<>>), - {WorkersList, NumOfBatches, NerlTensorType, Data} = binary_to_term(Body), - gen_statem:cast(Source_StateM_Pid, {batchList, WorkersList, list_to_integer(NumOfBatches), NerlTensorType , Data}); + {WorkersList, Phase, NumOfBatches, NerlTensorType, Data} = binary_to_term(Body), + gen_statem:cast(Source_StateM_Pid, {batchList, WorkersList, list_to_atom(Phase), list_to_integer(NumOfBatches), NerlTensorType , Data}); startCasting -> {_,Body,_} = cowboy_req:read_body(Req0), gen_statem:cast(Source_StateM_Pid, {startCasting,Body}); statistics -> gen_statem:cast(Source_StateM_Pid, {statistics}); diff --git a/src_erl/NerlnetApp/src/Source/sourceStatem.erl b/src_erl/NerlnetApp/src/Source/sourceStatem.erl index 4ceb3942..7739244f 100644 --- a/src_erl/NerlnetApp/src/Source/sourceStatem.erl +++ b/src_erl/NerlnetApp/src/Source/sourceStatem.erl @@ -25,6 +25,8 @@ -define(SENDING_FREQUENCY_OVERHEAD_FIX_FACTOR_PERC, 0.75). -define(MICRO_TO_MILLI_FACTOR, 0.001). +-define(PHASE_TRAINING_ATOM, training). +-define(PHASE_PREDICTION_ATOM, prediction). -record(source_statem_state, {ets_ref, batchesList = [], castingTo=[], myName, source_pid, transmitter_pid = none,csvName="", nerlTensorType}). @@ -77,6 +79,7 @@ init({MyName, WorkersMap, NerlnetGraph, Policy, BatchSize, Frequency , Epochs, T ets:insert(EtsRef, {workers_list, []}), ets:insert(EtsRef, {csv_name, ""}), % not in use ets:insert(EtsRef, {stats_ets, EtsStatsRef}), + ets:insert(EtsRef, {current_phase, none}), {MyRouterHost,MyRouterPort} = nerl_tools:getShortPath(MyName,?MAIN_SERVER_ATOM, NerlnetGraph), ets:insert(EtsRef, {my_router,{MyRouterHost,MyRouterPort}}), @@ -112,7 +115,7 @@ state_name(_EventType, _EventContent, State = #source_statem_state{}) -> %% This cast receive a list of samples to load to the records batchList -idle(cast, {batchList, WorkersList, NumOfBatches, NerlTensorType, Data}, State) -> +idle(cast, {batchList, WorkersList, Phase, NumOfBatches, NerlTensorType, Data}, State) -> EtsRef = get(source_ets), StatsEtsRef = get(source_stats_ets), MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX), @@ -120,6 +123,7 @@ idle(cast, {batchList, WorkersList, NumOfBatches, NerlTensorType, Data}, State) {NerlTensorBatchesList, SampleSize} = parser:parseCSV(MyName, BatchSize, NerlTensorType, Data), % TODO this is slow and heavy policy! pre parse in ETS a possible solution ets:update_element(EtsRef, workers_list, [{?DATA_IDX, WorkersList}]), ets:update_element(EtsRef, num_of_batches, [{?DATA_IDX, NumOfBatches}]), + ets:update_element(EtsRef, current_phase, [{?DATA_IDX, Phase}]), ets:insert(EtsRef, {nerlTensorType, NerlTensorType}), stats:increment_messages_received(StatsEtsRef), ?LOG_NOTICE("Source ~p, workers are: ~p", [MyName, WorkersList]), @@ -145,8 +149,8 @@ idle(cast, {startCasting,_Body}, State = #source_statem_state{batchesList = Batc BatchSize = ets:lookup_element(EtsRef, batch_size, ?DATA_IDX), SampleSize = ets:lookup_element(EtsRef, sample_size, ?DATA_IDX), Epochs = ets:lookup_element(EtsRef, epochs, ?DATA_IDX), - BatchSize = ets:lookup_element(EtsRef, batch_size, ?DATA_IDX), WorkersList = ets:lookup_element(EtsRef, workers_list, ?DATA_IDX), + Phase = ets:lookup_element(EtsRef, current_phase, ?DATA_IDX), %% UserLimitNumberOfBatchesToSendInt = list_to_integer(UserLimitNumberOfBatchesToSend), %% BatchesToSend = min(length(BatchesList), UserLimitNumberOfBatchesToSendInt), @@ -154,7 +158,7 @@ idle(cast, {startCasting,_Body}, State = #source_statem_state{batchesList = Batc BatchesListFinal = lists:sublist(BatchesList, BatchesToSend), % Batches list with respect of constraint UserLimitNumberOfBatchesToSendInt % TODO consider add offset value from API - ?LOG_NOTICE("~p - starts casting to workers: ~p",[MyName, WorkersList]), + ?LOG_NOTICE("~p - starts casting of phase ~p to workers: ~p",[MyName, Phase, WorkersList]), ?LOG_NOTICE("Frequency: ~pHz [Batches/Second]",[Frequency]), ?LOG_NOTICE("Batch size: ~p", [BatchSize]), ?LOG_NOTICE("Sample size = ~p",[SampleSize]), @@ -283,6 +287,13 @@ spawnTransmitter(SourceEtsRef, WorkersListOfNames, BatchesListToSend)-> TimeInterval_ms = ets:lookup_element(SourceEtsRef, time_interval_ms, ?DATA_IDX), % frequency to time interval duration in milliseconds between each send ClientWorkerPairs = nerl_tools:get_client_worker_pairs(WorkersListOfNames,WorkersMap,[]), Epochs = ets:lookup_element(SourceEtsRef, epochs, ?DATA_IDX), + Phase = ets:lookup_element(SourceEtsRef, current_phase, ?DATA_IDX), + MyName = ets:lookup_element(SourceEtsRef, my_name, ?DATA_IDX), + case Phase of + ?PHASE_TRAINING_ATOM -> pass; + ?PHASE_PREDICTION_ATOM -> Epochs = 1; % In prediction phase, we send only a single epoch always! + _ -> ?LOG_ERROR("Source ~p has an unknown phase: ~p",[MyName, Phase]) + end, SourcePid = self(), TimeIntervalWithOverheadFactor = TimeInterval_ms * ?SENDING_FREQUENCY_OVERHEAD_FIX_FACTOR_PERC, spawn_link(?MODULE,transmitter,[TimeIntervalWithOverheadFactor,SourceEtsRef, SourcePid ,Epochs, ClientWorkerPairs, BatchesListToSend, Method]).