diff --git a/src_cpp/opennnBridge/openNNnif.cpp b/src_cpp/opennnBridge/openNNnif.cpp index f6380aaeb..2c1de492d 100644 --- a/src_cpp/opennnBridge/openNNnif.cpp +++ b/src_cpp/opennnBridge/openNNnif.cpp @@ -102,7 +102,8 @@ void* PredictFun(void* arg) // Stop the timer and calculate the time took for training high_resolution_clock::time_point stop = high_resolution_clock::now(); auto duration = duration_cast(stop - PredictNNptr->start_time); - nifpp::TERM predict_time = nifpp::make(env, duration.count()); + + ERL_NIF_TERM predict_time = enif_make_double(env, duration.count()); nifpp::str_atom nerlnif_atom_str(NERLNIF_ATOM_STR); nifpp::TERM nerlnif_atom = nifpp::make(env , nerlnif_atom_str); ERL_NIF_TERM predict_res_and_time = enif_make_tuple(env, 4 , nerlnif_atom , prediction , nifpp::make(env, PredictNNptr->return_tensor_type) , predict_time); diff --git a/src_erl/NerlnetApp/src/Bridge/nerlNIF.erl b/src_erl/NerlnetApp/src/Bridge/nerlNIF.erl index c78661de3..fee98c830 100644 --- a/src_erl/NerlnetApp/src/Bridge/nerlNIF.erl +++ b/src_erl/NerlnetApp/src/Bridge/nerlNIF.erl @@ -53,8 +53,7 @@ call_to_train(ModelID, {DataTensor, Type}, WorkerPid , BatchID , SourceName)-> {nerlnif, nan, TrainTime} -> gen_statem:cast(WorkerPid,{loss, nan , TrainTime , BatchID , SourceName}); %TODO Guy - Please the behavior when this case happens {nerlnif , LossTensor, LossTensorType , TrainTime}-> - {ErlTensor, ErlTensorType} = nerltensor_conversion({LossTensor, LossTensorType}, erl_float), % TODO Guy - Please do the conversion in main server - gen_statem:cast(WorkerPid,{loss, {ErlTensor, ErlTensorType} , TrainTime , BatchID , SourceName}) + gen_statem:cast(WorkerPid,{loss, {LossTensor, LossTensorType} , TrainTime , BatchID , SourceName}) after ?TRAIN_TIMEOUT -> %TODO inspect this timeout ?LOG_ERROR("Worker train timeout reached! bid:~p s:~p",[BatchID , SourceName]), gen_statem:cast(WorkerPid,{loss, timeout , SourceName}) %% TODO Guy Define train timeout state @@ -64,11 +63,11 @@ call_to_predict(ModelID, {BatchTensor, Type}, WorkerPid, BatchID , SourceName)-> ok = predict_nif(ModelID, BatchTensor, Type), receive - {nerlnif , PredNerlTensor, NewType, TimeTook}-> %% nerlnif atom means a message from the nif implementation + {nerlnif , PredNerlTensor, PredNerlTensorType, TimeNif}-> %% nerlnif atom means a message from the nif implementation % io:format("pred_nif done~n"), % {PredTen, _NewType} = nerltensor_conversion({PredNerlTensor, NewType}, erl_float), % io:format("Pred returned: ~p~n", [PredNerlTensor]), - gen_statem:cast(WorkerPid,{predictRes,PredNerlTensor, NewType, TimeTook, BatchID , SourceName}); + gen_statem:cast(WorkerPid,{predictRes,PredNerlTensor, PredNerlTensorType, TimeNif, BatchID , SourceName}); Error -> ?LOG_ERROR("received wrong prediction_nif format: ~p" ,[Error]), throw("received wrong prediction_nif format") diff --git a/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl b/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl index 9b7a5c492..45a2238ed 100644 --- a/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl +++ b/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl @@ -161,24 +161,23 @@ idle(cast, _Param, State) -> %% Waiting for receiving results or loss function %% Got nan or inf from loss function - Error, loss function too big for double -wait(cast, {loss , nan , TimeNIF , BatchID , SourceName}, State = #workerGeneric_state{myName = MyName, nextState = NextState}) -> +wait(cast, {loss, nan , TrainTime , BatchID , SourceName}, State = #workerGeneric_state{myName = MyName, nextState = NextState}) -> stats:increment_by_value(get(worker_stats_ets), nan_loss_count, 1), - gen_statem:cast(get(client_pid),{loss, MyName , SourceName ,nan , TimeNIF ,BatchID}), + gen_statem:cast(get(client_pid),{loss, MyName , SourceName ,nan , TrainTime ,BatchID}), {next_state, NextState, State}; -wait(cast, {loss, LossTensor , TimeNIF , BatchID , SourceName}, State = #workerGeneric_state{myName = MyName, nextState = NextState, modelID=_ModelID, distributedBehaviorFunc = DistributedBehaviorFunc, distributedWorkerData = DistributedWorkerData}) -> - % {[_ , _ , _ , LossValue] , _} = LossTensor, - % io:format("Got Loss Value ~p~n",[LossValue]), + +wait(cast, {loss, {LossTensor, LossTensorType} , TrainTime , BatchID , SourceName}, State = #workerGeneric_state{myName = MyName, nextState = NextState, modelID=_ModelID, distributedBehaviorFunc = DistributedBehaviorFunc, distributedWorkerData = DistributedWorkerData}) -> BatchTimeStamp = erlang:system_time(nanosecond), - gen_statem:cast(get(client_pid),{loss, MyName, SourceName ,LossTensor , TimeNIF , BatchID , BatchTimeStamp}), %% TODO Add Time and Time_NIF to the cast + gen_statem:cast(get(client_pid),{loss, MyName, SourceName ,{LossTensor, LossTensorType} , TrainTime , BatchID , BatchTimeStamp}), ToUpdate = DistributedBehaviorFunc(post_train, {get(generic_worker_ets),DistributedWorkerData}), if ToUpdate -> {next_state, update, State#workerGeneric_state{nextState=NextState}}; true -> {next_state, NextState, State} end; -wait(cast, {predictRes,PredNerlTensor, Type, TimeNIF, BatchID , SourceName}, State = #workerGeneric_state{myName = MyName, nextState = NextState, distributedBehaviorFunc = DistributedBehaviorFunc, distributedWorkerData = DistributedWorkerData}) -> +wait(cast, {predictRes, PredNerlTensor, PredNerlTensorType, TimeNif, BatchID , SourceName}, State = #workerGeneric_state{myName = MyName, nextState = NextState, distributedBehaviorFunc = DistributedBehaviorFunc, distributedWorkerData = DistributedWorkerData}) -> BatchTimeStamp = erlang:system_time(nanosecond), - gen_statem:cast(get(client_pid),{predictRes,MyName,SourceName, {PredNerlTensor, Type}, TimeNIF , BatchID , BatchTimeStamp}), + gen_statem:cast(get(client_pid),{predictRes,MyName, SourceName, {PredNerlTensor, PredNerlTensorType}, TimeNif , BatchID , BatchTimeStamp}), Update = DistributedBehaviorFunc(post_predict, {get(generic_worker_ets),DistributedWorkerData}), if Update -> {next_state, update, State#workerGeneric_state{nextState=NextState}}; diff --git a/src_erl/NerlnetApp/src/Client/clientStatem.erl b/src_erl/NerlnetApp/src/Client/clientStatem.erl index aa43c7456..3b3b03349 100644 --- a/src_erl/NerlnetApp/src/Client/clientStatem.erl +++ b/src_erl/NerlnetApp/src/Client/clientStatem.erl @@ -279,7 +279,7 @@ training(cast, _In = {predict}, State = #client_statem_state{myName = MyName, et ?LOG_ERROR("Wrong request , client ~p can't go from training to predict directly", [MyName]), {next_state, training, State#client_statem_state{etsRef = EtsRef}}; -training(cast, In = {loss , WorkerName , SourceName , LossTensor , TimeNIF , BatchID , BatchTS}, State = #client_statem_state{myName = MyName,etsRef = EtsRef}) -> +training(cast, In = {loss, WorkerName ,SourceName ,LossTensor ,TimeNIF ,BatchID ,BatchTS}, State = #client_statem_state{myName = MyName,etsRef = EtsRef}) -> ClientStatsEts = get(client_stats_ets), stats:increment_messages_received(ClientStatsEts), stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), @@ -315,14 +315,15 @@ predict(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef}) end, {next_state, predict, State#client_statem_state{etsRef = EtsRef}}; -predict(cast, In = {predictRes,WorkerName, SourceName ,{PredictNerlTensor, Type} , TimeTook , BatchID , BatchTS}, State = #client_statem_state{myName = _MyName, etsRef = EtsRef}) -> +predict(cast, In = {predictRes,WorkerName, SourceName ,{PredictNerlTensor, NetlTensorType} , TimeTook , BatchID , BatchTS}, State = #client_statem_state{myName = _MyName, etsRef = EtsRef}) -> ClientStatsEts = get(client_stats_ets), stats:increment_messages_received(ClientStatsEts), stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), - + {RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX), - MessageBody = {atom_to_list(WorkerName), SourceName, BatchID, {PredictNerlTensor , Type} , TimeTook , BatchTS}, %% SHOULD INCLUDE TYPE? + MessageBody = {WorkerName, SourceName, {PredictNerlTensor , NetlTensorType}, TimeTook, BatchID, BatchTS}, %% SHOULD INCLUDE TYPE? nerl_tools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(predictRes), MessageBody), + stats:increment_messages_sent(ClientStatsEts), stats:increment_bytes_sent(ClientStatsEts , nerl_tools:calculate_size(MessageBody)), {next_state, predict, State#client_statem_state{etsRef = EtsRef}}; diff --git a/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl b/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl index 131a0c221..2e8a4941a 100644 --- a/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl +++ b/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl @@ -262,8 +262,8 @@ handle_cast({clientAck,Body}, State = #main_genserver_state{clientsWaitingList = ClientName = binary_to_term(Body), NewWaitingList = WaitingList--[ClientName], % waitingList is initialized in clientsTraining or clientsPredict handl cast calls if length(NewWaitingList) == 0 -> - ResultsToSendStr = generate_phase_result_data_to_send_from_ets_as_str(), - NothingToSend = string:is_empty(ResultsToSendStr), + PhaseResultsDataMap = generate_phase_result_data_map(), + NothingToSend = string:is_empty(PhaseResultsDataMap), if NothingToSend -> pass; true -> Action = case get(active_phase) of @@ -271,7 +271,7 @@ handle_cast({clientAck,Body}, State = #main_genserver_state{clientsWaitingList = prediction -> predRes end, {RouterHost,RouterPort} = ets:lookup_element(get(main_server_ets), my_router, ?DATA_IDX), % get main_server's router - nerl_tools:http_router_request(RouterHost, RouterPort, [?API_SERVER_ATOM], atom_to_list(Action), ResultsToSendStr), + nerl_tools:http_router_request(RouterHost, RouterPort, [?API_SERVER_ATOM], atom_to_list(Action), {json, PhaseResultsDataMap}), stats:increment_messages_sent(StatsEts), clean_phase_result_data_to_send_ets() % getting ready for next phase after data was sent to APIServer end, @@ -324,11 +324,12 @@ handle_cast({lossFunction,Body}, State = #main_genserver_state{myName = MyName}) stats:increment_messages_received(StatsEts), try case binary_to_term(Body) of - {WorkerName , SourceName , {LossTensor , _Type} , TimeNIF , BatchID , BatchTS} -> - ToSend = ?PHASE_RES_DATA_SEPARATOR ++ atom_to_list(WorkerName) ++ ?PHASE_RES_WORKER_NAME_SEPERATOR ++ atom_to_list(SourceName) ++ - ?PHASE_RES_VALUES_SEPERATOR ++ nerl_tools:string_format("~p",[LossTensor]) ++ ?PHASE_RES_VALUES_SEPERATOR ++ float_to_list(TimeNIF) ++ - ?PHASE_RES_VALUES_SEPERATOR ++ integer_to_list(BatchID) ++ ?PHASE_RES_VALUES_SEPERATOR ++ integer_to_list(BatchTS) ++ ?PHASE_RES_DATA_SEPARATOR, - store_phase_result_data_to_send_ets({WorkerName, BatchID , BatchTS}, ToSend); + {WorkerName , SourceName , {LossNerlTensor , LossNerlTensorType} , TimeNIF , BatchID , BatchTS} -> + Key = atom_to_list(WorkerName) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ atom_to_list(SourceName) ++ + ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ integer_to_list(BatchID) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ + integer_to_list(BatchTS) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ float_to_list(TimeNIF) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ + atom_to_list(LossNerlTensorType), + store_phase_result_data_to_send_ets(Key, binary_to_list(LossNerlTensor)); _ELSE -> ?LOG_ERROR("~p Wrong loss function pattern received from client and its worker ~p", [MyName, Body]) end @@ -344,18 +345,12 @@ handle_cast({predictRes,Body}, State) -> _BatchSize = ets:lookup_element(get(main_server_ets), batch_size, ?DATA_IDX), stats:increment_messages_received(StatsEts), try - {WorkerName, SourceName, BatchID, {NerlTensor, Type}, TimeNIF , BatchTS} = binary_to_term(Body), %% TODO: add convention with client - %io:format("WorkerName: ~p, InputName: ~p, BatchID: ~p, Type: ~p~n",[WorkerName, InputName, BatchID, Type]), - {DecodedNerlTensor, _Type} = - if - (NerlTensor==<<>>) -> ?LOG_ERROR(?LOG_HEADER++"Got empty tensor"), empty_nerltensor_err; - true -> nerlNIF:nerltensor_conversion({NerlTensor, Type}, nerlNIF:erl_type_conversion(Type)) % converting nerltensor from binary to erlang type using NerlNIF - end, - ToSend = ?PHASE_RES_DATA_SEPARATOR ++ WorkerName ++ ?PHASE_RES_WORKER_NAME_SEPERATOR ++ atom_to_list(SourceName) ++ - ?PHASE_RES_VALUES_SEPERATOR ++ nerl_tools:string_format("~p",[DecodedNerlTensor]) ++ ?PHASE_RES_VALUES_SEPERATOR ++ - integer_to_list(TimeNIF) ++ ?PHASE_RES_VALUES_SEPERATOR ++ integer_to_list(BatchID) ++ ?PHASE_RES_VALUES_SEPERATOR ++ - integer_to_list(BatchTS) ++ ?PHASE_RES_DATA_SEPARATOR, - store_phase_result_data_to_send_ets({WorkerName, BatchID , BatchTS}, ToSend) + {WorkerName, SourceName, {NerlTensor, NerlTensorType}, TimeNIF , BatchID, BatchTS} = binary_to_term(Body), + Key = atom_to_list(WorkerName) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ atom_to_list(SourceName) ++ + ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ integer_to_list(BatchID) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ + integer_to_list(BatchTS) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ float_to_list(TimeNIF) ++ ?PHASE_RES_VALUES_IN_KEY_SEPARATOR ++ + atom_to_list(NerlTensorType), + store_phase_result_data_to_send_ets(Key, binary_to_list(NerlTensor)) catch Err:E -> ?LOG_ERROR(?LOG_HEADER++"Error receiving predict result ~p",[{Err,E}]) end, @@ -471,19 +466,14 @@ retransmission_to_apiserver(HttpRouterRequestFunc, Trials) -> end. -store_phase_result_data_to_send_ets({WorkerName, BatchID , BatchTS}, DataToSendStr) -> - Key = {WorkerName, BatchID , BatchTS}, - ets:insert(get(phase_res_data_ets),{Key, DataToSendStr}). - +store_phase_result_data_to_send_ets(Key, NerlTensorData) -> + KeyBin = list_to_binary(Key), + ets:insert(get(phase_res_data_ets),{KeyBin, NerlTensorData}). -generate_phase_result_data_string_from_list([], _ResString) -> _ResString; -generate_phase_result_data_string_from_list(ListOfData, ResString) -> - NewResString = ResString++element(?DATA_IDX,hd(ListOfData)), - generate_phase_result_data_string_from_list(tl(ListOfData), NewResString). -generate_phase_result_data_to_send_from_ets_as_str() -> +generate_phase_result_data_map() -> ListOfData = ets:tab2list(get(phase_res_data_ets)), - generate_phase_result_data_string_from_list(ListOfData, ""). % String to send is retruned + ListOfData. clean_phase_result_data_to_send_ets() -> ets:delete_all_objects(get(phase_res_data_ets)). \ No newline at end of file diff --git a/src_erl/NerlnetApp/src/MainServer/mainServerDefs.hrl b/src_erl/NerlnetApp/src/MainServer/mainServerDefs.hrl index 840caf07d..f17673d06 100644 --- a/src_erl/NerlnetApp/src/MainServer/mainServerDefs.hrl +++ b/src_erl/NerlnetApp/src/MainServer/mainServerDefs.hrl @@ -1,5 +1,6 @@ -define(API_SERVER_ACTION_ACK, "ackPy"). +-define(PHASE_RES_VALUES_IN_KEY_SEPARATOR, "#"). -define(PHASE_RES_WORKER_NAME_SEPERATOR, "#"). -define(PHASE_RES_VALUES_SEPERATOR, "|"). -define(PHASE_RES_DATA_SEPARATOR, "?"). \ No newline at end of file diff --git a/src_erl/NerlnetApp/src/nerl_tools.erl b/src_erl/NerlnetApp/src/nerl_tools.erl index f301f11a6..ba6c33e5f 100644 --- a/src_erl/NerlnetApp/src/nerl_tools.erl +++ b/src_erl/NerlnetApp/src/nerl_tools.erl @@ -31,14 +31,22 @@ http_router_request(RouterHost, RouterPort, DestinationsList, ActionStr, Body) - end. +http_request(Host, Port, Path, {json, Body}) -> + io:format("Sending Json to ~p:~p~n",[Host,Port]), + JsonContentType = ?HTTP_CONTENT_TYPE_JSON, + Json = jsx:encode(Body), + http_request(Host, Port,Path, JsonContentType, Json); +http_request(Host, Port,Path, Body) -> + DefaultContentType = ?HTTP_CONTENT_TYPE_FORM_URLENCODED, + http_request(Host, Port,Path, DefaultContentType, Body). %% send message between entities -http_request(Host, Port,Path, Body) when is_atom(Body) -> http_request(Host, Port,Path, atom_to_list(Body)); -http_request(Host, Port,Path, Body) when is_binary(Host) -> http_request(binary_to_list(Host), Port,Path, Body); -http_request(Host, Port,Path, Body)-> - URL = "http://" ++ Host ++ ":"++integer_to_list(Port) ++ "/" ++ Path, +http_request(Host, Port, Path, ContentType, Body) when is_atom(Body) -> http_request(Host, Port,Path, ContentType, atom_to_list(Body)); +http_request(Host, Port, Path, ContentType, Body) when is_binary(Host) -> http_request(binary_to_list(Host), Port,Path, ContentType, Body); +http_request(Host, Port, Path, ContentType, Body)-> + URL = "http://" ++ Host ++ ":"++integer_to_list(Port) ++ "/" ++ Path, % Path is the action httpc:set_options([{proxy, {{Host, Port},[Host]}}]), - httpc:request(post,{URL, [],"application/x-www-form-urlencoded",Body}, [], []). + httpc:request(post,{URL, [], ContentType, Body}, [], []). get_client_worker_pairs([],_WorkersMap,Ret)-> Ret; get_client_worker_pairs([WorkerName|WorkersNames],WorkersMap,Ret)-> diff --git a/src_erl/NerlnetApp/src/nerl_tools.hrl b/src_erl/NerlnetApp/src/nerl_tools.hrl index 70227a7c7..7324fa6ef 100644 --- a/src_erl/NerlnetApp/src/nerl_tools.hrl +++ b/src_erl/NerlnetApp/src/nerl_tools.hrl @@ -13,6 +13,11 @@ -define(VALIDATION_OF_TRANSMISSION_WITH_API_SERVER_INTERVAL_MS, 100). % how much between each resend %% ETS definitions +%% HTTP Content type definitions +-define(HTTP_CONTENT_TYPE_MULTI_PART_FORM_DATA, "multipart/form-data"). +-define(HTTP_CONTENT_TYPE_JSON, "application/json"). +-define(HTTP_CONTENT_TYPE_FORM_URLENCODED, "application/x-www-form-urlencoded"). + % 2 elements ETS: -define(KEY_IDX, 1). -define(DATA_IDX, 2). diff --git a/src_py/apiServer/decoderHttpMainServer.py b/src_py/apiServer/decoderHttpMainServer.py index 71ddf356b..584bc9e26 100644 --- a/src_py/apiServer/decoderHttpMainServer.py +++ b/src_py/apiServer/decoderHttpMainServer.py @@ -58,6 +58,50 @@ def decode_main_server_ets_str(string_to_convert: str): return result_dict +def parse_key_string(key_string: str) -> tuple: + WORKER_NAME_IDX = 0 + SOURCE_NAME_IDX = 1 + BATCH_ID_IDX = 2 + BATCH_TS_IDX = 3 + DURATION_IDX = 4 # TimeNIF + NERLTENSOR_TYPE_IDX = 5 + + definitions_list = key_string.split(SEP_ENTITY_HASH_STATS) + worker_name = definitions_list[WORKER_NAME_IDX] + source_name = definitions_list[SOURCE_NAME_IDX] + batch_id = definitions_list[BATCH_ID_IDX] + batch_ts = definitions_list[BATCH_TS_IDX] + duration = definitions_list[DURATION_IDX] + nerltensor_type = definitions_list[NERLTENSOR_TYPE_IDX] + + return worker_name, source_name, batch_id, batch_ts, duration, nerltensor_type + + +def decode_phase_result_data_json_from_main_server(input_json_dict : dict) -> list: + decoded_data = [] + DIMS_LENGTH = 3 + for key_string, nerltensor in input_json_dict.items(): + worker_name, source_name, batch_id, batch_ts, duration, nerltensor_type = parse_key_string(key_string) + duration = int(float(duration)) # from here duration is int in micro seconds + + # nerltensor to numpy tensor conversion + np_tensor = None + nerltensor_as_bytes = bytes(nerltensor) + if nerltensor_type == 'float': + np_tensor = np.frombuffer(nerltensor_as_bytes, dtype=np.float32) + elif nerltensor_type == 'int': + np_tensor = np.frombuffer(nerltensor_as_bytes, dtype=np.int32) + elif nerltensor_type == 'double': + np_tensor = np.frombuffer(nerltensor_as_bytes, dtype=np.float64) + + dims = np_tensor[:DIMS_LENGTH].astype(int) + np_tensor = np_tensor[DIMS_LENGTH:] + np_tensor = np_tensor.reshape(dims) # reshaped + + decoded_data.append((worker_name, source_name, duration, batch_id, batch_ts, np_tensor)) + return decoded_data + + def decode_main_server_str_train(string_to_convert: str) -> tuple: #change to tuple of 5 elements worker_name = string_to_convert.split(SEP_ENTITY_HASH_STATS)[0] train_result = string_to_convert.split(SEP_ENTITY_HASH_STATS)[1] diff --git a/src_py/apiServer/decoderHttpMainServerDefs.py b/src_py/apiServer/decoderHttpMainServerDefs.py index c0d29d640..ae766b557 100644 --- a/src_py/apiServer/decoderHttpMainServerDefs.py +++ b/src_py/apiServer/decoderHttpMainServerDefs.py @@ -1,3 +1,6 @@ + +PHASE_RES_VALUES_IN_KEY_SEPARATOR = '#' # Must be identical to ?PHASE_RES_VALUES_IN_KEY_SEPARATOR in mainServerDefs.hrl + SEP_ENTITY_AND_STATS = '&' SEP_ENTITY_OR_STATS = '|' SEP_ENTITY_HASH_STATS = '#' diff --git a/src_py/apiServer/experiment_flow_debug.py b/src_py/apiServer/experiment_flow_debug.py index a749a47e4..cbcd72f16 100644 --- a/src_py/apiServer/experiment_flow_debug.py +++ b/src_py/apiServer/experiment_flow_debug.py @@ -17,9 +17,9 @@ def print_test(in_str : str): api_server_instance = ApiServer() #api_server_instance.help() api_server_instance.showJsons() -dc_idx = 0 -conn_idx = 19 -exp_idx = 18 +dc_idx = 1 +conn_idx = 20 +exp_idx = 19 api_server_instance.setJsons(dc_idx, conn_idx, exp_idx) dc_json , connmap_json, exp_flow_json = api_server_instance.getUserJsons() diff --git a/src_py/apiServer/experiment_phase.py b/src_py/apiServer/experiment_phase.py index ba3f5c77f..594f852bb 100644 --- a/src_py/apiServer/experiment_phase.py +++ b/src_py/apiServer/experiment_phase.py @@ -24,15 +24,13 @@ def clean_raw_data_buffer(self): self.raw_data_buffer = [] def process_experiment_phase_data(self): - for resData in self.raw_data_buffer: - if self.phase_type == PHASE_TRAINING_STR: - source_name, tensor_data, duration, batch_id, worker_name, batch_timestamp = decode_main_server_str_train(resData) - client_name = self.network_componenets.get_client_name_by_worker_name(worker_name) - self.nerl_model_db.get_client(client_name).get_worker(worker_name).create_batch(batch_id, source_name, tensor_data, duration, batch_timestamp) - elif self.phase_type == PHASE_PREDICTION_STR: - worker_name, source_name, tensor_data, duration, batch_id, batch_timestamp = decode_main_server_str_predict(resData) - client_name = self.network_componenets.get_client_name_by_worker_name(worker_name) - self.nerl_model_db.get_client(client_name).get_worker(worker_name).create_batch(batch_id, source_name, tensor_data, duration, batch_timestamp) + assert (len(self.raw_data_buffer) == 1, "Expecting only one raw_data in buffer of a single phase") + list_of_decoded_data = decode_phase_result_data_json_from_main_server(self.raw_data_buffer[0]) + for decoded_data in list_of_decoded_data: + worker_name, source_name, duration, batch_id, batch_ts, np_tensor = decoded_data + client_name = self.network_componenets.get_client_name_by_worker_name(worker_name) + self.nerl_model_db.get_client(client_name).get_worker(worker_name).create_batch(batch_id, source_name, np_tensor, duration, batch_ts) + self.clean_raw_data_buffer() def get_phase_type(self): diff --git a/src_py/apiServer/receiver.py b/src_py/apiServer/receiver.py index 529626586..baa27ab1a 100644 --- a/src_py/apiServer/receiver.py +++ b/src_py/apiServer/receiver.py @@ -73,29 +73,19 @@ def post(self): class trainRes(Resource): def post(self): - resData = request.get_data().decode('utf-8') - #print(f"Got {resData} from MainServer") # Todo remove print - #print(f"Received training result {resData}") # Todo remove print + resDataDict = request.get_json() current_experiment_phase = globe.experiment_focused_on.get_current_experiment_phase() raw_data_buffer = current_experiment_phase.get_raw_data_buffer() - - entities_raw_data_list = split_results_to_entities_chunks(resData) - raw_data_buffer += entities_raw_data_list - + raw_data_buffer.append(resDataDict) return "OK", 200 #http_request(RouterHost,RouterPort,"predictRes",ListOfResults++"#"++BatchID++"#"++CSVName++"#"++BatchSize) class predictRes(Resource): def post(self): - # Result preprocessing: - # Receiving from Erlang: Result++"#"++integer_to_list(BatchID)++"#"++CSVName++"#"++integer_to_list(BatchSize) - resData = request.get_data().decode('utf-8') + resDataDict = request.get_json() current_experiment_phase = globe.experiment_focused_on.get_current_experiment_phase() raw_data_buffer = current_experiment_phase.get_raw_data_buffer() - - entities_raw_data_list = split_results_to_entities_chunks(resData) - raw_data_buffer += entities_raw_data_list - + raw_data_buffer.append(resDataDict) return "OK", 200 class statistics(Resource):