diff --git a/.gitignore b/.gitignore index 64abdb48..393cae9b 100644 --- a/.gitignore +++ b/.gitignore @@ -37,4 +37,5 @@ examples/.ipynb_checkpoints/simple_run-checkpoint.ipynb /Results arch.json conn.json -/inputDataDir \ No newline at end of file +/inputDataDir +/NerlNetGraph.png diff --git a/NerlnetMonitor.sh b/NerlnetMonitor.sh new file mode 100755 index 00000000..a95ee9d2 --- /dev/null +++ b/NerlnetMonitor.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +MONITOR_PATH="src_erl/NerlMonitor" +GUI_PATH="src_erl/NerlMonitor/src" + +echo "NerlnetMonitor Activated" + + +cd $MONITOR_PATH +rebar3 shell --name erl@127.0.0.1 --setcookie COOKIE + +cd ../../ diff --git a/README.md b/README.md index 970f588e..17ec3b35 100644 --- a/README.md +++ b/README.md @@ -76,4 +76,4 @@ Minimum Python version: 3.8 4. Run Jupyter notebook with ```jupyter-notebook``` and create a new notebook in the created dir from step 3. 5. Follow the example: https://github.com/leondavi/NErlNet/blob/master/examples/example_run.ipynb -Contact Email: nerlnet@outlook.com +Contact Email: leondavi@post.bgu.ac.il diff --git a/inputJsonFiles/Architecture/arch_1PCSIM6WorkerSynth.json b/inputJsonFiles/Architecture/arch_1PCSIM6WorkerSynth.json index 8d1bb185..58821811 100755 --- a/inputJsonFiles/Architecture/arch_1PCSIM6WorkerSynth.json +++ b/inputJsonFiles/Architecture/arch_1PCSIM6WorkerSynth.json @@ -6,13 +6,13 @@ }, "devices": [ { - "host": "192.168.0.108", + "host": "192.168.64.7", "entities": "mainServer,c1,c2,c3,c4,c5,c6,s1,r1,r2,r3,r4,r5,r6,apiServer" } ], "apiServer": { - "host": "192.168.0.108", + "host": "192.168.64.7", "port": "8095", "args": "" } @@ -26,7 +26,7 @@ , "mainServer": { - "host": "192.168.0.108", + "host": "192.168.64.7", "port": "8080", "args": "" } diff --git a/inputJsonFiles/experimentsFlow/exp_6Workers1SourceMNist.json b/inputJsonFiles/experimentsFlow/exp_6Workers1SourceMNist.json index 75b46199..3c15b57e 100644 --- a/inputJsonFiles/experimentsFlow/exp_6Workers1SourceMNist.json +++ b/inputJsonFiles/experimentsFlow/exp_6Workers1SourceMNist.json @@ -1,7 +1,7 @@ { "Features": 784, "Labels": ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"], - "CSV path": "mnist", + "CSV path": "mnist-o", "Batches per source": { "Training": 10000, diff --git a/src_cpp/opennn b/src_cpp/opennn index 4d6b9484..1de26c61 160000 --- a/src_cpp/opennn +++ b/src_cpp/opennn @@ -1 +1 @@ -Subproject commit 4d6b94848d5dd5356084a54f1b1be58b7cdc1a21 +Subproject commit 1de26c611fda2fe6cf026a0e4b329c7509a68042 diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStateHandler.erl b/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStateHandler.erl index e1945527..a9601a68 100644 --- a/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStateHandler.erl +++ b/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStateHandler.erl @@ -30,7 +30,8 @@ init(Req0, [Action,Client_StateM_Pid]) -> idle -> gen_statem:cast(Client_StateM_Pid,{idle}); training -> gen_statem:cast(Client_StateM_Pid,{training}); predict -> gen_statem:cast(Client_StateM_Pid,{predict}); - statistics -> gen_statem:cast(Client_StateM_Pid,{statistics}) + statistics -> gen_statem:cast(Client_StateM_Pid,{statistics}); + worker_kill -> gen_statem:cast(Client_StateM_Pid,{worker_kill , Body}) end, %% reply ACKnowledge to main server for initiating, later send finished initiating http_request from client_stateM diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStatem.erl b/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStatem.erl index 52459b6b..a1cb1674 100644 --- a/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStatem.erl +++ b/src_erl/Communication_Layer/http_Nerlserver/src/Client/clientStatem.erl @@ -20,7 +20,7 @@ code_change/4, callback_mode/0, idle/3, training/3,waitforWorkers/3]). --import(nerlNIF,[validate_nerltensor_erl/1]). +-import(nerlNIF,[validate_nerltensor_erl/1 , get_active_models_ids_list/0]). -define(ETS_KV_VAL_IDX, 2). % key value pairs --> value index is 2 -define(WORKER_PID_IDX, 2). @@ -78,7 +78,7 @@ init({MyName,NerlnetGraph, WorkersToClientsMap}) -> ets:insert(EtsRef, {msgCounter, 1}), ets:insert(EtsRef, {infoIn, 0}), ets:insert(EtsRef, {myName, MyName}), - + ets:insert(EtsRef , {deadWorkers , []}), createWorkers(MyName,EtsRef), %% send pre_idle signal to workers @@ -99,8 +99,7 @@ format_status(_Opt, [_PDict, _StateName, _State]) -> Status = some_term, Status. %% ==============STATES================= waitforWorkers(cast, In = {stateChange,WorkerName,MissedBatchesCount}, State = #client_statem_state{myName = MyName,waitforWorkers = WaitforWorkers,nextState = NextState, etsRef = EtsRef}) -> - NewWaitforWorkers = WaitforWorkers--[WorkerName], - % io:format("remaining workers = ~p~n",[NewWaitforWorkers]), + NewWaitforWorkers = WaitforWorkers -- [WorkerName], ets:update_counter(EtsRef, msgCounter, 1), % last is increment value ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), ets:update_element(EtsRef, WorkerName,[{?WORKER_TRAIN_MISSED_IDX,MissedBatchesCount}]), %% update missed batches count @@ -114,16 +113,43 @@ waitforWorkers(cast, In = {stateChange,WorkerName,MissedBatchesCount}, State = # waitforWorkers(cast, In = {NewState}, State = #client_statem_state{myName = MyName, etsRef = EtsRef}) -> ets:update_counter(EtsRef, msgCounter, 1), ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), - % ?LOG_INFO("~p in waiting going to state ~p~n",[MyName, State]), + %% ?LOG_INFO("~p in waiting going to state ~p~n",[MyName, State]), Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), cast_message_to_workers(EtsRef, {NewState}), {next_state, waitforWorkers, State#client_statem_state{nextState = NewState, waitforWorkers = Workers}}; +waitforWorkers(cast, In ={worker_kill , Body}, State = #client_statem_state{waitforWorkers = WaitforWorkers,etsRef = EtsRef}) -> + %got kill command for a worker, kiil and take off waitforWorkers list + ets:update_counter(EtsRef, msgCounter, 1), + ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + {_, WorkerName} = binary_to_term(Body), + Pid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX), + gen_statem:stop(Pid, shutdown , infinity), + NewWaitforWorkers = WaitforWorkers -- [WorkerName], + {next_state, waitforWorkers, State#client_statem_state{waitforWorkers = NewWaitforWorkers,etsRef = EtsRef}}; + waitforWorkers(cast, EventContent, State = #client_statem_state{etsRef = EtsRef}) -> ets:update_counter(EtsRef, msgCounter, 1), ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)), - ?LOG_WARNING("client waitforWorkers ignored!!!: ~p ~n",[EventContent]), - {next_state, waitforWorkers, State}. + ?LOG_WARNING("client waitforWorkers ignored!!!: ~p~n",[EventContent]), + {next_state, waitforWorkers, State}; + +waitforWorkers(info, EventContent, State = #client_statem_state{myName = MyName,waitforWorkers = WaitforWorkers ,etsRef = EtsRef}) -> + ets:update_counter(EtsRef, msgCounter, 1), + ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)), + case EventContent of + {'DOWN',_Ref,process,Pid,_Reason}-> %worker down + [[WorkerName]] = ets:match(EtsRef,{'$1',Pid,'_','_','_'}), + NewWaitforWorkers = WaitforWorkers--[WorkerName], + %report worker down to main server + delete_worker(EtsRef,MyName,WorkerName), + {next_state, waitforWorkers, State#client_statem_state{waitforWorkers = NewWaitforWorkers,etsRef = EtsRef}}; %delete worker from ets so client will not wait for it in the future + + _Any-> ?LOG_INFO("client ~p got unexpected info: ~p~n",[MyName, EventContent]), + %recevied unexpected messege, print to log/tell main server for debuging + {next_state, waitforWorkers, State#client_statem_state{etsRef = EtsRef}} + end. + %% initiating workers when they include federated workers. init stage == handshake between federated worker client and server @@ -149,18 +175,27 @@ idle(cast, In = {custom_worker_message, {From, To}}, State = #client_statem_stat {keep_state, State}; idle(cast, In = {statistics}, State = #client_statem_state{ myName = MyName, etsRef = EtsRef}) -> - sendStatistics(EtsRef), ets:update_counter(EtsRef, msgCounter, 1), % last param is increment value ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + sendStatistics(EtsRef), {next_state, idle, State}; idle(cast, In = {training}, State = #client_statem_state{etsRef = EtsRef}) -> + io:format("client going to state training~n",[]), ets:update_counter(EtsRef, msgCounter, 1), ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), MessageToCast = {training}, cast_message_to_workers(EtsRef, MessageToCast), {next_state, waitforWorkers, State#client_statem_state{waitforWorkers= ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), nextState = training}}; +idle(cast, In={worker_kill , Body}, State = #client_statem_state{etsRef = EtsRef}) -> + ets:update_counter(EtsRef, msgCounter, 1), + ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + {_, WorkerName} = binary_to_term(Body), + Pid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX), + gen_statem:stop(Pid, shutdown , infinity), + {next_state, idle, State#client_statem_state{etsRef = EtsRef}}; + idle(cast, In = {predict}, State = #client_statem_state{etsRef = EtsRef}) -> io:format("client going to state predict~n",[]), ets:update_counter(EtsRef, msgCounter, 1), @@ -173,7 +208,22 @@ idle(cast, EventContent, State = #client_statem_state{etsRef = EtsRef}) -> ets:update_counter(EtsRef, msgCounter, 1), ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)), io:format("client idle ignored!!!: ~p ~n",[EventContent]), - {next_state, training, State#client_statem_state{etsRef = EtsRef}}. + {next_state, training, State#client_statem_state{etsRef = EtsRef}}; + + +idle(info, EventContent, State = #client_statem_state{myName = MyName,etsRef = EtsRef}) -> + ets:update_counter(EtsRef, msgCounter, 1), + ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)), + case EventContent of + {'DOWN',_Ref,process,Pid,_Reason}-> %worker down + [[WorkerName]] = ets:match(EtsRef,{'$1',Pid,'_','_','_'}), + %report worker down to main server + delete_worker(EtsRef,MyName,WorkerName); %delete worker from ets so client will not wait for it in the future + + _Any-> ?LOG_INFO("client ~p got unexpected info: ~p~n",[MyName, EventContent]) + %recevied unexpected messege, print to log/tell main server for debuging + end, + {next_state, idle, State#client_statem_state{etsRef = EtsRef}}. %% passing vector from FedClient to FedServer training(cast, In = {update, {From, To, Data}}, State = #client_statem_state{etsRef = EtsRef, myName = MyName}) -> @@ -237,7 +287,8 @@ training(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef} NewTimingTuple = {Start,TotalBatches+1,TotalTime}, ets:update_element(EtsRef, WorkerName,[{?WORKER_PID_IDX, WorkerPid},{?WORKER_TIMING_IDX,NewTimingTuple}]), gen_statem:cast(WorkerPid, {sample, BatchOfSamples}); - true -> ?LOG_ERROR("Given worker ~p isn't found in client ~p",[WorkerName, ClientName]) end, + true -> ok %%'?'LOG_ERROR("Given worker ~p isn't found in client ~p",[WorkerName, ClientName]) + end, {next_state, training, State#client_statem_state{etsRef = EtsRef}}; training(cast, In = {idle}, State = #client_statem_state{myName = MyName, etsRef = EtsRef}) -> @@ -278,10 +329,47 @@ training(cast, In = {loss,WorkerName,LossFunction,_Time_NIF}, State = #client_st nerl_tools:http_request(RouterHost,RouterPort,"lossFunction", term_to_binary({WorkerName,LossFunction})), {next_state, training, State#client_statem_state{myName = MyName,etsRef = EtsRef}}; + +training(cast, In={worker_kill , Body}, State = #client_statem_state{etsRef = EtsRef}) -> + ets:update_counter(EtsRef, msgCounter, 1), + ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + {_, WorkerName} = binary_to_term(Body), + Pid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX), + gen_statem:stop(Pid, shutdown , infinity), + {next_state, training, State#client_statem_state{etsRef = EtsRef}}; + +% training(cast, In = {statistics}, State = #client_statem_state{ myName = MyName, etsRef = EtsRef}) -> +% io:format("client ~p got statistics ~n",[MyName]), +% ets:update_counter(EtsRef, msgCounter, 1), % last param is increment value +% ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), +% sendStatistics(EtsRef), +% {next_state, training , State}; + training(cast, EventContent, State = #client_statem_state{etsRef = EtsRef, myName = MyName}) -> - ?LOG_WARNING("client ~p training ignored!!!: ~p ~n!!!",[MyName, EventContent]), + case EventContent of % ! Not the best way to handle this + {statistics} -> + ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)), + {next_state, training, State#client_statem_state{etsRef = EtsRef}}; + _ -> + ?LOG_WARNING("client ~p training ignored!!!: ~p ~n!!!",[MyName, EventContent]), + ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)), + {next_state, training, State#client_statem_state{etsRef = EtsRef}} + end; + + +training(info, EventContent, State = #client_statem_state{myName = MyName,etsRef = EtsRef}) -> + ets:update_counter(EtsRef, msgCounter, 1), ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)), - {next_state, training, State#client_statem_state{etsRef = EtsRef}}. + case EventContent of + {'DOWN' , _Ref , process , Pid , _Reason}-> %worker down + [[WorkerName]] = ets:match(EtsRef,{'$1',Pid,'_','_','_'}), + %report worker down to main server + delete_worker(EtsRef,MyName,WorkerName); %delete worker from ets so client will not wait for it in the future + + _Any-> ?LOG_INFO("client ~p got unexpected info: ~p~n",[MyName, EventContent]) + %recevied unexpected messege, print to log/tell main server for debuging + end, + {next_state, training, State#client_statem_state{etsRef = EtsRef}}. predict(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef}) -> %% Body: ClientName#WorkerName#CSVName#BatchNumber#BatchOfSamples @@ -289,7 +377,6 @@ predict(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef}) ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), {ClientName, WorkerNameStr, CSVName, BatchNumber, BatchOfSamples} = binary_to_term(Body), WorkerName = list_to_atom(WorkerNameStr), - Start = os:timestamp(), WorkerOfThisClient = ets:member(EtsRef, WorkerName), if @@ -297,12 +384,11 @@ predict(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef}) TimingTuple = ets:lookup_element(EtsRef, WorkerName, ?WORKER_TIMING_IDX), %todo refactor timing map {_LastBatchReceivedTime,TotalBatches,TotalTime} = TimingTuple, NewTimingTuple = {Start,TotalBatches+1,TotalTime}, - ets:update_element(EtsRef, WorkerName,[{?WORKER_TIMING_IDX,NewTimingTuple}]); - true -> ?LOG_ERROR("Given worker ~p isn't found in client ~p",[WorkerName, ClientName]) + ets:update_element(EtsRef, WorkerName,[{?WORKER_TIMING_IDX,NewTimingTuple}]) , + WorkerPid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX), + gen_statem:cast(WorkerPid, {sample, CSVName, BatchNumber, BatchOfSamples}); + true -> ok %%'?'LOG_ERROR("Given worker ~p isn't found in client ~p",[WorkerName, ClientName]) end, - - WorkerPid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX), - gen_statem:cast(WorkerPid, {sample, CSVName, BatchNumber, BatchOfSamples}), {next_state, predict, State#client_statem_state{etsRef = EtsRef}}; %% TODO: add nif timing statistics @@ -316,6 +402,14 @@ predict(cast, In = {predictRes,WorkerName,InputName,ResultID,PredictNerlTensor, nerl_tools:http_request(RouterHost,RouterPort,"predictRes", term_to_binary({atom_to_list(WorkerName), InputName, ResultID, {PredictNerlTensor, Type}})), {next_state, predict, State#client_statem_state{etsRef = EtsRef}}; +predict(cast, In={worker_kill , Body}, State = #client_statem_state{etsRef = EtsRef}) -> + ets:update_counter(EtsRef, msgCounter, 1), + ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + {_, WorkerName} = binary_to_term(Body), + Pid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX), + gen_statem:stop(Pid, shutdown , infinity), + {next_state, predict, State#client_statem_state{etsRef = EtsRef}}; + % TODO from predict directly to training?!?!? predict(cast, In = {training}, State = #client_statem_state{etsRef = EtsRef}) -> ets:update_counter(EtsRef, msgCounter, 1), @@ -335,11 +429,31 @@ predict(cast, In = {idle}, State = #client_statem_state{etsRef = EtsRef}) -> Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), {next_state, waitforWorkers, State#client_statem_state{nextState = idle, waitforWorkers = Workers, etsRef = EtsRef}}; +predict(cast, In = {statistics}, State = #client_statem_state{ myName = MyName, etsRef = EtsRef}) -> + ets:update_counter(EtsRef, msgCounter, 1), % last param is increment value + ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + sendStatistics(EtsRef), + {next_state, predict, State}; + predict(cast, EventContent, State = #client_statem_state{etsRef = EtsRef}) -> ets:update_counter(EtsRef, msgCounter, 1), ets:update_counter(EtsRef, infoIn, erts_debug:flat_size(EventContent)), ?LOG_WARNING("client predict ignored: ~p ~n",[EventContent]), - {next_state, predict, State#client_statem_state{etsRef = EtsRef}}. + {next_state, predict, State#client_statem_state{etsRef = EtsRef}}; + +predict(info, EventContent, State = #client_statem_state{myName = MyName,etsRef = EtsRef}) -> + ets:update_counter(EtsRef, msgCounter, 1), + ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)), + case EventContent of + {'DOWN',_Ref,process,Pid,_Reason}-> %worker down + [[WorkerName]] = ets:match(EtsRef,{'$1',Pid,'_','_','_'}), + %report worker down to main server + delete_worker(EtsRef,MyName,WorkerName); %delete worker from ets so client will not wait for it in the future + + _Any-> ?LOG_INFO("client ~p got unexpected info: ~p~n",[MyName, EventContent]) + %recevied unexpected messege, print to log/tell main server for debuging + end, + {next_state, predict, State#client_statem_state{etsRef = EtsRef}}. %% @private @@ -423,7 +537,7 @@ createWorkers(ClientName, EtsRef) -> WorkerArgs = {WorkerName,ModelId,ModelType,ScalingMethod, LayerTypesList, LayersSizes, LayersActivationFunctions, Optimizer, LossMethod, LearningRate, self(), CustomFunc, WorkerData}, - WorkerPid = workerGeneric:start_link(WorkerArgs), + WorkerPid = workerGeneric:start_monitor(WorkerArgs), ets:insert(EtsRef, {WorkerName, WorkerPid, WorkerArgs, {0,0,0.0}, 0}), WorkerName @@ -444,24 +558,46 @@ updateTimingMap(EtsRef, WorkerName) when is_atom(WorkerName) -> %% adding client c1=MsgNum,w1=... sendStatistics(EtsRef)-> NerlnetGraph = ets:lookup_element(EtsRef, nerlnetGraph, ?ETS_KV_VAL_IDX), - Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), + [{deadWorkers , DeadWorkers}] = ets:lookup(EtsRef, deadWorkers), + Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX) ++ DeadWorkers, TimingMap = [{WorkerKey,ets:lookup_element(EtsRef, WorkerKey, ?WORKER_TIMING_IDX)} || WorkerKey <- Workers], MissedCounts = [{WorkerKey,ets:lookup_element(EtsRef, WorkerKey, ?WORKER_TRAIN_MISSED_IDX)} || WorkerKey <- Workers], Counter = ets:lookup_element(EtsRef, msgCounter, ?ETS_KV_VAL_IDX), InfoSize = ets:lookup_element(EtsRef, infoIn, ?ETS_KV_VAL_IDX), MyName = ets:lookup_element(EtsRef, myName, ?ETS_KV_VAL_IDX), - + % io:format("Timing Map is ~p~n",[TimingMap]), TimingStats = lists:flatten([atom_to_list(WorkerName)++"_Train_Avg_Time="++float_to_list(TotalTime/TotalBatches,[{decimals, 3}])++","||{WorkerName,{_LastTime,TotalBatches,TotalTime}}<-TimingMap]), MissingStats = lists:flatten([atom_to_list(WorkerName)++"_Train_Miss="++integer_to_list(MissCount)++","||{WorkerName,MissCount}<-MissedCounts]), MyStats = atom_to_list(MyName)++"_Msg_Count="++integer_to_list(Counter)++","++atom_to_list(MyName)++"_info_Size="++integer_to_list(InfoSize)++",", - + MyDeadWorkers=lists:flatten([" "++atom_to_list(WorkerName)||WorkerName<-DeadWorkers]), {RouterHost,RouterPort} = nerl_tools:getShortPath(MyName, ?MAIN_SERVER_ATOM, NerlnetGraph), - nerl_tools:http_request(RouterHost,RouterPort,"statistics", list_to_binary(atom_to_list(MyName)++":"++MyStats++MissingStats++lists:droplast(TimingStats))). + nerl_tools:http_request(RouterHost,RouterPort,"statistics", list_to_binary(atom_to_list(MyName)++":"++MyStats++MissingStats++lists:droplast(TimingStats)++"#"++MyDeadWorkers)). cast_message_to_workers(EtsRef, Msg) -> Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), + case Workers of + [] -> io:format("No workers to send message to~n"), + gen_statem:cast(self(), {stateChange , [] , 0}); + _ -> ok + end, Func = fun(WorkerKey) -> WorkerPid = ets:lookup_element(EtsRef, WorkerKey, ?WORKER_PID_IDX), gen_statem:cast(WorkerPid, Msg) end, - lists:foreach(Func, Workers). \ No newline at end of file + lists:foreach(Func, Workers). + + +delete_worker(EtsRef,MyName,WorkerName)-> + %activated if client detected that a worker stopped, will delete it from workersNames and move to deadWorkers so as not to wait for responsed from it in the future. + AliveList = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), + DeadList = ets:lookup_element(EtsRef, deadWorkers, ?ETS_KV_VAL_IDX), + ets:delete(EtsRef,workersNames), + ets:delete(EtsRef,deadWorkers), + ets:insert(EtsRef, {workersNames, AliveList -- [WorkerName]}), + ets:insert(EtsRef, {deadWorkers, DeadList ++ [WorkerName]}), + NerlGraph = ets:lookup_element(EtsRef, nerlnetGraph, ?ETS_KV_VAL_IDX), + {Host , Port} = nerl_tools:getShortPath(MyName, ?MAIN_SERVER_ATOM, NerlGraph), + nerl_tools:http_request(Host,Port,"worker_down",list_to_binary(atom_to_list(MyName) ++ "-" ++ atom_to_list(WorkerName))), + ?LOG_WARNING("Worker ~p is down, deleting it from client ~p~n",[WorkerName,MyName]), + %% TODO Verify NIF active_model_id list + EtsRef. \ No newline at end of file diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/actionHandler.erl b/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/actionHandler.erl index adf2b06f..9adb9bb9 100644 --- a/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/actionHandler.erl +++ b/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/actionHandler.erl @@ -25,7 +25,9 @@ init(Req0, [Action, Main_genserver_Pid]) -> predictRes -> gen_statem:cast(Main_genserver_Pid, {predictRes,Body}); statistics -> gen_statem:cast(Main_genserver_Pid, {statistics,Body}); startCasting -> gen_statem:cast(Main_genserver_Pid, {startCasting,Body}); - stopCasting -> gen_statem:cast(Main_genserver_Pid, {stopCasting,Body}) + stopCasting -> gen_statem:cast(Main_genserver_Pid, {stopCasting,Body}); + %monitor + worker_down -> gen_statem:cast(Main_genserver_Pid, {worker_down,Body}) end, Reply = io_lib:format("Body Received: ~p ~n ", [Body]), Req = cowboy_req:reply(200, diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/mainGenserver.erl b/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/mainGenserver.erl index 09e84c24..7802900f 100644 --- a/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/mainGenserver.erl +++ b/src_erl/Communication_Layer/http_Nerlserver/src/MainServer/mainGenserver.erl @@ -22,7 +22,7 @@ -define(SERVER, ?MODULE). --record(main_genserver_state, {statisticsCounter = 0, myName, state, workersMap, clients, nerlnetGraph, sourcesCastingList = [], sourcesWaitingList = [], clientsWaitingList = [], statisticsMap, msgCounter = 0, batchSize}). +-record(main_genserver_state, {statisticsCounter = 0, myName, state, workersMap, clients, nerlnetGraph, sourcesCastingList = [], sourcesWaitingList = [], clientsWaitingList = [], statisticsMap, msgCounter = 0, batchSize , etsRef}). %%%=============================================================== %%% API @@ -56,10 +56,10 @@ init({MyName,Clients,BatchSize,WorkersMap,NerlnetGraph}) -> ?LOG_NOTICE("Main Server is connected to: ~p~n",[ConnectedEntities]), put(nerlnetGraph, NerlnetGraph), % nerl_tools:start_connection([digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:out_neighbours(NerlnetGraph,MyName)]), - + EtsRef = ets:new(main_server_data , [set]) , %% ! In some point the whole record will replaced by ets NewStatisticsMap = getNewStatisticsMap([digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:vertices(NerlnetGraph)--?LIST_OF_SPECIAL_SERVERS]), % io:format("New StatisticsMap = ~p~n",[NewStatisticsMap]), - {ok, #main_genserver_state{myName = MyNameStr, workersMap = WorkersMap, batchSize = BatchSize, state=idle, clients = Clients, nerlnetGraph = NerlnetGraph, msgCounter = 1,statisticsMap = NewStatisticsMap}}. + {ok, #main_genserver_state{myName = MyNameStr, workersMap = WorkersMap, batchSize = BatchSize, state=idle, clients = Clients, nerlnetGraph = NerlnetGraph, msgCounter = 1,statisticsMap = NewStatisticsMap , etsRef = EtsRef}}. %% @private %% @doc Handling call messages @@ -76,10 +76,11 @@ init({MyName,Clients,BatchSize,WorkersMap,NerlnetGraph}) -> handle_call(getGraph, _From, State) -> NerlGraph = State#main_genserver_state.nerlnetGraph, FullNodes = [digraph:vertex(NerlGraph,Vertex) || Vertex <- digraph:vertices(NerlGraph)], - %io:format("Full graph is: ~p~n", [FullNodes]), - NodesList = [Entity++","++IP++","++integer_to_list(Port)++"#"||{Entity, {IP, Port}} <- FullNodes], + %%io:format("Full graph is: ~p~n", [FullNodes]), + NodesList = [atom_to_list(Entity)++","++binary_to_list(IP)++","++integer_to_list(Port)++"#"||{Entity, {IP, Port}} <- FullNodes], + %%io:format("graph nodes are: ~p~n", [NodesList]), EdgesList = [digraph:edge(NerlGraph,Edge) || Edge <- digraph:edges(NerlGraph)], - %io:format("graph edges are: ~p~n", [EdgesList]), + %%io:format("graph edges are: ~p~n", [EdgesList]), Nodes = nodeString(NodesList), Edges = edgeString(EdgesList), @@ -100,11 +101,11 @@ nodeString([Node |NodeList]) -> nodeString(NodeList, Node). nodeString([], Str) -> Str; nodeString([Node |NodeList], Str)-> nodeString(NodeList, Node++Str). -edgeString([Edge |EdgesList])-> {_ID, V1, V2, _Label} = Edge, edgeString(EdgesList, V1++"-"++V2). +edgeString([Edge |EdgesList])-> {_ID, V1, V2, _Label} = Edge, edgeString(EdgesList, atom_to_list(V1)++"-"++atom_to_list(V2)). edgeString([], Str)-> Str; edgeString([Edge |EdgesList], Str)-> {_ID, V1, V2, _Label} = Edge, - edgeString(EdgesList, V1++"-"++V2++","++Str). + edgeString(EdgesList, atom_to_list(V1)++"-"++atom_to_list(V2)++","++Str). %% @private %% @doc Handling cast messages @@ -113,6 +114,11 @@ edgeString([Edge |EdgesList], Str)-> {noreply, NewState :: #main_genserver_state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #main_genserver_state{}}). +handle_cast({saveUtility , Body} , State = #main_genserver_state{etsRef = EtsRef , msgCounter = MsgCounter}) -> + %gets called after a tool tried to connect, saves its addres in the ets. + ets:insert(EtsRef , Body), + {noreply, State#main_genserver_state{msgCounter = MsgCounter+1}}; + handle_cast({initCSV, Source,SourceData}, State = #main_genserver_state{state = idle, myName = MyName, sourcesWaitingList = SourcesWaitingList,nerlnetGraph = NerlnetGraph,msgCounter = MsgCounter}) -> %% send router http request, to rout this message to all sensors @@ -158,33 +164,60 @@ handle_cast({clientsIdle}, State = #main_genserver_state{state = idle, myName = {noreply, State#main_genserver_state{clientsWaitingList = ListOfClients,msgCounter = MsgCounter+1}}; %%%get Statistics from all Entities in the network -handle_cast({statistics,Body}, State = #main_genserver_state{myName = MyName, statisticsCounter = StatisticsCounter, nerlnetGraph = NerlnetGraph,statisticsMap = StatisticsMap,msgCounter = MsgCounter}) -> +handle_cast({statistics,Body}, State = #main_genserver_state{myName = MyName, statisticsCounter = StatisticsCounter, nerlnetGraph = NerlnetGraph,statisticsMap = StatisticsMap,msgCounter = MsgCounter , etsRef = EtsRef , state = CurrState}) -> if Body == <<"getStatistics">> -> %% initial message from APIServer, get stats from entities - NewStatisticsMap = getNewStatisticsMap([digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:vertices(NerlnetGraph)--?LIST_OF_SPECIAL_SERVERS]), - [findroutAndsendStatistics(MyName, Name)||{Name,_Counter}<-maps:to_list(StatisticsMap)], - NewState = State#main_genserver_state{msgCounter = MsgCounter+1,statisticsMap = NewStatisticsMap, statisticsCounter = length(maps:to_list(StatisticsMap))}; - + [findroutAndsendStatistics(MyName, Name)||{Name,_Counter}<-maps:to_list(NewStatisticsMap)], + NewState = State#main_genserver_state{msgCounter = MsgCounter+1,statisticsMap = NewStatisticsMap, statisticsCounter = length(maps:to_list(NewStatisticsMap))}; Body == <<>> -> io:format("in Statistcs, State has: StatsCount=~p, MsgCount=~p~n", [StatisticsCounter, MsgCounter]), NewState = State; true -> %% statistics arrived from Entity - [From|[NewCounter]] = re:split(binary_to_list(Body), ":", [{return, list}]), - - NewStatisticsMap = maps:put(From,NewCounter,StatisticsMap), + CheckForDead=re:split(binary_to_list(Body), "#", [{return, list}]), + case CheckForDead of + [_]-> + Data=CheckForDead, + UpdateStatMap=StatisticsMap; %entity is not a client + [Data,[]]-> + UpdateStatMap=StatisticsMap;%entity is client without dead workers + [Data,DeadWorkers]-> %client with dead workers + case maps:is_key("Dead workers",StatisticsMap) of + true-> + MapWithKey=StatisticsMap; %key was already put in + false-> + MapWithKey=maps:put("Dead workers","",StatisticsMap) + end, + UpdateStatMap=maps:put("Dead workers",maps:get("Dead workers",MapWithKey)++DeadWorkers++" ",MapWithKey) + end, + [From|[NewCounter]] = re:split(Data, ":", [{return, list}]), + + NewStatisticsMap = maps:put(From,NewCounter,UpdateStatMap), NewState = State#main_genserver_state{msgCounter = MsgCounter+1,statisticsMap = NewStatisticsMap,statisticsCounter = StatisticsCounter-1}, if StatisticsCounter == 1 -> %% got stats from all entities Statistics = maps:to_list(NewStatisticsMap), S = mapToString(Statistics,[]) , - ?LOG_NOTICE("Sending stats: ~p~n",[S]), - {RouterHost,RouterPort} = nerl_tools:getShortPath(MyName,?API_SERVER_ATOM,NerlnetGraph), - nerl_tools:http_request(RouterHost,RouterPort,"statistics", S ++ "|mainServer:" ++integer_to_list(MsgCounter)); - - %% wait for more stats - true -> pass end - end, + %?LOG_NOTICE("Sending stats: ~p~n",[S]), + case CurrState of %statistics gets called in idle state by api at the end of the experiment or nerlMonitorin casting at the start of predict + idle -> + case ets:member(EtsRef,nerlMonitor) of + true-> + [{nerlMonitor , IP , Port}] = ets:lookup(EtsRef , nerlMonitor), + nerl_tools:http_request(IP,list_to_integer(Port),"stats", S ++ "|mainServer:" ++integer_to_list(MsgCounter)); + false -> ok + end, + {RouterHost,RouterPort} = nerl_tools:getShortPath(MyName,?API_SERVER_ATOM,NerlnetGraph), + nerl_tools:http_request(RouterHost,RouterPort,"statistics", S ++ "|mainServer:" ++integer_to_list(MsgCounter)); + casting -> + [{_ , MonitorIP , MonitorPort}] = ets:lookup(EtsRef, nerlMonitor), + nerl_tools:http_request(MonitorIP,list_to_integer(MonitorPort),"stats", S ++ "|mainServer:" ++integer_to_list(MsgCounter)) + end; + + %% wait for more stats + true -> pass + end + end, {noreply, NewState}; %%handle_cast({startPredicting}, State = #main_genserver_state{clients = ListOfClients, nerlnetGraph = NerlnetGraph}) -> @@ -223,10 +256,17 @@ handle_cast({sourceAck,Body}, State = #main_genserver_state{sourcesWaitingList = {noreply, State#main_genserver_state{sourcesWaitingList = NewWaitingList,msgCounter = MsgCounter+1}}; -handle_cast({clientAck,Body}, State = #main_genserver_state{clientsWaitingList = WaitingList,msgCounter = MsgCounter}) -> - NewWaitingList = WaitingList--[list_to_atom(binary_to_list(Body))], - % io:format("new Waiting List: ~p ~n",[NewWaitingList]), - if length(NewWaitingList) == 0 -> ack(); +handle_cast({clientAck,Body}, State = #main_genserver_state{clientsWaitingList = WaitingList,msgCounter = MsgCounter , etsRef = EtsRef}) -> + ClientName = list_to_atom(binary_to_list(Body)), + NewWaitingList = WaitingList--[ClientName], + if length(NewWaitingList) == 0 -> + case ets:member(EtsRef , nerlMonitor) of %if NerlMonitor is up, initiate mid experiment statistics + true -> + ?LOG_INFO(?LOG_HEADER ++ "Sending statistics request"), + gen_server:cast(self(),{statistics , list_to_binary("getStatistics")}); + _ -> ok + end, + ack(); true-> ok end, {noreply, State#main_genserver_state{clientsWaitingList = NewWaitingList, msgCounter = MsgCounter+1}}; @@ -310,6 +350,26 @@ handle_cast({predictRes,Body}, State = #main_genserver_state{batchSize = BatchSi end, {noreply, State#main_genserver_state{msgCounter = MsgCounter+1}}; +handle_cast({worker_down,Body}, State = #main_genserver_state{msgCounter = MsgCounter,etsRef = EtsRef}) -> + % some worker terminated, print to log and notify NerlMonitor if it is up + case ets:member(EtsRef,nerlMonitor) of + true-> + [{nerlMonitor , IP , Port}] = ets:lookup(EtsRef , nerlMonitor), + URL = "http://" ++ IP ++ ":" ++ Port ++ "/utilInfo", + httpc:request(post,{URL, [],"application/x-www-form-urlencoded",Body}, [], []); + false -> ok + end, + ?LOG_WARNING(?LOG_HEADER++"Worker down , ~p disconneted~n",[binary_to_list(Body)]), + {noreply, State#main_genserver_state{msgCounter = MsgCounter+1,etsRef=EtsRef}}; + +handle_cast({worker_kill , WorkerName} , State = #main_genserver_state{workersMap = WorkersMap , msgCounter = MsgCounter}) -> + %got a user kill command for a worker from NerlMonitor + ?LOG_WARNING(?LOG_HEADER++"Killing worker ~p ~n",[WorkerName]), + WorkerNameAtom=binary_to_term(WorkerName), + ClientName = maps:get(WorkerNameAtom,WorkersMap), + Body = term_to_binary({ClientName, WorkerNameAtom}), + nerl_tools:sendHTTP(?MAIN_SERVER_ATOM , ClientName , atom_to_list(worker_kill) , Body), + {noreply, State#main_genserver_state{workersMap = WorkersMap , msgCounter = MsgCounter+1}}; handle_cast(Request, State = #main_genserver_state{}) -> io:format("main server cast ignored: ~p~n",[Request]), @@ -377,9 +437,9 @@ getNewStatisticsMap([{Name,{_Host, _Port}}|Tail],StatisticsMap) -> getNewStatisticsMap(Tail,maps:put(atom_to_list(Name), 0, StatisticsMap)). -startCasting([],_NumOfSampleToSend,_MyName, _NerlnetGraph)->done; +startCasting([],_NumOfSampleToSend,_MyName, _NerlnetGraph) -> done; startCasting([SourceName|SourceNames],NumOfSampleToSend, MyName, NerlnetGraph)-> - ?LOG_NOTICE("~p sending start casting command to: ~p",[MyName, SourceName]), + ?LOG_NOTICE("~p sending start casting command to: ~p~n",[MyName, SourceName]), {RouterHost,RouterPort} = nerl_tools:getShortPath(MyName,SourceName,NerlnetGraph), nerl_tools:http_request(RouterHost,RouterPort,"startCasting", SourceName++[","]++NumOfSampleToSend), startCasting(SourceNames,NumOfSampleToSend, MyName, NerlnetGraph). diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/Router/routerGenserver.erl b/src_erl/Communication_Layer/http_Nerlserver/src/Router/routerGenserver.erl index e1da9edf..43ae44ba 100644 --- a/src_erl/Communication_Layer/http_Nerlserver/src/Router/routerGenserver.erl +++ b/src_erl/Communication_Layer/http_Nerlserver/src/Router/routerGenserver.erl @@ -176,6 +176,18 @@ handle_cast({getStats,_Body}, State = #router_genserver_state{myName = MyName, nerl_tools:http_request(Host,Port,"routerStats",Mes), {noreply, State#router_genserver_state{msgCounter = MsgCounter+1}}; +handle_cast({worker_kill , Body} , State = #router_genserver_state{msgCounter = MsgCounter, myName = MyName}) -> + io:format("Body is ~p~n",[binary_to_term(Body)]), + {ClientName , _} = binary_to_term(Body), + nerl_tools:sendHTTP(MyName, ClientName, "worker_kill", Body), + {noreply, State#router_genserver_state{msgCounter = MsgCounter+1}}; + + +%monitor +handle_cast({worker_down,Body}, State = #router_genserver_state{myName = MyName, msgCounter = MsgCounter, nerlnetGraph = NerlnetGraph}) -> + nerl_tools:sendHTTP(MyName, ?MAIN_SERVER_ATOM, "worker_down", Body), + {noreply, State#router_genserver_state{msgCounter = MsgCounter+1}}; + handle_cast(_Request, State = #router_genserver_state{msgCounter = MsgCounter }) -> {noreply, State#router_genserver_state{msgCounter = MsgCounter+1}}. diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/Router/routingHandler.erl b/src_erl/Communication_Layer/http_Nerlserver/src/Router/routingHandler.erl index 4dd51b29..c391e481 100644 --- a/src_erl/Communication_Layer/http_Nerlserver/src/Router/routingHandler.erl +++ b/src_erl/Communication_Layer/http_Nerlserver/src/Router/routingHandler.erl @@ -74,7 +74,10 @@ init(Req0, State) -> % gen_server:cast(Router_genserver_Pid, {federatedWeights,Body}); %%%%%%%%%%%%%%GUI actions - getStats -> gen_server:cast(Router_genserver_Pid, {getStats,Body}) + getStats -> gen_server:cast(Router_genserver_Pid, {getStats,Body}); + %monitor + worker_down -> gen_server:cast(Router_genserver_Pid, {worker_down,Body}); + worker_kill -> gen_server:cast(Router_genserver_Pid, {worker_kill,Body}) end, Reply = io_lib:format(" ", []), diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/nerlNetServer_app.erl b/src_erl/Communication_Layer/http_Nerlserver/src/nerlNetServer_app.erl index 70e9454f..4d20239a 100644 --- a/src_erl/Communication_Layer/http_Nerlserver/src/nerlNetServer_app.erl +++ b/src_erl/Communication_Layer/http_Nerlserver/src/nerlNetServer_app.erl @@ -187,7 +187,8 @@ createClientsAndWorkers() -> {"/clientTraining",clientStateHandler, [training,ClientStatemPid]}, {"/clientIdle",clientStateHandler, [idle,ClientStatemPid]}, {"/clientPredict",clientStateHandler, [predict,ClientStatemPid]}, - {"/weightsVector",clientStateHandler, [vector,ClientStatemPid]} + {"/weightsVector",clientStateHandler, [vector,ClientStatemPid]}, + {"/worker_kill" , clientStateHandler , [worker_kill , ClientStatemPid]} ]} ]), init_cowboy_start_clear(Client, {HostName, Port},NerlClientDispatch) @@ -265,7 +266,10 @@ createRouters(MapOfRouters, HostName) -> {"/federatedWeights",routingHandler, [federatedWeights,RouterGenServerPid]}, %%GUI actions - {"/getStats",routingHandler, [getStats,RouterGenServerPid]} + {"/getStats",routingHandler, [getStats,RouterGenServerPid]}, + %monitor actions + {"/worker_down",routingHandler, [worker_down,RouterGenServerPid]}, + {"/worker_kill" , routingHandler , [worker_kill , RouterGenServerPid]} ]} ]), %% cowboy:start_clear(Name, TransOpts, ProtoOpts) - an http_listener @@ -304,7 +308,9 @@ createMainServer(true,BatchSize,HostName) -> %GUI actions {"/getGraph",[],guiHandler, [getGraph, MainGenServerPid]}, {"/getStats",[],guiHandler, [getStats, MainGenServerPid]}, - + {"/toolConnectionReq" , [] , utilities_handler , [MainGenServerPid]} , %% Added with NerlMonitor Project + {"/worker_kill" , [] , utilities_handler , [worker_kill , MainGenServerPid]}, + {"/worker_down",actionHandler, [worker_down,MainGenServerPid]}, {"/[...]", [],noMatchingRouteHandler, [MainGenServerPid]} ]} ]), diff --git a/src_erl/Communication_Layer/http_Nerlserver/src/utilities_handler.erl b/src_erl/Communication_Layer/http_Nerlserver/src/utilities_handler.erl new file mode 100644 index 00000000..fb1bf309 --- /dev/null +++ b/src_erl/Communication_Layer/http_Nerlserver/src/utilities_handler.erl @@ -0,0 +1,39 @@ +-module(utilities_handler). + +-define(DATA_IDX , 2). + +-export([init/2]). + +init(Req0 , [MainServerPid]) -> + %handler for a tool connection req + {_,Body,_} = cowboy_req:read_body(Req0), + [UtilityName , IP , Port] = binary_to_term(Body), + case UtilityName of + nerlMonitor -> + Graph = gen_server:call(MainServerPid , getGraph), + WorkersMap = ets:lookup_element(nerlnet_data , workers , ?DATA_IDX), + WorkersClients = maps:to_list(WorkersMap), + Workers = lists:flatten([atom_to_list(X)++"-"++atom_to_list(Y)++"!" || {X , Y} <- WorkersClients]), + Reply = Graph ++ "," ++ Workers + end, + gen_server:cast(MainServerPid , {saveUtility , {UtilityName , IP , Port}}), + Req = cowboy_req:reply(200, + #{<<"content-type">> => <<"text/plain">>}, + Reply, + Req0), + {ok, Req, MainServerPid}; + +init(Req0 , [Action , MainServerPid]) -> + %handler for a tool's requested action (not connection) + {_,Body,_} = cowboy_req:read_body(Req0), + case Action of + worker_kill -> + gen_statem:cast(MainServerPid , {worker_kill , Body}); + _ -> ok + end, + Reply = io_lib:format("ACK", []), + Req = cowboy_req:reply(200, + #{<<"content-type">> => <<"text/plain">>}, + Reply, + Req0), + {ok, Req, MainServerPid}. diff --git a/src_erl/NerlGUI/src/graphScreen.erl b/src_erl/NerlGUI/src/graphScreen.erl index 6254dd8f..97ef9cb2 100644 --- a/src_erl/NerlGUI/src/graphScreen.erl +++ b/src_erl/NerlGUI/src/graphScreen.erl @@ -67,13 +67,13 @@ init([Parent, Gen])-> io:format("got body: ~p~n", [Body]), DevicesInfo = string:split(Body, "#", all), - Devices = [string:split(DeviceInfo, ",", all) || DeviceInfo <- DevicesInfo, DevicesInfo /=[[]]], + Devices = [string:split(DeviceInfo, ",", all) || DeviceInfo <- DevicesInfo, DevicesInfo /= [[]]], Edges = lists:droplast(lists:last(Devices)), io:format("got graph: ~p~n", [Devices]), DeviceList = lists:droplast(Devices), - {FileName, G} = gui_tools:makeGraphIMG(DeviceList, Edges), + {FileName, G} = gui_tools:makeGraphIMG(DeviceList , Edges), mainScreen:updateGraph(Gen, gui_tools:serialize(G)), mainScreen:addInfo(Gen, "updated graph"), diff --git a/src_erl/NerlMonitor/.gitignore b/src_erl/NerlMonitor/.gitignore new file mode 100644 index 00000000..df53f7d9 --- /dev/null +++ b/src_erl/NerlMonitor/.gitignore @@ -0,0 +1,20 @@ +.rebar3 +_build +_checkouts +_vendor +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +.idea +*.iml +rebar3.crashdump +*~ diff --git a/src_erl/NerlMonitor/README.md b/src_erl/NerlMonitor/README.md new file mode 100644 index 00000000..bb7f9928 --- /dev/null +++ b/src_erl/NerlMonitor/README.md @@ -0,0 +1,41 @@ +NerlMonitor +===== +NerlMonitor is an external tool that helps NErlNet users get more knowledge on the experiment flow and also gain the ability to terminate workers mid-experiment. This can help in monitoring your model behavior for different kinds of failures. The app is also used to gain various statistics. + + +# Dependencies +`pip install` these libraries (In addition to the src_py/requirements.txt file): +- NetowrkX +- PySimpleGUI +- PyGraphviz +- Nest-Asyncio + +Also, install **Pyrlang** and **Term** libraries for Python-Erlang communication (follow their instructions **carefully**): + +Pyrlang - https://github.com/Pyrlang/Pyrlang + +Term - https://github.com/Pyrlang/Term + +# Run The App +Run `./NerlnetMonitor.sh` script from a different shell (make sure you're using the same Python virtual environment where you installed all dependencies) + +# Demo +Youtube Video Demo: https://youtu.be/X5RHLUTqBWk + +https://github.com/leondavi/NErlNet/assets/79912473/4e69ad09-3a07-436e-9741-84a64baa4e47 + +When running the app: +1. Start up screen: + +SCR-20230815-lews + +2. Main Server is up: + +SCR-20230815-lghc + +3. Worker termination: + +SCR-20230815-lghc + + + diff --git a/src_erl/NerlMonitor/rebar.config b/src_erl/NerlMonitor/rebar.config new file mode 100644 index 00000000..cc4df178 --- /dev/null +++ b/src_erl/NerlMonitor/rebar.config @@ -0,0 +1,9 @@ +{erl_opts, [debug_info]}. +{deps, [ + {cowboy, {git, "https://github.com/ninenines/cowboy.git" , {tag,"2.9.0"}}} +]}. + +{shell, [ + % {config, "config/sys.config"}, + {apps, [nerlMonitor]} +]}. diff --git a/src_erl/NerlMonitor/src/MonitorGUI.py b/src_erl/NerlMonitor/src/MonitorGUI.py new file mode 100644 index 00000000..27cde3ab --- /dev/null +++ b/src_erl/NerlMonitor/src/MonitorGUI.py @@ -0,0 +1,305 @@ +from term import Atom +from pyrlang.node import Node +from pyrlang.process import Process +import PySimpleGUI as sg +import multiprocessing +from time import sleep +import networkx as nx +import matplotlib.pyplot as plt +from datetime import datetime +import os +import math +import asyncio +import nest_asyncio +nest_asyncio.apply() + + +class MyProcess(Process): + def __init__(self , msg_queue) -> None: + Process.__init__(self) + self.get_node().register_name(self, Atom('PyrlangProcess')) + self.msg_queue = msg_queue + + + def handle_one_inbox_message(self, msg): + print(f'From ErlProcess: {msg}') + if msg[0] == Atom('send'): + print(f'From ErlProcess: {msg[1]}') + else: + self.msg_queue.put_nowait(msg) + if not self.msg_queue.empty(): + print(f'Queue is not Empty: {msg} added.') + + + + +def draw_gradient(canvas, start_color, end_color): + for y in range(0, 200): # Adjust the range to your desired height + r = start_color[0] + (end_color[0] - start_color[0]) * y / 200 + g = start_color[1] + (end_color[1] - start_color[1]) * y / 200 + b = start_color[2] + (end_color[2] - start_color[2]) * y / 200 + color = f'#{int(r):02x}{int(g):02x}{int(b):02x}' + canvas.TKCanvas.create_line(0, y, 200, y, fill=color) + + +Msg_log = [] + +DataColumn = [ + [sg.Frame(title="Event Log:" , + layout=[[sg.Multiline('', size=(140, 60), key='-LOG-', autoscroll=True , font=('SFPro' , 12) , no_scrollbar=True)]], + background_color=('#A90433') , font=('SFPro' , 20) , size=(500,325) , title_color='White' , element_justification='right') + ] , + [sg.Frame(title="Statistics:" , + layout=[[sg.Multiline('', size=(140, 60), key='-STATS-', autoscroll=True , font=('SFPro' , 12) , no_scrollbar=True)]], + background_color=('#A90433') , font=('SFPro' , 20) , size=(500,325) , title_color='White' , element_justification='right') + ] + ] + +GraphColumn = [ + [ sg.Text("Waiting For\n NerlNet Graph..." , key='-PHOLD-', text_color='White' , font=('SFPro' , 12) , size=(70,5) , background_color='#A90433' , justification='center' , pad=(0,0)) , + sg.Image(key='-IMAGE-' , visible=False) + ], + [ + sg.Text("Enter the name of the worker you wish to terminate:" ,key='-INTEXT-', size=(42,1) ,text_color='white' , font=('SFPro' , 12) , background_color='#A90433' , justification='left' , pad=(0,0) , visible=False) , + sg.Input('',key='-INPUT-' , visible=False , justification='left' , size=(20,1) , font=('SFPro' , 12) , background_color='white' , text_color='black' , pad=(0,0) , enable_events=True), + sg.Button(button_text="Terminate" , button_color=('#A90433' , '#FFFFFF') , font=('SFPro' , 12) , size=(10,1) , pad=(0,0) , visible=False, key='-TERM-', enable_events=True) + ] + ] + +layout = [ + [ + sg.Text("NerlNet Monitor" , key='-TEXT-' , size=(30,1) ,text_color='White' , font=('SFPro' , 20) , background_color='#A90433' , justification='center' , pad=(0,0)) + ] , + [ sg.Column(DataColumn , background_color='#A90433') , + sg.VSeperator() , + sg.Column(GraphColumn , background_color='#A90433') + ] , + [ + sg.Button(button_text="Close" , button_color=('#A90433' , '#FFFFFF') , font=('SFPro' , 12) , size=(5,2)), + sg.Button(button_text="Clear Log" , button_color=('#A90433' , '#FFFFFF') , font=('SFPro' , 12) , size=(5,2)) + ] + + ] + +MainWindow = sg.Window("NErlNet" , layout , margins=(5,5) , size=(1400,800) , background_color='#A90433' , finalize=True , resizable=True , element_justification='c' , icon='../../../NerlnetLogo.ico') + +def RemoteRecv(): + return Atom('erl@127.0.0.1') , Atom("recvPyrlang") + +def formatted_time(): + return f'[{datetime.now().day}/{datetime.now().month}/{datetime.now().year}|{datetime.now().hour}:{datetime.now().minute}:{datetime.now().second}]' + +# def SendMsg(Msg): +# SendNode = Node(node_name='pysend@127.0.0.1' , cookie='COOKIE') +# SendProc = MyProcess() +# event_loop = SendNode.get_loop() + +# print(SendNode.where_is(Atom('recvPyrlang'))) + +# def task(): +# SendNode.send_nowait(sender = SendProc.pid_ , receiver = RemoteRecv() , message = (Atom('send'),Atom(Msg))) +# SendNode.destroy() + +# event_loop.call_soon(task) + +# SendNode.run() + +async def GUI(msg_queue): + print("GUI task started...") + PyNode , CommProc = await msg_queue.get() + print("Got Message from queue") + print(msg_queue.empty()) + print(f"Got PyNode and CommProc from Queue.") + StatsInfo = {"workers": {} , "clients": {}} + while True: + await asyncio.sleep(.01) + event , values = MainWindow.read(timeout=100) + existing_text = values['-LOG-'] + updated_text = '' + if event == "Close" or event == sg.WIN_CLOSED: + PyNode.send_nowait(sender = CommProc.pid_ , receiver = RemoteRecv() , message = (Atom('close'))) + await asyncio.sleep(.2) + os.kill(os.getpid() , 9) + print("GUI Closed.") + break + elif event == "Clear Log": + ShowStats(StatsInfo) + MainWindow['-LOG-'].update('') + elif event == "-TERM-": + Workers = [Graph.nodes[node]['label'] for node in Graph.nodes() if Graph.nodes[node]['label'][0] == 'w' and node_colors[node] != 'gray'] + if values['-INPUT-'] not in Workers: + updated_text = f'{existing_text}\n{formatted_time()}: Invalid Worker Name {values["-INPUT-"]} , Available Workers: {Workers}.' + MainWindow['-LOG-'].update(updated_text) + else: + Workers.remove(values['-INPUT-']) + node_colors[values['-INPUT-']] = 'gray' + nx.set_node_attributes(Graph, node_colors, 'color') + colors = nx.get_node_attributes(Graph, 'color').values() + pos = nx.nx_agraph.graphviz_layout(Graph, prog='dot') + plt.figure(figsize=(8,6)) + nx.draw_networkx(Graph, pos, with_labels=True, node_color=colors , node_size=200, font_size=8, font_color='white' , edge_color='black' , width=1.5) + plt.savefig('NerlNetGraph.png' ,bbox_inches='tight' , dpi=125) + plt.close() + MainWindow['-IMAGE-'].update(filename='NerlNetGraph.png' , visible=True , size=(800,600)) + + updated_text = f'{existing_text}\n{formatted_time()}: Sending termination message for {values["-INPUT-"]} to Main Server.' + PyNode.send_nowait(sender = CommProc.pid_ , receiver = RemoteRecv() , message = (Atom('terminate'),Atom(f'{values["-INPUT-"]}'))) + + + MainWindow['-LOG-'].update(updated_text) + MainWindow['-INPUT-'].update('') + if not msg_queue.empty(): + msg = msg_queue.get_nowait() + if msg[0] == 'graph': + Graph , node_colors = Show_Nerlnet_Graph(msg[1]) + MainWindow['-PHOLD-'].update(visible=False) + MainWindow['-IMAGE-'].update(filename='NerlNetGraph.png' , visible=True , size=(800,600)) + MainWindow['-LOG-'].update(f'{formatted_time()}: NerlNet Graph Received.') + MainWindow['-INTEXT-'].update(visible=True) + MainWindow['-INPUT-'].update(visible=True) + MainWindow['-TERM-'].update(visible=True) + elif msg[0] == 'update': + ClientName , WorkerName = msg[1].split('-') + + node_colors[WorkerName] = 'gray' + #node_colors[ClientName] = 'gray' + nx.set_node_attributes(Graph, node_colors, 'color') + colors = nx.get_node_attributes(Graph, 'color').values() + + pos = nx.nx_agraph.graphviz_layout(Graph, prog='dot') + angle = 100 + #rotated_pos = {node: (x*math.cos(angle) -y*math.sin(angle), x*math.sin(angle) + y*math.cos(angle)) for node, (x, y) in pos.items()} + + plt.figure(figsize=(8,6)) + nx.draw_networkx(Graph, pos, with_labels=True, node_color=colors , node_size=200, font_size=8, font_color='white' , edge_color='black' , width=1.5) + plt.savefig('NerlNetGraph.png' ,bbox_inches='tight' , dpi=125) + plt.close() + MainWindow['-IMAGE-'].update(filename='NerlNetGraph.png' , visible=True , size=(800,600)) + if existing_text == '': + updated_text = f'{formatted_time()}: Worker {WorkerName} of Client {ClientName} is down.' + else: + Workers = [Graph.nodes[node]['label'] for node in Graph.nodes() if Graph.nodes[node]['label'][0] == 'w' and node_colors[node] != 'gray'] + updated_text = f'{existing_text}\n{formatted_time()}: Worker {WorkerName} of Client {ClientName} is down , Available workers: {Workers}' + MainWindow['-LOG-'].update(updated_text) + + elif msg[0] == 'stats': + try: + Data = msg[1] + for items in str(Data).split('|'): + Entity, val = items.split(':') + if '=' in val: + for EntityStat in val.split(','): + Stat, Result = EntityStat.split('=') + if "Train" in Stat: + StatsInfo["workers"][Stat] = Result + else: + StatsInfo["clients"][Stat] = Result + else: + StatsInfo[Entity] = val # Messages for entities other than clients/workers + CurrentStats = StatsInfo.copy() # copy the stats to a new variable + StatsText = ShowStats(CurrentStats) + existing_stats = values['-STATS-'] + if existing_stats != '': + MainWindow['-STATS-'].update(f'{existing_stats}\n{StatsText}') + else: + MainWindow['-STATS-'].update(StatsText) + existing_text = values['-LOG-'] + MainWindow['-LOG-'].update(f'{existing_text}\n{formatted_time()}: Statistics Received.') + except Exception as err: + MainWindow['-LOG-'].update(f"Error in Stats {err} , Got {StatsInfo}") + + + elif values['-LOG-'] != '': + existing_text = values['-LOG-'] + updated_text = f'{existing_text}\n{formatted_time()}: {msg}' + else: + updated_text = f'{formatted_time()}: {msg}' + if updated_text != '': + MainWindow['-LOG-'].update(updated_text) + + + + MainWindow.close() + + +def ShowStats(CurrentStats): + MainWindow['-LOG-'].update(f'{formatted_time()}: Printing Statistics...') + StatsText = '' + for key in CurrentStats: + if key == 'workers': + StatsText += f'Workers:\n' + for stat in CurrentStats[key]: + if "Time" in stat: + StatsText += f'\t{stat.replace("_Train_" , " Working ")}: {CurrentStats[key][stat]} seconds\n' + else: + StatsText += f'\t{stat.replace("_" , " ")}: {CurrentStats[key][stat]}\n' + elif key == 'clients': + StatsText += f'Clients:\n' + for stat in CurrentStats[key]: + if "info" in stat: + StatsText += f'\t{stat.replace("_info_" , " Info ")}: {CurrentStats[key][stat]} bytes\n' + else: + StatsText += f'\t{stat.replace("_Msg_" , " Message ")}: {CurrentStats[key][stat]}\n' + elif key == 'Dead workers': + StatsText += f'Dead Workers are:{CurrentStats[key]}\n' + else: + StatsText += f'{key} Message Count: {CurrentStats[key]}\n' + return StatsText + + +def Show_Nerlnet_Graph(NerlGraph): + # Graph in string format: "Entity1Name,Entity1IP,Entity1Port#Entity2Name,Entity2IP,Entity2Port#Entity1Name-Entity2Name,Entity2Name-Entity1Name#Worker1-Client1#Worker2-Client2" etc. + # Workers in string format: "Worker1-Client1,Worker2-Client1,Worker3-Client2" etc. + # Node is defined by a triplet 'Name,IP,Port' seperated by '#' + # Edge is defined by a string 'Entity1-Entity2' seperated by ',' + Nodes = NerlGraph.split('#')[0:-1] + Edges = NerlGraph.split('#')[-1].split(',')[0:-1] + Workers = NerlGraph.split('#')[-1].split(',')[-1].split('!')[0:-1] + WorkersNames = [Worker.split('-')[0] for Worker in Workers ] + Edges += Workers + EdgesSeperated = [(Edge.split('-')[0],Edge.split('-')[1]) for Edge in Edges if len(Edges) > 1] # ? What if no edges? + NodesNames = [NodeTriplet.split(',')[0] for NodeTriplet in Nodes] + NodesNames += WorkersNames + + NodeWithLabels = [(NodeName , {'label' : NodeName}) for NodeName in NodesNames] + graph = nx.Graph() + graph.add_nodes_from(NodeWithLabels) + graph.add_edges_from(EdgesSeperated) + + my_labels = {'mainServer': 'mS' , 'apiServer': 'aS'} + + nx.relabel_nodes(graph, my_labels , copy=False) + default_colors = {node:'#A90433' for node in graph.nodes()} + node_colors = {node:default_colors[node] for node in graph.nodes()} + nx.set_node_attributes(graph, node_colors, 'color') + colors = nx.get_node_attributes(graph, 'color').values() + + pos = nx.nx_agraph.graphviz_layout(graph, prog='dot') + angle = 100 + #rotated_pos = {node: (x*math.cos(angle) -y*math.sin(angle), x*math.sin(angle) + y*math.cos(angle)) for node, (x, y) in pos.items()} + plt.figure(figsize=(8,6)) + nx.draw_networkx(graph, pos, with_labels=True, node_color=colors , node_size=200, font_size=8, font_color='white' , edge_color='black' , width=1.5) + plt.savefig('NerlNetGraph.png' ,bbox_inches='tight' , dpi=125) + plt.close() + return graph , node_colors + + + +async def Pyrlang(msg_queue): + print("Pyrlang task started...") + PyNode = Node(node_name="py@127.0.0.1" , cookie="COOKIE") + CommProc = MyProcess(msg_queue=msg_queue) + msg_queue.put_nowait((PyNode , CommProc)) + print("Pyrlang task finished.") + PyNode.run() + +async def main_func(): + msg_queue = asyncio.Queue() + await asyncio.gather(GUI(msg_queue) , Pyrlang(msg_queue)) + +if __name__ == "__main__": + asyncio.run(main_func()) + + + + diff --git a/src_erl/NerlMonitor/src/handlers/nerlMonitor_handler.erl b/src_erl/NerlMonitor/src/handlers/nerlMonitor_handler.erl new file mode 100644 index 00000000..59f19934 --- /dev/null +++ b/src_erl/NerlMonitor/src/handlers/nerlMonitor_handler.erl @@ -0,0 +1,22 @@ +-module(nerlMonitor_handler). + +-export([init/2]). + +-define(GUI , {'PyrlangProcess' , 'py@127.0.0.1'}). + +init(Req0, [Msg]) -> + {_,Body,_} = cowboy_req:read_body(Req0), + Data = binary_to_list(Body), + case Msg of + utilInfo -> ?GUI ! {update ,Data}; + stats -> ?GUI ! {stats ,Data}; + _ -> + ok % got unknown messge, ignore. + end, + + Req = cowboy_req:reply(200, + #{<<"content-type">> => <<"text/plain">>}, + <<"Got that">>, + Req0), + {ok, Req, Msg}. + diff --git a/src_erl/NerlMonitor/src/nerlMonitor.app.src b/src_erl/NerlMonitor/src/nerlMonitor.app.src new file mode 100644 index 00000000..3625d754 --- /dev/null +++ b/src_erl/NerlMonitor/src/nerlMonitor.app.src @@ -0,0 +1,15 @@ +{application, nerlMonitor, + [{description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {nerlMonitor_app, []}}, + {applications, + [kernel, + stdlib + ]}, + {env,[]}, + {modules, []}, + + {licenses, ["Apache-2.0"]}, + {links, []} + ]}. diff --git a/src_erl/NerlMonitor/src/nerlMonitor_app.erl b/src_erl/NerlMonitor/src/nerlMonitor_app.erl new file mode 100644 index 00000000..6c2f54e3 --- /dev/null +++ b/src_erl/NerlMonitor/src/nerlMonitor_app.erl @@ -0,0 +1,91 @@ +%%%------------------------------------------------------------------- +%% @doc nerlGUI public API +%% @end +%%%------------------------------------------------------------------- + +-module(nerlMonitor_app). + +-behaviour(application). + +-include("../../Communication_Layer/http_Nerlserver/src/nerl_tools.hrl"). + +-export([start/2, stop/1 , link_GUI/0]). + +-define(UTILNAME,nerlMonitor). +-define(IP , "192.168.64.7"). +-define(PORT, 8096). +-define(MSADDRES,"192.168.64.7:8080" ). +-define(GUI , {'PyrlangProcess' , 'py@127.0.0.1'}). % Erlang node should be long name to communicate with pyrlang node + +start(_StartType, _StartArgs) -> + application:start(sasl), + application:start(ranch), + application:start(inets), + + Dispatch = cowboy_router:compile([ + {'_', [ + {"/utilInfo",nerlMonitor_handler, [utilInfo]}, + {"/stats" , nerlMonitor_handler , [stats]} + + ]} + ]), + {ok, _} = cowboy:start_clear(?UTILNAME,[{port, ?PORT}],#{env => #{dispatch => Dispatch}}), + io:format("nerlMonitor started , opening GUI...~n"), + erlang:register(recvPyrlang , self()), + _GUI_PID = spawn_link(?MODULE , link_GUI , []) , %% PyrlangNode: ('PyralngProcess' , 'py@127.0.0.1' , 'COOKIE') , sending message by: 'GUI ! HELLO.' + URL = "http://" ++ ?MSADDRES ++ "/toolConnectionReq", + mainServerPing(URL,term_to_binary([?UTILNAME , ?IP , integer_to_list(?PORT)])), %% TODO How to "import" nerl_tools + nerlMonitor_sup:start_link(). + + + +%ping main server in 0.5 sec intervals with connection request. will stop when got valid response. +mainServerPing(URL,Body)-> + io:format("pinging main server...~n"), + Response = httpc:request(post,{URL, [],"application/x-www-form-urlencoded",Body}, [], []), + case Response of + {error,_}-> + timer:sleep(1000), + receive + close -> + io:format("Quitting NerlMonitor...~n"), + {ok , AppName} = application:get_application(), + stop(AppName) + after 0 -> + mainServerPing(URL,Body) + end; + {ok,{_ResCode, _Headers, Data}}-> + io:format("Got NerlGraph , Sending to GUI...~n" , []), + ?GUI ! {graph , Data}, + recvLoop(); + {ok , _} -> + io:format("Got unknown response from main server~n") + end. + + +recvLoop()-> %% MainServer replies with Nerlnet-Graph + receive + {terminate , WorkerName} -> + io:format("Got termination message for Worker ~p from GUI~n" , [WorkerName]), + URL = "http://" ++ ?MSADDRES ++ "/worker_kill", + Body = term_to_binary(WorkerName), + httpc:request(post,{URL, [],"application/x-www-form-urlencoded",Body}, [], []), + recvLoop(); + close -> + io:format("Quitting NerlMonitor...~n"), + {ok , AppName} = application:get_application(), + stop(AppName); + Msg -> + io:format("Got unknown message from GUI: ~p~n", [Msg]), + recvLoop() + end. + + +stop(_State) -> + ok. + +link_GUI() -> + os:cmd('python3 src/MonitorGUI.py'), + io:format("GUI Closed~n"). + + diff --git a/src_erl/NerlMonitor/src/nerlMonitor_sup.erl b/src_erl/NerlMonitor/src/nerlMonitor_sup.erl new file mode 100644 index 00000000..02225cc6 --- /dev/null +++ b/src_erl/NerlMonitor/src/nerlMonitor_sup.erl @@ -0,0 +1,35 @@ +%%%------------------------------------------------------------------- +%% @doc NerlMonitor top level supervisor. +%% @end +%%%------------------------------------------------------------------- + +-module(nerlMonitor_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%% sup_flags() = #{strategy => strategy(), % optional +%% intensity => non_neg_integer(), % optional +%% period => pos_integer()} % optional +%% child_spec() = #{id => child_id(), % mandatory +%% start => mfargs(), % mandatory +%% restart => restart(), % optional +%% shutdown => shutdown(), % optional +%% type => worker(), % optional +%% modules => modules()} % optional +init([]) -> + SupFlags = #{strategy => one_for_all, + intensity => 0, + period => 1}, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +%% internal functions diff --git a/src_erl/NerlMonitor/src/requirements.txt b/src_erl/NerlMonitor/src/requirements.txt new file mode 100644 index 00000000..b4bc4464 --- /dev/null +++ b/src_erl/NerlMonitor/src/requirements.txt @@ -0,0 +1,4 @@ +matplotlib==3.7.1 +nest-asyncio==1.5.7 +networkx==3.1 +PySimpleGUI==4.60.5 diff --git a/src_erl/erlBridge/workers/workerGeneric.erl b/src_erl/erlBridge/workers/workerGeneric.erl index fb859377..dfb7cbaa 100644 --- a/src_erl/erlBridge/workers/workerGeneric.erl +++ b/src_erl/erlBridge/workers/workerGeneric.erl @@ -16,7 +16,7 @@ -behaviour(gen_statem). %% API --export([start_link/1]). +-export([start_link/1,start_monitor/1]). %% gen_statem callbacks -export([init/1, format_status/2, state_name/3, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -37,6 +37,11 @@ start_link(ARGS) -> {ok,Pid} = gen_statem:start_link(?MODULE, ARGS, []), Pid. +start_monitor(ARGS) -> + %{ok,Pid} = gen_statem:start_link({local, element(1, ARGS)}, ?MODULE, ARGS, []), %% name this machine by unique name + {ok,{Pid,_Ref}} = gen_statem:start_monitor(?MODULE, ARGS, []), + Pid. + %%%=================================================================== %%% gen_statem callbacks %%%=================================================================== @@ -104,7 +109,7 @@ handle_event(_EventType, _EventContent, _StateName, State = #workerGeneric_state %% necessary cleaning up. When it returns, the gen_statem terminates with %% Reason. The return value is ignored. terminate(_Reason, _StateName, _State) -> - ok. + nerlNIF:destroy_nif(ets:lookup_element(get(generic_worker_ets) , model_id , ?DATA_IDX)). %% @private %% @doc Convert process state when code is changed diff --git a/src_py/apiServer/apiServer.py b/src_py/apiServer/apiServer.py index ac6a4cad..2f7bb31f 100644 --- a/src_py/apiServer/apiServer.py +++ b/src_py/apiServer/apiServer.py @@ -413,11 +413,14 @@ def accuracy_matrix(self, expNum): # print(f"worker {worker}, has {len(workerNeuronRes[worker][TRUE_LABLE_IND])} labels, with {len(workerNeuronRes[worker][TRUE_LABLE_IND][j])} samples") # print(f"confusion {worker}:{j}, has is of {workerNeuronRes[worker][TRUE_LABLE_IND][j]}, {workerNeuronRes[worker][PRED_LABLE_IND][j]}") confMatList[worker][j] = confusion_matrix(workerNeuronRes[worker][globe.TRUE_LABLE_IND][j], workerNeuronRes[worker][globe.PRED_LABLE_IND][j]) - # print(confMatList[worker][j]) disp = ConfusionMatrixDisplay(confMatList[worker][j], display_labels=["X", labelNames[j]]) + if confMatList[worker][j].shape == (0,0): # ! Worker is down + disp = ConfusionMatrixDisplay(np.array([[0,0],[0,0]]), display_labels=["X", labelNames[j]]) + workerNeuronRes[worker][globe.TRUE_LABLE_IND][j] = [0,0] + workerNeuronRes[worker][globe.PRED_LABLE_IND][j] = [1,1] disp.plot(ax=axes[i, j], colorbar=False) disp.ax_.set_title(f'{worker}, class #{j}\nAccuracy={round(accuracy_score(workerNeuronRes[worker][globe.TRUE_LABLE_IND][j], workerNeuronRes[worker][globe.PRED_LABLE_IND][j]), 3)}') - if i < len(workersList) - 1: + if i < len(workersList) - 1: # ? Ask David why this is needed disp.ax_.set_xlabel('') #remove "predicted label" if j != 0: disp.ax_.set_ylabel('') #remove "true label" @@ -437,6 +440,8 @@ def accuracy_matrix(self, expNum): statFile = open(statFileName, "a") for worker in confMatList: + if confMatList[worker][0].shape == (0,0): # ! Worker is down + continue for j, label in enumerate(confMatList[worker]): # Calculate the accuracy and other stats: tn, fp, fn, tp = label.ravel()