Skip to content

Commit

Permalink
[SOURCE] Epochs is set to 1 when phase is prediction
Browse files Browse the repository at this point in the history
  • Loading branch information
leondavi committed Jun 8, 2024
1 parent 8e69b4c commit af5ee1b
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 9 deletions.
4 changes: 2 additions & 2 deletions src_erl/NerlnetApp/src/MainServer/initHandler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ init(Req0, [Main_genServer_Pid]) ->
%Bindings also can be accessed as once, giving a map of all bindings of Req0:
{_,Body,_} = cowboy_req:read_body(Req0, #{length => ?DATA_LEN}), %read up to X MB (default was 8MB)
Decoded_body = binary_to_list(Body),
[Index, TotalSources, SourceName, WorkersStr, NumOfBatches, NerlTensorType, Data] = string:split(Decoded_body, "#", all),
gen_server:cast(Main_genServer_Pid,{initCSV, Index, TotalSources, SourceName, WorkersStr, NumOfBatches, NerlTensorType, Data}),
[Index, TotalSources, SourceName, WorkersStr, Phase, NumOfBatches, NerlTensorType, Data] = string:split(Decoded_body, "#", all),
gen_server:cast(Main_genServer_Pid,{initCSV, Index, TotalSources, SourceName, WorkersStr, Phase, NumOfBatches, NerlTensorType, Data}),
%[Source|WorkersAndInput] = re:split(binary_to_list(Body), "#", [{return, list}]),
%{Workers,SourceData} = getWorkerInput(WorkersAndInput,[]),

Expand Down
5 changes: 3 additions & 2 deletions src_erl/NerlnetApp/src/MainServer/mainGenserver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ init({MyName,ClientsNames,BatchSize,WorkersMap,NerlnetGraph , DeviceName}) ->
{ok, #main_genserver_state{myName = MyNameStr , state=idle, total_sources=0, sources_data_ready_ctr = 0}}.


handle_cast({initCSV, _Index, TotalSources, SourceName, WorkersList, NumOfBatches, NerlTensorType, Data}, State = #main_genserver_state{state = idle, sourcesWaitingList = SourcesWaitingList, total_sources = TotalSourcesOld, sources_data_ready_ctr = SourcesDataReadyCtrOld}) ->
handle_cast({initCSV, _Index, TotalSources, SourceName, WorkersList, Phase, NumOfBatches, NerlTensorType, Data}, State = #main_genserver_state{state = idle, sourcesWaitingList = SourcesWaitingList, total_sources = TotalSourcesOld, sources_data_ready_ctr = SourcesDataReadyCtrOld}) ->
{RouterHost,RouterPort} = ets:lookup_element(get(main_server_ets), my_router, ?DATA_IDX),
ActionStr = atom_to_list(updateCSV),
{TotalSourcesInt, _Rest} = string:to_integer(TotalSources),
% MessageBody = WorkersList ++ "#" ++ NumOfBatches ++ "#" ++ NerlTensorType ++ "#" ++ Data,
WorkersListSeperated = string:split(WorkersList, ",", all),
MessageBody = {WorkersListSeperated, NumOfBatches, NerlTensorType, Data},
MessageBody = {WorkersListSeperated, Phase, NumOfBatches, NerlTensorType, Data},
nerl_tools:http_router_request(RouterHost,RouterPort, [SourceName], ActionStr, MessageBody), % update the source with its data
UpdatedSourceWaitingList = SourcesWaitingList++[list_to_atom(SourceName)],
{SourcesDataReadyCtr, NewTotalSources} =
Expand All @@ -98,6 +98,7 @@ handle_cast({initCSV, _Index, TotalSources, SourceName, WorkersList, NumOfBatche
end,
{noreply, State#main_genserver_state{sourcesWaitingList = UpdatedSourceWaitingList, total_sources = NewTotalSources, sources_data_ready_ctr = SourcesDataReadyCtr}};

% TODO Guy - I think this pattern is redundant - it is not relevant to the main server state
handle_cast({initCSV, _Index, _TotalSources, _SourceName ,_SourceData}, State) ->
?LOG_ERROR("initCSV is only applicalble when main server is in idle state!"),
{noreply, State#main_genserver_state{}};
Expand Down
4 changes: 2 additions & 2 deletions src_erl/NerlnetApp/src/Source/castingHandler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ init(Req0, [Action,Source_StateM_Pid]) ->
%% io:format("casting handler got Body:~p~n",[Body]),
case Action of
csv -> {_ , Body} = nerl_tools:read_all_data(Req0 , <<>>),
{WorkersList, NumOfBatches, NerlTensorType, Data} = binary_to_term(Body),
gen_statem:cast(Source_StateM_Pid, {batchList, WorkersList, list_to_integer(NumOfBatches), NerlTensorType , Data});
{WorkersList, Phase, NumOfBatches, NerlTensorType, Data} = binary_to_term(Body),
gen_statem:cast(Source_StateM_Pid, {batchList, WorkersList, list_to_atom(Phase), list_to_integer(NumOfBatches), NerlTensorType , Data});
startCasting -> {_,Body,_} = cowboy_req:read_body(Req0),
gen_statem:cast(Source_StateM_Pid, {startCasting,Body});
statistics -> gen_statem:cast(Source_StateM_Pid, {statistics});
Expand Down
17 changes: 14 additions & 3 deletions src_erl/NerlnetApp/src/Source/sourceStatem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
-define(SENDING_FREQUENCY_OVERHEAD_FIX_FACTOR_PERC, 0.75).
-define(MICRO_TO_MILLI_FACTOR, 0.001).

-define(PHASE_TRAINING_ATOM, training).
-define(PHASE_PREDICTION_ATOM, prediction).

-record(source_statem_state, {ets_ref, batchesList = [], castingTo=[], myName, source_pid, transmitter_pid = none,csvName="", nerlTensorType}).

Expand Down Expand Up @@ -77,6 +79,7 @@ init({MyName, WorkersMap, NerlnetGraph, Policy, BatchSize, Frequency , Epochs, T
ets:insert(EtsRef, {workers_list, []}),
ets:insert(EtsRef, {csv_name, ""}), % not in use
ets:insert(EtsRef, {stats_ets, EtsStatsRef}),
ets:insert(EtsRef, {current_phase, none}),

{MyRouterHost,MyRouterPort} = nerl_tools:getShortPath(MyName,?MAIN_SERVER_ATOM, NerlnetGraph),
ets:insert(EtsRef, {my_router,{MyRouterHost,MyRouterPort}}),
Expand Down Expand Up @@ -112,14 +115,15 @@ state_name(_EventType, _EventContent, State = #source_statem_state{}) ->


%% This cast receive a list of samples to load to the records batchList
idle(cast, {batchList, WorkersList, NumOfBatches, NerlTensorType, Data}, State) ->
idle(cast, {batchList, WorkersList, Phase, NumOfBatches, NerlTensorType, Data}, State) ->
EtsRef = get(source_ets),
StatsEtsRef = get(source_stats_ets),
MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX),
BatchSize = ets:lookup_element(EtsRef, batch_size, ?DATA_IDX),
{NerlTensorBatchesList, SampleSize} = parser:parseCSV(MyName, BatchSize, NerlTensorType, Data), % TODO this is slow and heavy policy! pre parse in ETS a possible solution
ets:update_element(EtsRef, workers_list, [{?DATA_IDX, WorkersList}]),
ets:update_element(EtsRef, num_of_batches, [{?DATA_IDX, NumOfBatches}]),
ets:update_element(EtsRef, current_phase, [{?DATA_IDX, Phase}]),
ets:insert(EtsRef, {nerlTensorType, NerlTensorType}),
stats:increment_messages_received(StatsEtsRef),
?LOG_NOTICE("Source ~p, workers are: ~p", [MyName, WorkersList]),
Expand All @@ -145,16 +149,16 @@ idle(cast, {startCasting,_Body}, State = #source_statem_state{batchesList = Batc
BatchSize = ets:lookup_element(EtsRef, batch_size, ?DATA_IDX),
SampleSize = ets:lookup_element(EtsRef, sample_size, ?DATA_IDX),
Epochs = ets:lookup_element(EtsRef, epochs, ?DATA_IDX),
BatchSize = ets:lookup_element(EtsRef, batch_size, ?DATA_IDX),
WorkersList = ets:lookup_element(EtsRef, workers_list, ?DATA_IDX),
Phase = ets:lookup_element(EtsRef, current_phase, ?DATA_IDX),

%% UserLimitNumberOfBatchesToSendInt = list_to_integer(UserLimitNumberOfBatchesToSend),
%% BatchesToSend = min(length(BatchesList), UserLimitNumberOfBatchesToSendInt),
BatchesToSend = length(BatchesList),
BatchesListFinal = lists:sublist(BatchesList, BatchesToSend), % Batches list with respect of constraint UserLimitNumberOfBatchesToSendInt

% TODO consider add offset value from API
?LOG_NOTICE("~p - starts casting to workers: ~p",[MyName, WorkersList]),
?LOG_NOTICE("~p - starts casting of phase ~p to workers: ~p",[MyName, Phase, WorkersList]),
?LOG_NOTICE("Frequency: ~pHz [Batches/Second]",[Frequency]),
?LOG_NOTICE("Batch size: ~p", [BatchSize]),
?LOG_NOTICE("Sample size = ~p",[SampleSize]),
Expand Down Expand Up @@ -283,6 +287,13 @@ spawnTransmitter(SourceEtsRef, WorkersListOfNames, BatchesListToSend)->
TimeInterval_ms = ets:lookup_element(SourceEtsRef, time_interval_ms, ?DATA_IDX), % frequency to time interval duration in milliseconds between each send
ClientWorkerPairs = nerl_tools:get_client_worker_pairs(WorkersListOfNames,WorkersMap,[]),
Epochs = ets:lookup_element(SourceEtsRef, epochs, ?DATA_IDX),
Phase = ets:lookup_element(SourceEtsRef, current_phase, ?DATA_IDX),
MyName = ets:lookup_element(SourceEtsRef, my_name, ?DATA_IDX),
case Phase of
?PHASE_TRAINING_ATOM -> pass;
?PHASE_PREDICTION_ATOM -> Epochs = 1; % In prediction phase, we send only a single epoch always!
_ -> ?LOG_ERROR("Source ~p has an unknown phase: ~p",[MyName, Phase])
end,
SourcePid = self(),
TimeIntervalWithOverheadFactor = TimeInterval_ms * ?SENDING_FREQUENCY_OVERHEAD_FIX_FACTOR_PERC,
spawn_link(?MODULE,transmitter,[TimeIntervalWithOverheadFactor,SourceEtsRef, SourcePid ,Epochs, ClientWorkerPairs, BatchesListToSend, Method]).
Expand Down

0 comments on commit af5ee1b

Please sign in to comment.