diff --git a/CMakeLists.txt b/CMakeLists.txt index 41bf7a24..40e289d6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,7 @@ set(CMAKE_CXX_FLAGS_RELEASE "-O2") # Options option(NERLWOLF "Use Wolfram Engine workers extension" OFF) +option(NERLTORCH "use libtorch installed to /usr/local/lib/libtorch" OFF) option(USE_OpenMP "Use-OpenMP" ON) #add_compile_definitions(EIGEN_MAX_ALIGN_BYTES=8) #Open this line for RASPI @@ -42,3 +43,8 @@ if (NERLWOLF) add_library(nerlnet_wolf SHARED $) target_link_libraries(nerlnet_wolf PUBLIC wolframBridge) endif() + +if (NERLTORCH) + add_library(nerlnet_torch SHARED $) + target_link_libraries(nerlnet_torch PUBLIC torchBridge) +endif() \ No newline at end of file diff --git a/NerlnetBuild.sh b/NerlnetBuild.sh index b41565d7..a03a88d0 100755 --- a/NerlnetBuild.sh +++ b/NerlnetBuild.sh @@ -11,18 +11,25 @@ INPUT_DATA_DIR="inputDataDir" Branch="master" JobsNum=4 NerlWolf=OFF +NerlTorch=OFF help() { echo "-------------------------------------" && echo "Nerlnet Build" && echo "-------------------------------------" echo "Usage:" echo "--p or --pull Warning! this uses checkout -f! and branch name checkout to branch $Branch and pull the latest" - echo "--w or --wolf wolfram engine workers extension (nerlwolf)" + echo "--w or --wolf wolfram engine workers infra (nerlwolf)" + echo "--t or --torch torch workers infra (nerltorch)" echo "--j or --jobs number of jobs to cmake build" echo "--c or --clean remove build directory" exit 2 } +print() +{ + echo "$NERLNET_BUILD_PREFIX $1" +} + gitOperations() { echo "$NERLNET_PREFIX Warning! git checkout -f is about to be executed" @@ -67,6 +74,8 @@ print_help() printf '\t%s\n' "-j, --jobs: number of jobs (default: '4')" printf '\t%s\n' "-p, --pull: pull from branch (default: '4')" printf '\t%s\n' "-w, --wolf: wolfram engine extension build (default: 'off')" + printf '\t%s\n' "-t, --torch: torch engine extension build (default: 'off')" + printf '\t%s\n' "-c, --clean: clean build directory (default: 'off')" } @@ -110,6 +119,17 @@ parse_commandline() -w*) NerlWolf="${_key##-j}" ;; + -t|--torch) + test $# -lt 2 && die "Missing value for the optional argument '$_key'." 1 + NerlTorch="$2" + shift + ;; + --torch=*) + NerlTorch="${_key##--jobs=}" + ;; + -t*) + NerlTorch="${_key##-j}" + ;; -j|--jobs) test $# -lt 2 && die "Missing value for the optional argument '$_key'." 1 JobsNum="$2" @@ -157,6 +177,11 @@ else sed -i "s/^.*\(${OPTION}.*$\)/#\1/" CMakeLists.txt fi +if [[ ! $NerlTorch =~ OFF ]]; then + print "NerlTorch is enabled" + print "Installation directory points to /usr/local/lib/libtorch" +fi + if command -v python3 >/dev/null 2>&1; then echo "$NERLNET_BUILD_PREFIX Python 3 is installed" # Generate auto-generated files @@ -187,7 +212,7 @@ fi echo "$NERLNET_BUILD_PREFIX Building Nerlnet Library" echo "$NERLNET_BUILD_PREFIX Cmake command of Nerlnet NIFPP" set -e -cmake -S . -B build/release -DNERLWOLF=$NerlWolf -DCMAKE_BUILD_TYPE=RELEASE +cmake -S . -B build/release -DNERLWOLF=$NerlWolf -DNERLTORCH=$NerlTorch -DCMAKE_BUILD_TYPE=RELEASE cd build/release echo "$NERLNET_BUILD_PREFIX Script CWD: $PWD" echo "$NERLNET_BUILD_PREFIX Build Nerlnet" diff --git a/inputJsonsFiles/DistributedConfig/dc_FedTorchTest_5d_2s_2r_4c_4w.json b/inputJsonsFiles/DistributedConfig/dc_FedTorchTest_5d_2s_2r_4c_4w.json new file mode 100644 index 00000000..f15e9f08 --- /dev/null +++ b/inputJsonsFiles/DistributedConfig/dc_FedTorchTest_5d_2s_2r_4c_4w.json @@ -0,0 +1,179 @@ +{ + "nerlnetSettings": { + "frequency": "300", + "batchSize": "100" + }, + "mainServer": { + "port": "8900", + "args": "" + }, + "apiServer": { + "port": "8901", + "args": "" + }, + "devices": [ + { + "name": "C0VM0", + "ipv4": "10.0.0.5", + "entities": "apiServer,mainServer" + }, + { + "name": "Minion2", + "ipv4": "10.0.0.19", + "entities": "c1,r1,s1" + }, + { + "name": "Minion3", + "ipv4": "10.0.0.20", + "entities": "c2,r2,s2" + }, + { + "name": "Minion7", + "ipv4": "10.0.0.24", + "entities": "c3" + }, + { + "name": "Minion8", + "ipv4": "10.0.0.25", + "entities": "c4" + } + ], + "routers": [ + { + "name": "r1", + "port": "8086", + "policy": "0" + }, + { + "name": "r2", + "port": "8087", + "policy": "0" + } + ], + "sources": [ + { + "name": "s1", + "port": "8085", + "frequency": "200", + "policy": "0", + "epochs": "1", + "type": "0" + }, + { + "name": "s2", + "port": "8085", + "frequency": "200", + "policy": "0", + "epochs": "1", + "type": "0" + } + ], + "clients": [ + { + "name": "c1", + "port": "8083", + "workers": "w1" + }, + { + "name": "c2", + "port": "8083", + "workers": "w2" + }, + { + "name": "c3", + "port": "8083", + "workers": "w3" + }, + { + "name": "c4", + "port": "8083", + "workers": "w4" + } + ], + "workers": [ + { + "name": "w1", + "model_sha": "0771693392e898393c9b2b8235497537b5fbed1fd0c9a5a7ec6aab665d2c1896" + }, + { + "name": "w2", + "model_sha": "0771693392e898393c9b2b8235497537b5fbed1fd0c9a5a7ec6aab665d2c1896" + }, + { + "name": "w3", + "model_sha": "0771693392e898393c9b2b8235497537b5fbed1fd0c9a5a7ec6aab665d2c1896" + }, + { + "name": "w4", + "model_sha": "c081daf49b8332585243b68cb828ebc9b947528601a6852688cea0312b3e3914" + } + ], + "model_sha": { + "0771693392e898393c9b2b8235497537b5fbed1fd0c9a5a7ec6aab665d2c1896": { + "modelType": "0", + "_doc_modelType": " nn:0 | approximation:1 | classification:2 | forecasting:3 | image_classification:4 | text_classification:5 | text_generation:6 | auto_association:7 | autoencoder:8 | ae_classifier:9 |", + "modelArgs": "", + "_doc_modelArgs": "Extra arguments to model", + "layersSizes": "5,6,6,4,3", + "_doc_layersSizes": "List of postive integers [L0, L1, ..., LN]", + "layerTypesList": "1,3,3,3,3", + "_doc_LayerTypes": " Default:0 | Scaling:1 | Conv:2 | Perceptron:3 | Pooling:4 | Probabilistic:5 | LSTM:6 | Reccurrent:7 | Unscaling:8 | Flatten:9 | Bounding:10 |", + "layers_functions": "1,8,8,8,11", + "_doc_layers_functions_activation": " Threshold:1 | Sign:2 | Logistic:3 | Tanh:4 | Linear:5 | ReLU:6 | eLU:7 | SeLU:8 | Soft-plus:9 | Soft-sign:10 | Hard-sigmoid:11 |", + "_doc_layer_functions_pooling": " none:1 | Max:2 | Avg:3 |", + "_doc_layer_functions_probabilistic": " Binary:1 | Logistic:2 | Competitive:3 | Softmax:4 |", + "_doc_layer_functions_scaler": " none:1 | MinMax:2 | MeanStd:3 | STD:4 | Log:5 |", + "lossMethod": "2", + "_doc_lossMethod": " SSE:1 | MSE:2 | NSE:3 | MinkowskiE:4 | WSE:5 | CEE:6 |", + "lr": "0.001", + "_doc_lr": "Positve float", + "epochs": "1", + "_doc_epochs": "Positve Integer", + "optimizer": "5", + "_doc_optimizer": " GD:0 | CGD:1 | SGD:2 | QuasiNeuton:3 | LVM:4 | ADAM:5 |", + "optimizerArgs": "none", + "_doc_optimizerArgs": "String", + "infraType": "0", + "_doc_infraType": " opennn:0 | wolfengine:1 |", + "distributedSystemType": "1", + "_doc_distributedSystemType": " none:0 | fedClientAvg:1 | fedServerAvg:2 |", + "distributedSystemArgs": "SyncMaxCount=50", + "_doc_distributedSystemArgs": "String", + "distributedSystemToken": "9922u", + "_doc_distributedSystemToken": "Token that associates distributed group of workers and parameter-server" + }, + "c081daf49b8332585243b68cb828ebc9b947528601a6852688cea0312b3e3914": { + "modelType": "0", + "_doc_modelType": " nn:0 | approximation:1 | classification:2 | forecasting:3 | image_classification:4 | text_classification:5 | text_generation:6 | auto_association:7 | autoencoder:8 | ae_classifier:9 |", + "modelArgs": "", + "_doc_modelArgs": "Extra arguments to model", + "layersSizes": "5,6,6,4,3", + "_doc_layersSizes": "List of postive integers [L0, L1, ..., LN]", + "layerTypesList": "1,3,3,3,3", + "_doc_LayerTypes": " Default:0 | Scaling:1 | Conv:2 | Perceptron:3 | Pooling:4 | Probabilistic:5 | LSTM:6 | Reccurrent:7 | Unscaling:8 | Flatten:9 | Bounding:10 |", + "layers_functions": "1,8,8,8,11", + "_doc_layers_functions_activation": " Threshold:1 | Sign:2 | Logistic:3 | Tanh:4 | Linear:5 | ReLU:6 | eLU:7 | SeLU:8 | Soft-plus:9 | Soft-sign:10 | Hard-sigmoid:11 |", + "_doc_layer_functions_pooling": " none:1 | Max:2 | Avg:3 |", + "_doc_layer_functions_probabilistic": " Binary:1 | Logistic:2 | Competitive:3 | Softmax:4 |", + "_doc_layer_functions_scaler": " none:1 | MinMax:2 | MeanStd:3 | STD:4 | Log:5 |", + "lossMethod": "2", + "_doc_lossMethod": " SSE:1 | MSE:2 | NSE:3 | MinkowskiE:4 | WSE:5 | CEE:6 |", + "lr": "0.001", + "_doc_lr": "Positve float", + "epochs": "1", + "_doc_epochs": "Positve Integer", + "optimizer": "5", + "_doc_optimizer": " GD:0 | CGD:1 | SGD:2 | QuasiNeuton:3 | LVM:4 | ADAM:5 |", + "optimizerArgs": "none", + "_doc_optimizerArgs": "String", + "infraType": "0", + "_doc_infraType": " opennn:0 | wolfengine:1 |", + "distributedSystemType": "2", + "_doc_distributedSystemType": " none:0 | fedClientAvg:1 | fedServerAvg:2 |", + "distributedSystemArgs": "SyncMaxCount=50", + "_doc_distributedSystemArgs": "String", + "distributedSystemToken": "9922u", + "_doc_distributedSystemToken": "Token that associates distributed group of workers and parameter-server" + } + } +} \ No newline at end of file diff --git a/inputJsonsFiles/experimentsFlow/exp_FedTorchTest_5d_2s_2r_4c_4w.json b/inputJsonsFiles/experimentsFlow/exp_FedTorchTest_5d_2s_2r_4c_4w.json new file mode 100644 index 00000000..1fa9072b --- /dev/null +++ b/inputJsonsFiles/experimentsFlow/exp_FedTorchTest_5d_2s_2r_4c_4w.json @@ -0,0 +1,54 @@ +{ + "experimentName": "synthetic_3_gausians", + "experimentType": "classification", + "batchSize": 100, + "csvFilePath": "/tmp/nerlnet/data/NerlnetData-master/nerlnet/synthetic_norm/synthetic_full.csv", + "numOfFeatures": "5", + "numOfLabels": "3", + "headersNames": "Norm(0:1),Norm(4:1),Norm(10:3)", + "Phases": + [ + { + "phaseName": "training_phase", + "phaseType": "training", + "sourcePieces": + [ + { + "sourceName": "s1", + "startingSample": "0", + "numOfBatches": "500", + "workers": "w1,w2", + "nerltensorType": "float" + }, + { + "sourceName": "s2", + "startingSample": "0", + "numOfBatches": "500", + "workers": "w3", + "nerltensorType": "float" + } + ] + }, + { + "phaseName": "prediction_phase", + "phaseType": "prediction", + "sourcePieces": + [ + { + "sourceName": "s1", + "startingSample": "50000", + "numOfBatches": "500", + "workers": "w1,w2", + "nerltensorType": "float" + }, + { + "sourceName": "s2", + "startingSample": "50000", + "numOfBatches": "500", + "workers": "w3", + "nerltensorType": "float" + } + ] + } +] +} \ No newline at end of file diff --git a/src_cpp/CMakeLists.txt b/src_cpp/CMakeLists.txt index 26aea536..87c179ce 100644 --- a/src_cpp/CMakeLists.txt +++ b/src_cpp/CMakeLists.txt @@ -14,4 +14,9 @@ add_subdirectory(source) if(NERLWOLF) message("[NERLNET] Wolfram Engine nif extension is enabled") add_subdirectory(wolframBridge) +endif() + +if(NERLTORCH) + message("[NERLNET] Libtorch nif extension is enabled") + add_subdirectory(torchBridge) endif() \ No newline at end of file diff --git a/src_cpp/torchBridge/CMakeLists.txt b/src_cpp/torchBridge/CMakeLists.txt new file mode 100644 index 00000000..d06290b8 --- /dev/null +++ b/src_cpp/torchBridge/CMakeLists.txt @@ -0,0 +1,46 @@ +#**************************************************** +# Authors: David Leon +# 29/10/2021 +# +# @copyright Copyright (c) 2021 Nerlnet +# *****************************************************/ + +project(torchBridge) + +set(NIFPP_PATH "${CMAKE_CURRENT_SOURCE_DIR}/../nifpp/") +set(SIMPLE_LOGGER_PATH "${CMAKE_CURRENT_SOURCE_DIR}/../simple-cpp-logger/include") +set(COMMON_PATH "${CMAKE_CURRENT_SOURCE_DIR}/../common") +set(Torch_DIR "/usr/local/lib/libtorch/share/cmake/Torch") + +find_package(Torch REQUIRED "TorchConfig.cmake" CONFIG ) + +set(CMAKE_VERBOSE_MAKEFILE ON) +set(CMAKE_CXX_FLAGS "-fpic") +set(ERL_NIF_DEFAULT_LOCATION "/usr/local/lib/erlang/usr/include") + +# cpp Simple logger options +add_definitions( -D LOGGER_MAX_LOG_LEVEL_PRINTED=6 ) +add_definitions( -D LOGGER_PREFIX_LEVEL=2 ) +add_definitions( -D LOGGER_ENABLE_COLORS=1 ) +add_definitions( -D LOGGER_ENABLE_COLORS_ON_USER_HEADER=0 ) + +set(SRC_CODE + "nerltensorTorchDefs.h" + "nifppNerlTensorTorch.h" + "torchNIF.h" + "torchNIF.cpp" + "NerlWorkerTorch.h" + "NerlWorkerTorch.cpp" + "NerlWorkerTorchNIF.h" + ) + +add_library(${PROJECT_NAME} SHARED ${SRC_CODE}) + +target_link_libraries(${PROJECT_NAME} "${TORCH_LIBRARIES}" common) + +# Include NIF, OpenNN and Simple Cpp Logger +target_include_directories(${PROJECT_NAME} PUBLIC + ${COMMON_PATH} + ${NIFPP_PATH} + ${SIMPLE_LOGGER_PATH} + ${ERL_NIF_DEFAULT_LOCATION}) \ No newline at end of file diff --git a/src_cpp/torchBridge/NerlWorkerTorch.cpp b/src_cpp/torchBridge/NerlWorkerTorch.cpp new file mode 100644 index 00000000..d179b93d --- /dev/null +++ b/src_cpp/torchBridge/NerlWorkerTorch.cpp @@ -0,0 +1 @@ +#include "NerlWorkerTorch.h" \ No newline at end of file diff --git a/src_cpp/torchBridge/NerlWorkerTorch.h b/src_cpp/torchBridge/NerlWorkerTorch.h new file mode 100644 index 00000000..c6ccd096 --- /dev/null +++ b/src_cpp/torchBridge/NerlWorkerTorch.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include + +#include "../common/nerlWorker.h" +#include "worker_definitions_ag.h" +#include "nifppNerlTensorTorch.h" + + +namespace nerlnet +{ + +class NerlWorkerTorch : public NerlWorker +{ + +}; + +} // namespace nerlnet \ No newline at end of file diff --git a/src_cpp/torchBridge/NerlWorkerTorchNIF.h b/src_cpp/torchBridge/NerlWorkerTorchNIF.h new file mode 100644 index 00000000..3f59c932 --- /dev/null +++ b/src_cpp/torchBridge/NerlWorkerTorchNIF.h @@ -0,0 +1,2 @@ +#pragma once + diff --git a/src_cpp/torchBridge/Readme.md b/src_cpp/torchBridge/Readme.md new file mode 100644 index 00000000..17a17fc6 --- /dev/null +++ b/src_cpp/torchBridge/Readme.md @@ -0,0 +1,13 @@ +## Torch Bridge + +(Unsupported yet) +This is a bridge that extends Nerlnet to support libtorch as cpp neural network library. + +### Installation + +1. Go to [Pytorch site](https://pytorch.org/get-started/locally/) and download libtorch +2. Extract libotorch to ```/usr/local/lib/libtorch``` +3. Execute Nerlnet build with -t=ON or --torch=ON +4. Select worker infrastructe in Nerlplanner as torch. + +For apple silicon use [this repo](https://github.com/Nerlnet/libtorch_compiled) to download compiled libtorch. diff --git a/src_cpp/torchBridge/nerltensorTorchDefs.h b/src_cpp/torchBridge/nerltensorTorchDefs.h new file mode 100644 index 00000000..37fa9ff1 --- /dev/null +++ b/src_cpp/torchBridge/nerltensorTorchDefs.h @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include +#include + +namespace nerlnet +{ + +using TorchTensor = torch::Tensor; + +enum {DIMS_CASE_1D,DIMS_CASE_2D,DIMS_CASE_3D}; +enum {DIMS_X_IDX,DIMS_Y_IDX,DIMS_Z_IDX,DIMS_TOTAL}; + +const std::map torch_dtype_map = { + {"float", c10::ScalarType::Float}, + {"double", c10::ScalarType::Double}, + {"int", c10::ScalarType::Int}, + {"long", c10::ScalarType::Long}, + {"bool", c10::ScalarType::Bool}, + {"uint8", c10::ScalarType::Byte}, + {"int8", c10::ScalarType::Char}, + {"int16", c10::ScalarType::Short}, + {"int32", c10::ScalarType::Int}, + {"int64", c10::ScalarType::Long}, + {"float16", c10::ScalarType::Half}, + {"float32", c10::ScalarType::Float}, + {"float64", c10::ScalarType::Double}, + {"qint8", c10::ScalarType::QInt8}, + {"quint8", c10::ScalarType::QUInt8}, + {"qint32", c10::ScalarType::QInt32}, + {"bfloat16", c10::ScalarType::BFloat16}, + {"bool", c10::ScalarType::Bool}, + {"uint8", c10::ScalarType::Byte}, + {"int8", c10::ScalarType::Char}, + {"int16", c10::ScalarType::Short}, + {"int32", c10::ScalarType::Int}, + {"int64", c10::ScalarType::Long}, + {"float16", c10::ScalarType::Half}, + {"float32", c10::ScalarType::Float}, + {"float64", c10::ScalarType::Double}}; + +inline c10::ScalarType get_torch_dtype(const std::string &dtype_str) +{ + assert (torch_dtype_map.find(dtype_str) != torch_dtype_map.end()); + return torch_dtype_map.at(dtype_str); +} + +} // namespace nerlnet \ No newline at end of file diff --git a/src_cpp/torchBridge/nifppNerlTensorTorch.h b/src_cpp/torchBridge/nifppNerlTensorTorch.h new file mode 100644 index 00000000..e7f81ab6 --- /dev/null +++ b/src_cpp/torchBridge/nifppNerlTensorTorch.h @@ -0,0 +1,120 @@ +#pragma once + +#include "nifpp.h" +#include "nerltensorTorchDefs.h" + + +namespace nifpp +{ + using namespace nerlnet; + + struct nerltensor_dims + { + int dimx; + int dimy; + int dimz; + int total_size; + int dims_case; + }; + + // Declarations + template int get_nerltensor_dims(ErlNifEnv *env , ERL_NIF_TERM bin_term, nerltensor_dims &dims_info); + template int get_nerltensor(ErlNifEnv *env , ERL_NIF_TERM bin_term, TorchTensor &tensor, torch::ScalarType torch_dtype); + template void make_tensor(ErlNifEnv *env , nifpp::TERM &ret_bin_term, TorchTensor &tensor); + + + // Definitions + template int get_nerltensor_dims(ErlNifEnv *env , ERL_NIF_TERM bin_term, nerltensor_dims &dims_info) + { + ErlNifBinary bin; + int ret = enif_inspect_binary(env, bin_term, &bin); + assert(ret != 0); + + std::vector dims; + // extract dims and data size + dims.resize(DIMS_TOTAL); + memcpy(dims.data(), bin.data, DIMS_TOTAL * sizeof(BasicType)); + + dims_info.total_size = 1; + for (int i=0; i < DIMS_TOTAL; i++) + { + dims_info.total_size *= dims[i]; + if (dims[i] > 1) + { + dims_info.dims_case = i; + } + } + assert(("Negative Or zero value of dimension", dims_info.total_size > 0)); + + + dims_info.dimx = static_cast(dims[DIMS_X_IDX]); + dims_info.dimy = static_cast(dims[DIMS_Y_IDX]); + dims_info.dimz = static_cast(dims[DIMS_Z_IDX]); + } + + + template int get_nerltensor(ErlNifEnv *env , ERL_NIF_TERM bin_term, TorchTensor &tensor, torch::ScalarType torch_dtype) + { + ErlNifBinary bin; + int ret = enif_inspect_binary(env, bin_term, &bin); + assert(ret != 0); + + // extract dims and data size + nerltensor_dims dims_info; + get_nerltensor_dims(env, bin_term, dims_info); + + switch (dims_info.dims_case) + { + case DIMS_CASE_1D: + { + tensor = torch::zeros(dims_info.dimx, torch_dtype); + break; + } + case DIMS_CASE_2D: + { + tensor = torch::zeros({dims_info.dimx, dims_info.dimy}, torch_dtype); + break; + } + case DIMS_CASE_3D: + { + tensor = torch::zeros({dims_info.dimx, dims_info.dimy, dims_info.dimz}, torch_dtype); + break; + } + } + + assert((sizeof(BasicType) == tensor.element_size(), "Size of BasicType and torch tensor element size mismatch")); + + // copy data from nerltensor to torch tensor + int skip_dims_bytes = (DIMS_TOTAL * sizeof(BasicType)); + std::memcpy(tensor.data_ptr(),bin.data + skip_dims_bytes, sizeof(BasicType)*tensor.numel()); + } + + template void make_tensor(ErlNifEnv *env , nifpp::TERM &ret_bin_term, TorchTensor &tensor) + { + std::vector dims; + dims.resize(DIMS_TOTAL); + for (int dim=0; dim < DIMS_TOTAL; dim++) + { + if (dim < tensor.sizes().Length()) + { + dims[dim] = static_cast(tensor.sizes()[dim]); + } + else + { + dims[dim] = 1; + } + } + size_t dims_size = DIMS_TOTAL * sizeof(BasicType); + size_t data_size = tensor.numel() * sizeof(BasicType); + + nifpp::binary nifpp_bin(dims_size + data_size); + + assert((sizeof(BasicType) == tensor.element_size(), "Size of BasicType and torch tensor element size mismatch")); + + std::memcpy(nifpp_bin.data, dims.data(), dims_size); + std::memcpy(nifpp_bin.data + dims_size, tensor.data_ptr(), data_size); + + ret_bin_term = nifpp::make(env, nifpp_bin); + } + +} \ No newline at end of file diff --git a/src_cpp/torchBridge/torchNIF.cpp b/src_cpp/torchBridge/torchNIF.cpp new file mode 100644 index 00000000..1cd21b17 --- /dev/null +++ b/src_cpp/torchBridge/torchNIF.cpp @@ -0,0 +1,21 @@ +#include "torchNIF.h" + + +void* train_threaded_function(void* args) +{ + std::shared_ptr* p_thread_args_ptr = static_cast*>(args); + std::shared_ptr thread_args_ptr = *p_thread_args_ptr; + delete p_thread_args_ptr; + + // TODO implement training +} + + +void* predict_threaded_function(void* args) +{ + std::shared_ptr* p_thread_args_ptr = static_cast*>(args); + std::shared_ptr thread_args_ptr = *p_thread_args_ptr; + delete p_thread_args_ptr; + + // TODO implement prediction +} \ No newline at end of file diff --git a/src_cpp/torchBridge/torchNIF.h b/src_cpp/torchBridge/torchNIF.h new file mode 100644 index 00000000..1593552f --- /dev/null +++ b/src_cpp/torchBridge/torchNIF.h @@ -0,0 +1,132 @@ +#pragma once + +#include "NerlWorkerTorch.h" +#include "bridgeController.h" + +#include + +class dirty_thread_args +{ +public: + long int mid; // model id + std::shared_ptr data; + nifpp::str_atom return_tensor_type; // holds the type of tensor should be returned + std::chrono::high_resolution_clock::time_point start_time; + + + ErlNifTid tid; + ErlNifPid pid; +}; + +void* train_threaded_function(void* args); +void* predict_threaded_function(void* args); + +/* +train_nif function is called by NIF from Erlang. +It creates a TorchTensor from input data and calls the threaded train funciton +*/ +static ERL_NIF_TERM train_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + std::shared_ptr* p_thread_args_ptr = new std::shared_ptr(std::make_shared()); + std::shared_ptr thread_args_ptr = *p_thread_args_ptr; + + nifpp::str_atom tensor_type; + c10::ScalarType torch_dtype; + + thread_args_ptr->start_time = std::chrono::high_resolution_clock::now(); + + enum{ARG_MODEL_ID, ARG_NERLTENSOR, ARG_NERLTENSOR_TYPE}; + + nifpp::get_throws(env, argv[ARG_NERLTENSOR_TYPE],tensor_type); + thread_args_ptr->return_tensor_type = tensor_type; + // extract model id + nifpp::get_throws(env, argv[ARG_MODEL_ID], thread_args_ptr->mid); + torch_dtype = nerlnet::get_torch_dtype(tensor_type); + // extract nerltensor + nifpp::get_nerltensor(env, argv[ARG_NERLTENSOR], *(thread_args_ptr->data), torch_dtype); + + char* thread_name = "train_thread"; + int thread_create_status = enif_thread_create(thread_name, &(thread_args_ptr->tid), train_threaded_function, (void*) p_thread_args_ptr, NULL); + void** exit_code; + if (thread_create_status != 0) + { + LogError("failed to call enif_thread_create with train_nif"); + nifpp::str_atom ret_status("train_nif_error"); + return nifpp::make(env, ret_status); + } + else + { + thread_create_status = enif_thread_join(thread_args_ptr->tid, exit_code ); + if (thread_create_status != 0) + { + LogError("failed to join with train_nif"); + nifpp::str_atom ret_status("train_nif_error"); + return nifpp::make(env, ret_status); + } + } + return nifpp::make(env, "ok"); +} +/* +predict_nif function is called by NIF from Erlang. +It creates a TorchTensor from input data and calls the threaded predict function +*/ +static ERL_NIF_TERM predict_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + std::shared_ptr* p_thread_args_ptr = new std::shared_ptr(std::make_shared()); + std::shared_ptr thread_args_ptr = *p_thread_args_ptr; + + nifpp::str_atom tensor_type; + c10::ScalarType torch_dtype; + + thread_args_ptr->start_time = std::chrono::high_resolution_clock::now(); + + enum{ARG_MODEL_ID, ARG_NERLTENSOR, ARG_NERLTENSOR_TYPE}; + + nifpp::get_throws(env, argv[ARG_NERLTENSOR_TYPE],tensor_type); + thread_args_ptr->return_tensor_type = tensor_type; + // extract model id + nifpp::get_throws(env, argv[ARG_MODEL_ID], thread_args_ptr->mid); + torch_dtype = nerlnet::get_torch_dtype(tensor_type); + // extract nerltensor + nifpp::get_nerltensor(env, argv[ARG_NERLTENSOR], *(thread_args_ptr->data), torch_dtype); + + char* thread_name = "predict_thread"; + int thread_create_status = enif_thread_create(thread_name, &(thread_args_ptr->tid), predict_threaded_function, (void*) p_thread_args_ptr, NULL); + void** exit_code; + if (thread_create_status != 0) + { + LogError("failed to call enif_thread_create with predict_nif"); + nifpp::str_atom ret_status("predict_nif_error"); + return nifpp::make(env, ret_status); + } + else + { + thread_create_status = enif_thread_join(thread_args_ptr->tid, exit_code ); + if (thread_create_status != 0) + { + LogError("failed to join with predict_nif"); + nifpp::str_atom ret_status("predict_nif_error"); + return nifpp::make(env, ret_status); + } + } + return nifpp::make(env, "ok"); +} + +static ErlNifFunc nif_funcs[] = +{ + {"get_active_models_ids_list",0, get_active_models_ids_list_nif}, + {"train_nif", 3 , train_nif}, + {"predict_nif", 3 , predict_nif}, + // {"get_weights_nif",1, get_weights_nif}, + // {"set_weights_nif",3, set_weights_nif}, + // {"encode_nif",2, encode_nif}, + // {"decode_nif",2, decode_nif}, + // {"nerltensor_sum_nif",3, nerltensor_sum_nif}, + // {"nerltensor_scalar_multiplication_nif",3,nerltensor_scalar_multiplication_nif}, + // // nerlworker functions + // {"new_nerlworker_nif", 13, new_nerlworker_nif}, + // {"test_nerlworker_nif", 13, test_nerlworker_nif}, + // {"update_nerlworker_train_params_nif", 6, update_nerlworker_train_params_nif}, + // {"remove_nerlworker_nif", 1, remove_nerlworker_nif}, + // {"get_distributed_system_train_labels_count_nif", 1, get_distributed_system_train_labels_count_nif} +}; diff --git a/src_erl/NerlnetApp/src/Bridge/onnWorkers/w2wCom.erl b/src_erl/NerlnetApp/src/Bridge/Common/w2wCom.erl similarity index 100% rename from src_erl/NerlnetApp/src/Bridge/onnWorkers/w2wCom.erl rename to src_erl/NerlnetApp/src/Bridge/Common/w2wCom.erl diff --git a/src_erl/NerlnetApp/src/Bridge/onnWorkers/w2wCom.hrl b/src_erl/NerlnetApp/src/Bridge/Common/w2wCom.hrl similarity index 100% rename from src_erl/NerlnetApp/src/Bridge/onnWorkers/w2wCom.hrl rename to src_erl/NerlnetApp/src/Bridge/Common/w2wCom.hrl diff --git a/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerDefinitions.hrl b/src_erl/NerlnetApp/src/Bridge/Common/workerDefinitions.hrl similarity index 100% rename from src_erl/NerlnetApp/src/Bridge/onnWorkers/workerDefinitions.hrl rename to src_erl/NerlnetApp/src/Bridge/Common/workerDefinitions.hrl diff --git a/src_erl/NerlnetApp/src/Bridge/nerlNIF.erl b/src_erl/NerlnetApp/src/Bridge/onnWorkers/nerlNIF.erl similarity index 99% rename from src_erl/NerlnetApp/src/Bridge/nerlNIF.erl rename to src_erl/NerlnetApp/src/Bridge/onnWorkers/nerlNIF.erl index c70194cd..496b0df5 100644 --- a/src_erl/NerlnetApp/src/Bridge/nerlNIF.erl +++ b/src_erl/NerlnetApp/src/Bridge/onnWorkers/nerlNIF.erl @@ -1,6 +1,6 @@ -module(nerlNIF). -include_lib("kernel/include/logger.hrl"). --include("nerlTensor.hrl"). +-include("../nerlTensor.hrl"). -export([init/0,nif_preload/0,get_active_models_ids_list/0, train_nif/3,update_nerlworker_train_params_nif/6,call_to_train/5,predict_nif/3,call_to_predict/5,get_weights_nif/1,printTensor/2]). -export([call_to_get_weights/1,call_to_set_weights/2]). diff --git a/src_erl/NerlnetApp/src/Bridge/nerlTests.erl b/src_erl/NerlnetApp/src/Bridge/onnWorkers/nerlTests.erl similarity index 99% rename from src_erl/NerlnetApp/src/Bridge/nerlTests.erl rename to src_erl/NerlnetApp/src/Bridge/onnWorkers/nerlTests.erl index 3e17c5cb..8b3a6057 100644 --- a/src_erl/NerlnetApp/src/Bridge/nerlTests.erl +++ b/src_erl/NerlnetApp/src/Bridge/onnWorkers/nerlTests.erl @@ -1,9 +1,9 @@ -module(nerlTests). -author("David Leon"). --include("nerlTensor.hrl"). +-include("../nerlTensor.hrl"). -include("neural_networks_testing_models.hrl"). --include("layers_types_ag.hrl"). --include("models_types_ag.hrl"). +-include("../layers_types_ag.hrl"). +-include("../models_types_ag.hrl"). -compile(nerlNIF). -export([run_tests/0]). @@ -211,7 +211,6 @@ sum_nerltensors_lists_test(Type, N, Performance) -> encode_decode_nifs_test(0, _Res, Performance) -> Performance ; encode_decode_nifs_test(N, Res, Performance) -> - %io:format("GOT HERE~n"), EncodeType = random_pick_nerltensor_type(), NerlTensor = generate_nerltensor_rand_dims(EncodeType), Tic = nerl:tic(), diff --git a/src_erl/NerlnetApp/src/Bridge/neural_networks_testing_models.hrl b/src_erl/NerlnetApp/src/Bridge/onnWorkers/neural_networks_testing_models.hrl similarity index 100% rename from src_erl/NerlnetApp/src/Bridge/neural_networks_testing_models.hrl rename to src_erl/NerlnetApp/src/Bridge/onnWorkers/neural_networks_testing_models.hrl diff --git a/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerFederatedClient.erl b/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerFederatedClient.erl index ec808b95..3b7879b3 100644 --- a/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerFederatedClient.erl +++ b/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerFederatedClient.erl @@ -3,7 +3,7 @@ -export([controller/2]). -include("/usr/local/lib/nerlnet-lib/NErlNet/src_erl/NerlnetApp/src/nerl_tools.hrl"). --include("w2wCom.hrl"). +-include("/usr/local/lib/nerlnet-lib/NErlNet/src_erl/NerlnetApp/src/Bridge/Common/w2wCom.hrl"). -import(nerlNIF, [call_to_get_weights/2, call_to_set_weights/2]). @@ -113,7 +113,7 @@ end_stream({GenWorkerEts, WorkerData}) -> % WorkerData is currently a list of [S W2WPid = ets:lookup_element(ThisEts, w2wcom_pid, ?ETS_KEYVAL_VAL_IDX), ActiveStreams = ets:lookup_element(GenWorkerEts, active_streams, ?ETS_KEYVAL_VAL_IDX), case length(ActiveStreams) of % Send to server an updater after got start_stream from the first source - 0 -> io:format("Worker ~p ending stream with ~p~n", [MyName, SourceName]), + 0 -> % io:format("Worker ~p ending stream with ~p~n", [MyName, SourceName]), w2wCom:send_message_with_event(W2WPid, MyName, ServerName , end_stream, {MyName, SourceName}); % Mimic source behavior _ -> ok end @@ -152,6 +152,7 @@ post_train({GenWorkerEts, {post_train_update, {_SyncIdx, UpdatedWeights}}}) -> case WeightsUpdateFlag of false -> throw("Received weights update but not waiting for it"); true -> + % io:format("Got updated weights from server~n"), ModelID = ets:lookup_element(GenWorkerEts, model_id, ?ETS_KEYVAL_VAL_IDX), nerlNIF:call_to_set_weights(ModelID, UpdatedWeights), ets:update_element(FedClientEts, wait_for_weights_update, {?ETS_KEYVAL_VAL_IDX, false}), @@ -172,6 +173,7 @@ post_train({GenWorkerEts, _Data}) -> SyncCount = ets:lookup_element(FedClientEts, sync_count, ?ETS_KEYVAL_VAL_IDX), MaxSyncCount = ets:lookup_element(FedClientEts, sync_max_count, ?ETS_KEYVAL_VAL_IDX), if SyncCount == MaxSyncCount -> + % io:format("~p sent averaging request to server~n", [MyName]), ModelID = ets:lookup_element(GenWorkerEts, model_id, ?ETS_KEYVAL_VAL_IDX), WeightsTensor = nerlNIF:call_to_get_weights(ModelID), ServerName = ets:lookup_element(FedClientEts, server_name, ?ETS_KEYVAL_VAL_IDX), diff --git a/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerFederatedServer.erl b/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerFederatedServer.erl index 4eec7dfc..3bcf9dd8 100644 --- a/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerFederatedServer.erl +++ b/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerFederatedServer.erl @@ -2,7 +2,7 @@ -export([controller/2]). --include("w2wCom.hrl"). +-include("/usr/local/lib/nerlnet-lib/NErlNet/src_erl/NerlnetApp/src/Bridge/Common/w2wCom.hrl"). -import(nerlNIF,[nerltensor_scalar_multiplication_nif/3, call_to_get_weights/1, call_to_set_weights/2]). -import(nerlTensor,[sum_nerltensors_lists/2]). @@ -145,18 +145,15 @@ post_train({GenWorkerEts, WeightsTensor}) -> NumOfActiveWorkers = length([FedWorker || {_MyName, {FedWorker, _Source}} <- ActiveWorkersSourcesList]), case length(TotalWorkersWeights) of NumOfActiveWorkers -> + % io:format("Averaging model weights...~n"), ets:update_counter(FedServerEts, total_syncs, 1), SyncIdx = ets:lookup_element(FedServerEts, total_syncs, ?ETS_KEYVAL_VAL_IDX), ModelID = ets:lookup_element(GenWorkerEts, model_id, ?ETS_KEYVAL_VAL_IDX), - % io:format("Averaging model weights...~n"), {CurrentModelWeights, BinaryType} = nerlNIF:call_to_get_weights(ModelID), FedServerName = ets:lookup_element(FedServerEts, my_name, ?ETS_KEYVAL_VAL_IDX), AllWorkersWeightsList = TotalWorkersWeights ++ [CurrentModelWeights], - io:format("GOT HERE1~n"), AvgWeightsNerlTensor = generate_avg_weights(AllWorkersWeightsList, BinaryType), - io:format("GOT HERE2~n"), nerlNIF:call_to_set_weights(ModelID, AvgWeightsNerlTensor), %% update self weights to new model - io:format("GOT HERE3~n"), Func = fun(FedClient) -> FedServerName = ets:lookup_element(ThisEts, my_name, ?ETS_KEYVAL_VAL_IDX), W2WPid = ets:lookup_element(ThisEts, w2wcom_pid, ?ETS_KEYVAL_VAL_IDX), diff --git a/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl b/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl index 8aea9582..faa88c30 100644 --- a/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl +++ b/src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl @@ -1,5 +1,4 @@ -module(workerGeneric). --include("workerDefinitions.hrl"). %%%------------------------------------------------------------------- %%% @copyright (C) 2023, Nerlnet %%% @doc @@ -11,6 +10,9 @@ -import(nerlNIF,[encode_nif/2, nerltensor_encode/5, nerltensor_conversion/2, get_all_binary_types/0]). -import(nerlNIF,[erl_type_conversion/1]). -import(w2wCom,[send_message/3, get_inbox_queue/1]). + +% includes +-include("/usr/local/lib/nerlnet-lib/NErlNet/src_erl/NerlnetApp/src/Bridge/Common/workerDefinitions.hrl"). -include("/usr/local/lib/nerlnet-lib/NErlNet/src_erl/NerlnetApp/src/nerl_tools.hrl"). -include("/usr/local/lib/nerlnet-lib/NErlNet/src_erl/NerlnetApp/src/Bridge/nerlTensor.hrl"). @@ -175,18 +177,7 @@ wait(cast, {loss, nan , TrainTime , BatchID , SourceName}, State = #workerGeneri WorkerToken = ets:lookup_element(get(generic_worker_ets), distributed_system_token, ?ETS_KEYVAL_VAL_IDX), gen_statem:cast(get(client_pid),{loss, MyName , SourceName ,nan , TrainTime, WorkerToken ,BatchID}), NextStateBehavior = DistributedBehaviorFunc(post_train, {get(generic_worker_ets),[]}), %% First call sends empty list , then it will be updated by the federated server and clients - EndStreamWaitingList = ets:lookup_element(get(generic_worker_ets), end_streams_waiting_list, ?ETS_KEYVAL_VAL_IDX), - case length(EndStreamWaitingList) of - 0 -> ok; - _ -> - Func = fun(StreamName) -> - stream_handler(end_stream, train, StreamName, DistributedBehaviorFunc), - CurrentEndStreamWaitingList = ets:lookup_element(get(generic_worker_ets), end_streams_waiting_list, ?ETS_KEYVAL_VAL_IDX), - NewEndStreamWaitingList = CurrentEndStreamWaitingList -- [StreamName], - ets:update_element(get(generic_worker_ets), end_streams_waiting_list, {?ETS_KEYVAL_VAL_IDX, NewEndStreamWaitingList}) - end, - lists:foreach(Func, EndStreamWaitingList) - end, + handle_end_stream_waiting_list(DistributedBehaviorFunc, train), {next_state, NextStateBehavior, State}; @@ -195,18 +186,7 @@ wait(cast, {loss, {LossTensor, LossTensorType} , TrainTime , BatchID , SourceNam WorkerToken = ets:lookup_element(get(generic_worker_ets), distributed_system_token, ?ETS_KEYVAL_VAL_IDX), gen_statem:cast(get(client_pid),{loss, MyName, SourceName ,{LossTensor, LossTensorType} , TrainTime , WorkerToken, BatchID , BatchTimeStamp}), NextStateBehavior = DistributedBehaviorFunc(post_train, {get(generic_worker_ets),[]}), %% First call sends empty list , then it will be updated by the federated server and clients - EndStreamWaitingList = ets:lookup_element(get(generic_worker_ets), end_streams_waiting_list, ?ETS_KEYVAL_VAL_IDX), - case length(EndStreamWaitingList) of - 0 -> ok; - _ -> - Func = fun(StreamName) -> - stream_handler(end_stream, train, StreamName, DistributedBehaviorFunc), - CurrentEndStreamWaitingList = ets:lookup_element(get(generic_worker_ets), end_streams_waiting_list, ?ETS_KEYVAL_VAL_IDX), - NewEndStreamWaitingList = CurrentEndStreamWaitingList -- [StreamName], - ets:update_element(get(generic_worker_ets), end_streams_waiting_list, {?ETS_KEYVAL_VAL_IDX, NewEndStreamWaitingList}) - end, - lists:foreach(Func, EndStreamWaitingList) - end, + handle_end_stream_waiting_list(DistributedBehaviorFunc, train), {next_state, NextStateBehavior, State}; wait(cast, {predictRes, PredNerlTensor, PredNerlTensorType, TimeNif, BatchID , SourceName}, State = #workerGeneric_state{myName = MyName, nextState = NextState, distributedBehaviorFunc = DistributedBehaviorFunc, distributedWorkerData = DistributedWorkerData}) -> @@ -214,37 +194,35 @@ wait(cast, {predictRes, PredNerlTensor, PredNerlTensorType, TimeNif, BatchID , S WorkerToken = ets:lookup_element(get(generic_worker_ets), distributed_system_token, ?ETS_KEYVAL_VAL_IDX), gen_statem:cast(get(client_pid),{predictRes,MyName, SourceName, {PredNerlTensor, PredNerlTensorType}, TimeNif , WorkerToken, BatchID , BatchTimeStamp}), DistributedBehaviorFunc(post_predict, {get(generic_worker_ets),DistributedWorkerData}), - EndStreamWaitingList = ets:lookup_element(get(generic_worker_ets), end_streams_waiting_list, ?ETS_KEYVAL_VAL_IDX), - case length(EndStreamWaitingList) of - 0 -> ok; - _ -> - Func = fun(StreamName) -> - stream_handler(end_stream, train, StreamName, DistributedBehaviorFunc), - CurrentEndStreamWaitingList = ets:lookup_element(get(generic_worker_ets), end_streams_waiting_list, ?ETS_KEYVAL_VAL_IDX), - NewEndStreamWaitingList = CurrentEndStreamWaitingList -- [StreamName], - ets:update_element(get(generic_worker_ets), end_streams_waiting_list, {?ETS_KEYVAL_VAL_IDX, NewEndStreamWaitingList}) - end, - lists:foreach(Func, EndStreamWaitingList) - end, + handle_end_stream_waiting_list(DistributedBehaviorFunc, predict), {next_state, NextState, State}; wait(cast, {end_stream , StreamName}, State = #workerGeneric_state{myName = _MyName, distributedBehaviorFunc = _DistributedBehaviorFunc}) -> %logger:notice("Waiting, next state - idle"), CurrentEndStreamWaitingList = ets:lookup_element(get(generic_worker_ets), end_streams_waiting_list, ?ETS_KEYVAL_VAL_IDX), NewEndStreamWaitingList = CurrentEndStreamWaitingList ++ [StreamName], + % io:format("Got end_stream @wait: NewWaitingList: ~p~n",[NewEndStreamWaitingList]), ets:update_element(get(generic_worker_ets), end_streams_waiting_list, {?ETS_KEYVAL_VAL_IDX, NewEndStreamWaitingList}), % io:format("@wait ~p got end stream from ~p~n",[MyName, StreamName]), {next_state, wait, State}; -wait(cast, {post_train_update, Data}, State = #workerGeneric_state{myName = _MyName, distributedBehaviorFunc = DistributedBehaviorFunc, postBatchFunc = PostBatchFunc}) -> +wait(cast, {post_train_update, Data}, State = #workerGeneric_state{myName = _MyName, distributedBehaviorFunc = DistributedBehaviorFunc}) -> NextStateBehavior = DistributedBehaviorFunc(post_train, {get(generic_worker_ets), {post_train_update, Data}}), - PostBatchFunc(), - {next_state, NextStateBehavior, State#workerGeneric_state{postBatchFunc = ?EMPTY_FUNC}}; + if + NextStateBehavior == train -> + ok; + true -> + ?LOG_ERROR("@wait: post_train controller method must return train atom!"), + throw("@wait: post_train controller method must return train atom!") + end, + handle_end_stream_waiting_list(DistributedBehaviorFunc, train), + {next_state, NextStateBehavior, State}; % CANNOT HAPPEN wait(cast, {idle}, State= #workerGeneric_state{myName = MyName, distributedBehaviorFunc = DistributedBehaviorFunc}) -> %logger:notice("Waiting, next state - idle"), + % io:format("@wait: Got idle message, next state - idle~n"), DistributedBehaviorFunc(pre_idle, {get(generic_worker_ets), train}), update_client_avilable_worker(MyName), {next_state, idle, State#workerGeneric_state{nextState = idle}}; @@ -260,6 +238,7 @@ wait(cast, {predict}, State) -> %% Worker in wait can't treat incoming message wait(cast, BatchTuple , State = #workerGeneric_state{lastPhase = LastPhase, myName= _MyName}) when element(1, BatchTuple) == sample -> + % io:format("@wait: Dropped batch state...~n"), case LastPhase of train -> ets:update_counter(get(worker_stats_ets), batches_dropped_train , 1); @@ -303,13 +282,13 @@ train(cast, {post_train_update , Weights}, State = #workerGeneric_state{myName = DistributedBehaviorFunc(post_train, {get(generic_worker_ets), Weights}), {next_state, train, State}; -train(cast, {start_stream , StreamName}, State = #workerGeneric_state{myName = MyName , distributedBehaviorFunc = DistributedBehaviorFunc}) -> - % io:format("~p start stream ~p~n",[MyName, StreamName]), +train(cast, {start_stream , StreamName}, State = #workerGeneric_state{myName = _MyName , distributedBehaviorFunc = DistributedBehaviorFunc}) -> stream_handler(start_stream, train, StreamName, DistributedBehaviorFunc), + % io:format("~p start stream ~p~n",[MyName, StreamName]), {next_state, train, State}; -train(cast, {end_stream , StreamName}, State = #workerGeneric_state{myName = MyName , distributedBehaviorFunc = DistributedBehaviorFunc}) -> - % io:format("~p end stream ~p~n",[MyName, StreamName]), +train(cast, {end_stream , StreamName}, State = #workerGeneric_state{myName = _MyName , distributedBehaviorFunc = DistributedBehaviorFunc}) -> + % io:format("@train: ~p end stream ~p~n",[MyName, StreamName]), stream_handler(end_stream, train, StreamName, DistributedBehaviorFunc), {next_state, train, State}; @@ -377,3 +356,20 @@ stream_handler(StreamPhase , ModelPhase , StreamName , DistributedBehaviorFunc) end, ets:update_element(GenWorkerEts, active_streams, {?ETS_KEYVAL_VAL_IDX, NewActiveStreams}), DistributedBehaviorFunc(StreamPhase, {GenWorkerEts, [StreamName , ModelPhase]}). + +handle_end_stream_waiting_list(DistributedBehaviorFunc, ModelPhase) -> + EndStreamWaitingList = ets:lookup_element(get(generic_worker_ets), end_streams_waiting_list, ?ETS_KEYVAL_VAL_IDX), + % io:format("EndStreamWaitingList: ~p~n",[EndStreamWaitingList]), + case length(EndStreamWaitingList) of + 0 -> ok; + _ -> + % io:format("Removing from waiting list...~n"), + Func = fun(StreamName) -> + stream_handler(end_stream, ModelPhase, StreamName, DistributedBehaviorFunc), + CurrentEndStreamWaitingList = ets:lookup_element(get(generic_worker_ets), end_streams_waiting_list, ?ETS_KEYVAL_VAL_IDX), + NewEndStreamWaitingList = CurrentEndStreamWaitingList -- [StreamName], + ets:update_element(get(generic_worker_ets), end_streams_waiting_list, {?ETS_KEYVAL_VAL_IDX, NewEndStreamWaitingList}) + end, + lists:foreach(Func, EndStreamWaitingList) + end. + diff --git a/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchDefs.hrl b/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchDefs.hrl new file mode 100644 index 00000000..6342fd42 --- /dev/null +++ b/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchDefs.hrl @@ -0,0 +1,3 @@ + +-define(NERLNET_LIB_PATH,"/usr/local/lib/nerlnet-lib/NErlNet"). +-include("/usr/local/lib/nerlnet-lib/NErlNet/src_erl/NerlnetApp/src/Bridge/nerlTensor.hrl"). diff --git a/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchNIF.erl b/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchNIF.erl new file mode 100644 index 00000000..0e311634 --- /dev/null +++ b/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchNIF.erl @@ -0,0 +1,15 @@ +-module(torchNIF). + +-include_lib("kernel/include/logger.hrl"). +-include("torchDefs.hrl"). + +-author("David Leon"). + + + +% -export([init/0,nif_preload/0,get_active_models_ids_list/0, train_nif/3,update_nerlworker_train_params_nif/6,call_to_train/5,predict_nif/3,call_to_predict/5,get_weights_nif/1,printTensor/2]). +% -export([call_to_get_weights/1,call_to_set_weights/2]). +% -export([decode_nif/2, nerltensor_binary_decode/2]). +% -export([encode_nif/2, nerltensor_encode/5, nerltensor_conversion/2, get_all_binary_types/0, get_all_nerltensor_list_types/0]). +% -export([erl_type_conversion/1]). + diff --git a/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchTests.erl b/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchTests.erl new file mode 100644 index 00000000..4eafbf22 --- /dev/null +++ b/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchTests.erl @@ -0,0 +1,26 @@ +-module(torchTests). +-author("David Leon"). + +-include_lib("kernel/include/logger.hrl"). +-include("torchTestsDefs.hrl"). + +-define(NERLTEST_PRINT_STR, "[NERLTEST] "). + +-export([run_tests/0]). + +nerltest_print(String) -> + logger:notice(?NERLTEST_PRINT_STR++String). + +test_envelope(Func, TestName, Rounds) -> + nerltest_print(nerl:string_format("~p test starts for ~p rounds",[TestName, Rounds])), + {TimeTookMicro, _RetVal} = timer:tc(Func, [Rounds]), + nerltest_print(nerl:string_format("Elapsed: ~p~p",[TimeTookMicro / 1000, ms])), ok. + +test_envelope_nif_performance(Func, TestName, Rounds) -> + nerltest_print(nerl:string_format("~p test starts for ~p rounds",[TestName, Rounds])), + {TimeTookMicro, AccPerfromance} = timer:tc(Func, [Rounds]), + AveragedPerformance = AccPerfromance/Rounds, + nerltest_print(nerl:string_format("Elapsed: ~p~p Average nif performance: ~.3f~p",[TimeTookMicro/1000,ms, AveragedPerformance, ms])), ok. + +run_tests()-> + nerl:logger_settings(nerlTests). \ No newline at end of file diff --git a/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchTestsDefs.hrl b/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchTestsDefs.hrl new file mode 100644 index 00000000..0e7875ad --- /dev/null +++ b/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchTestsDefs.hrl @@ -0,0 +1,2 @@ + +-include("torchTestsModels.hrl"). \ No newline at end of file diff --git a/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchTestsModels.hrl b/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchTestsModels.hrl new file mode 100644 index 00000000..2f66460a --- /dev/null +++ b/src_erl/NerlnetApp/src/Bridge/torchWorkers/torchTestsModels.hrl @@ -0,0 +1,74 @@ + + +-define(PERCEPTRON_TESTING_NN,{ _ModelId = erlang:unique_integer([positive]), + _ModelType = "0", + _ModelArgs = "", + _LayersSizes = "5,30,5,3", + _LayersTypes = "1,3,3,3", + _LayersFunctionalityCodes = "1,6,6,6", % change scaler functionality to 6 to check exception handling + _LearningRate = "0.01", + _Epochs = "50", + _OptimizerType = "2", + _OptimizerArgs = "", + _LossMethod = "2", + _DistributedSystemType = "0", + _DistributedSystemArg = ""} ). + +-define(PERCEPTRON_TESTING_DISTRIBUTED_NN,{ _ModelId = erlang:unique_integer([positive]), + _ModelType = "0", + _ModelArgs = "", + _LayersSizes = "5,30,5,3", + _LayersTypes = "1,3,3,3", + _LayersFunctionalityCodes = "1,6,6,6", % change scaler functionality to 6 to check exception handling + _LearningRate = "0.01", + _Epochs = "50", + _OptimizerType = "2", + _OptimizerArgs = "", + _LossMethod = "2", + _DistributedSystemType = "0", % TODO Ori put the correct value + _DistributedSystemArg = ""} ). + +-define(CNN_TESTING_NN,{ _ModelIdCNN = erlang:unique_integer([positive]), + _ModelTypeCNN = "0", + _ModelArgsCNN = "", + _LayersSizesCNN = "28x28x1k5x5x1x6p0s1t1,28x28x6k2x2p0s2,14x14x6k4x4x6x12p0s1t0,1,32,10", + _LayersTypesCNN = "2,4,2,9,3,5", + _LayersFunctionalityCodesCNN = "6,2,6,6,6,4", % change scaler functionality to 6 to check exception handling + _LearningRateCNN = "0.01", + _EpochsCNN = "50", + _OptimizerTypeCNN = "5", + _OptimizerArgsCNN = "", + _LossMethodCNN = "2", + _DistributedSystemTypeCNN = "0", + _DistributedSystemArgCNN = ""} ). + +-define(AEC_TESTING_NN,{ _ModelIdAEC = erlang:unique_integer([positive]), + _ModelTypeAEC = "9", + _ModelArgsAEC = "", + _LayersSizesAEC = "32,16,8,4,8,16,32,32", % last layer (perceptron) should be the same as the input layer , followed by bounding layer + _LayersTypesAEC = "1,3,3,3,3,3,3,10", + _LayersFunctionalityCodesAEC = "1,11,11,11,11,11,11,1", + _LearningRateAEC = "0.01", + _EpochsAEC = "50", + _OptimizerTypeAEC = "5", + _OptimizerArgsAEC = "", + _LossMethodAEC = "2", + _DistributedSystemTypeAEC = "0", + _DistributedSystemArgAEC = ""} ). + +-define(AE_TESTING_NN, { _ModelIdAE = erlang:unique_integer([positive]), + _ModelTypeAE = "8", + _ModelArgsAE = "", + _LayersSizesAE = "32,16,8,4,8,16,32,32", % last layer (perceptron) should be the same as the input layer , followed by bounding layer + _LayersTypesAE = "1,3,3,3,3,3,3,10", + _LayersFunctionalityCodesAE = "1,11,11,11,11,11,11,1", + _LearningRateAE = "0.01", + _EpochsAE = "50", + _OptimizerTypeAE = "5", + _OptimizerArgsAE = "", + _LossMethodAE = "2", + _DistributedSystemTypeAE = "0", + _DistributedSystemArgAE = ""} ). + +-define(NEURAL_NETWORK_TESTING_MODELS_LIST, [?PERCEPTRON_TESTING_NN ,?AEC_TESTING_NN , ?CNN_TESTING_NN]). +-define(NEURAL_NETWORK_TESTING_MODELS_LIST_NAMES, ["Perceptron" ,"AEC" ,"CNN"]). \ No newline at end of file diff --git a/src_erl/NerlnetApp/src/Client/clientWorkersFunctions.erl b/src_erl/NerlnetApp/src/Client/clientWorkersFunctions.erl index 2d1a2119..44b29384 100644 --- a/src_erl/NerlnetApp/src/Client/clientWorkersFunctions.erl +++ b/src_erl/NerlnetApp/src/Client/clientWorkersFunctions.erl @@ -48,6 +48,15 @@ create_workers(ClientName, ClientEtsRef , ShaToModelArgsMap , EtsStats) -> {ModelType, ModelArgs, LayersSizes, LayersTypes, LayersFunctions, LossMethod, LearningRate, Epochs, Optimizer, OptimizerArgs, _InfraType, DistributedSystemType, DistributedSystemArgs, DistributedSystemToken} = maps:get(SHA, ShaToModelArgsMap), + DistributedTypeInteger = list_to_integer(DistributedSystemType), + if + DistributedTypeInteger > 0 -> % not none (distributed) + if length(DistributedSystemToken) == 5 -> + ?LOG_INFO("~p Running a Distributed independent system on this device with Token: ~p",[WorkerName, DistributedSystemToken]); + true -> throw("Distributed non-independent system (e.g., Federated Learning) must have a token which is NOT none. Add it to the Distributed Config json file under distributedSystemToken field, make sure it is 5 characters long") + end; + true -> ok + end, MyClientPid = self(), % TODO add documentation about this case of % move this case to module called client_controller diff --git a/src_erl/NerlnetApp/src/Init/jsonParser.erl b/src_erl/NerlnetApp/src/Init/jsonParser.erl index d433b01d..02eba428 100644 --- a/src_erl/NerlnetApp/src/Init/jsonParser.erl +++ b/src_erl/NerlnetApp/src/Init/jsonParser.erl @@ -139,7 +139,9 @@ get_device_routers(DCMap, DeviceEntities) -> %% return the ets name %% -------------------------------------------------------------- json_to_ets(IPv4, JsonDCMap) -> - + % Auto generated definitions validation + if ?DC_DISTRIBUTED_SYSTEM_TYPE_NONE_IDX_STR == "0" -> ok; + true -> throw("Auto generated definitions are not valid, none-distributed system type should be 0") end, % update DeviceName ets:insert(nerlnet_data, {?DC_IPV4_FIELD_ATOM, IPv4}), ets:insert(nerlnet_data, {ipv4_bin, list_to_binary(IPv4)}), %% ? is this needed diff --git a/src_py/apiServer/nerl_model_db.py b/src_py/apiServer/nerl_model_db.py index f2dfcd3c..bd048770 100644 --- a/src_py/apiServer/nerl_model_db.py +++ b/src_py/apiServer/nerl_model_db.py @@ -57,11 +57,11 @@ def get_total_batches_per_source(self, source_name): def get_worker_name(self): return self.worker_name - def get_batches_ts_tansor_data_dict(self): - batches_ts_tansor_data_dict = {} + def get_batches_ts_tensor_data_dict(self): + batches_ts_tensor_data_dict = {} for batch_db in self.batches_ts_dict.values(): - batches_ts_tansor_data_dict[batch_db.batch_timestamp] = batch_db.tensor_data - return batches_ts_tansor_data_dict + batches_ts_tensor_data_dict[batch_db.batch_timestamp] = batch_db.tensor_data + return batches_ts_tensor_data_dict def get_batches_dict(self): return self.batches_dict diff --git a/src_py/apiServer/stats.py b/src_py/apiServer/stats.py index 6a81b267..d32ac3d7 100644 --- a/src_py/apiServer/stats.py +++ b/src_py/apiServer/stats.py @@ -72,9 +72,9 @@ def get_loss_ts(self , plot : bool = False , saveToFile : bool = False): # Todo for worker_db in workers_model_db_list: worker_name = worker_db.get_worker_name() loss_dict[worker_name] = [] - batches_ts_tansor_data_dict =worker_db.get_batches_ts_tansor_data_dict() - sorted_batches_ts_tansor_data_dict = dict(sorted(batches_ts_tansor_data_dict.items())) - loss_dict[worker_name] = [sorted_batches_ts_tansor_data_dict[key] for key in sorted(sorted_batches_ts_tansor_data_dict)] + batches_ts_tensor_data_dict =worker_db.get_batches_ts_tensor_data_dict() + sorted_batches_ts_tensor_data_dict = dict(sorted(batches_ts_tensor_data_dict.items())) + loss_dict[worker_name] = [sorted_batches_ts_tensor_data_dict[key] for key in sorted(sorted_batches_ts_tensor_data_dict)] # Convert NumPy arrays to floats for worker_name in loss_dict: diff --git a/src_py/apiServer/stats_aec.py b/src_py/apiServer/stats_aec.py index ed1b1bdd..44cf4436 100644 --- a/src_py/apiServer/stats_aec.py +++ b/src_py/apiServer/stats_aec.py @@ -28,9 +28,9 @@ def get_aec_loss(self, plot=False): for worker_db in workers_model_db_list: worker_name = worker_db.get_worker_name() - batches_ts_tansor_data_dict = worker_db.get_batches_ts_tansor_data_dict() - sorted_batches_ts_tansor_data_dict = dict(sorted(batches_ts_tansor_data_dict.items())) - loss_dict[worker_name] = [sorted_batches_ts_tansor_data_dict[key][0] for key in sorted(sorted_batches_ts_tansor_data_dict)] + batches_ts_tensor_data_dict = worker_db.get_batches_ts_tensor_data_dict() + sorted_batches_ts_tensor_data_dict = dict(sorted(batches_ts_tensor_data_dict.items())) + loss_dict[worker_name] = [sorted_batches_ts_tensor_data_dict[key][0] for key in sorted(sorted_batches_ts_tensor_data_dict)] for worker_name in loss_dict: loss_dict[worker_name] = [float(arr) for sublist in loss_dict[worker_name] for arr in sublist] @@ -56,10 +56,10 @@ def get_aec_boundaries(self, plot=False): for worker_db in workers_model_db_list: worker_name = worker_db.get_worker_name() - batches_ts_tansor_data_dict = worker_db.get_batches_ts_tansor_data_dict() - sorted_batches_ts_tansor_data_dict = dict(sorted(batches_ts_tansor_data_dict.items())) - upper_boundaries_dict[worker_name] = [sorted_batches_ts_tansor_data_dict[key][1] for key in sorted(sorted_batches_ts_tansor_data_dict)] - lower_boundaries_dict[worker_name] = [sorted_batches_ts_tansor_data_dict[key][2] for key in sorted(sorted_batches_ts_tansor_data_dict)] + batches_ts_tensor_data_dict = worker_db.get_batches_ts_tensor_data_dict() + sorted_batches_ts_tensor_data_dict = dict(sorted(batches_ts_tensor_data_dict.items())) + upper_boundaries_dict[worker_name] = [sorted_batches_ts_tensor_data_dict[key][1] for key in sorted(sorted_batches_ts_tensor_data_dict)] + lower_boundaries_dict[worker_name] = [sorted_batches_ts_tensor_data_dict[key][2] for key in sorted(sorted_batches_ts_tensor_data_dict)] for worker_name in upper_boundaries_dict: upper_boundaries_dict[worker_name] = [float(arr) for sublist in upper_boundaries_dict[worker_name] for arr in sublist] diff --git a/tests/NerlnetNifTest.sh b/tests/NerlnetNifTest.sh index cca2690f..81e2193f 100755 --- a/tests/NerlnetNifTest.sh +++ b/tests/NerlnetNifTest.sh @@ -12,9 +12,11 @@ TEST_LOG_PATH="/usr/local/lib/nerlnet-lib/log" TEST_LOG_FILE_PATH="$TEST_LOG_PATH/$LOG_FILE" ERL_BRIDGE_SOURCE_PATH="$NERLNET_PATH/src_erl/NerlnetApp/src/Bridge" NERLNET_BUILD_DIR="$NERLNET_PATH/build" -NERLNET_TEST_DIR="$NERLNET_BUILD_DIR/test" +NERLNET_TEST_DIR="$NERLNET_BUILD_DIR/test/onnNifTest" +ONN_WORKER_DIR="onnWorkers" +ONN_WORKER_PATH_FULL="$NERLNET_PATH/src_erl/NerlnetApp/src/Bridge/$ONN_WORKER_DIR" -print "Nerlnet testing script initiated" +print "Nerlnet testing script initiated for ONN-NIF" print "Copy files to $NERLNET_TEST_DIR" if [ -d "$NERLNET_TEST_DIR" ]; @@ -28,13 +30,20 @@ else mkdir -p $NERLNET_TEST_DIR fi +if [ -d "$NERLNET_TEST_DIR/$ONN_WORKER_DIR" ]; +then + : +else + mkdir -p $NERLNET_TEST_DIR/$ONN_WORKER_DIR +fi + cd $NERLNET_PATH -cp $ERL_BRIDGE_SOURCE_PATH/nerlTests.erl $NERLNET_TEST_DIR/nerlTests.erl -cp $ERL_BRIDGE_SOURCE_PATH/nerlNIF.erl $NERLNET_TEST_DIR/nerlNIF.erl +cp $ONN_WORKER_PATH_FULL/nerlTests.erl $NERLNET_TEST_DIR/$ONN_WORKER_DIR/nerlTests.erl +cp $ONN_WORKER_PATH_FULL/nerlNIF.erl $NERLNET_TEST_DIR/$ONN_WORKER_DIR/nerlNIF.erl +cp $ONN_WORKER_PATH_FULL/neural_networks_testing_models.hrl $NERLNET_TEST_DIR/$ONN_WORKER_DIR/neural_networks_testing_models.hrl cp $ERL_BRIDGE_SOURCE_PATH/nerl.erl $NERLNET_TEST_DIR/nerl.erl cp $ERL_BRIDGE_SOURCE_PATH/nerlTensor.hrl $NERLNET_TEST_DIR/nerlTensor.hrl cp $ERL_BRIDGE_SOURCE_PATH/nerlTensor.erl $NERLNET_TEST_DIR/nerlTensor.erl -cp $ERL_BRIDGE_SOURCE_PATH/neural_networks_testing_models.hrl $NERLNET_TEST_DIR/neural_networks_testing_models.hrl cp $ERL_BRIDGE_SOURCE_PATH/layers_types_ag.hrl $NERLNET_TEST_DIR/layers_types_ag.hrl cp $ERL_BRIDGE_SOURCE_PATH/models_types_ag.hrl $NERLNET_TEST_DIR/models_types_ag.hrl @@ -49,8 +58,8 @@ print "Change directory to $NERLNET_TEST_DIR:" cd $NERLNET_TEST_DIR print "Running nerlTests.erl" -COMPILE_NERLNIF="compile:file(\"nerlNIF.erl\")" -COMPILE_NERLTEST="compile:file(\"nerlTests.erl\")" +COMPILE_NERLNIF="compile:file(\"$ONN_WORKER_DIR/nerlNIF.erl\")" +COMPILE_NERLTEST="compile:file(\"$ONN_WORKER_DIR/nerlTests.erl\")" COMPILE_NERL="compile:file(\"nerl.erl\")" COMPILE_NERLTENSOR="compile:file(\"nerlTensor.erl\")"