diff --git a/.gitignore b/.gitignore
index 64abdb48..393cae9b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -37,4 +37,5 @@ examples/.ipynb_checkpoints/simple_run-checkpoint.ipynb
/Results
arch.json
conn.json
-/inputDataDir
\ No newline at end of file
+/inputDataDir
+/NerlNetGraph.png
diff --git a/NerlnetMonitor.sh b/NerlnetMonitor.sh
new file mode 100755
index 00000000..a95ee9d2
--- /dev/null
+++ b/NerlnetMonitor.sh
@@ -0,0 +1,12 @@
+#!/bin/bash
+
+MONITOR_PATH="src_erl/NerlMonitor"
+GUI_PATH="src_erl/NerlMonitor/src"
+
+echo "NerlnetMonitor Activated"
+
+
+cd $MONITOR_PATH
+rebar3 shell --name erl@127.0.0.1 --setcookie COOKIE
+
+cd ../../
diff --git a/README.md b/README.md
index 970f588e..17ec3b35 100644
--- a/README.md
+++ b/README.md
@@ -76,4 +76,4 @@ Minimum Python version: 3.8
4. Run Jupyter notebook with ```jupyter-notebook``` and create a new notebook in the created dir from step 3.
5. Follow the example: https://github.com/leondavi/NErlNet/blob/master/examples/example_run.ipynb
-Contact Email: nerlnet@outlook.com
+Contact Email: leondavi@post.bgu.ac.il
diff --git a/inputJsonFiles/Architecture/arch_1PCSIM6WorkerSynth.json b/inputJsonFiles/Architecture/arch_1PCSIM6WorkerSynth.json
index 8d1bb185..58821811 100755
--- a/inputJsonFiles/Architecture/arch_1PCSIM6WorkerSynth.json
+++ b/inputJsonFiles/Architecture/arch_1PCSIM6WorkerSynth.json
@@ -6,13 +6,13 @@
},
"devices": [
{
- "host": "192.168.0.108",
+ "host": "192.168.64.7",
"entities": "mainServer,c1,c2,c3,c4,c5,c6,s1,r1,r2,r3,r4,r5,r6,apiServer"
}
],
"apiServer":
{
- "host": "192.168.0.108",
+ "host": "192.168.64.7",
"port": "8095",
"args": ""
}
@@ -26,7 +26,7 @@
,
"mainServer":
{
- "host": "192.168.0.108",
+ "host": "192.168.64.7",
"port": "8080",
"args": ""
}
diff --git a/inputJsonFiles/experimentsFlow/exp_6Workers1SourceMNist.json b/inputJsonFiles/experimentsFlow/exp_6Workers1SourceMNist.json
index 75b46199..3c15b57e 100644
--- a/inputJsonFiles/experimentsFlow/exp_6Workers1SourceMNist.json
+++ b/inputJsonFiles/experimentsFlow/exp_6Workers1SourceMNist.json
@@ -1,7 +1,7 @@
{
"Features": 784,
"Labels": ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],
- "CSV path": "mnist",
+ "CSV path": "mnist-o",
"Batches per source":
{
"Training": 10000,
diff --git a/src_cpp/opennn b/src_cpp/opennn
index 4d6b9484..1de26c61 160000
--- a/src_cpp/opennn
+++ b/src_cpp/opennn
@@ -1 +1 @@
-Subproject commit 4d6b94848d5dd5356084a54f1b1be58b7cdc1a21
+Subproject commit 1de26c611fda2fe6cf026a0e4b329c7509a68042
diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStateHandler.erl b/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStateHandler.erl
index e1945527..a9601a68 100644
--- a/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStateHandler.erl
+++ b/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStateHandler.erl
@@ -30,7 +30,8 @@ init(Req0, [Action,Client_StateM_Pid]) ->
idle -> gen_statem:cast(Client_StateM_Pid,{idle});
training -> gen_statem:cast(Client_StateM_Pid,{training});
predict -> gen_statem:cast(Client_StateM_Pid,{predict});
- statistics -> gen_statem:cast(Client_StateM_Pid,{statistics})
+ statistics -> gen_statem:cast(Client_StateM_Pid,{statistics});
+ worker_kill -> gen_statem:cast(Client_StateM_Pid,{worker_kill , Body})
end,
%% reply ACKnowledge to main server for initiating, later send finished initiating http_request from client_stateM
diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStatem.erl b/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStatem.erl
index 52459b6b..a1cb1674 100644
--- a/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStatem.erl
+++ b/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStatem.erl
@@ -20,7 +20,7 @@
code_change/4, callback_mode/0, idle/3, training/3,waitforWorkers/3]).
--import(nerlNIF,[validate_nerltensor_erl/1]).
+-import(nerlNIF,[validate_nerltensor_erl/1 , get_active_models_ids_list/0]).
-define(ETS_KV_VAL_IDX, 2). % key value pairs --> value index is 2
-define(WORKER_PID_IDX, 2).
@@ -78,7 +78,7 @@ init({MyName,NerlnetGraph, WorkersToClientsMap}) ->
ets:insert(EtsRef, {msgCounter, 1}),
ets:insert(EtsRef, {infoIn, 0}),
ets:insert(EtsRef, {myName, MyName}),
-
+ ets:insert(EtsRef , {deadWorkers , []}),
createWorkers(MyName,EtsRef),
%% send pre_idle signal to workers
@@ -99,8 +99,7 @@ format_status(_Opt, [_PDict, _StateName, _State]) -> Status = some_term, Status.
%% ==============STATES=================
waitforWorkers(cast, In = {stateChange,WorkerName,MissedBatchesCount}, State = #client_statem_state{myName = MyName,waitforWorkers = WaitforWorkers,nextState = NextState, etsRef = EtsRef}) ->
- NewWaitforWorkers = WaitforWorkers--[WorkerName],
- % io:format("remaining workers = ~p~n",[NewWaitforWorkers]),
+ NewWaitforWorkers = WaitforWorkers -- [WorkerName],
ets:update_counter(EtsRef, msgCounter, 1), % last is increment value
ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)),
ets:update_element(EtsRef, WorkerName,[{?WORKER_TRAIN_MISSED_IDX,MissedBatchesCount}]), %% update missed batches count
@@ -114,16 +113,43 @@ waitforWorkers(cast, In = {stateChange,WorkerName,MissedBatchesCount}, State = #
waitforWorkers(cast, In = {NewState}, State = #client_statem_state{myName = MyName, etsRef = EtsRef}) ->
ets:update_counter(EtsRef, msgCounter, 1),
ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)),
- % ?LOG_INFO("~p in waiting going to state ~p~n",[MyName, State]),
+ %% ?LOG_INFO("~p in waiting going to state ~p~n",[MyName, State]),
Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX),
cast_message_to_workers(EtsRef, {NewState}),
{next_state, waitforWorkers, State#client_statem_state{nextState = NewState, waitforWorkers = Workers}};
+waitforWorkers(cast, In ={worker_kill , Body}, State = #client_statem_state{waitforWorkers = WaitforWorkers,etsRef = EtsRef}) ->
+ %got kill command for a worker, kiil and take off waitforWorkers list
+ ets:update_counter(EtsRef, msgCounter, 1),
+ ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)),
+ {_, WorkerName} = binary_to_term(Body),
+ Pid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX),
+ gen_statem:stop(Pid, shutdown , infinity),
+ NewWaitforWorkers = WaitforWorkers -- [WorkerName],
+ {next_state, waitforWorkers, State#client_statem_state{waitforWorkers = NewWaitforWorkers,etsRef = EtsRef}};
+
waitforWorkers(cast, EventContent, State = #client_statem_state{etsRef = EtsRef}) ->
ets:update_counter(EtsRef, msgCounter, 1),
ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)),
- ?LOG_WARNING("client waitforWorkers ignored!!!: ~p ~n",[EventContent]),
- {next_state, waitforWorkers, State}.
+ ?LOG_WARNING("client waitforWorkers ignored!!!: ~p~n",[EventContent]),
+ {next_state, waitforWorkers, State};
+
+waitforWorkers(info, EventContent, State = #client_statem_state{myName = MyName,waitforWorkers = WaitforWorkers ,etsRef = EtsRef}) ->
+ ets:update_counter(EtsRef, msgCounter, 1),
+ ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)),
+ case EventContent of
+ {'DOWN',_Ref,process,Pid,_Reason}-> %worker down
+ [[WorkerName]] = ets:match(EtsRef,{'$1',Pid,'_','_','_'}),
+ NewWaitforWorkers = WaitforWorkers--[WorkerName],
+ %report worker down to main server
+ delete_worker(EtsRef,MyName,WorkerName),
+ {next_state, waitforWorkers, State#client_statem_state{waitforWorkers = NewWaitforWorkers,etsRef = EtsRef}}; %delete worker from ets so client will not wait for it in the future
+
+ _Any-> ?LOG_INFO("client ~p got unexpected info: ~p~n",[MyName, EventContent]),
+ %recevied unexpected messege, print to log/tell main server for debuging
+ {next_state, waitforWorkers, State#client_statem_state{etsRef = EtsRef}}
+ end.
+
%% initiating workers when they include federated workers. init stage == handshake between federated worker client and server
@@ -149,18 +175,27 @@ idle(cast, In = {custom_worker_message, {From, To}}, State = #client_statem_stat
{keep_state, State};
idle(cast, In = {statistics}, State = #client_statem_state{ myName = MyName, etsRef = EtsRef}) ->
- sendStatistics(EtsRef),
ets:update_counter(EtsRef, msgCounter, 1), % last param is increment value
ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)),
+ sendStatistics(EtsRef),
{next_state, idle, State};
idle(cast, In = {training}, State = #client_statem_state{etsRef = EtsRef}) ->
+ io:format("client going to state training~n",[]),
ets:update_counter(EtsRef, msgCounter, 1),
ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)),
MessageToCast = {training},
cast_message_to_workers(EtsRef, MessageToCast),
{next_state, waitforWorkers, State#client_statem_state{waitforWorkers= ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), nextState = training}};
+idle(cast, In={worker_kill , Body}, State = #client_statem_state{etsRef = EtsRef}) ->
+ ets:update_counter(EtsRef, msgCounter, 1),
+ ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)),
+ {_, WorkerName} = binary_to_term(Body),
+ Pid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX),
+ gen_statem:stop(Pid, shutdown , infinity),
+ {next_state, idle, State#client_statem_state{etsRef = EtsRef}};
+
idle(cast, In = {predict}, State = #client_statem_state{etsRef = EtsRef}) ->
io:format("client going to state predict~n",[]),
ets:update_counter(EtsRef, msgCounter, 1),
@@ -173,7 +208,22 @@ idle(cast, EventContent, State = #client_statem_state{etsRef = EtsRef}) ->
ets:update_counter(EtsRef, msgCounter, 1),
ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)),
io:format("client idle ignored!!!: ~p ~n",[EventContent]),
- {next_state, training, State#client_statem_state{etsRef = EtsRef}}.
+ {next_state, training, State#client_statem_state{etsRef = EtsRef}};
+
+
+idle(info, EventContent, State = #client_statem_state{myName = MyName,etsRef = EtsRef}) ->
+ ets:update_counter(EtsRef, msgCounter, 1),
+ ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)),
+ case EventContent of
+ {'DOWN',_Ref,process,Pid,_Reason}-> %worker down
+ [[WorkerName]] = ets:match(EtsRef,{'$1',Pid,'_','_','_'}),
+ %report worker down to main server
+ delete_worker(EtsRef,MyName,WorkerName); %delete worker from ets so client will not wait for it in the future
+
+ _Any-> ?LOG_INFO("client ~p got unexpected info: ~p~n",[MyName, EventContent])
+ %recevied unexpected messege, print to log/tell main server for debuging
+ end,
+ {next_state, idle, State#client_statem_state{etsRef = EtsRef}}.
%% passing vector from FedClient to FedServer
training(cast, In = {update, {From, To, Data}}, State = #client_statem_state{etsRef = EtsRef, myName = MyName}) ->
@@ -237,7 +287,8 @@ training(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef}
NewTimingTuple = {Start,TotalBatches+1,TotalTime},
ets:update_element(EtsRef, WorkerName,[{?WORKER_PID_IDX, WorkerPid},{?WORKER_TIMING_IDX,NewTimingTuple}]),
gen_statem:cast(WorkerPid, {sample, BatchOfSamples});
- true -> ?LOG_ERROR("Given worker ~p isn't found in client ~p",[WorkerName, ClientName]) end,
+ true -> ok %%'?'LOG_ERROR("Given worker ~p isn't found in client ~p",[WorkerName, ClientName])
+ end,
{next_state, training, State#client_statem_state{etsRef = EtsRef}};
training(cast, In = {idle}, State = #client_statem_state{myName = MyName, etsRef = EtsRef}) ->
@@ -278,10 +329,47 @@ training(cast, In = {loss,WorkerName,LossFunction,_Time_NIF}, State = #client_st
nerl_tools:http_request(RouterHost,RouterPort,"lossFunction", term_to_binary({WorkerName,LossFunction})),
{next_state, training, State#client_statem_state{myName = MyName,etsRef = EtsRef}};
+
+training(cast, In={worker_kill , Body}, State = #client_statem_state{etsRef = EtsRef}) ->
+ ets:update_counter(EtsRef, msgCounter, 1),
+ ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)),
+ {_, WorkerName} = binary_to_term(Body),
+ Pid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX),
+ gen_statem:stop(Pid, shutdown , infinity),
+ {next_state, training, State#client_statem_state{etsRef = EtsRef}};
+
+% training(cast, In = {statistics}, State = #client_statem_state{ myName = MyName, etsRef = EtsRef}) ->
+% io:format("client ~p got statistics ~n",[MyName]),
+% ets:update_counter(EtsRef, msgCounter, 1), % last param is increment value
+% ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)),
+% sendStatistics(EtsRef),
+% {next_state, training , State};
+
training(cast, EventContent, State = #client_statem_state{etsRef = EtsRef, myName = MyName}) ->
- ?LOG_WARNING("client ~p training ignored!!!: ~p ~n!!!",[MyName, EventContent]),
+ case EventContent of % ! Not the best way to handle this
+ {statistics} ->
+ ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)),
+ {next_state, training, State#client_statem_state{etsRef = EtsRef}};
+ _ ->
+ ?LOG_WARNING("client ~p training ignored!!!: ~p ~n!!!",[MyName, EventContent]),
+ ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)),
+ {next_state, training, State#client_statem_state{etsRef = EtsRef}}
+ end;
+
+
+training(info, EventContent, State = #client_statem_state{myName = MyName,etsRef = EtsRef}) ->
+ ets:update_counter(EtsRef, msgCounter, 1),
ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)),
- {next_state, training, State#client_statem_state{etsRef = EtsRef}}.
+ case EventContent of
+ {'DOWN' , _Ref , process , Pid , _Reason}-> %worker down
+ [[WorkerName]] = ets:match(EtsRef,{'$1',Pid,'_','_','_'}),
+ %report worker down to main server
+ delete_worker(EtsRef,MyName,WorkerName); %delete worker from ets so client will not wait for it in the future
+
+ _Any-> ?LOG_INFO("client ~p got unexpected info: ~p~n",[MyName, EventContent])
+ %recevied unexpected messege, print to log/tell main server for debuging
+ end,
+ {next_state, training, State#client_statem_state{etsRef = EtsRef}}.
predict(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef}) ->
%% Body: ClientName#WorkerName#CSVName#BatchNumber#BatchOfSamples
@@ -289,7 +377,6 @@ predict(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef})
ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)),
{ClientName, WorkerNameStr, CSVName, BatchNumber, BatchOfSamples} = binary_to_term(Body),
WorkerName = list_to_atom(WorkerNameStr),
-
Start = os:timestamp(),
WorkerOfThisClient = ets:member(EtsRef, WorkerName),
if
@@ -297,12 +384,11 @@ predict(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef})
TimingTuple = ets:lookup_element(EtsRef, WorkerName, ?WORKER_TIMING_IDX), %todo refactor timing map
{_LastBatchReceivedTime,TotalBatches,TotalTime} = TimingTuple,
NewTimingTuple = {Start,TotalBatches+1,TotalTime},
- ets:update_element(EtsRef, WorkerName,[{?WORKER_TIMING_IDX,NewTimingTuple}]);
- true -> ?LOG_ERROR("Given worker ~p isn't found in client ~p",[WorkerName, ClientName])
+ ets:update_element(EtsRef, WorkerName,[{?WORKER_TIMING_IDX,NewTimingTuple}]) ,
+ WorkerPid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX),
+ gen_statem:cast(WorkerPid, {sample, CSVName, BatchNumber, BatchOfSamples});
+ true -> ok %%'?'LOG_ERROR("Given worker ~p isn't found in client ~p",[WorkerName, ClientName])
end,
-
- WorkerPid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX),
- gen_statem:cast(WorkerPid, {sample, CSVName, BatchNumber, BatchOfSamples}),
{next_state, predict, State#client_statem_state{etsRef = EtsRef}};
%% TODO: add nif timing statistics
@@ -316,6 +402,14 @@ predict(cast, In = {predictRes,WorkerName,InputName,ResultID,PredictNerlTensor,
nerl_tools:http_request(RouterHost,RouterPort,"predictRes", term_to_binary({atom_to_list(WorkerName), InputName, ResultID, {PredictNerlTensor, Type}})),
{next_state, predict, State#client_statem_state{etsRef = EtsRef}};
+predict(cast, In={worker_kill , Body}, State = #client_statem_state{etsRef = EtsRef}) ->
+ ets:update_counter(EtsRef, msgCounter, 1),
+ ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)),
+ {_, WorkerName} = binary_to_term(Body),
+ Pid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX),
+ gen_statem:stop(Pid, shutdown , infinity),
+ {next_state, predict, State#client_statem_state{etsRef = EtsRef}};
+
% TODO from predict directly to training?!?!?
predict(cast, In = {training}, State = #client_statem_state{etsRef = EtsRef}) ->
ets:update_counter(EtsRef, msgCounter, 1),
@@ -335,11 +429,31 @@ predict(cast, In = {idle}, State = #client_statem_state{etsRef = EtsRef}) ->
Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX),
{next_state, waitforWorkers, State#client_statem_state{nextState = idle, waitforWorkers = Workers, etsRef = EtsRef}};
+predict(cast, In = {statistics}, State = #client_statem_state{ myName = MyName, etsRef = EtsRef}) ->
+ ets:update_counter(EtsRef, msgCounter, 1), % last param is increment value
+ ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)),
+ sendStatistics(EtsRef),
+ {next_state, predict, State};
+
predict(cast, EventContent, State = #client_statem_state{etsRef = EtsRef}) ->
ets:update_counter(EtsRef, msgCounter, 1),
ets:update_counter(EtsRef, infoIn, erts_debug:flat_size(EventContent)),
?LOG_WARNING("client predict ignored: ~p ~n",[EventContent]),
- {next_state, predict, State#client_statem_state{etsRef = EtsRef}}.
+ {next_state, predict, State#client_statem_state{etsRef = EtsRef}};
+
+predict(info, EventContent, State = #client_statem_state{myName = MyName,etsRef = EtsRef}) ->
+ ets:update_counter(EtsRef, msgCounter, 1),
+ ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)),
+ case EventContent of
+ {'DOWN',_Ref,process,Pid,_Reason}-> %worker down
+ [[WorkerName]] = ets:match(EtsRef,{'$1',Pid,'_','_','_'}),
+ %report worker down to main server
+ delete_worker(EtsRef,MyName,WorkerName); %delete worker from ets so client will not wait for it in the future
+
+ _Any-> ?LOG_INFO("client ~p got unexpected info: ~p~n",[MyName, EventContent])
+ %recevied unexpected messege, print to log/tell main server for debuging
+ end,
+ {next_state, predict, State#client_statem_state{etsRef = EtsRef}}.
%% @private
@@ -423,7 +537,7 @@ createWorkers(ClientName, EtsRef) ->
WorkerArgs = {WorkerName,ModelId,ModelType,ScalingMethod, LayerTypesList, LayersSizes,
LayersActivationFunctions, Optimizer, LossMethod, LearningRate, self(), CustomFunc, WorkerData},
- WorkerPid = workerGeneric:start_link(WorkerArgs),
+ WorkerPid = workerGeneric:start_monitor(WorkerArgs),
ets:insert(EtsRef, {WorkerName, WorkerPid, WorkerArgs, {0,0,0.0}, 0}),
WorkerName
@@ -444,24 +558,46 @@ updateTimingMap(EtsRef, WorkerName) when is_atom(WorkerName) ->
%% adding client c1=MsgNum,w1=...
sendStatistics(EtsRef)->
NerlnetGraph = ets:lookup_element(EtsRef, nerlnetGraph, ?ETS_KV_VAL_IDX),
- Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX),
+ [{deadWorkers , DeadWorkers}] = ets:lookup(EtsRef, deadWorkers),
+ Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX) ++ DeadWorkers,
TimingMap = [{WorkerKey,ets:lookup_element(EtsRef, WorkerKey, ?WORKER_TIMING_IDX)} || WorkerKey <- Workers],
MissedCounts = [{WorkerKey,ets:lookup_element(EtsRef, WorkerKey, ?WORKER_TRAIN_MISSED_IDX)} || WorkerKey <- Workers],
Counter = ets:lookup_element(EtsRef, msgCounter, ?ETS_KV_VAL_IDX),
InfoSize = ets:lookup_element(EtsRef, infoIn, ?ETS_KV_VAL_IDX),
MyName = ets:lookup_element(EtsRef, myName, ?ETS_KV_VAL_IDX),
-
+ % io:format("Timing Map is ~p~n",[TimingMap]),
TimingStats = lists:flatten([atom_to_list(WorkerName)++"_Train_Avg_Time="++float_to_list(TotalTime/TotalBatches,[{decimals, 3}])++","||{WorkerName,{_LastTime,TotalBatches,TotalTime}}<-TimingMap]),
MissingStats = lists:flatten([atom_to_list(WorkerName)++"_Train_Miss="++integer_to_list(MissCount)++","||{WorkerName,MissCount}<-MissedCounts]),
MyStats = atom_to_list(MyName)++"_Msg_Count="++integer_to_list(Counter)++","++atom_to_list(MyName)++"_info_Size="++integer_to_list(InfoSize)++",",
-
+ MyDeadWorkers=lists:flatten([" "++atom_to_list(WorkerName)||WorkerName<-DeadWorkers]),
{RouterHost,RouterPort} = nerl_tools:getShortPath(MyName, ?MAIN_SERVER_ATOM, NerlnetGraph),
- nerl_tools:http_request(RouterHost,RouterPort,"statistics", list_to_binary(atom_to_list(MyName)++":"++MyStats++MissingStats++lists:droplast(TimingStats))).
+ nerl_tools:http_request(RouterHost,RouterPort,"statistics", list_to_binary(atom_to_list(MyName)++":"++MyStats++MissingStats++lists:droplast(TimingStats)++"#"++MyDeadWorkers)).
cast_message_to_workers(EtsRef, Msg) ->
Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX),
+ case Workers of
+ [] -> io:format("No workers to send message to~n"),
+ gen_statem:cast(self(), {stateChange , [] , 0});
+ _ -> ok
+ end,
Func = fun(WorkerKey) ->
WorkerPid = ets:lookup_element(EtsRef, WorkerKey, ?WORKER_PID_IDX),
gen_statem:cast(WorkerPid, Msg)
end,
- lists:foreach(Func, Workers).
\ No newline at end of file
+ lists:foreach(Func, Workers).
+
+
+delete_worker(EtsRef,MyName,WorkerName)->
+ %activated if client detected that a worker stopped, will delete it from workersNames and move to deadWorkers so as not to wait for responsed from it in the future.
+ AliveList = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX),
+ DeadList = ets:lookup_element(EtsRef, deadWorkers, ?ETS_KV_VAL_IDX),
+ ets:delete(EtsRef,workersNames),
+ ets:delete(EtsRef,deadWorkers),
+ ets:insert(EtsRef, {workersNames, AliveList -- [WorkerName]}),
+ ets:insert(EtsRef, {deadWorkers, DeadList ++ [WorkerName]}),
+ NerlGraph = ets:lookup_element(EtsRef, nerlnetGraph, ?ETS_KV_VAL_IDX),
+ {Host , Port} = nerl_tools:getShortPath(MyName, ?MAIN_SERVER_ATOM, NerlGraph),
+ nerl_tools:http_request(Host,Port,"worker_down",list_to_binary(atom_to_list(MyName) ++ "-" ++ atom_to_list(WorkerName))),
+ ?LOG_WARNING("Worker ~p is down, deleting it from client ~p~n",[WorkerName,MyName]),
+ %% TODO Verify NIF active_model_id list
+ EtsRef.
\ No newline at end of file
diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/actionHandler.erl b/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/actionHandler.erl
index adf2b06f..9adb9bb9 100644
--- a/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/actionHandler.erl
+++ b/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/actionHandler.erl
@@ -25,7 +25,9 @@ init(Req0, [Action, Main_genserver_Pid]) ->
predictRes -> gen_statem:cast(Main_genserver_Pid, {predictRes,Body});
statistics -> gen_statem:cast(Main_genserver_Pid, {statistics,Body});
startCasting -> gen_statem:cast(Main_genserver_Pid, {startCasting,Body});
- stopCasting -> gen_statem:cast(Main_genserver_Pid, {stopCasting,Body})
+ stopCasting -> gen_statem:cast(Main_genserver_Pid, {stopCasting,Body});
+ %monitor
+ worker_down -> gen_statem:cast(Main_genserver_Pid, {worker_down,Body})
end,
Reply = io_lib:format("Body Received: ~p ~n ", [Body]),
Req = cowboy_req:reply(200,
diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/mainGenserver.erl b/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/mainGenserver.erl
index 09e84c24..7802900f 100644
--- a/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/mainGenserver.erl
+++ b/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/mainGenserver.erl
@@ -22,7 +22,7 @@
-define(SERVER, ?MODULE).
--record(main_genserver_state, {statisticsCounter = 0, myName, state, workersMap, clients, nerlnetGraph, sourcesCastingList = [], sourcesWaitingList = [], clientsWaitingList = [], statisticsMap, msgCounter = 0, batchSize}).
+-record(main_genserver_state, {statisticsCounter = 0, myName, state, workersMap, clients, nerlnetGraph, sourcesCastingList = [], sourcesWaitingList = [], clientsWaitingList = [], statisticsMap, msgCounter = 0, batchSize , etsRef}).
%%%===============================================================
%%% API
@@ -56,10 +56,10 @@ init({MyName,Clients,BatchSize,WorkersMap,NerlnetGraph}) ->
?LOG_NOTICE("Main Server is connected to: ~p~n",[ConnectedEntities]),
put(nerlnetGraph, NerlnetGraph),
% nerl_tools:start_connection([digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:out_neighbours(NerlnetGraph,MyName)]),
-
+ EtsRef = ets:new(main_server_data , [set]) , %% ! In some point the whole record will replaced by ets
NewStatisticsMap = getNewStatisticsMap([digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:vertices(NerlnetGraph)--?LIST_OF_SPECIAL_SERVERS]),
% io:format("New StatisticsMap = ~p~n",[NewStatisticsMap]),
- {ok, #main_genserver_state{myName = MyNameStr, workersMap = WorkersMap, batchSize = BatchSize, state=idle, clients = Clients, nerlnetGraph = NerlnetGraph, msgCounter = 1,statisticsMap = NewStatisticsMap}}.
+ {ok, #main_genserver_state{myName = MyNameStr, workersMap = WorkersMap, batchSize = BatchSize, state=idle, clients = Clients, nerlnetGraph = NerlnetGraph, msgCounter = 1,statisticsMap = NewStatisticsMap , etsRef = EtsRef}}.
%% @private
%% @doc Handling call messages
@@ -76,10 +76,11 @@ init({MyName,Clients,BatchSize,WorkersMap,NerlnetGraph}) ->
handle_call(getGraph, _From, State) ->
NerlGraph = State#main_genserver_state.nerlnetGraph,
FullNodes = [digraph:vertex(NerlGraph,Vertex) || Vertex <- digraph:vertices(NerlGraph)],
- %io:format("Full graph is: ~p~n", [FullNodes]),
- NodesList = [Entity++","++IP++","++integer_to_list(Port)++"#"||{Entity, {IP, Port}} <- FullNodes],
+ %%io:format("Full graph is: ~p~n", [FullNodes]),
+ NodesList = [atom_to_list(Entity)++","++binary_to_list(IP)++","++integer_to_list(Port)++"#"||{Entity, {IP, Port}} <- FullNodes],
+ %%io:format("graph nodes are: ~p~n", [NodesList]),
EdgesList = [digraph:edge(NerlGraph,Edge) || Edge <- digraph:edges(NerlGraph)],
- %io:format("graph edges are: ~p~n", [EdgesList]),
+ %%io:format("graph edges are: ~p~n", [EdgesList]),
Nodes = nodeString(NodesList),
Edges = edgeString(EdgesList),
@@ -100,11 +101,11 @@ nodeString([Node |NodeList]) -> nodeString(NodeList, Node).
nodeString([], Str) -> Str;
nodeString([Node |NodeList], Str)-> nodeString(NodeList, Node++Str).
-edgeString([Edge |EdgesList])-> {_ID, V1, V2, _Label} = Edge, edgeString(EdgesList, V1++"-"++V2).
+edgeString([Edge |EdgesList])-> {_ID, V1, V2, _Label} = Edge, edgeString(EdgesList, atom_to_list(V1)++"-"++atom_to_list(V2)).
edgeString([], Str)-> Str;
edgeString([Edge |EdgesList], Str)->
{_ID, V1, V2, _Label} = Edge,
- edgeString(EdgesList, V1++"-"++V2++","++Str).
+ edgeString(EdgesList, atom_to_list(V1)++"-"++atom_to_list(V2)++","++Str).
%% @private
%% @doc Handling cast messages
@@ -113,6 +114,11 @@ edgeString([Edge |EdgesList], Str)->
{noreply, NewState :: #main_genserver_state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #main_genserver_state{}}).
+handle_cast({saveUtility , Body} , State = #main_genserver_state{etsRef = EtsRef , msgCounter = MsgCounter}) ->
+ %gets called after a tool tried to connect, saves its addres in the ets.
+ ets:insert(EtsRef , Body),
+ {noreply, State#main_genserver_state{msgCounter = MsgCounter+1}};
+
handle_cast({initCSV, Source,SourceData}, State = #main_genserver_state{state = idle, myName = MyName, sourcesWaitingList = SourcesWaitingList,nerlnetGraph = NerlnetGraph,msgCounter = MsgCounter}) ->
%% send router http request, to rout this message to all sensors
@@ -158,33 +164,60 @@ handle_cast({clientsIdle}, State = #main_genserver_state{state = idle, myName =
{noreply, State#main_genserver_state{clientsWaitingList = ListOfClients,msgCounter = MsgCounter+1}};
%%%get Statistics from all Entities in the network
-handle_cast({statistics,Body}, State = #main_genserver_state{myName = MyName, statisticsCounter = StatisticsCounter, nerlnetGraph = NerlnetGraph,statisticsMap = StatisticsMap,msgCounter = MsgCounter}) ->
+handle_cast({statistics,Body}, State = #main_genserver_state{myName = MyName, statisticsCounter = StatisticsCounter, nerlnetGraph = NerlnetGraph,statisticsMap = StatisticsMap,msgCounter = MsgCounter , etsRef = EtsRef , state = CurrState}) ->
if Body == <<"getStatistics">> -> %% initial message from APIServer, get stats from entities
-
NewStatisticsMap = getNewStatisticsMap([digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:vertices(NerlnetGraph)--?LIST_OF_SPECIAL_SERVERS]),
- [findroutAndsendStatistics(MyName, Name)||{Name,_Counter}<-maps:to_list(StatisticsMap)],
- NewState = State#main_genserver_state{msgCounter = MsgCounter+1,statisticsMap = NewStatisticsMap, statisticsCounter = length(maps:to_list(StatisticsMap))};
-
+ [findroutAndsendStatistics(MyName, Name)||{Name,_Counter}<-maps:to_list(NewStatisticsMap)],
+ NewState = State#main_genserver_state{msgCounter = MsgCounter+1,statisticsMap = NewStatisticsMap, statisticsCounter = length(maps:to_list(NewStatisticsMap))};
Body == <<>> -> io:format("in Statistcs, State has: StatsCount=~p, MsgCount=~p~n", [StatisticsCounter, MsgCounter]), NewState = State;
true ->
%% statistics arrived from Entity
- [From|[NewCounter]] = re:split(binary_to_list(Body), ":", [{return, list}]),
-
- NewStatisticsMap = maps:put(From,NewCounter,StatisticsMap),
+ CheckForDead=re:split(binary_to_list(Body), "#", [{return, list}]),
+ case CheckForDead of
+ [_]->
+ Data=CheckForDead,
+ UpdateStatMap=StatisticsMap; %entity is not a client
+ [Data,[]]->
+ UpdateStatMap=StatisticsMap;%entity is client without dead workers
+ [Data,DeadWorkers]-> %client with dead workers
+ case maps:is_key("Dead workers",StatisticsMap) of
+ true->
+ MapWithKey=StatisticsMap; %key was already put in
+ false->
+ MapWithKey=maps:put("Dead workers","",StatisticsMap)
+ end,
+ UpdateStatMap=maps:put("Dead workers",maps:get("Dead workers",MapWithKey)++DeadWorkers++" ",MapWithKey)
+ end,
+ [From|[NewCounter]] = re:split(Data, ":", [{return, list}]),
+
+ NewStatisticsMap = maps:put(From,NewCounter,UpdateStatMap),
NewState = State#main_genserver_state{msgCounter = MsgCounter+1,statisticsMap = NewStatisticsMap,statisticsCounter = StatisticsCounter-1},
if StatisticsCounter == 1 -> %% got stats from all entities
Statistics = maps:to_list(NewStatisticsMap),
S = mapToString(Statistics,[]) ,
- ?LOG_NOTICE("Sending stats: ~p~n",[S]),
- {RouterHost,RouterPort} = nerl_tools:getShortPath(MyName,?API_SERVER_ATOM,NerlnetGraph),
- nerl_tools:http_request(RouterHost,RouterPort,"statistics", S ++ "|mainServer:" ++integer_to_list(MsgCounter));
-
- %% wait for more stats
- true -> pass end
- end,
+ %?LOG_NOTICE("Sending stats: ~p~n",[S]),
+ case CurrState of %statistics gets called in idle state by api at the end of the experiment or nerlMonitorin casting at the start of predict
+ idle ->
+ case ets:member(EtsRef,nerlMonitor) of
+ true->
+ [{nerlMonitor , IP , Port}] = ets:lookup(EtsRef , nerlMonitor),
+ nerl_tools:http_request(IP,list_to_integer(Port),"stats", S ++ "|mainServer:" ++integer_to_list(MsgCounter));
+ false -> ok
+ end,
+ {RouterHost,RouterPort} = nerl_tools:getShortPath(MyName,?API_SERVER_ATOM,NerlnetGraph),
+ nerl_tools:http_request(RouterHost,RouterPort,"statistics", S ++ "|mainServer:" ++integer_to_list(MsgCounter));
+ casting ->
+ [{_ , MonitorIP , MonitorPort}] = ets:lookup(EtsRef, nerlMonitor),
+ nerl_tools:http_request(MonitorIP,list_to_integer(MonitorPort),"stats", S ++ "|mainServer:" ++integer_to_list(MsgCounter))
+ end;
+
+ %% wait for more stats
+ true -> pass
+ end
+ end,
{noreply, NewState};
%%handle_cast({startPredicting}, State = #main_genserver_state{clients = ListOfClients, nerlnetGraph = NerlnetGraph}) ->
@@ -223,10 +256,17 @@ handle_cast({sourceAck,Body}, State = #main_genserver_state{sourcesWaitingList =
{noreply, State#main_genserver_state{sourcesWaitingList = NewWaitingList,msgCounter = MsgCounter+1}};
-handle_cast({clientAck,Body}, State = #main_genserver_state{clientsWaitingList = WaitingList,msgCounter = MsgCounter}) ->
- NewWaitingList = WaitingList--[list_to_atom(binary_to_list(Body))],
- % io:format("new Waiting List: ~p ~n",[NewWaitingList]),
- if length(NewWaitingList) == 0 -> ack();
+handle_cast({clientAck,Body}, State = #main_genserver_state{clientsWaitingList = WaitingList,msgCounter = MsgCounter , etsRef = EtsRef}) ->
+ ClientName = list_to_atom(binary_to_list(Body)),
+ NewWaitingList = WaitingList--[ClientName],
+ if length(NewWaitingList) == 0 ->
+ case ets:member(EtsRef , nerlMonitor) of %if NerlMonitor is up, initiate mid experiment statistics
+ true ->
+ ?LOG_INFO(?LOG_HEADER ++ "Sending statistics request"),
+ gen_server:cast(self(),{statistics , list_to_binary("getStatistics")});
+ _ -> ok
+ end,
+ ack();
true-> ok end,
{noreply, State#main_genserver_state{clientsWaitingList = NewWaitingList, msgCounter = MsgCounter+1}};
@@ -310,6 +350,26 @@ handle_cast({predictRes,Body}, State = #main_genserver_state{batchSize = BatchSi
end,
{noreply, State#main_genserver_state{msgCounter = MsgCounter+1}};
+handle_cast({worker_down,Body}, State = #main_genserver_state{msgCounter = MsgCounter,etsRef = EtsRef}) ->
+ % some worker terminated, print to log and notify NerlMonitor if it is up
+ case ets:member(EtsRef,nerlMonitor) of
+ true->
+ [{nerlMonitor , IP , Port}] = ets:lookup(EtsRef , nerlMonitor),
+ URL = "http://" ++ IP ++ ":" ++ Port ++ "/utilInfo",
+ httpc:request(post,{URL, [],"application/x-www-form-urlencoded",Body}, [], []);
+ false -> ok
+ end,
+ ?LOG_WARNING(?LOG_HEADER++"Worker down , ~p disconneted~n",[binary_to_list(Body)]),
+ {noreply, State#main_genserver_state{msgCounter = MsgCounter+1,etsRef=EtsRef}};
+
+handle_cast({worker_kill , WorkerName} , State = #main_genserver_state{workersMap = WorkersMap , msgCounter = MsgCounter}) ->
+ %got a user kill command for a worker from NerlMonitor
+ ?LOG_WARNING(?LOG_HEADER++"Killing worker ~p ~n",[WorkerName]),
+ WorkerNameAtom=binary_to_term(WorkerName),
+ ClientName = maps:get(WorkerNameAtom,WorkersMap),
+ Body = term_to_binary({ClientName, WorkerNameAtom}),
+ nerl_tools:sendHTTP(?MAIN_SERVER_ATOM , ClientName , atom_to_list(worker_kill) , Body),
+ {noreply, State#main_genserver_state{workersMap = WorkersMap , msgCounter = MsgCounter+1}};
handle_cast(Request, State = #main_genserver_state{}) ->
io:format("main server cast ignored: ~p~n",[Request]),
@@ -377,9 +437,9 @@ getNewStatisticsMap([{Name,{_Host, _Port}}|Tail],StatisticsMap) ->
getNewStatisticsMap(Tail,maps:put(atom_to_list(Name), 0, StatisticsMap)).
-startCasting([],_NumOfSampleToSend,_MyName, _NerlnetGraph)->done;
+startCasting([],_NumOfSampleToSend,_MyName, _NerlnetGraph) -> done;
startCasting([SourceName|SourceNames],NumOfSampleToSend, MyName, NerlnetGraph)->
- ?LOG_NOTICE("~p sending start casting command to: ~p",[MyName, SourceName]),
+ ?LOG_NOTICE("~p sending start casting command to: ~p~n",[MyName, SourceName]),
{RouterHost,RouterPort} = nerl_tools:getShortPath(MyName,SourceName,NerlnetGraph),
nerl_tools:http_request(RouterHost,RouterPort,"startCasting", SourceName++[","]++NumOfSampleToSend),
startCasting(SourceNames,NumOfSampleToSend, MyName, NerlnetGraph).
diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/Router/routerGenserver.erl b/src_erl/Communication_Layer/http_Nerlserver/src/Router/routerGenserver.erl
index e1da9edf..43ae44ba 100644
--- a/src_erl/Communication_Layer/http_Nerlserver/src/Router/routerGenserver.erl
+++ b/src_erl/Communication_Layer/http_Nerlserver/src/Router/routerGenserver.erl
@@ -176,6 +176,18 @@ handle_cast({getStats,_Body}, State = #router_genserver_state{myName = MyName,
nerl_tools:http_request(Host,Port,"routerStats",Mes),
{noreply, State#router_genserver_state{msgCounter = MsgCounter+1}};
+handle_cast({worker_kill , Body} , State = #router_genserver_state{msgCounter = MsgCounter, myName = MyName}) ->
+ io:format("Body is ~p~n",[binary_to_term(Body)]),
+ {ClientName , _} = binary_to_term(Body),
+ nerl_tools:sendHTTP(MyName, ClientName, "worker_kill", Body),
+ {noreply, State#router_genserver_state{msgCounter = MsgCounter+1}};
+
+
+%monitor
+handle_cast({worker_down,Body}, State = #router_genserver_state{myName = MyName, msgCounter = MsgCounter, nerlnetGraph = NerlnetGraph}) ->
+ nerl_tools:sendHTTP(MyName, ?MAIN_SERVER_ATOM, "worker_down", Body),
+ {noreply, State#router_genserver_state{msgCounter = MsgCounter+1}};
+
handle_cast(_Request, State = #router_genserver_state{msgCounter = MsgCounter }) ->
{noreply, State#router_genserver_state{msgCounter = MsgCounter+1}}.
diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/Router/routingHandler.erl b/src_erl/Communication_Layer/http_Nerlserver/src/Router/routingHandler.erl
index 4dd51b29..c391e481 100644
--- a/src_erl/Communication_Layer/http_Nerlserver/src/Router/routingHandler.erl
+++ b/src_erl/Communication_Layer/http_Nerlserver/src/Router/routingHandler.erl
@@ -74,7 +74,10 @@ init(Req0, State) ->
% gen_server:cast(Router_genserver_Pid, {federatedWeights,Body});
%%%%%%%%%%%%%%GUI actions
- getStats -> gen_server:cast(Router_genserver_Pid, {getStats,Body})
+ getStats -> gen_server:cast(Router_genserver_Pid, {getStats,Body});
+ %monitor
+ worker_down -> gen_server:cast(Router_genserver_Pid, {worker_down,Body});
+ worker_kill -> gen_server:cast(Router_genserver_Pid, {worker_kill,Body})
end,
Reply = io_lib:format(" ", []),
diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/nerlNetServer_app.erl b/src_erl/Communication_Layer/http_Nerlserver/src/nerlNetServer_app.erl
index 70e9454f..4d20239a 100644
--- a/src_erl/Communication_Layer/http_Nerlserver/src/nerlNetServer_app.erl
+++ b/src_erl/Communication_Layer/http_Nerlserver/src/nerlNetServer_app.erl
@@ -187,7 +187,8 @@ createClientsAndWorkers() ->
{"/clientTraining",clientStateHandler, [training,ClientStatemPid]},
{"/clientIdle",clientStateHandler, [idle,ClientStatemPid]},
{"/clientPredict",clientStateHandler, [predict,ClientStatemPid]},
- {"/weightsVector",clientStateHandler, [vector,ClientStatemPid]}
+ {"/weightsVector",clientStateHandler, [vector,ClientStatemPid]},
+ {"/worker_kill" , clientStateHandler , [worker_kill , ClientStatemPid]}
]}
]),
init_cowboy_start_clear(Client, {HostName, Port},NerlClientDispatch)
@@ -265,7 +266,10 @@ createRouters(MapOfRouters, HostName) ->
{"/federatedWeights",routingHandler, [federatedWeights,RouterGenServerPid]},
%%GUI actions
- {"/getStats",routingHandler, [getStats,RouterGenServerPid]}
+ {"/getStats",routingHandler, [getStats,RouterGenServerPid]},
+ %monitor actions
+ {"/worker_down",routingHandler, [worker_down,RouterGenServerPid]},
+ {"/worker_kill" , routingHandler , [worker_kill , RouterGenServerPid]}
]}
]),
%% cowboy:start_clear(Name, TransOpts, ProtoOpts) - an http_listener
@@ -304,7 +308,9 @@ createMainServer(true,BatchSize,HostName) ->
%GUI actions
{"/getGraph",[],guiHandler, [getGraph, MainGenServerPid]},
{"/getStats",[],guiHandler, [getStats, MainGenServerPid]},
-
+ {"/toolConnectionReq" , [] , utilities_handler , [MainGenServerPid]} , %% Added with NerlMonitor Project
+ {"/worker_kill" , [] , utilities_handler , [worker_kill , MainGenServerPid]},
+ {"/worker_down",actionHandler, [worker_down,MainGenServerPid]},
{"/[...]", [],noMatchingRouteHandler, [MainGenServerPid]}
]}
]),
diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/utilities_handler.erl b/src_erl/Communication_Layer/http_Nerlserver/src/utilities_handler.erl
new file mode 100644
index 00000000..fb1bf309
--- /dev/null
+++ b/src_erl/Communication_Layer/http_Nerlserver/src/utilities_handler.erl
@@ -0,0 +1,39 @@
+-module(utilities_handler).
+
+-define(DATA_IDX , 2).
+
+-export([init/2]).
+
+init(Req0 , [MainServerPid]) ->
+ %handler for a tool connection req
+ {_,Body,_} = cowboy_req:read_body(Req0),
+ [UtilityName , IP , Port] = binary_to_term(Body),
+ case UtilityName of
+ nerlMonitor ->
+ Graph = gen_server:call(MainServerPid , getGraph),
+ WorkersMap = ets:lookup_element(nerlnet_data , workers , ?DATA_IDX),
+ WorkersClients = maps:to_list(WorkersMap),
+ Workers = lists:flatten([atom_to_list(X)++"-"++atom_to_list(Y)++"!" || {X , Y} <- WorkersClients]),
+ Reply = Graph ++ "," ++ Workers
+ end,
+ gen_server:cast(MainServerPid , {saveUtility , {UtilityName , IP , Port}}),
+ Req = cowboy_req:reply(200,
+ #{<<"content-type">> => <<"text/plain">>},
+ Reply,
+ Req0),
+ {ok, Req, MainServerPid};
+
+init(Req0 , [Action , MainServerPid]) ->
+ %handler for a tool's requested action (not connection)
+ {_,Body,_} = cowboy_req:read_body(Req0),
+ case Action of
+ worker_kill ->
+ gen_statem:cast(MainServerPid , {worker_kill , Body});
+ _ -> ok
+ end,
+ Reply = io_lib:format("ACK", []),
+ Req = cowboy_req:reply(200,
+ #{<<"content-type">> => <<"text/plain">>},
+ Reply,
+ Req0),
+ {ok, Req, MainServerPid}.
diff --git a/src_erl/NerlGUI/src/graphScreen.erl b/src_erl/NerlGUI/src/graphScreen.erl
index 6254dd8f..97ef9cb2 100644
--- a/src_erl/NerlGUI/src/graphScreen.erl
+++ b/src_erl/NerlGUI/src/graphScreen.erl
@@ -67,13 +67,13 @@ init([Parent, Gen])->
io:format("got body: ~p~n", [Body]),
DevicesInfo = string:split(Body, "#", all),
- Devices = [string:split(DeviceInfo, ",", all) || DeviceInfo <- DevicesInfo, DevicesInfo /=[[]]],
+ Devices = [string:split(DeviceInfo, ",", all) || DeviceInfo <- DevicesInfo, DevicesInfo /= [[]]],
Edges = lists:droplast(lists:last(Devices)),
io:format("got graph: ~p~n", [Devices]),
DeviceList = lists:droplast(Devices),
- {FileName, G} = gui_tools:makeGraphIMG(DeviceList, Edges),
+ {FileName, G} = gui_tools:makeGraphIMG(DeviceList , Edges),
mainScreen:updateGraph(Gen, gui_tools:serialize(G)),
mainScreen:addInfo(Gen, "updated graph"),
diff --git a/src_erl/NerlMonitor/.gitignore b/src_erl/NerlMonitor/.gitignore
new file mode 100644
index 00000000..df53f7d9
--- /dev/null
+++ b/src_erl/NerlMonitor/.gitignore
@@ -0,0 +1,20 @@
+.rebar3
+_build
+_checkouts
+_vendor
+.eunit
+*.o
+*.beam
+*.plt
+*.swp
+*.swo
+.erlang.cookie
+ebin
+log
+erl_crash.dump
+.rebar
+logs
+.idea
+*.iml
+rebar3.crashdump
+*~
diff --git a/src_erl/NerlMonitor/README.md b/src_erl/NerlMonitor/README.md
new file mode 100644
index 00000000..bb7f9928
--- /dev/null
+++ b/src_erl/NerlMonitor/README.md
@@ -0,0 +1,41 @@
+NerlMonitor
+=====
+NerlMonitor is an external tool that helps NErlNet users get more knowledge on the experiment flow and also gain the ability to terminate workers mid-experiment. This can help in monitoring your model behavior for different kinds of failures. The app is also used to gain various statistics.
+
+
+# Dependencies
+`pip install` these libraries (In addition to the src_py/requirements.txt file):
+- NetowrkX
+- PySimpleGUI
+- PyGraphviz
+- Nest-Asyncio
+
+Also, install **Pyrlang** and **Term** libraries for Python-Erlang communication (follow their instructions **carefully**):
+
+Pyrlang - https://github.com/Pyrlang/Pyrlang
+
+Term - https://github.com/Pyrlang/Term
+
+# Run The App
+Run `./NerlnetMonitor.sh` script from a different shell (make sure you're using the same Python virtual environment where you installed all dependencies)
+
+# Demo
+Youtube Video Demo: https://youtu.be/X5RHLUTqBWk
+
+https://github.com/leondavi/NErlNet/assets/79912473/4e69ad09-3a07-436e-9741-84a64baa4e47
+
+When running the app:
+1. Start up screen:
+
+
+
+2. Main Server is up:
+
+
+
+3. Worker termination:
+
+
+
+
+
diff --git a/src_erl/NerlMonitor/rebar.config b/src_erl/NerlMonitor/rebar.config
new file mode 100644
index 00000000..cc4df178
--- /dev/null
+++ b/src_erl/NerlMonitor/rebar.config
@@ -0,0 +1,9 @@
+{erl_opts, [debug_info]}.
+{deps, [
+ {cowboy, {git, "https://github.com/ninenines/cowboy.git" , {tag,"2.9.0"}}}
+]}.
+
+{shell, [
+ % {config, "config/sys.config"},
+ {apps, [nerlMonitor]}
+]}.
diff --git a/src_erl/NerlMonitor/src/MonitorGUI.py b/src_erl/NerlMonitor/src/MonitorGUI.py
new file mode 100644
index 00000000..27cde3ab
--- /dev/null
+++ b/src_erl/NerlMonitor/src/MonitorGUI.py
@@ -0,0 +1,305 @@
+from term import Atom
+from pyrlang.node import Node
+from pyrlang.process import Process
+import PySimpleGUI as sg
+import multiprocessing
+from time import sleep
+import networkx as nx
+import matplotlib.pyplot as plt
+from datetime import datetime
+import os
+import math
+import asyncio
+import nest_asyncio
+nest_asyncio.apply()
+
+
+class MyProcess(Process):
+ def __init__(self , msg_queue) -> None:
+ Process.__init__(self)
+ self.get_node().register_name(self, Atom('PyrlangProcess'))
+ self.msg_queue = msg_queue
+
+
+ def handle_one_inbox_message(self, msg):
+ print(f'From ErlProcess: {msg}')
+ if msg[0] == Atom('send'):
+ print(f'From ErlProcess: {msg[1]}')
+ else:
+ self.msg_queue.put_nowait(msg)
+ if not self.msg_queue.empty():
+ print(f'Queue is not Empty: {msg} added.')
+
+
+
+
+def draw_gradient(canvas, start_color, end_color):
+ for y in range(0, 200): # Adjust the range to your desired height
+ r = start_color[0] + (end_color[0] - start_color[0]) * y / 200
+ g = start_color[1] + (end_color[1] - start_color[1]) * y / 200
+ b = start_color[2] + (end_color[2] - start_color[2]) * y / 200
+ color = f'#{int(r):02x}{int(g):02x}{int(b):02x}'
+ canvas.TKCanvas.create_line(0, y, 200, y, fill=color)
+
+
+Msg_log = []
+
+DataColumn = [
+ [sg.Frame(title="Event Log:" ,
+ layout=[[sg.Multiline('', size=(140, 60), key='-LOG-', autoscroll=True , font=('SFPro' , 12) , no_scrollbar=True)]],
+ background_color=('#A90433') , font=('SFPro' , 20) , size=(500,325) , title_color='White' , element_justification='right')
+ ] ,
+ [sg.Frame(title="Statistics:" ,
+ layout=[[sg.Multiline('', size=(140, 60), key='-STATS-', autoscroll=True , font=('SFPro' , 12) , no_scrollbar=True)]],
+ background_color=('#A90433') , font=('SFPro' , 20) , size=(500,325) , title_color='White' , element_justification='right')
+ ]
+ ]
+
+GraphColumn = [
+ [ sg.Text("Waiting For\n NerlNet Graph..." , key='-PHOLD-', text_color='White' , font=('SFPro' , 12) , size=(70,5) , background_color='#A90433' , justification='center' , pad=(0,0)) ,
+ sg.Image(key='-IMAGE-' , visible=False)
+ ],
+ [
+ sg.Text("Enter the name of the worker you wish to terminate:" ,key='-INTEXT-', size=(42,1) ,text_color='white' , font=('SFPro' , 12) , background_color='#A90433' , justification='left' , pad=(0,0) , visible=False) ,
+ sg.Input('',key='-INPUT-' , visible=False , justification='left' , size=(20,1) , font=('SFPro' , 12) , background_color='white' , text_color='black' , pad=(0,0) , enable_events=True),
+ sg.Button(button_text="Terminate" , button_color=('#A90433' , '#FFFFFF') , font=('SFPro' , 12) , size=(10,1) , pad=(0,0) , visible=False, key='-TERM-', enable_events=True)
+ ]
+ ]
+
+layout = [
+ [
+ sg.Text("NerlNet Monitor" , key='-TEXT-' , size=(30,1) ,text_color='White' , font=('SFPro' , 20) , background_color='#A90433' , justification='center' , pad=(0,0))
+ ] ,
+ [ sg.Column(DataColumn , background_color='#A90433') ,
+ sg.VSeperator() ,
+ sg.Column(GraphColumn , background_color='#A90433')
+ ] ,
+ [
+ sg.Button(button_text="Close" , button_color=('#A90433' , '#FFFFFF') , font=('SFPro' , 12) , size=(5,2)),
+ sg.Button(button_text="Clear Log" , button_color=('#A90433' , '#FFFFFF') , font=('SFPro' , 12) , size=(5,2))
+ ]
+
+ ]
+
+MainWindow = sg.Window("NErlNet" , layout , margins=(5,5) , size=(1400,800) , background_color='#A90433' , finalize=True , resizable=True , element_justification='c' , icon='../../../NerlnetLogo.ico')
+
+def RemoteRecv():
+ return Atom('erl@127.0.0.1') , Atom("recvPyrlang")
+
+def formatted_time():
+ return f'[{datetime.now().day}/{datetime.now().month}/{datetime.now().year}|{datetime.now().hour}:{datetime.now().minute}:{datetime.now().second}]'
+
+# def SendMsg(Msg):
+# SendNode = Node(node_name='pysend@127.0.0.1' , cookie='COOKIE')
+# SendProc = MyProcess()
+# event_loop = SendNode.get_loop()
+
+# print(SendNode.where_is(Atom('recvPyrlang')))
+
+# def task():
+# SendNode.send_nowait(sender = SendProc.pid_ , receiver = RemoteRecv() , message = (Atom('send'),Atom(Msg)))
+# SendNode.destroy()
+
+# event_loop.call_soon(task)
+
+# SendNode.run()
+
+async def GUI(msg_queue):
+ print("GUI task started...")
+ PyNode , CommProc = await msg_queue.get()
+ print("Got Message from queue")
+ print(msg_queue.empty())
+ print(f"Got PyNode and CommProc from Queue.")
+ StatsInfo = {"workers": {} , "clients": {}}
+ while True:
+ await asyncio.sleep(.01)
+ event , values = MainWindow.read(timeout=100)
+ existing_text = values['-LOG-']
+ updated_text = ''
+ if event == "Close" or event == sg.WIN_CLOSED:
+ PyNode.send_nowait(sender = CommProc.pid_ , receiver = RemoteRecv() , message = (Atom('close')))
+ await asyncio.sleep(.2)
+ os.kill(os.getpid() , 9)
+ print("GUI Closed.")
+ break
+ elif event == "Clear Log":
+ ShowStats(StatsInfo)
+ MainWindow['-LOG-'].update('')
+ elif event == "-TERM-":
+ Workers = [Graph.nodes[node]['label'] for node in Graph.nodes() if Graph.nodes[node]['label'][0] == 'w' and node_colors[node] != 'gray']
+ if values['-INPUT-'] not in Workers:
+ updated_text = f'{existing_text}\n{formatted_time()}: Invalid Worker Name {values["-INPUT-"]} , Available Workers: {Workers}.'
+ MainWindow['-LOG-'].update(updated_text)
+ else:
+ Workers.remove(values['-INPUT-'])
+ node_colors[values['-INPUT-']] = 'gray'
+ nx.set_node_attributes(Graph, node_colors, 'color')
+ colors = nx.get_node_attributes(Graph, 'color').values()
+ pos = nx.nx_agraph.graphviz_layout(Graph, prog='dot')
+ plt.figure(figsize=(8,6))
+ nx.draw_networkx(Graph, pos, with_labels=True, node_color=colors , node_size=200, font_size=8, font_color='white' , edge_color='black' , width=1.5)
+ plt.savefig('NerlNetGraph.png' ,bbox_inches='tight' , dpi=125)
+ plt.close()
+ MainWindow['-IMAGE-'].update(filename='NerlNetGraph.png' , visible=True , size=(800,600))
+
+ updated_text = f'{existing_text}\n{formatted_time()}: Sending termination message for {values["-INPUT-"]} to Main Server.'
+ PyNode.send_nowait(sender = CommProc.pid_ , receiver = RemoteRecv() , message = (Atom('terminate'),Atom(f'{values["-INPUT-"]}')))
+
+
+ MainWindow['-LOG-'].update(updated_text)
+ MainWindow['-INPUT-'].update('')
+ if not msg_queue.empty():
+ msg = msg_queue.get_nowait()
+ if msg[0] == 'graph':
+ Graph , node_colors = Show_Nerlnet_Graph(msg[1])
+ MainWindow['-PHOLD-'].update(visible=False)
+ MainWindow['-IMAGE-'].update(filename='NerlNetGraph.png' , visible=True , size=(800,600))
+ MainWindow['-LOG-'].update(f'{formatted_time()}: NerlNet Graph Received.')
+ MainWindow['-INTEXT-'].update(visible=True)
+ MainWindow['-INPUT-'].update(visible=True)
+ MainWindow['-TERM-'].update(visible=True)
+ elif msg[0] == 'update':
+ ClientName , WorkerName = msg[1].split('-')
+
+ node_colors[WorkerName] = 'gray'
+ #node_colors[ClientName] = 'gray'
+ nx.set_node_attributes(Graph, node_colors, 'color')
+ colors = nx.get_node_attributes(Graph, 'color').values()
+
+ pos = nx.nx_agraph.graphviz_layout(Graph, prog='dot')
+ angle = 100
+ #rotated_pos = {node: (x*math.cos(angle) -y*math.sin(angle), x*math.sin(angle) + y*math.cos(angle)) for node, (x, y) in pos.items()}
+
+ plt.figure(figsize=(8,6))
+ nx.draw_networkx(Graph, pos, with_labels=True, node_color=colors , node_size=200, font_size=8, font_color='white' , edge_color='black' , width=1.5)
+ plt.savefig('NerlNetGraph.png' ,bbox_inches='tight' , dpi=125)
+ plt.close()
+ MainWindow['-IMAGE-'].update(filename='NerlNetGraph.png' , visible=True , size=(800,600))
+ if existing_text == '':
+ updated_text = f'{formatted_time()}: Worker {WorkerName} of Client {ClientName} is down.'
+ else:
+ Workers = [Graph.nodes[node]['label'] for node in Graph.nodes() if Graph.nodes[node]['label'][0] == 'w' and node_colors[node] != 'gray']
+ updated_text = f'{existing_text}\n{formatted_time()}: Worker {WorkerName} of Client {ClientName} is down , Available workers: {Workers}'
+ MainWindow['-LOG-'].update(updated_text)
+
+ elif msg[0] == 'stats':
+ try:
+ Data = msg[1]
+ for items in str(Data).split('|'):
+ Entity, val = items.split(':')
+ if '=' in val:
+ for EntityStat in val.split(','):
+ Stat, Result = EntityStat.split('=')
+ if "Train" in Stat:
+ StatsInfo["workers"][Stat] = Result
+ else:
+ StatsInfo["clients"][Stat] = Result
+ else:
+ StatsInfo[Entity] = val # Messages for entities other than clients/workers
+ CurrentStats = StatsInfo.copy() # copy the stats to a new variable
+ StatsText = ShowStats(CurrentStats)
+ existing_stats = values['-STATS-']
+ if existing_stats != '':
+ MainWindow['-STATS-'].update(f'{existing_stats}\n{StatsText}')
+ else:
+ MainWindow['-STATS-'].update(StatsText)
+ existing_text = values['-LOG-']
+ MainWindow['-LOG-'].update(f'{existing_text}\n{formatted_time()}: Statistics Received.')
+ except Exception as err:
+ MainWindow['-LOG-'].update(f"Error in Stats {err} , Got {StatsInfo}")
+
+
+ elif values['-LOG-'] != '':
+ existing_text = values['-LOG-']
+ updated_text = f'{existing_text}\n{formatted_time()}: {msg}'
+ else:
+ updated_text = f'{formatted_time()}: {msg}'
+ if updated_text != '':
+ MainWindow['-LOG-'].update(updated_text)
+
+
+
+ MainWindow.close()
+
+
+def ShowStats(CurrentStats):
+ MainWindow['-LOG-'].update(f'{formatted_time()}: Printing Statistics...')
+ StatsText = ''
+ for key in CurrentStats:
+ if key == 'workers':
+ StatsText += f'Workers:\n'
+ for stat in CurrentStats[key]:
+ if "Time" in stat:
+ StatsText += f'\t{stat.replace("_Train_" , " Working ")}: {CurrentStats[key][stat]} seconds\n'
+ else:
+ StatsText += f'\t{stat.replace("_" , " ")}: {CurrentStats[key][stat]}\n'
+ elif key == 'clients':
+ StatsText += f'Clients:\n'
+ for stat in CurrentStats[key]:
+ if "info" in stat:
+ StatsText += f'\t{stat.replace("_info_" , " Info ")}: {CurrentStats[key][stat]} bytes\n'
+ else:
+ StatsText += f'\t{stat.replace("_Msg_" , " Message ")}: {CurrentStats[key][stat]}\n'
+ elif key == 'Dead workers':
+ StatsText += f'Dead Workers are:{CurrentStats[key]}\n'
+ else:
+ StatsText += f'{key} Message Count: {CurrentStats[key]}\n'
+ return StatsText
+
+
+def Show_Nerlnet_Graph(NerlGraph):
+ # Graph in string format: "Entity1Name,Entity1IP,Entity1Port#Entity2Name,Entity2IP,Entity2Port#Entity1Name-Entity2Name,Entity2Name-Entity1Name#Worker1-Client1#Worker2-Client2" etc.
+ # Workers in string format: "Worker1-Client1,Worker2-Client1,Worker3-Client2" etc.
+ # Node is defined by a triplet 'Name,IP,Port' seperated by '#'
+ # Edge is defined by a string 'Entity1-Entity2' seperated by ','
+ Nodes = NerlGraph.split('#')[0:-1]
+ Edges = NerlGraph.split('#')[-1].split(',')[0:-1]
+ Workers = NerlGraph.split('#')[-1].split(',')[-1].split('!')[0:-1]
+ WorkersNames = [Worker.split('-')[0] for Worker in Workers ]
+ Edges += Workers
+ EdgesSeperated = [(Edge.split('-')[0],Edge.split('-')[1]) for Edge in Edges if len(Edges) > 1] # ? What if no edges?
+ NodesNames = [NodeTriplet.split(',')[0] for NodeTriplet in Nodes]
+ NodesNames += WorkersNames
+
+ NodeWithLabels = [(NodeName , {'label' : NodeName}) for NodeName in NodesNames]
+ graph = nx.Graph()
+ graph.add_nodes_from(NodeWithLabels)
+ graph.add_edges_from(EdgesSeperated)
+
+ my_labels = {'mainServer': 'mS' , 'apiServer': 'aS'}
+
+ nx.relabel_nodes(graph, my_labels , copy=False)
+ default_colors = {node:'#A90433' for node in graph.nodes()}
+ node_colors = {node:default_colors[node] for node in graph.nodes()}
+ nx.set_node_attributes(graph, node_colors, 'color')
+ colors = nx.get_node_attributes(graph, 'color').values()
+
+ pos = nx.nx_agraph.graphviz_layout(graph, prog='dot')
+ angle = 100
+ #rotated_pos = {node: (x*math.cos(angle) -y*math.sin(angle), x*math.sin(angle) + y*math.cos(angle)) for node, (x, y) in pos.items()}
+ plt.figure(figsize=(8,6))
+ nx.draw_networkx(graph, pos, with_labels=True, node_color=colors , node_size=200, font_size=8, font_color='white' , edge_color='black' , width=1.5)
+ plt.savefig('NerlNetGraph.png' ,bbox_inches='tight' , dpi=125)
+ plt.close()
+ return graph , node_colors
+
+
+
+async def Pyrlang(msg_queue):
+ print("Pyrlang task started...")
+ PyNode = Node(node_name="py@127.0.0.1" , cookie="COOKIE")
+ CommProc = MyProcess(msg_queue=msg_queue)
+ msg_queue.put_nowait((PyNode , CommProc))
+ print("Pyrlang task finished.")
+ PyNode.run()
+
+async def main_func():
+ msg_queue = asyncio.Queue()
+ await asyncio.gather(GUI(msg_queue) , Pyrlang(msg_queue))
+
+if __name__ == "__main__":
+ asyncio.run(main_func())
+
+
+
+
diff --git a/src_erl/NerlMonitor/src/handlers/nerlMonitor_handler.erl b/src_erl/NerlMonitor/src/handlers/nerlMonitor_handler.erl
new file mode 100644
index 00000000..59f19934
--- /dev/null
+++ b/src_erl/NerlMonitor/src/handlers/nerlMonitor_handler.erl
@@ -0,0 +1,22 @@
+-module(nerlMonitor_handler).
+
+-export([init/2]).
+
+-define(GUI , {'PyrlangProcess' , 'py@127.0.0.1'}).
+
+init(Req0, [Msg]) ->
+ {_,Body,_} = cowboy_req:read_body(Req0),
+ Data = binary_to_list(Body),
+ case Msg of
+ utilInfo -> ?GUI ! {update ,Data};
+ stats -> ?GUI ! {stats ,Data};
+ _ ->
+ ok % got unknown messge, ignore.
+ end,
+
+ Req = cowboy_req:reply(200,
+ #{<<"content-type">> => <<"text/plain">>},
+ <<"Got that">>,
+ Req0),
+ {ok, Req, Msg}.
+
diff --git a/src_erl/NerlMonitor/src/nerlMonitor.app.src b/src_erl/NerlMonitor/src/nerlMonitor.app.src
new file mode 100644
index 00000000..3625d754
--- /dev/null
+++ b/src_erl/NerlMonitor/src/nerlMonitor.app.src
@@ -0,0 +1,15 @@
+{application, nerlMonitor,
+ [{description, "An OTP application"},
+ {vsn, "0.1.0"},
+ {registered, []},
+ {mod, {nerlMonitor_app, []}},
+ {applications,
+ [kernel,
+ stdlib
+ ]},
+ {env,[]},
+ {modules, []},
+
+ {licenses, ["Apache-2.0"]},
+ {links, []}
+ ]}.
diff --git a/src_erl/NerlMonitor/src/nerlMonitor_app.erl b/src_erl/NerlMonitor/src/nerlMonitor_app.erl
new file mode 100644
index 00000000..6c2f54e3
--- /dev/null
+++ b/src_erl/NerlMonitor/src/nerlMonitor_app.erl
@@ -0,0 +1,91 @@
+%%%-------------------------------------------------------------------
+%% @doc nerlGUI public API
+%% @end
+%%%-------------------------------------------------------------------
+
+-module(nerlMonitor_app).
+
+-behaviour(application).
+
+-include("../../Communication_Layer/http_Nerlserver/src/nerl_tools.hrl").
+
+-export([start/2, stop/1 , link_GUI/0]).
+
+-define(UTILNAME,nerlMonitor).
+-define(IP , "192.168.64.7").
+-define(PORT, 8096).
+-define(MSADDRES,"192.168.64.7:8080" ).
+-define(GUI , {'PyrlangProcess' , 'py@127.0.0.1'}). % Erlang node should be long name to communicate with pyrlang node
+
+start(_StartType, _StartArgs) ->
+ application:start(sasl),
+ application:start(ranch),
+ application:start(inets),
+
+ Dispatch = cowboy_router:compile([
+ {'_', [
+ {"/utilInfo",nerlMonitor_handler, [utilInfo]},
+ {"/stats" , nerlMonitor_handler , [stats]}
+
+ ]}
+ ]),
+ {ok, _} = cowboy:start_clear(?UTILNAME,[{port, ?PORT}],#{env => #{dispatch => Dispatch}}),
+ io:format("nerlMonitor started , opening GUI...~n"),
+ erlang:register(recvPyrlang , self()),
+ _GUI_PID = spawn_link(?MODULE , link_GUI , []) , %% PyrlangNode: ('PyralngProcess' , 'py@127.0.0.1' , 'COOKIE') , sending message by: 'GUI ! HELLO.'
+ URL = "http://" ++ ?MSADDRES ++ "/toolConnectionReq",
+ mainServerPing(URL,term_to_binary([?UTILNAME , ?IP , integer_to_list(?PORT)])), %% TODO How to "import" nerl_tools
+ nerlMonitor_sup:start_link().
+
+
+
+%ping main server in 0.5 sec intervals with connection request. will stop when got valid response.
+mainServerPing(URL,Body)->
+ io:format("pinging main server...~n"),
+ Response = httpc:request(post,{URL, [],"application/x-www-form-urlencoded",Body}, [], []),
+ case Response of
+ {error,_}->
+ timer:sleep(1000),
+ receive
+ close ->
+ io:format("Quitting NerlMonitor...~n"),
+ {ok , AppName} = application:get_application(),
+ stop(AppName)
+ after 0 ->
+ mainServerPing(URL,Body)
+ end;
+ {ok,{_ResCode, _Headers, Data}}->
+ io:format("Got NerlGraph , Sending to GUI...~n" , []),
+ ?GUI ! {graph , Data},
+ recvLoop();
+ {ok , _} ->
+ io:format("Got unknown response from main server~n")
+ end.
+
+
+recvLoop()-> %% MainServer replies with Nerlnet-Graph
+ receive
+ {terminate , WorkerName} ->
+ io:format("Got termination message for Worker ~p from GUI~n" , [WorkerName]),
+ URL = "http://" ++ ?MSADDRES ++ "/worker_kill",
+ Body = term_to_binary(WorkerName),
+ httpc:request(post,{URL, [],"application/x-www-form-urlencoded",Body}, [], []),
+ recvLoop();
+ close ->
+ io:format("Quitting NerlMonitor...~n"),
+ {ok , AppName} = application:get_application(),
+ stop(AppName);
+ Msg ->
+ io:format("Got unknown message from GUI: ~p~n", [Msg]),
+ recvLoop()
+ end.
+
+
+stop(_State) ->
+ ok.
+
+link_GUI() ->
+ os:cmd('python3 src/MonitorGUI.py'),
+ io:format("GUI Closed~n").
+
+
diff --git a/src_erl/NerlMonitor/src/nerlMonitor_sup.erl b/src_erl/NerlMonitor/src/nerlMonitor_sup.erl
new file mode 100644
index 00000000..02225cc6
--- /dev/null
+++ b/src_erl/NerlMonitor/src/nerlMonitor_sup.erl
@@ -0,0 +1,35 @@
+%%%-------------------------------------------------------------------
+%% @doc NerlMonitor top level supervisor.
+%% @end
+%%%-------------------------------------------------------------------
+
+-module(nerlMonitor_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+%% sup_flags() = #{strategy => strategy(), % optional
+%% intensity => non_neg_integer(), % optional
+%% period => pos_integer()} % optional
+%% child_spec() = #{id => child_id(), % mandatory
+%% start => mfargs(), % mandatory
+%% restart => restart(), % optional
+%% shutdown => shutdown(), % optional
+%% type => worker(), % optional
+%% modules => modules()} % optional
+init([]) ->
+ SupFlags = #{strategy => one_for_all,
+ intensity => 0,
+ period => 1},
+ ChildSpecs = [],
+ {ok, {SupFlags, ChildSpecs}}.
+
+%% internal functions
diff --git a/src_erl/NerlMonitor/src/requirements.txt b/src_erl/NerlMonitor/src/requirements.txt
new file mode 100644
index 00000000..b4bc4464
--- /dev/null
+++ b/src_erl/NerlMonitor/src/requirements.txt
@@ -0,0 +1,4 @@
+matplotlib==3.7.1
+nest-asyncio==1.5.7
+networkx==3.1
+PySimpleGUI==4.60.5
diff --git a/src_erl/erlBridge/workers/workerGeneric.erl b/src_erl/erlBridge/workers/workerGeneric.erl
index fb859377..dfb7cbaa 100644
--- a/src_erl/erlBridge/workers/workerGeneric.erl
+++ b/src_erl/erlBridge/workers/workerGeneric.erl
@@ -16,7 +16,7 @@
-behaviour(gen_statem).
%% API
--export([start_link/1]).
+-export([start_link/1,start_monitor/1]).
%% gen_statem callbacks
-export([init/1, format_status/2, state_name/3, handle_event/4, terminate/3,
code_change/4, callback_mode/0]).
@@ -37,6 +37,11 @@ start_link(ARGS) ->
{ok,Pid} = gen_statem:start_link(?MODULE, ARGS, []),
Pid.
+start_monitor(ARGS) ->
+ %{ok,Pid} = gen_statem:start_link({local, element(1, ARGS)}, ?MODULE, ARGS, []), %% name this machine by unique name
+ {ok,{Pid,_Ref}} = gen_statem:start_monitor(?MODULE, ARGS, []),
+ Pid.
+
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
@@ -104,7 +109,7 @@ handle_event(_EventType, _EventContent, _StateName, State = #workerGeneric_state
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(_Reason, _StateName, _State) ->
- ok.
+ nerlNIF:destroy_nif(ets:lookup_element(get(generic_worker_ets) , model_id , ?DATA_IDX)).
%% @private
%% @doc Convert process state when code is changed
diff --git a/src_py/apiServer/apiServer.py b/src_py/apiServer/apiServer.py
index ac6a4cad..2f7bb31f 100644
--- a/src_py/apiServer/apiServer.py
+++ b/src_py/apiServer/apiServer.py
@@ -413,11 +413,14 @@ def accuracy_matrix(self, expNum):
# print(f"worker {worker}, has {len(workerNeuronRes[worker][TRUE_LABLE_IND])} labels, with {len(workerNeuronRes[worker][TRUE_LABLE_IND][j])} samples")
# print(f"confusion {worker}:{j}, has is of {workerNeuronRes[worker][TRUE_LABLE_IND][j]}, {workerNeuronRes[worker][PRED_LABLE_IND][j]}")
confMatList[worker][j] = confusion_matrix(workerNeuronRes[worker][globe.TRUE_LABLE_IND][j], workerNeuronRes[worker][globe.PRED_LABLE_IND][j])
- # print(confMatList[worker][j])
disp = ConfusionMatrixDisplay(confMatList[worker][j], display_labels=["X", labelNames[j]])
+ if confMatList[worker][j].shape == (0,0): # ! Worker is down
+ disp = ConfusionMatrixDisplay(np.array([[0,0],[0,0]]), display_labels=["X", labelNames[j]])
+ workerNeuronRes[worker][globe.TRUE_LABLE_IND][j] = [0,0]
+ workerNeuronRes[worker][globe.PRED_LABLE_IND][j] = [1,1]
disp.plot(ax=axes[i, j], colorbar=False)
disp.ax_.set_title(f'{worker}, class #{j}\nAccuracy={round(accuracy_score(workerNeuronRes[worker][globe.TRUE_LABLE_IND][j], workerNeuronRes[worker][globe.PRED_LABLE_IND][j]), 3)}')
- if i < len(workersList) - 1:
+ if i < len(workersList) - 1: # ? Ask David why this is needed
disp.ax_.set_xlabel('') #remove "predicted label"
if j != 0:
disp.ax_.set_ylabel('') #remove "true label"
@@ -437,6 +440,8 @@ def accuracy_matrix(self, expNum):
statFile = open(statFileName, "a")
for worker in confMatList:
+ if confMatList[worker][0].shape == (0,0): # ! Worker is down
+ continue
for j, label in enumerate(confMatList[worker]):
# Calculate the accuracy and other stats:
tn, fp, fn, tp = label.ravel()