From 96d2b558bc28bf9a74c77e8cc423cb6a65854285 Mon Sep 17 00:00:00 2001 From: GuyPerets106 Date: Wed, 10 Jan 2024 22:46:20 +0000 Subject: [PATCH] [NerlPlanner Integration] ETS Fixes --- NerlnetMonitor.sh | 12 + src_erl/NerlMonitor/.gitignore | 20 ++ src_erl/NerlMonitor/README.md | 41 +++ src_erl/NerlMonitor/rebar.config | 9 + src_erl/NerlMonitor/src/MonitorGUI.py | 305 ++++++++++++++++++ .../src/handlers/nerlMonitor_handler.erl | 22 ++ src_erl/NerlMonitor/src/nerlMonitor.app.src | 15 + src_erl/NerlMonitor/src/nerlMonitor_app.erl | 91 ++++++ src_erl/NerlMonitor/src/nerlMonitor_sup.erl | 35 ++ src_erl/NerlMonitor/src/requirements.txt | 4 + .../src/Bridge/onnWorkers/workerGeneric.erl | 3 +- .../NerlnetApp/src/Client/clientStatem.erl | 272 ++++++++-------- .../src/Client/clientWorkersFunctions.erl | 16 +- .../src/MainServer/mainGenserver.erl | 40 ++- .../NerlnetApp/src/Router/routerGenserver.erl | 33 +- .../NerlnetApp/src/Router/routingHandler.erl | 3 +- .../NerlnetApp/src/Source/sourceStatem.erl | 57 ++-- src_erl/NerlnetApp/src/Stats/stats.erl | 60 +++- src_erl/NerlnetApp/src/Stats/stats.hrl | 6 +- src_erl/NerlnetApp/src/nerlnetApp_app.erl | 3 +- 20 files changed, 850 insertions(+), 197 deletions(-) create mode 100755 NerlnetMonitor.sh create mode 100644 src_erl/NerlMonitor/.gitignore create mode 100644 src_erl/NerlMonitor/README.md create mode 100644 src_erl/NerlMonitor/rebar.config create mode 100644 src_erl/NerlMonitor/src/MonitorGUI.py create mode 100644 src_erl/NerlMonitor/src/handlers/nerlMonitor_handler.erl create mode 100644 src_erl/NerlMonitor/src/nerlMonitor.app.src create mode 100644 src_erl/NerlMonitor/src/nerlMonitor_app.erl create mode 100644 src_erl/NerlMonitor/src/nerlMonitor_sup.erl create mode 100644 src_erl/NerlMonitor/src/requirements.txt diff --git a/NerlnetMonitor.sh b/NerlnetMonitor.sh new file mode 100755 index 000000000..a95ee9d26 --- /dev/null +++ b/NerlnetMonitor.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +MONITOR_PATH="src_erl/NerlMonitor" +GUI_PATH="src_erl/NerlMonitor/src" + +echo "NerlnetMonitor Activated" + + +cd $MONITOR_PATH +rebar3 shell --name erl@127.0.0.1 --setcookie COOKIE + +cd ../../ diff --git a/src_erl/NerlMonitor/.gitignore b/src_erl/NerlMonitor/.gitignore new file mode 100644 index 000000000..df53f7d92 --- /dev/null +++ b/src_erl/NerlMonitor/.gitignore @@ -0,0 +1,20 @@ +.rebar3 +_build +_checkouts +_vendor +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +.idea +*.iml +rebar3.crashdump +*~ diff --git a/src_erl/NerlMonitor/README.md b/src_erl/NerlMonitor/README.md new file mode 100644 index 000000000..bb7f99289 --- /dev/null +++ b/src_erl/NerlMonitor/README.md @@ -0,0 +1,41 @@ +NerlMonitor +===== +NerlMonitor is an external tool that helps NErlNet users get more knowledge on the experiment flow and also gain the ability to terminate workers mid-experiment. This can help in monitoring your model behavior for different kinds of failures. The app is also used to gain various statistics. + + +# Dependencies +`pip install` these libraries (In addition to the src_py/requirements.txt file): +- NetowrkX +- PySimpleGUI +- PyGraphviz +- Nest-Asyncio + +Also, install **Pyrlang** and **Term** libraries for Python-Erlang communication (follow their instructions **carefully**): + +Pyrlang - https://github.com/Pyrlang/Pyrlang + +Term - https://github.com/Pyrlang/Term + +# Run The App +Run `./NerlnetMonitor.sh` script from a different shell (make sure you're using the same Python virtual environment where you installed all dependencies) + +# Demo +Youtube Video Demo: https://youtu.be/X5RHLUTqBWk + +https://github.com/leondavi/NErlNet/assets/79912473/4e69ad09-3a07-436e-9741-84a64baa4e47 + +When running the app: +1. Start up screen: + +SCR-20230815-lews + +2. Main Server is up: + +SCR-20230815-lghc + +3. Worker termination: + +SCR-20230815-lghc + + + diff --git a/src_erl/NerlMonitor/rebar.config b/src_erl/NerlMonitor/rebar.config new file mode 100644 index 000000000..cc4df178a --- /dev/null +++ b/src_erl/NerlMonitor/rebar.config @@ -0,0 +1,9 @@ +{erl_opts, [debug_info]}. +{deps, [ + {cowboy, {git, "https://github.com/ninenines/cowboy.git" , {tag,"2.9.0"}}} +]}. + +{shell, [ + % {config, "config/sys.config"}, + {apps, [nerlMonitor]} +]}. diff --git a/src_erl/NerlMonitor/src/MonitorGUI.py b/src_erl/NerlMonitor/src/MonitorGUI.py new file mode 100644 index 000000000..27cde3ab8 --- /dev/null +++ b/src_erl/NerlMonitor/src/MonitorGUI.py @@ -0,0 +1,305 @@ +from term import Atom +from pyrlang.node import Node +from pyrlang.process import Process +import PySimpleGUI as sg +import multiprocessing +from time import sleep +import networkx as nx +import matplotlib.pyplot as plt +from datetime import datetime +import os +import math +import asyncio +import nest_asyncio +nest_asyncio.apply() + + +class MyProcess(Process): + def __init__(self , msg_queue) -> None: + Process.__init__(self) + self.get_node().register_name(self, Atom('PyrlangProcess')) + self.msg_queue = msg_queue + + + def handle_one_inbox_message(self, msg): + print(f'From ErlProcess: {msg}') + if msg[0] == Atom('send'): + print(f'From ErlProcess: {msg[1]}') + else: + self.msg_queue.put_nowait(msg) + if not self.msg_queue.empty(): + print(f'Queue is not Empty: {msg} added.') + + + + +def draw_gradient(canvas, start_color, end_color): + for y in range(0, 200): # Adjust the range to your desired height + r = start_color[0] + (end_color[0] - start_color[0]) * y / 200 + g = start_color[1] + (end_color[1] - start_color[1]) * y / 200 + b = start_color[2] + (end_color[2] - start_color[2]) * y / 200 + color = f'#{int(r):02x}{int(g):02x}{int(b):02x}' + canvas.TKCanvas.create_line(0, y, 200, y, fill=color) + + +Msg_log = [] + +DataColumn = [ + [sg.Frame(title="Event Log:" , + layout=[[sg.Multiline('', size=(140, 60), key='-LOG-', autoscroll=True , font=('SFPro' , 12) , no_scrollbar=True)]], + background_color=('#A90433') , font=('SFPro' , 20) , size=(500,325) , title_color='White' , element_justification='right') + ] , + [sg.Frame(title="Statistics:" , + layout=[[sg.Multiline('', size=(140, 60), key='-STATS-', autoscroll=True , font=('SFPro' , 12) , no_scrollbar=True)]], + background_color=('#A90433') , font=('SFPro' , 20) , size=(500,325) , title_color='White' , element_justification='right') + ] + ] + +GraphColumn = [ + [ sg.Text("Waiting For\n NerlNet Graph..." , key='-PHOLD-', text_color='White' , font=('SFPro' , 12) , size=(70,5) , background_color='#A90433' , justification='center' , pad=(0,0)) , + sg.Image(key='-IMAGE-' , visible=False) + ], + [ + sg.Text("Enter the name of the worker you wish to terminate:" ,key='-INTEXT-', size=(42,1) ,text_color='white' , font=('SFPro' , 12) , background_color='#A90433' , justification='left' , pad=(0,0) , visible=False) , + sg.Input('',key='-INPUT-' , visible=False , justification='left' , size=(20,1) , font=('SFPro' , 12) , background_color='white' , text_color='black' , pad=(0,0) , enable_events=True), + sg.Button(button_text="Terminate" , button_color=('#A90433' , '#FFFFFF') , font=('SFPro' , 12) , size=(10,1) , pad=(0,0) , visible=False, key='-TERM-', enable_events=True) + ] + ] + +layout = [ + [ + sg.Text("NerlNet Monitor" , key='-TEXT-' , size=(30,1) ,text_color='White' , font=('SFPro' , 20) , background_color='#A90433' , justification='center' , pad=(0,0)) + ] , + [ sg.Column(DataColumn , background_color='#A90433') , + sg.VSeperator() , + sg.Column(GraphColumn , background_color='#A90433') + ] , + [ + sg.Button(button_text="Close" , button_color=('#A90433' , '#FFFFFF') , font=('SFPro' , 12) , size=(5,2)), + sg.Button(button_text="Clear Log" , button_color=('#A90433' , '#FFFFFF') , font=('SFPro' , 12) , size=(5,2)) + ] + + ] + +MainWindow = sg.Window("NErlNet" , layout , margins=(5,5) , size=(1400,800) , background_color='#A90433' , finalize=True , resizable=True , element_justification='c' , icon='../../../NerlnetLogo.ico') + +def RemoteRecv(): + return Atom('erl@127.0.0.1') , Atom("recvPyrlang") + +def formatted_time(): + return f'[{datetime.now().day}/{datetime.now().month}/{datetime.now().year}|{datetime.now().hour}:{datetime.now().minute}:{datetime.now().second}]' + +# def SendMsg(Msg): +# SendNode = Node(node_name='pysend@127.0.0.1' , cookie='COOKIE') +# SendProc = MyProcess() +# event_loop = SendNode.get_loop() + +# print(SendNode.where_is(Atom('recvPyrlang'))) + +# def task(): +# SendNode.send_nowait(sender = SendProc.pid_ , receiver = RemoteRecv() , message = (Atom('send'),Atom(Msg))) +# SendNode.destroy() + +# event_loop.call_soon(task) + +# SendNode.run() + +async def GUI(msg_queue): + print("GUI task started...") + PyNode , CommProc = await msg_queue.get() + print("Got Message from queue") + print(msg_queue.empty()) + print(f"Got PyNode and CommProc from Queue.") + StatsInfo = {"workers": {} , "clients": {}} + while True: + await asyncio.sleep(.01) + event , values = MainWindow.read(timeout=100) + existing_text = values['-LOG-'] + updated_text = '' + if event == "Close" or event == sg.WIN_CLOSED: + PyNode.send_nowait(sender = CommProc.pid_ , receiver = RemoteRecv() , message = (Atom('close'))) + await asyncio.sleep(.2) + os.kill(os.getpid() , 9) + print("GUI Closed.") + break + elif event == "Clear Log": + ShowStats(StatsInfo) + MainWindow['-LOG-'].update('') + elif event == "-TERM-": + Workers = [Graph.nodes[node]['label'] for node in Graph.nodes() if Graph.nodes[node]['label'][0] == 'w' and node_colors[node] != 'gray'] + if values['-INPUT-'] not in Workers: + updated_text = f'{existing_text}\n{formatted_time()}: Invalid Worker Name {values["-INPUT-"]} , Available Workers: {Workers}.' + MainWindow['-LOG-'].update(updated_text) + else: + Workers.remove(values['-INPUT-']) + node_colors[values['-INPUT-']] = 'gray' + nx.set_node_attributes(Graph, node_colors, 'color') + colors = nx.get_node_attributes(Graph, 'color').values() + pos = nx.nx_agraph.graphviz_layout(Graph, prog='dot') + plt.figure(figsize=(8,6)) + nx.draw_networkx(Graph, pos, with_labels=True, node_color=colors , node_size=200, font_size=8, font_color='white' , edge_color='black' , width=1.5) + plt.savefig('NerlNetGraph.png' ,bbox_inches='tight' , dpi=125) + plt.close() + MainWindow['-IMAGE-'].update(filename='NerlNetGraph.png' , visible=True , size=(800,600)) + + updated_text = f'{existing_text}\n{formatted_time()}: Sending termination message for {values["-INPUT-"]} to Main Server.' + PyNode.send_nowait(sender = CommProc.pid_ , receiver = RemoteRecv() , message = (Atom('terminate'),Atom(f'{values["-INPUT-"]}'))) + + + MainWindow['-LOG-'].update(updated_text) + MainWindow['-INPUT-'].update('') + if not msg_queue.empty(): + msg = msg_queue.get_nowait() + if msg[0] == 'graph': + Graph , node_colors = Show_Nerlnet_Graph(msg[1]) + MainWindow['-PHOLD-'].update(visible=False) + MainWindow['-IMAGE-'].update(filename='NerlNetGraph.png' , visible=True , size=(800,600)) + MainWindow['-LOG-'].update(f'{formatted_time()}: NerlNet Graph Received.') + MainWindow['-INTEXT-'].update(visible=True) + MainWindow['-INPUT-'].update(visible=True) + MainWindow['-TERM-'].update(visible=True) + elif msg[0] == 'update': + ClientName , WorkerName = msg[1].split('-') + + node_colors[WorkerName] = 'gray' + #node_colors[ClientName] = 'gray' + nx.set_node_attributes(Graph, node_colors, 'color') + colors = nx.get_node_attributes(Graph, 'color').values() + + pos = nx.nx_agraph.graphviz_layout(Graph, prog='dot') + angle = 100 + #rotated_pos = {node: (x*math.cos(angle) -y*math.sin(angle), x*math.sin(angle) + y*math.cos(angle)) for node, (x, y) in pos.items()} + + plt.figure(figsize=(8,6)) + nx.draw_networkx(Graph, pos, with_labels=True, node_color=colors , node_size=200, font_size=8, font_color='white' , edge_color='black' , width=1.5) + plt.savefig('NerlNetGraph.png' ,bbox_inches='tight' , dpi=125) + plt.close() + MainWindow['-IMAGE-'].update(filename='NerlNetGraph.png' , visible=True , size=(800,600)) + if existing_text == '': + updated_text = f'{formatted_time()}: Worker {WorkerName} of Client {ClientName} is down.' + else: + Workers = [Graph.nodes[node]['label'] for node in Graph.nodes() if Graph.nodes[node]['label'][0] == 'w' and node_colors[node] != 'gray'] + updated_text = f'{existing_text}\n{formatted_time()}: Worker {WorkerName} of Client {ClientName} is down , Available workers: {Workers}' + MainWindow['-LOG-'].update(updated_text) + + elif msg[0] == 'stats': + try: + Data = msg[1] + for items in str(Data).split('|'): + Entity, val = items.split(':') + if '=' in val: + for EntityStat in val.split(','): + Stat, Result = EntityStat.split('=') + if "Train" in Stat: + StatsInfo["workers"][Stat] = Result + else: + StatsInfo["clients"][Stat] = Result + else: + StatsInfo[Entity] = val # Messages for entities other than clients/workers + CurrentStats = StatsInfo.copy() # copy the stats to a new variable + StatsText = ShowStats(CurrentStats) + existing_stats = values['-STATS-'] + if existing_stats != '': + MainWindow['-STATS-'].update(f'{existing_stats}\n{StatsText}') + else: + MainWindow['-STATS-'].update(StatsText) + existing_text = values['-LOG-'] + MainWindow['-LOG-'].update(f'{existing_text}\n{formatted_time()}: Statistics Received.') + except Exception as err: + MainWindow['-LOG-'].update(f"Error in Stats {err} , Got {StatsInfo}") + + + elif values['-LOG-'] != '': + existing_text = values['-LOG-'] + updated_text = f'{existing_text}\n{formatted_time()}: {msg}' + else: + updated_text = f'{formatted_time()}: {msg}' + if updated_text != '': + MainWindow['-LOG-'].update(updated_text) + + + + MainWindow.close() + + +def ShowStats(CurrentStats): + MainWindow['-LOG-'].update(f'{formatted_time()}: Printing Statistics...') + StatsText = '' + for key in CurrentStats: + if key == 'workers': + StatsText += f'Workers:\n' + for stat in CurrentStats[key]: + if "Time" in stat: + StatsText += f'\t{stat.replace("_Train_" , " Working ")}: {CurrentStats[key][stat]} seconds\n' + else: + StatsText += f'\t{stat.replace("_" , " ")}: {CurrentStats[key][stat]}\n' + elif key == 'clients': + StatsText += f'Clients:\n' + for stat in CurrentStats[key]: + if "info" in stat: + StatsText += f'\t{stat.replace("_info_" , " Info ")}: {CurrentStats[key][stat]} bytes\n' + else: + StatsText += f'\t{stat.replace("_Msg_" , " Message ")}: {CurrentStats[key][stat]}\n' + elif key == 'Dead workers': + StatsText += f'Dead Workers are:{CurrentStats[key]}\n' + else: + StatsText += f'{key} Message Count: {CurrentStats[key]}\n' + return StatsText + + +def Show_Nerlnet_Graph(NerlGraph): + # Graph in string format: "Entity1Name,Entity1IP,Entity1Port#Entity2Name,Entity2IP,Entity2Port#Entity1Name-Entity2Name,Entity2Name-Entity1Name#Worker1-Client1#Worker2-Client2" etc. + # Workers in string format: "Worker1-Client1,Worker2-Client1,Worker3-Client2" etc. + # Node is defined by a triplet 'Name,IP,Port' seperated by '#' + # Edge is defined by a string 'Entity1-Entity2' seperated by ',' + Nodes = NerlGraph.split('#')[0:-1] + Edges = NerlGraph.split('#')[-1].split(',')[0:-1] + Workers = NerlGraph.split('#')[-1].split(',')[-1].split('!')[0:-1] + WorkersNames = [Worker.split('-')[0] for Worker in Workers ] + Edges += Workers + EdgesSeperated = [(Edge.split('-')[0],Edge.split('-')[1]) for Edge in Edges if len(Edges) > 1] # ? What if no edges? + NodesNames = [NodeTriplet.split(',')[0] for NodeTriplet in Nodes] + NodesNames += WorkersNames + + NodeWithLabels = [(NodeName , {'label' : NodeName}) for NodeName in NodesNames] + graph = nx.Graph() + graph.add_nodes_from(NodeWithLabels) + graph.add_edges_from(EdgesSeperated) + + my_labels = {'mainServer': 'mS' , 'apiServer': 'aS'} + + nx.relabel_nodes(graph, my_labels , copy=False) + default_colors = {node:'#A90433' for node in graph.nodes()} + node_colors = {node:default_colors[node] for node in graph.nodes()} + nx.set_node_attributes(graph, node_colors, 'color') + colors = nx.get_node_attributes(graph, 'color').values() + + pos = nx.nx_agraph.graphviz_layout(graph, prog='dot') + angle = 100 + #rotated_pos = {node: (x*math.cos(angle) -y*math.sin(angle), x*math.sin(angle) + y*math.cos(angle)) for node, (x, y) in pos.items()} + plt.figure(figsize=(8,6)) + nx.draw_networkx(graph, pos, with_labels=True, node_color=colors , node_size=200, font_size=8, font_color='white' , edge_color='black' , width=1.5) + plt.savefig('NerlNetGraph.png' ,bbox_inches='tight' , dpi=125) + plt.close() + return graph , node_colors + + + +async def Pyrlang(msg_queue): + print("Pyrlang task started...") + PyNode = Node(node_name="py@127.0.0.1" , cookie="COOKIE") + CommProc = MyProcess(msg_queue=msg_queue) + msg_queue.put_nowait((PyNode , CommProc)) + print("Pyrlang task finished.") + PyNode.run() + +async def main_func(): + msg_queue = asyncio.Queue() + await asyncio.gather(GUI(msg_queue) , Pyrlang(msg_queue)) + +if __name__ == "__main__": + asyncio.run(main_func()) + + + + diff --git a/src_erl/NerlMonitor/src/handlers/nerlMonitor_handler.erl b/src_erl/NerlMonitor/src/handlers/nerlMonitor_handler.erl new file mode 100644 index 000000000..59f19934d --- /dev/null +++ b/src_erl/NerlMonitor/src/handlers/nerlMonitor_handler.erl @@ -0,0 +1,22 @@ +-module(nerlMonitor_handler). + +-export([init/2]). + +-define(GUI , {'PyrlangProcess' , 'py@127.0.0.1'}). + +init(Req0, [Msg]) -> + {_,Body,_} = cowboy_req:read_body(Req0), + Data = binary_to_list(Body), + case Msg of + utilInfo -> ?GUI ! {update ,Data}; + stats -> ?GUI ! {stats ,Data}; + _ -> + ok % got unknown messge, ignore. + end, + + Req = cowboy_req:reply(200, + #{<<"content-type">> => <<"text/plain">>}, + <<"Got that">>, + Req0), + {ok, Req, Msg}. + diff --git a/src_erl/NerlMonitor/src/nerlMonitor.app.src b/src_erl/NerlMonitor/src/nerlMonitor.app.src new file mode 100644 index 000000000..3625d754d --- /dev/null +++ b/src_erl/NerlMonitor/src/nerlMonitor.app.src @@ -0,0 +1,15 @@ +{application, nerlMonitor, + [{description, "An OTP application"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {nerlMonitor_app, []}}, + {applications, + [kernel, + stdlib + ]}, + {env,[]}, + {modules, []}, + + {licenses, ["Apache-2.0"]}, + {links, []} + ]}. diff --git a/src_erl/NerlMonitor/src/nerlMonitor_app.erl b/src_erl/NerlMonitor/src/nerlMonitor_app.erl new file mode 100644 index 000000000..6c2f54e3b --- /dev/null +++ b/src_erl/NerlMonitor/src/nerlMonitor_app.erl @@ -0,0 +1,91 @@ +%%%------------------------------------------------------------------- +%% @doc nerlGUI public API +%% @end +%%%------------------------------------------------------------------- + +-module(nerlMonitor_app). + +-behaviour(application). + +-include("../../Communication_Layer/http_Nerlserver/src/nerl_tools.hrl"). + +-export([start/2, stop/1 , link_GUI/0]). + +-define(UTILNAME,nerlMonitor). +-define(IP , "192.168.64.7"). +-define(PORT, 8096). +-define(MSADDRES,"192.168.64.7:8080" ). +-define(GUI , {'PyrlangProcess' , 'py@127.0.0.1'}). % Erlang node should be long name to communicate with pyrlang node + +start(_StartType, _StartArgs) -> + application:start(sasl), + application:start(ranch), + application:start(inets), + + Dispatch = cowboy_router:compile([ + {'_', [ + {"/utilInfo",nerlMonitor_handler, [utilInfo]}, + {"/stats" , nerlMonitor_handler , [stats]} + + ]} + ]), + {ok, _} = cowboy:start_clear(?UTILNAME,[{port, ?PORT}],#{env => #{dispatch => Dispatch}}), + io:format("nerlMonitor started , opening GUI...~n"), + erlang:register(recvPyrlang , self()), + _GUI_PID = spawn_link(?MODULE , link_GUI , []) , %% PyrlangNode: ('PyralngProcess' , 'py@127.0.0.1' , 'COOKIE') , sending message by: 'GUI ! HELLO.' + URL = "http://" ++ ?MSADDRES ++ "/toolConnectionReq", + mainServerPing(URL,term_to_binary([?UTILNAME , ?IP , integer_to_list(?PORT)])), %% TODO How to "import" nerl_tools + nerlMonitor_sup:start_link(). + + + +%ping main server in 0.5 sec intervals with connection request. will stop when got valid response. +mainServerPing(URL,Body)-> + io:format("pinging main server...~n"), + Response = httpc:request(post,{URL, [],"application/x-www-form-urlencoded",Body}, [], []), + case Response of + {error,_}-> + timer:sleep(1000), + receive + close -> + io:format("Quitting NerlMonitor...~n"), + {ok , AppName} = application:get_application(), + stop(AppName) + after 0 -> + mainServerPing(URL,Body) + end; + {ok,{_ResCode, _Headers, Data}}-> + io:format("Got NerlGraph , Sending to GUI...~n" , []), + ?GUI ! {graph , Data}, + recvLoop(); + {ok , _} -> + io:format("Got unknown response from main server~n") + end. + + +recvLoop()-> %% MainServer replies with Nerlnet-Graph + receive + {terminate , WorkerName} -> + io:format("Got termination message for Worker ~p from GUI~n" , [WorkerName]), + URL = "http://" ++ ?MSADDRES ++ "/worker_kill", + Body = term_to_binary(WorkerName), + httpc:request(post,{URL, [],"application/x-www-form-urlencoded",Body}, [], []), + recvLoop(); + close -> + io:format("Quitting NerlMonitor...~n"), + {ok , AppName} = application:get_application(), + stop(AppName); + Msg -> + io:format("Got unknown message from GUI: ~p~n", [Msg]), + recvLoop() + end. + + +stop(_State) -> + ok. + +link_GUI() -> + os:cmd('python3 src/MonitorGUI.py'), + io:format("GUI Closed~n"). + + diff --git a/src_erl/NerlMonitor/src/nerlMonitor_sup.erl b/src_erl/NerlMonitor/src/nerlMonitor_sup.erl new file mode 100644 index 000000000..02225cc6a --- /dev/null +++ b/src_erl/NerlMonitor/src/nerlMonitor_sup.erl @@ -0,0 +1,35 @@ +%%%------------------------------------------------------------------- +%% @doc NerlMonitor top level supervisor. +%% @end +%%%------------------------------------------------------------------- + +-module(nerlMonitor_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%% sup_flags() = #{strategy => strategy(), % optional +%% intensity => non_neg_integer(), % optional +%% period => pos_integer()} % optional +%% child_spec() = #{id => child_id(), % mandatory +%% start => mfargs(), % mandatory +%% restart => restart(), % optional +%% shutdown => shutdown(), % optional +%% type => worker(), % optional +%% modules => modules()} % optional +init([]) -> + SupFlags = #{strategy => one_for_all, + intensity => 0, + period => 1}, + ChildSpecs = [], + {ok, {SupFlags, ChildSpecs}}. + +%% internal functions diff --git a/src_erl/NerlMonitor/src/requirements.txt b/src_erl/NerlMonitor/src/requirements.txt new file mode 100644 index 000000000..b4bc44648 --- /dev/null +++ b/src_erl/NerlMonitor/src/requirements.txt @@ -0,0 +1,4 @@ +matplotlib==3.7.1 +nest-asyncio==1.5.7 +networkx==3.1 +PySimpleGUI==4.60.5 diff --git a/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl b/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl index f5a3e3eee..f5d48c0e5 100644 --- a/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl +++ b/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl @@ -45,13 +45,14 @@ start_link(ARGS) -> %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new process to initialize. %% distributedBehaviorFunc is the special behavior of the worker regrading the distributed system e.g. federated client/server -init({WorkerName , WorkerArgs , DistributedBehaviorFunc , DistributedWorkerData , ClientPid}) -> +init({WorkerName , WorkerArgs , DistributedBehaviorFunc , DistributedWorkerData , ClientPid , WorkerStatsEts}) -> nerl_tools:setup_logger(?MODULE), {ModelID , ModelType , LayersSizes, LayersTypes, LayersFunctionalityCodes, LearningRate , Epochs, OptimizerType, OptimizerArgs , LossMethod , DistributedSystemType , DistributedSystemArgs} = WorkerArgs, GenWorkerEts = ets:new(generic_worker,[set]), put(generic_worker_ets, GenWorkerEts), put(client_pid, ClientPid), + put(worker_stats_ets , WorkerStatsEts), ets:insert(GenWorkerEts,{worker_name, WorkerName}), ets:insert(GenWorkerEts,{model_id, ModelID}), ets:insert(GenWorkerEts,{model_type, ModelType}), diff --git a/src_erl/NerlnetApp/src/Client/clientStatem.erl b/src_erl/NerlnetApp/src/Client/clientStatem.erl index 5523bdff5..f9a318a25 100644 --- a/src_erl/NerlnetApp/src/Client/clientStatem.erl +++ b/src_erl/NerlnetApp/src/Client/clientStatem.erl @@ -9,7 +9,7 @@ -module(clientStatem). -author("kapelnik"). -include("../nerl_tools.hrl"). - +-include("../Stats/stats.hrl"). -behaviour(gen_statem). %% API @@ -23,11 +23,9 @@ -import(clientWorkersFunctions,[createWorkers/2]). -import(nerlNIF,[validate_nerltensor_erl/1]). + -define(ETS_KV_VAL_IDX, 2). % key value pairs --> value index is 2 --define(WORKER_PID_IDX, 2). --define(WORKER_TIMING_IDX, 4). --define(WORKER_TRAIN_MISSED_IDX, 5). --define(WORKER_PRED_MISSED_IDX, 6). +-define(WORKER_PID_IDX, 1). -define(SERVER, ?MODULE). %% client ETS table: {WorkerName, WorkerPid, WorkerArgs, TimingTuple} @@ -73,14 +71,14 @@ init({MyName,NerlnetGraph, ClientWorkers , WorkerShaMap , WorkerToClientMap , Sh inets:start(), io:format("Client ~p is connected to: ~p~n",[MyName, [digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:out_neighbours(NerlnetGraph,MyName)]]), % nerl_tools:start_connection([digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:out_neighbours(NerlnetGraph,MyName)]), - EtsRef = ets:new(client_data, [set]), - EtsStatsRef = stats:generate_stats_ets(), - + EtsRef = ets:new(client_data, [set]), %% client_data is responsible for functional attributes + EtsStats = ets:new(ets_stats, [set]), %% ets_stats is responsible for holding all the ets stats (client + workers) + ClientStatsEts = stats:generate_stats_ets(), %% client stats ets inside ets_stats + ets:insert(EtsStats, {MyName, ClientStatsEts}), + put(ets_stats, EtsStats), ets:insert(EtsRef, {workerToClient, WorkerToClientMap}), ets:insert(EtsRef, {workersNames, ClientWorkers}), ets:insert(EtsRef, {nerlnetGraph, NerlnetGraph}), - ets:insert(EtsRef, {msgCounter, 1}), - ets:insert(EtsRef, {infoIn, 0}), ets:insert(EtsRef, {myName, MyName}), MyWorkersToShaMap = maps:filter(fun(Worker , _SHA) -> lists:member(Worker , ClientWorkers) end , WorkerShaMap), io:format("client ~p workers to sha map: ~p~n",[MyName, MyWorkersToShaMap]), @@ -90,7 +88,7 @@ init({MyName,NerlnetGraph, ClientWorkers , WorkerShaMap , WorkerToClientMap , Sh ets:insert(EtsRef, {my_router,{MyRouterHost,MyRouterPort}}), io:format("*****************HERE ~p*****************~n",[MyName]), - clientWorkersFunctions:create_workers(MyName , EtsRef , ShaToModelArgsMap), + clientWorkersFunctions:create_workers(MyName , EtsRef , ShaToModelArgsMap , EtsStats), io:format("*****************HERE AFTER CREATE WORKERS ~p*****************~n",[MyName]), %% send pre_idle signal to workers WorkersNames = clientWorkersFunctions:get_workers_names(EtsRef), @@ -100,9 +98,9 @@ init({MyName,NerlnetGraph, ClientWorkers , WorkerShaMap , WorkerToClientMap , Sh % update dictionary put(nerlnetGraph, NerlnetGraph), - put(client_ets, EtsRef), - put(stats_ets, EtsStatsRef), - + put(client_data, EtsRef), + put(ets_stats, EtsStats), + put(client_stats_ets , ClientStatsEts), {ok, idle, #client_statem_state{myName= MyName, etsRef = EtsRef}}. @@ -117,27 +115,31 @@ callback_mode() -> state_functions. format_status(_Opt, [_PDict, _StateName, _State]) -> Status = some_term, Status. %% ==============STATES================= -waitforWorkers(cast, In = {stateChange,WorkerName}, State = #client_statem_state{myName = MyName,waitforWorkers = WaitforWorkers,nextState = NextState, etsRef = EtsRef}) -> +waitforWorkers(cast, In = {stateChange,WorkerName}, State = #client_statem_state{myName = MyName,waitforWorkers = WaitforWorkers,nextState = NextState, etsRef = _EtsRef}) -> NewWaitforWorkers = WaitforWorkers--[WorkerName], - ets:update_counter(EtsRef, msgCounter, 1), % last is increment value - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), case NewWaitforWorkers of % TODO Guy here we need to check for keep alive with workers [] -> send_client_is_ready(MyName), % when all workers done their work + stats:increment_messages_sent(ClientStatsEts), {next_state, NextState, State#client_statem_state{waitforWorkers = []}}; _-> {next_state, waitforWorkers, State#client_statem_state{waitforWorkers = NewWaitforWorkers}} end; waitforWorkers(cast, In = {NewState}, State = #client_statem_state{myName = _MyName, etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), % ?LOG_INFO("~p in waiting going to state ~p~n",[MyName, State]), Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), - cast_message_to_workers(EtsRef, {NewState}), + cast_message_to_workers(EtsRef, {NewState}), %% This function increments the number of sent messages in stats ets {next_state, waitforWorkers, State#client_statem_state{nextState = NewState, waitforWorkers = Workers}}; -waitforWorkers(cast, EventContent, State = #client_statem_state{etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)), +waitforWorkers(cast, EventContent, State) -> + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(EventContent)), ?LOG_WARNING("client waitforWorkers ignored!!!: ~p ~n",[EventContent]), {next_state, waitforWorkers, State}. @@ -145,61 +147,79 @@ waitforWorkers(cast, EventContent, State = #client_statem_state{etsRef = EtsRef} %% initiating workers when they include federated workers. init stage == handshake between federated worker client and server %% TODO: make custom_worker_message in all states to send messages from workers to entities (not just client) idle(cast, In = {custom_worker_message, {From, To}}, State = #client_statem_state{etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), WorkerOfThisClient = ets:member(EtsRef, To), if WorkerOfThisClient -> TargetWorkerPID = ets:lookup_element(EtsRef, To, ?WORKER_PID_IDX), - gen_statem:cast(TargetWorkerPID,{post_idle,From}); + gen_statem:cast(TargetWorkerPID,{post_idle,From}), + stats:increment_messages_sent(ClientStatsEts); true -> %% send to FedServer that worker From is connecting to it DestClient = maps:get(To, ets:lookup_element(EtsRef, workerToClient, ?ETS_KV_VAL_IDX)), - MessageBody = term_to_binary({DestClient, custom_worker_message, {From, To}}), + MessageBody = {DestClient, custom_worker_message, {From, To}}, {RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX), - nerltools:http_router_request(RouterHost, RouterPort, [DestClient], atom_to_list(custom_worker_message), MessageBody) + nerltools:http_router_request(RouterHost, RouterPort, [DestClient], atom_to_list(custom_worker_message), term_to_binary(MessageBody)), + stats:increment_messages_sent(ClientStatsEts), + stats:increment_bytes_sent(ClientStatsEts , nerl_tools:calculate_size(MessageBody)) end, {keep_state, State}; -idle(cast, In = {statistics}, State = #client_statem_state{ myName = _MyName, etsRef = EtsRef}) -> - sendStatistics(EtsRef), - ets:update_counter(EtsRef, msgCounter, 1), % last param is increment value - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), +idle(cast, _In = {statistics}, State = #client_statem_state{ myName = MyName, etsRef = EtsRef}) -> + EtsStats = get(ets_stats), + ClientStatsEts = get(client_stats_ets), + ClientStatsEncStr = stats:encode_ets_to_http_bin_str(ClientStatsEts), + ClientStatsToSend = atom_to_list(MyName) ++ ?API_SERVER_WITHIN_ENTITY_SEPERATOR ++ ClientStatsEncStr ++ ?API_SERVER_ENTITY_SEPERATOR, + stats:increment_messages_received(ClientStatsEts), + ListStatsEts = ets:tab2list(EtsStats) -- [{MyName , ClientStatsEts}], + WorkersStatsEncStr = create_encoded_stats_str(ListStatsEts), + StatsBody = {MyName , ClientStatsToSend ++ WorkersStatsEncStr}, + {RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX), + nerltools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(statistics), StatsBody), + stats:increment_messages_sent(ClientStatsEts), {next_state, idle, State}; idle(cast, In = {training}, State = #client_statem_state{etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), - MessageToCast = {training}, + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), MessageToCast = {training}, cast_message_to_workers(EtsRef, MessageToCast), {next_state, waitforWorkers, State#client_statem_state{waitforWorkers= ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), nextState = training}}; idle(cast, In = {predict}, State = #client_statem_state{etsRef = EtsRef}) -> io:format("client going to state predict~n",[]), - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), MessageToCast = {predict}, cast_message_to_workers(EtsRef, MessageToCast), {next_state, waitforWorkers, State#client_statem_state{waitforWorkers= ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX),nextState = predict}}; -idle(cast, EventContent, State = #client_statem_state{etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(EventContent)), - io:format("client idle ignored!!!: ~p ~n",[EventContent]), +idle(cast, EventContent, State = #client_statem_state{etsRef = EtsRef , myName = MyName}) -> + ClientStatsEts = get(client_stats_ets), + stats:increment_bad_messages(ClientStatsEts), + ?LOG_WARNING("~p Unrecognized Message!!!: ~p",[MyName , EventContent]), {next_state, training, State#client_statem_state{etsRef = EtsRef}}. %% passing Data from worker to worker e.g. (FedClient to FedServer) training(cast, MessageIn = {update, {From, To, Data}}, State = #client_statem_state{etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(MessageIn)), + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(MessageIn)), WorkerOfThisClient = ets:member(EtsRef, To), if WorkerOfThisClient -> TargetWorkerPID = ets:lookup_element(EtsRef, To, ?WORKER_PID_IDX), - gen_statem:cast(TargetWorkerPID,{update,From,To, Data}); + gen_statem:cast(TargetWorkerPID,{update,From,To, Data}), + stats:increment_messages_sent(ClientStatsEts), + stats:increment_bytes_sent(ClientStatsEts , nerl_tools:calculate_size(Data)); true -> DestClient = maps:get(To, ets:lookup_element(EtsRef, workerToClient, ?ETS_KV_VAL_IDX)), MessageBody = term_to_binary({DestClient, update, {From, To, Data}}), {RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX), - nerltools:http_router_request(RouterHost, RouterPort, [DestClient], atom_to_list(pass), MessageBody) + nerltools:http_router_request(RouterHost, RouterPort, [DestClient], atom_to_list(pass), MessageBody), + stats:increment_messages_sent(ClientStatsEts), + stats:increment_bytes_sent(ClientStatsEts , nerl_tools:calculate_size(MessageBody)) end, {keep_state, State}; @@ -208,78 +228,79 @@ training(cast, MessageIn = {update, {From, To, Data}}, State = #client_statem_st %% TODO fix variables names to make it more generic %% federated server sends AvgWeights to workers training(cast, InMessage = {custom_worker_message, WorkersList, WeightsTensor}, State = #client_statem_state{etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(InMessage)), + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(InMessage)), Func = fun(WorkerName) -> DestClient = maps:get(WorkerName, ets:lookup_element(EtsRef, workerToClient, ?ETS_KV_VAL_IDX)), MessageBody = term_to_binary({DestClient, update, {_FedServer = "server", WorkerName, WeightsTensor}}), % TODO - fix client should not be aware of the data of custom worker message {RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX), - nerltools:http_router_request(RouterHost, RouterPort, [DestClient], atom_to_list(custom_worker_message), MessageBody) + nerltools:http_router_request(RouterHost, RouterPort, [DestClient], atom_to_list(custom_worker_message), MessageBody), + stats:increment_messages_sent(ClientStatsEts), + stats:increment_bytes_sent(ClientStatsEts , nerl_tools:calculate_size(MessageBody)) end, lists:foreach(Func, WorkersList), % can be optimized with broadcast instead of unicast {keep_state, State}; % TODO Validate this state - sample and empty list -training(cast, In = {sample,[]}, State = #client_statem_state{etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), +training(cast, _In = {sample,[]}, State = #client_statem_state{etsRef = EtsRef}) -> + ClientStatsEts = get(client_stats_ets), + stats:increment_bad_messages(ClientStatsEts), ?LOG_ERROR("client got empty Vector",[]), {next_state, training, State#client_statem_state{etsRef = EtsRef}}; training(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), {ClientName, WorkerNameStr, _CSVName, BatchID, BatchOfSamples} = binary_to_term(Body), WorkerName = list_to_atom(WorkerNameStr), WorkerOfThisClient = ets:member(EtsRef, WorkerName), if WorkerOfThisClient -> WorkerPid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX), - TimingTuple = ets:lookup_element(EtsRef, WorkerName, ?WORKER_TIMING_IDX), %TODO timing should be in statistics of worker - {_LastBatchReceivedTime,TotalBatches,TotalTime} = TimingTuple, - Start = os:timestamp(), - NewTimingTuple = {Start,TotalBatches+1,TotalTime}, - ets:update_element(EtsRef, WorkerName,[{?WORKER_PID_IDX, WorkerPid},{?WORKER_TIMING_IDX,NewTimingTuple}]), - gen_statem:cast(WorkerPid, {sample, BatchID ,BatchOfSamples}); + gen_statem:cast(WorkerPid, {sample, BatchID ,BatchOfSamples}), + stats:increment_messages_sent(ClientStatsEts), + stats:increment_bytes_sent(ClientStatsEts , nerl_tools:calculate_size(BatchOfSamples)); true -> ?LOG_ERROR("Given worker ~p isn't found in client ~p",[WorkerName, ClientName]) end, {next_state, training, State#client_statem_state{etsRef = EtsRef}}; training(cast, In = {idle}, State = #client_statem_state{myName = _MyName, etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), MessageToCast = {idle}, cast_message_to_workers(EtsRef, MessageToCast), Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), ?LOG_INFO("setting workers at idle: ~p~n",[ets:lookup_element(EtsRef, workersNames, ?DATA_IDX)]), {next_state, waitforWorkers, State#client_statem_state{etsRef = EtsRef, waitforWorkers = Workers}}; -training(cast, In = {predict}, State = #client_statem_state{myName = MyName, etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), - io:format("~p going to state predict~n",[MyName]), - MessageToCast = {predict}, - cast_message_to_workers(EtsRef,MessageToCast), - Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), - {next_state, waitforWorkers, State#client_statem_state{nextState = predict, waitforWorkers = Workers, etsRef = EtsRef}}; - +training(cast, _In = {predict}, State = #client_statem_state{myName = MyName, etsRef = EtsRef}) -> + ?LOG_ERROR("Wrong request , client ~p can't go from training to predict directly", [MyName]), + {next_state, training, State#client_statem_state{etsRef = EtsRef}}; % training get path to main server training(cast, In = {loss,WorkerName,nan,_Time_NIF}, State) -> EtsRef = get(client_ets), - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), {RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX), MessageBody = term_to_binary({WorkerName,"nan"}), nerltools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(lossFunction), MessageBody), + stats:increment_messages_sent(ClientStatsEts), + stats:increment_bytes_sent(ClientStatsEts , nerl_tools:calculate_size(MessageBody)), {next_state, training, State#client_statem_state{etsRef = EtsRef}}; training(cast, In = {loss,WorkerName,LossFunction,_Time_NIF}, State = #client_statem_state{myName = MyName,etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), - updateTimingMap(EtsRef, WorkerName), + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), {RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX), MessageBody = term_to_binary({WorkerName,LossFunction}), nerltools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(lossFunction), MessageBody), + stats:increment_messages_sent(ClientStatsEts), + stats:increment_bytes_sent(ClientStatsEts , nerl_tools:calculate_size(MessageBody)), {next_state, training, State#client_statem_state{myName = MyName,etsRef = EtsRef}}; training(cast, EventContent, State = #client_statem_state{etsRef = EtsRef, myName = MyName}) -> @@ -288,58 +309,57 @@ training(cast, EventContent, State = #client_statem_state{etsRef = EtsRef, myNam {next_state, training, State#client_statem_state{etsRef = EtsRef}}. predict(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), {ClientName, WorkerNameStr, CSVName, BatchNumber, BatchOfSamples} = binary_to_term(Body), WorkerName = list_to_atom(WorkerNameStr), - Start = os:timestamp(), WorkerOfThisClient = ets:member(EtsRef, WorkerName), if - WorkerOfThisClient -> - TimingTuple = ets:lookup_element(EtsRef, WorkerName, ?WORKER_TIMING_IDX), %todo refactor timing map - {_LastBatchReceivedTime,TotalBatches,TotalTime} = TimingTuple, - NewTimingTuple = {Start,TotalBatches+1,TotalTime}, - ets:update_element(EtsRef, WorkerName,[{?WORKER_TIMING_IDX,NewTimingTuple}]); + WorkerOfThisClient -> + WorkerPid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX), + gen_statem:cast(WorkerPid, {sample, CSVName, BatchNumber, BatchOfSamples}), + stats:increment_messages_sent(ClientStatsEts), + stats:increment_bytes_sent(ClientStatsEts , nerl_tools:calculate_size(BatchOfSamples)); true -> ?LOG_ERROR("Given worker ~p isn't found in client ~p",[WorkerName, ClientName]) end, - - WorkerPid = ets:lookup_element(EtsRef, WorkerName, ?WORKER_PID_IDX), - gen_statem:cast(WorkerPid, {sample, CSVName, BatchNumber, BatchOfSamples}), {next_state, predict, State#client_statem_state{etsRef = EtsRef}}; -%% TODO: add nif timing statistics -predict(cast, In = {predictRes,WorkerName,InputName,ResultID,PredictNerlTensor, Type, _TimeTook}, State = #client_statem_state{myName = MyName, etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), - updateTimingMap(EtsRef, WorkerName), +predict(cast, In = {predictRes,WorkerName,InputName,ResultID,PredictNerlTensor, Type}, State = #client_statem_state{myName = _MyName, etsRef = EtsRef}) -> + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), {RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX), MessageBody = term_to_binary({atom_to_list(WorkerName), InputName, ResultID, {PredictNerlTensor, Type}}), nerltools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(predictRes), MessageBody), + stats:increment_messages_sent(ClientStatsEts), + stats:increment_bytes_sent(ClientStatsEts , nerl_tools:calculate_size(MessageBody)), {next_state, predict, State#client_statem_state{etsRef = EtsRef}}; % TODO from predict directly to training?!?!? -predict(cast, In = {training}, State = #client_statem_state{etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), - MsgToCast = {training}, - cast_message_to_workers(EtsRef, MsgToCast), - Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), - {next_state, waitforWorkers, State#client_statem_state{nextState = training, etsRef = EtsRef, waitforWorkers = Workers}}; - - -predict(cast, In = {idle}, State = #client_statem_state{etsRef = EtsRef}) -> +predict(cast,_In = {training}, State = #client_statem_state{myName = MyName}) -> + ClientStatsEts = get(client_stats_ets), + stats:increment_bad_messages(ClientStatsEts), + ?LOG_ERROR("client ~p got training request in predict state",[MyName]), + {next_state, predict, State#client_statem_state{nextState = predict}}; + +%% The source sends message to main server that it has finished +%% The main server updates its' clients to move to state 'idle' +predict(cast, In = {idle}, State = #client_statem_state{etsRef = EtsRef , myName = MyName}) -> MsgToCast = {idle}, - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, nerl_tools:calculate_size(In)), + ClientStatsEts = get(client_stats_ets), + stats:increment_messages_received(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)), cast_message_to_workers(EtsRef, MsgToCast), - ?LOG_INFO("client going to state idle"), + ?LOG_INFO("client ~p going to state idle" , [MyName]), Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), {next_state, waitforWorkers, State#client_statem_state{nextState = idle, waitforWorkers = Workers, etsRef = EtsRef}}; predict(cast, EventContent, State = #client_statem_state{etsRef = EtsRef}) -> - ets:update_counter(EtsRef, msgCounter, 1), - ets:update_counter(EtsRef, infoIn, erts_debug:flat_size(EventContent)), + ClientStatsEts = get(client_stats_ets), + stats:increment_bad_messages(ClientStatsEts), + stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(EventContent)), ?LOG_WARNING("client predict ignored: ~p ~n",[EventContent]), {next_state, predict, State#client_statem_state{etsRef = EtsRef}}. @@ -376,37 +396,19 @@ send_client_is_ready(MyName) -> %% send an ACK to mainserver that the client is ready nerltools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(clientReady), MyName). -% calculates the avarage training time -updateTimingMap(EtsRef, WorkerName) when is_atom(WorkerName) -> - {Start,TotalBatches,TotalTime} = ets:lookup_element(EtsRef, WorkerName, ?WORKER_TIMING_IDX), % retrieving old value - Finish = os:timestamp(), - TotalTrainingTime = (timer:now_diff(Finish, Start) / 1000), - NewTimingTuple = {Start,TotalBatches,TotalTrainingTime+TotalTime}, - ets:update_element(EtsRef, WorkerName,[{?WORKER_TIMING_IDX,NewTimingTuple}]). %% TODO update WorkerStatsETS - -%% statistics format: clientName:workerName=avgTime,... -%% adding client c1=MsgNum,w1=... -sendStatistics(EtsRef)-> - %TODO Guy - this should be replaced by the new stats module statistics - Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), - TimingMap = [{WorkerKey,ets:lookup_element(EtsRef, WorkerKey, ?WORKER_TIMING_IDX)} || WorkerKey <- Workers], - MissedCounts = [{WorkerKey,ets:lookup_element(EtsRef, WorkerKey, ?WORKER_TRAIN_MISSED_IDX)} || WorkerKey <- Workers], - Counter = ets:lookup_element(EtsRef, msgCounter, ?ETS_KV_VAL_IDX), - InfoSize = ets:lookup_element(EtsRef, infoIn, ?ETS_KV_VAL_IDX), - MyName = ets:lookup_element(EtsRef, myName, ?ETS_KV_VAL_IDX), - - TimingStats = lists:flatten([atom_to_list(WorkerName)++"_Train_Avg_Time="++float_to_list(TotalTime/TotalBatches,[{decimals, 3}])++","||{WorkerName,{_LastTime,TotalBatches,TotalTime}}<-TimingMap]), - MissingStats = lists:flatten([atom_to_list(WorkerName)++"_Train_Miss="++integer_to_list(MissCount)++","||{WorkerName,MissCount}<-MissedCounts]), - MyStats = atom_to_list(MyName)++"_Msg_Count="++integer_to_list(Counter)++","++atom_to_list(MyName)++"_info_Size="++integer_to_list(InfoSize)++",", - - {RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX), - MessageBody = list_to_binary(atom_to_list(MyName)++":"++MyStats++MissingStats++lists:droplast(TimingStats)), % TODO Guy - use encode from stats module - nerltools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(statistics), MessageBody). - cast_message_to_workers(EtsRef, Msg) -> + ClientStatsEts = get(client_stats_ets), Workers = ets:lookup_element(EtsRef, workersNames, ?ETS_KV_VAL_IDX), Func = fun(WorkerKey) -> WorkerPid = ets:lookup_element(EtsRef, WorkerKey, ?WORKER_PID_IDX), - gen_statem:cast(WorkerPid, Msg) + gen_statem:cast(WorkerPid, Msg), + stats:increment_messages_sent(ClientStatsEts) end, - lists:foreach(Func, Workers). \ No newline at end of file + lists:foreach(Func, Workers). + +create_encoded_stats_str(ListStatsEts) -> + Func = fun({WorkerName , StatsEts}) -> + WorkerEncStatsStr = stats:encode_ets_to_http_bin_str(StatsEts), + WorkerName ++ ?API_SERVER_WITHIN_ENTITY_SEPERATOR ++ WorkerEncStatsStr ++ ?API_SERVER_ENTITY_SEPERATOR + end, + lists:flatten(lists:map(Func , ListStatsEts)). \ No newline at end of file diff --git a/src_erl/NerlnetApp/src/Client/clientWorkersFunctions.erl b/src_erl/NerlnetApp/src/Client/clientWorkersFunctions.erl index e4eb0f8c1..02a1f7e28 100644 --- a/src_erl/NerlnetApp/src/Client/clientWorkersFunctions.erl +++ b/src_erl/NerlnetApp/src/Client/clientWorkersFunctions.erl @@ -5,7 +5,7 @@ -include("../nerl_tools.hrl"). -include("../worker_definitions_ag.hrl"). --export([create_workers/3]). +-export([create_workers/4]). -export([get_worker_pid/2 , get_worker_stats_ets/2 , get_workers_names/1]). get_distributed_worker_behavior(DistributedSystemType , WorkerName , DistributedSystemArgs , DistributedSystemToken) -> @@ -26,7 +26,7 @@ case DistributedSystemType of %% Create workers for clients %% %% ets tree structure: ClientETS -> WorkersETS -> {WorkerName, {WorkerStatsETS , WorkerPid, WorkerArgs}} -create_workers(ClientName, ClientEtsRef , ShaToModelArgsMap) -> +create_workers(ClientName, ClientEtsRef , ShaToModelArgsMap , EtsStats) -> CLIENT_WORKES_MAPS_TUPLE_IDX = 2, ClientsMap = maps:from_list(ets:lookup_element(nerlnet_data, deviceClients, ?DATA_IDX)), % This is the format of hostClients {Name,{Port,ClientWorkers,ClientWorkersMaps}} io:format("ClientWorkersMaps: ~p~n", [ClientsMap]), @@ -38,7 +38,7 @@ create_workers(ClientName, ClientEtsRef , ShaToModelArgsMap) -> Func = fun(WorkerName) -> ModelID = erlang:unique_integer([positive]), - WorkerStatsETS = ets:new(worker_stats_ets,[set]), + WorkerStatsETS = stats:generate_workers_stats_ets(), {ok , SHA} = maps:find(WorkerName , ets:lookup_element(ClientEtsRef, workers_to_sha_map, ?DATA_IDX)), {ModelType, LayersSizes, LayersTypes, LayersFunctions, LossMethod, LearningRate, Epochs, Optimizer, OptimizerArgs, _InfraType, DistributedSystemType, @@ -53,9 +53,10 @@ create_workers(ClientName, ClientEtsRef , ShaToModelArgsMap) -> WorkerArgs = {ModelID , ModelType , LayersSizes, LayersTypes, LayersFunctions, LearningRate , Epochs, Optimizer, OptimizerArgs , LossMethod , DistributedSystemType , DistributedSystemArgs}, io:format("WorkerArgs: ~p~n", [WorkerArgs]), - WorkerPid = workerGeneric:start_link({WorkerName , WorkerArgs , DistributedBehaviorFunc , DistributedWorkerData , _ClientPid = self()}), + WorkerPid = workerGeneric:start_link({WorkerName , WorkerArgs , DistributedBehaviorFunc , DistributedWorkerData , _ClientPid = self() , WorkerStatsETS}), - ets:insert(WorkersETS, {WorkerName, {WorkerStatsETS , WorkerPid, WorkerArgs}}), + ets:insert(WorkersETS, {WorkerName, {WorkerPid, WorkerArgs}}), + ets:insert(EtsStats, {WorkerName, WorkerStatsETS}), WorkerName end, @@ -72,12 +73,13 @@ get_worker_stats_ets(ClientEtsRef , WorkerName) -> get_worker_pid(ClientEtsRef , WorkerName) -> WorkersETS = ets:lookup_element(ClientEtsRef, workers_ets, ?DATA_IDX), - {_WorkerStatsETS , WorkerPid , _WorkerArgs} = ets:lookup_element(WorkersETS, WorkerName, ?DATA_IDX), + {WorkerPid , _WorkerArgs} = ets:lookup_element(WorkersETS, WorkerName, ?DATA_IDX), WorkerPid. get_workers_names(ClientEtsRef) -> WorkersETS = ets:lookup_element(ClientEtsRef, workers_ets, ?DATA_IDX), - WorkersNames = [WorkerName || {WorkerName, _Val} <- ets:tab2list(WorkersETS)]. + _WorkersNames = [WorkerName || {WorkerName, _Val} <- ets:tab2list(WorkersETS)]. + diff --git a/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl b/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl index a74daeb64..67f97e2e5 100644 --- a/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl +++ b/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl @@ -150,28 +150,34 @@ handle_cast({statistics,Body}, State = #main_genserver_state{myName = MyName}) - %% TODO - Guy here you should get the the encoded statistics from entities and decode it use it the function you should implement %% statistics arrived from Entity {From, StatsEtsEncStr} = Body, - EntityName = binary_to_atom(From), - EntityStatsEts = get_entity_stats_ets(EntityName), - stats:decode_http_bin_str_to_ets(StatsEtsEncStr, EntityStatsEts, overwrite), %TODO Guy + EntityName = binary_to_atom(From), %TODO Guy + set_entity_stats_ets_str(EntityName, StatsEtsEncStr), % TODO increase counter_received_stats ets by 1 - + ets:update_counter(StatsEts, counter_received_stats, 1), % [From|[NewCounter]] = re:split(binary_to_list(Body), ":", [{return, list}]), % NewStatisticsMap = maps:put(From,NewCounter,StatisticsMap), % NewState = State#main_genserver_state{msgCounter = MsgCounter+1,statisticsMap = NewStatisticsMap,statisticsCounter = StatisticsCounter-1}, ReceivedCounterStatsValue = ets:lookup_element(get(main_server_ets), counter_received_stats, ?DATA_IDX), - TotalNumOfEntities = length(ets:lookup_element(get(main_server_ets), entities_names_list, ?DATA_IDX)), % without MainServer! + EntitiesNamesList = ets:lookup_element(get(main_server_ets), entities_names_list, ?DATA_IDX), + TotalNumOfEntities = length(EntitiesNamesList), % without MainServer! if ReceivedCounterStatsValue == TotalNumOfEntities -> %% got stats from all entities - % TODO Guy Here we send all stats to Api Server - We need to define the new scheme and Noa and Ohad should implement it - % Statistics = maps:to_list(NewStatisticsMap), - % S = mapToString(Statistics,[]) , - % ?LOG_NOTICE("Sending stats: ~p~n",[S]), - % {RouterHost,RouterPort} = nerl_tools:getShortPath(MyName,?API_SERVER_ATOM,NerlnetGraph), - % nerl_tools:http_request(RouterHost,RouterPort,"statistics", S ++ "|mainServer:" ++integer_to_list(MsgCounter)); - todo; - true -> wait_for_more_stats end + Func = fun(Entity) -> + EntityStatsEncStr = get_entity_stats_ets_str(EntityName), + Entity ++ ?API_SERVER_WITHIN_ENTITY_SEPERATOR ++ EntityStatsEncStr ++ ?API_SERVER_ENTITY_SEPERATOR + end, + MainServerEncStatsEts = stats:encode_ets_to_http_bin_str(get_entity_stats_ets_str(?MAIN_SERVER_ATOM)), + MainServerStr = atom_to_list(?MAIN_SERVER_ATOM) ++ ?API_SERVER_WITHIN_ENTITY_SEPERATOR ++ MainServerEncStatsEts ++ ?API_SERVER_ENTITY_SEPERATOR, + StatsToSend = lists:flatten([Func(Entity) || Entity <- EntitiesNamesList] ++ MainServerStr), % add main server to the list + {RouterHost,RouterPort} = ets:lookup_element(get(main_server_ets), my_router, ?DATA_IDX), + ActionStr = atom_to_list(statistics), + nerltools:http_router_request(RouterHost,RouterPort, [?API_SERVER_ATOM], ActionStr, list_to_binary(StatsToSend)), % update the source with its data + ets:update_element(StatsEts, counter_received_stats, {?STATS_KEYVAL_VAL_IDX, 0}); + + true -> wait_for_more_stats + end end, {noreply, State#main_genserver_state{}}; @@ -352,6 +358,14 @@ get_entity_stats_ets(EntityName) -> MainServerEtsStats = get(etsStats), ets:lookup_element(MainServerEtsStats, EntityName , ?DATA_IDX). +get_entity_stats_ets_str(EntityName) -> + MainServerEtsStats = get(etsStats), + ets:lookup_element(MainServerEtsStats, EntityName , ?DATA_IDX). + +set_entity_stats_ets_str(EntityName , StatsEncStr) -> + MainServerEtsStats = get(etsStats), + ets:insert(MainServerEtsStats, {EntityName, StatsEncStr}). + sources_start_casting([],_NumOfSampleToSend)->done; sources_start_casting([SourceName|SourceNames],NumOfSamplesToSend) -> diff --git a/src_erl/NerlnetApp/src/Router/routerGenserver.erl b/src_erl/NerlnetApp/src/Router/routerGenserver.erl index 3759995f6..0bc68b1f3 100644 --- a/src_erl/NerlnetApp/src/Router/routerGenserver.erl +++ b/src_erl/NerlnetApp/src/Router/routerGenserver.erl @@ -45,14 +45,30 @@ init({MyName , _Policy , NerlnetGraph}) -> %% TODO : Add policy to router inets:start(), ?LOG_NOTICE("Router ~p is connected to: ~p~n",[MyName, [digraph:vertex(NerlnetGraph,Vertex) || Vertex <- digraph:out_neighbours(NerlnetGraph,MyName)]]), put(nerlnetGraph, NerlnetGraph), + put(myName, MyName), RoutingTableEtsRef = ets:new(routing_table, [set]), + RouterStatsEts = stats:generate_stats_ets(), + put(router_stats_ets, RouterStatsEts), EntitiesList=digraph:vertices(NerlnetGraph), nerl_tools:make_routing_table(RoutingTableEtsRef,EntitiesList--[?API_SERVER_ATOM,MyName],MyName,NerlnetGraph), {ok, #router_genserver_state{msgCounter = 1, myName = MyName, etsRef=RoutingTableEtsRef}}. +handle_cast({statistics , _Body} , State=#router_genserver_state{etsRef = Routing_table}) -> -handle_cast({unicast,{Dest,Body}}, State = #router_genserver_state{msgCounter = MsgCounter,etsRef=Routing_table }) -> + RouterStatsEts = get(router_stats_ets), + stats:increment_messages_received(RouterStatsEts), + + MyName = get(myName), + StatsEtsStr = stats:encode_ets_to_http_bin_str(RouterStatsEts), + StatisticsBody = {term_to_binary(MyName) , list_to_binary(StatsEtsStr)}, % old data + [{_Dest,{_Name , RouterHost , RouterPort}}] = ets:lookup(Routing_table , ?MAIN_SERVER_ATOM), + nerltools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(statistics), StatisticsBody), + stats:increment_messages_sent(RouterStatsEts), + {noreply , State}; +handle_cast({unicast,{Dest,Body}}, State = #router_genserver_state{msgCounter = MsgCounter,etsRef=Routing_table }) -> + RouterStatsEts = get(router_stats_ets), + stats:increment_messages_received(RouterStatsEts), [{Dest,{Name,Host,Port}}]=ets:lookup(Routing_table,Dest), case Dest of Name-> @@ -64,9 +80,12 @@ handle_cast({unicast,{Dest,Body}}, State = #router_genserver_state{msgCounter = Data={Dest,Body} end, nerl_tools:http_request(Host, Port,Action, term_to_binary(Data)), + stats:increment_messages_sent(RouterStatsEts), {noreply, State#router_genserver_state{msgCounter = MsgCounter+1,etsRef=Routing_table }}; -handle_cast({broadcast,{DestList,Body}}, State = #router_genserver_state{msgCounter = MsgCounter,etsRef=Routing_table }) -> +handle_cast({broadcast,{DestList,Body}}, State = #router_genserver_state{etsRef=Routing_table }) -> + RouterStatsEts = get(router_stats_ets), + stats:increment_messages_received(RouterStatsEts), MapFunc=fun(Dest,Acc)-> %make a map when keys are addreses to send a message to, and values are lists of destination of the message that go throu key addres [{Dest,{Name,Host,Port}}]=ets:lookup(Routing_table,Dest), @@ -100,15 +119,17 @@ handle_cast({broadcast,{DestList,Body}}, State = #router_genserver_state{msgCoun Action="broadcast", Data={DestEntityList,Body} end, - nerl_tools:http_request(Host, Port,Action, term_to_binary(Data)) + nerl_tools:http_request(Host, Port,Action, term_to_binary(Data)), + stats:increment_messages_sent(RouterStatsEts) end, maps:foreach(SendFunc,NextHopMap), - {noreply, State#router_genserver_state{msgCounter = MsgCounter+1,etsRef=Routing_table }}; + {noreply, State#router_genserver_state{etsRef=Routing_table }}; -handle_cast(_Request, State = #router_genserver_state{msgCounter = MsgCounter }) -> +handle_cast(_Request, State) -> ?LOG_ERROR("Unrecognized handle cast message! only unicast/broadcast types are allowed"), - {noreply, State#router_genserver_state{msgCounter = MsgCounter+1}}. + stats:increment_bad_messages(get(router_stats_ets)), + {noreply, State}. diff --git a/src_erl/NerlnetApp/src/Router/routingHandler.erl b/src_erl/NerlnetApp/src/Router/routingHandler.erl index b09dc1ec2..d2d0c2159 100644 --- a/src_erl/NerlnetApp/src/Router/routingHandler.erl +++ b/src_erl/NerlnetApp/src/Router/routingHandler.erl @@ -25,7 +25,8 @@ init(Req0, State) -> % io:format("router got action ~p body:~p~n",[Action,Body]), case Action of unicast ->gen_server:cast(Router_genserver_Pid, {unicast,binary_to_term(Body)}); - broadcast ->gen_server:cast(Router_genserver_Pid, {broadcast,binary_to_term(Body)}) + broadcast ->gen_server:cast(Router_genserver_Pid, {broadcast,binary_to_term(Body)}); + statistics -> gen_server:cast(Router_genserver_Pid, {statistics,binary_to_term(Body)}) end, Reply = io_lib:format(" ", []), %% Reply = io_lib:format("Body Received: ~p, Decoded Body = ~p ~n Client_StateM_Pid:~p, Handler's Pid: ~p~n ", [Body,Decoded_body, Router_genserver_Pid,self()]), diff --git a/src_erl/NerlnetApp/src/Source/sourceStatem.erl b/src_erl/NerlnetApp/src/Source/sourceStatem.erl index 8b66652be..f436c725b 100644 --- a/src_erl/NerlnetApp/src/Source/sourceStatem.erl +++ b/src_erl/NerlnetApp/src/Source/sourceStatem.erl @@ -81,7 +81,7 @@ init({MyName, WorkersMap, NerlnetGraph, Policy, BatchSize, Frequency , Epochs, T % Updating dictionary put(nerlnetGraph, NerlnetGraph), put(source_ets, EtsRef), - put(stats_ets, EtsStatsRef), + put(source_stats_ets, EtsStatsRef), {ok, idle, #source_statem_state{ets_ref = EtsRef, castingTo = []}}. %% @private @@ -111,12 +111,13 @@ state_name(_EventType, _EventContent, State = #source_statem_state{}) -> %% This cast receive a list of samples to load to the records batchList idle(cast, {batchList,WorkersList,Epochs, CSVData}, State) -> EtsRef = get(source_ets), + StatsEtsRef = get(source_stats_ets), MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX), BatchSize = ets:lookup_element(EtsRef, batch_size, ?DATA_IDX), {NerlTensorBatchesList, NerlTensorType, SampleSize} = parser:parseCSV(MyName,BatchSize,CSVData), % TODO this is slow and heavy policy! pre parse in ETS a possible solution ets:update_element(EtsRef, workers_list, [{?DATA_IDX, WorkersList}]), ets:update_element(EtsRef, epochs, [{?DATA_IDX, Epochs}]), - ets:update_counter(EtsRef, total_messages_ctr, 1), % last is increment value + stats:increment_messages_received(StatsEtsRef), ?LOG_NOTICE("Source ~p, workers are: ~p", [MyName, WorkersList]), ?LOG_NOTICE("Source ~p, sample size: ~p", [MyName, SampleSize]), ets:update_element(EtsRef, sample_size, [{?DATA_IDX, SampleSize}]), @@ -125,6 +126,7 @@ idle(cast, {batchList,WorkersList,Epochs, CSVData}, State) -> %% send an ACK to mainserver that the CSV file is ready {RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX), nerltools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(csvReady), MyName), + stats:increment_messages_sent(StatsEtsRef), {next_state, idle, State#source_statem_state{batchesList = NerlTensorBatchesList, nerlTensorType = NerlTensorType}}; @@ -132,8 +134,8 @@ idle(cast, {batchList,WorkersList,Epochs, CSVData}, State) -> idle(cast, {startCasting,Body}, State = #source_statem_state{batchesList = BatchesList}) -> EtsRef = get(source_ets), [_Source,UserLimitNumberOfBatchesToSend] = re:split(binary_to_list(Body), ",", [{return, list}]), - - ets:update_counter(EtsRef, total_messages_ctr, 1), % last is increment value + StatsEtsRef = get(source_stats_ets), + stats:increment_messages_received(StatsEtsRef), MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX), Frequency = ets:lookup_element(EtsRef, frequency, ?DATA_IDX), @@ -161,74 +163,81 @@ idle(cast, {startCasting,Body}, State = #source_statem_state{batchesList = Batch idle(cast, {startCasting}, State) -> EtsRef = get(source_ets), - ets:update_counter(EtsRef, total_messages_ctr, 1), % last is increment value % io:format("im not suppose to be here"), + StatsEtsRef = get(source_stats_ets), + stats:increment_messages_received(StatsEtsRef), + MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX), ?LOG_WARNING("Source ~p receives message during casting!",[MyName]), {next_state, castingData, State}; idle(cast, {stopCasting}, State) -> - EtsRef = get(source_ets), - ets:update_counter(EtsRef, total_messages_ctr, 1), % last is increment value + StatsEtsRef = get(source_stats_ets), + stats:increment_messages_received(StatsEtsRef), {next_state, idle, State}; idle(cast, {statistics}, State) -> EtsRef = get(source_ets), - _StatsEts = get(stats_ets), % TODO use this stats - MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX), - ets:update_counter(EtsRef, total_messages_ctr, 1), % last is increment value - TotalMessagesCounter = ets:lookup_element(EtsRef, total_messages_ctr, ?DATA_IDX), + StatsEtsRef = get(source_stats_ets), + stats:increment_messages_received(StatsEtsRef), - % TODO Guy - new statistics stats.erl - % encode stats ets - StatisticsBody = list_to_binary(atom_to_list(MyName)++":"++integer_to_list(TotalMessagesCounter)), % old data + MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX), + StatsEtsStr = stats:encode_ets_to_http_bin_str(StatsEtsRef), + StatisticsBody = {term_to_binary(MyName) , list_to_binary(StatsEtsStr)}, % old data {RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX), nerltools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(statistics), StatisticsBody), + stats:increment_messages_sent(StatsEtsRef), {next_state, idle, State#source_statem_state{}}; idle(cast, _EventContent, State) -> EtsRef = get(source_ets), - ets:update_counter(EtsRef, total_messages_ctr, 1), % last is increment value + StatsEtsRef = get(source_stats_ets), + stats:increment_messages_received(StatsEtsRef), MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX), ?LOG_WARNING("Source ~p receives an unexpected cast event in idle state!",[MyName]), {next_state, idle, State#source_statem_state{}}. %%waiting for ether data list of sample been sent to finish OR stop message from main server. castingData(cast, {stopCasting}, State = #source_statem_state{transmitter_pid = TransmitterPID}) -> - EtsRef = get(source_ets), - ets:update_counter(EtsRef, total_messages_ctr, 1), % last is increment value + StatsEtsRef = get(source_stats_ets), + stats:increment_messages_received(StatsEtsRef), ?LOG_ERROR("Unsupported yet"), TransmitterPID ! {stopCasting}, % TODO - kill transmitter on stop casting {next_state, idle, State#source_statem_state{transmitter_pid = none}}; castingData(cast, {startCasting}, State) -> EtsRef = get(source_ets), - ets:update_counter(EtsRef, total_messages_ctr, 1), % last is increment value + StatsEtsRef = get(source_stats_ets), + stats:increment_messages_received(StatsEtsRef), MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX), ?LOG_WARNING("~p is already casting, but received a startCasting message",[MyName]), {next_state, castingData, State}; castingData(cast, {leftOvers,_Tail}, State) -> EtsRef = get(source_ets), + StatsEtsRef = get(source_stats_ets), + stats:increment_messages_received(StatsEtsRef), MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX), - ets:update_counter(EtsRef, total_messages_ctr, 1), % last is increment value ?LOG_ERROR("Source ~p got leftOvers unhandled case of castingData state - Currently Deprecated!",[MyName]), {next_state, idle, State}; castingData(cast, {finishedCasting, BatchesSent}, State) -> EtsRef = get(source_ets), - %% source finished casting %% - ets:update_counter(EtsRef, total_messages_ctr, BatchesSent), % last is increment value - ets:update_counter(EtsRef, batches_sent_ctr, BatchesSent), - MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX), + StatsEtsRef = get(source_stats_ets), + stats:increment_messages_received(StatsEtsRef), + %% source finished casting %% + stats:increment_by_value(StatsEtsRef, batches_sent, BatchesSent), + MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX), {RouterHost,RouterPort} = ets:lookup_element(EtsRef, my_router, ?DATA_IDX), %% send an ACK to mainserver that the CSV file is ready nerltools:http_router_request(RouterHost, RouterPort, [?MAIN_SERVER_ATOM], atom_to_list(sourceDone), MyName), + stats:increment_messages_sent(StatsEtsRef), {next_state, idle, State#source_statem_state{transmitter_pid = none}}; castingData(cast, _EventContent, State = #source_statem_state{ets_ref = EtsRef}) -> - ets:update_counter(EtsRef, total_messages_ctr, 1), % last is increment value + StatsEtsRef = get(source_stats_ets), + stats:increment_bad_messages(StatsEtsRef), MyName = ets:lookup_element(EtsRef, my_name, ?DATA_IDX), ?LOG_WARNING("Source ~p Received cast event during castingData state",[MyName]), {next_state, castingData, State#source_statem_state{}}. diff --git a/src_erl/NerlnetApp/src/Stats/stats.erl b/src_erl/NerlnetApp/src/Stats/stats.erl index 7385d33b0..0b7ae0bbe 100644 --- a/src_erl/NerlnetApp/src/Stats/stats.erl +++ b/src_erl/NerlnetApp/src/Stats/stats.erl @@ -10,15 +10,48 @@ -export([get_bytes_sent/1, increment_bytes_sent/2]). -export([get_bad_messages/1, increment_bad_messages/1]). -export([get_value/2, increment_by_value/3]). --export([encode_ets_to_http_bin_str/1, decode_http_bin_str_to_ets/3]). --export([update_workers_ets/4, increment_workers_ets/4]). - -encode_ets_to_http_bin_str(_StatsEts) -> ok. %% Takes value from ets and converts it to "SEPERATOR" string. -decode_http_bin_str_to_ets(_HTTPBinStr , _StatsEts , _OptsAtom) -> ok. %% Takes "SEPERATOR" string and updates ets value. OptsAtom = increment/overwrite. +-export([encode_ets_to_http_bin_str/1 , decode_http_bin_str_to_ets/1]). +-export([update_workers_ets/4, increment_workers_ets/4 , generate_workers_stats_ets/0]). + +get_numeric_type(Value) -> + case Value of + _ when is_integer(Value) -> int; + _ when is_float(Value) -> float; + _ -> throw("Value is not numeric") + end. + +encode_ets_to_http_bin_str(StatsEts) -> + %% Takes value from ets and converts it to "SEPERATORSEPERATOR" string. + StatsList = ets:tab2list(StatsEts), + Func = fun({Key , Value}) -> + Type = get_numeric_type(Value), + KeyStr = lists:flatten(io_lib:format("~p" , [Key])), + ValueStr = lists:flatten(io_lib:format("~p" , [Value])), + TypeStr = lists:flatten(io_lib:format("~p" , [Type])), + KeyStr ++ ?SEPERATOR_WITHIN_TRIPLET ++ ValueStr ++ ?SEPERATOR_WITHIN_TRIPLET ++ TypeStr ++ ?SEPERATOR_TRIPLETS + end, + lists:flatten(lists:map(Func , StatsList)). + +decode_http_bin_str_to_ets(EncodedStr) -> + ReturnedEts = ets:new(ets_to_merge , [set]), + KeyValTypeTokens = string:tokens(EncodedStr , ?SEPERATOR_TRIPLETS), + io:format("KeyValTypeTokens: ~p~n" , [KeyValTypeTokens]), + Func = fun(Triplet) -> + [Key , ValueStr , Type] = string:tokens(Triplet , ?SEPERATOR_WITHIN_TRIPLET), + TypeAtom = list_to_atom(Type), + Value = case TypeAtom of + int -> list_to_integer(ValueStr); + float -> list_to_float(ValueStr); + _ -> throw("Type is not numeric") + end, + ets:insert(ReturnedEts , {Key , Value}) + end, + lists:foreach(Func , KeyValTypeTokens), + ReturnedEts. + generate_stats_ets() -> StatsEts = ets:new(stats_ets , [set]), - WorkersStatsEts = ets:new(stats_ets , [set]), ets:insert(StatsEts, {message_received , 0}), ets:insert(StatsEts, {message_sent , 0}), ets:insert(StatsEts, {message_dropped , 0}), @@ -28,14 +61,25 @@ generate_stats_ets() -> ets:insert(StatsEts, {batches_received , 0}), % related with client only ets:insert(StatsEts, {batches_dropped , 0}), % related with client only ets:insert(StatsEts, {batches_sent , 0}), % related with source - ets:insert(StatsEts, {workers_ets, WorkersStatsEts}), % TODO Guy pay attention to this when you implement encode/decode methods StatsEts. +generate_workers_stats_ets() -> + WorkersStatsEts = ets:new(workers_ets , [set, public]), + ets:insert(WorkersStatsEts, {bytes_received , 0}), + ets:insert(WorkersStatsEts, {bytes_sent , 0}), + ets:insert(WorkersStatsEts, {bad_messages , 0}), + ets:insert(WorkersStatsEts, {batches_received , 0}), % related with client only + ets:insert(WorkersStatsEts, {batches_dropped , 0}), % related with client only + ets:insert(WorkersStatsEts, {batches_sent , 0}), % related with source + ets:insert(WorkersStatsEts, {average_time_training , 0}), + ets:insert(WorkersStatsEts, {average_time_prediction , 0}), + WorkersStatsEts. + %% ---- Workers Stats ETS Methods ----%% update_workers_ets(StatsEts, WorkerName, WorkerAttribute, Value) -> WorkerStatsEts = ets:lookup_element(StatsEts, workers_ets, ?STATS_KEYVAL_VAL_IDX), Key = {WorkerName, WorkerAttribute}, - ets:insert(WorkerStatsEts, Key, Value). + ets:insert(WorkerStatsEts, {Key, Value}). increment_workers_ets(StatsEts, WorkerName, WorkerAttribute, Value) -> WorkerStatsEts = ets:lookup_element(StatsEts, workers_ets, ?STATS_KEYVAL_VAL_IDX), diff --git a/src_erl/NerlnetApp/src/Stats/stats.hrl b/src_erl/NerlnetApp/src/Stats/stats.hrl index 20544346d..4688a4d60 100644 --- a/src_erl/NerlnetApp/src/Stats/stats.hrl +++ b/src_erl/NerlnetApp/src/Stats/stats.hrl @@ -11,4 +11,8 @@ -define(STATS_KEYVAL_KEY_IDX, 1). -define(STATS_KEYVAL_VAL_IDX, 2). --define(SEPERATOR , "@:@"). \ No newline at end of file +-define(SEPERATOR_TRIPLETS , "#"). +-define(SEPERATOR_WITHIN_TRIPLET , ":"). + +-define(API_SERVER_ENTITY_SEPERATOR , "|"). +-define(API_SERVER_WITHIN_ENTITY_SEPERATOR , "&"). \ No newline at end of file diff --git a/src_erl/NerlnetApp/src/nerlnetApp_app.erl b/src_erl/NerlnetApp/src/nerlnetApp_app.erl index ae8f2fbf7..ef869cf2e 100644 --- a/src_erl/NerlnetApp/src/nerlnetApp_app.erl +++ b/src_erl/NerlnetApp/src/nerlnetApp_app.erl @@ -259,7 +259,8 @@ createRouters(MapOfRouters, HostName) -> RouterDispatch = cowboy_router:compile([ {'_', [ {"/unicast",routingHandler, [unicast,RouterGenServerPid]}, - {"/broadcast",routingHandler, [broadcast,RouterGenServerPid]} + {"/broadcast",routingHandler, [broadcast,RouterGenServerPid]}, + {"/statistics",routingHandler, [statistics,RouterGenServerPid]} ]} ]), %% cowboy:start_clear(Name, TransOpts, ProtoOpts) - an http_listener