From eccac367dafd1fbbb9ecffe1caa50d9dd8775917 Mon Sep 17 00:00:00 2001 From: James O'Hea Date: Mon, 9 Sep 2024 09:22:46 +0000 Subject: [PATCH 1/5] Removing redundunt lines. hwm is being set by RECEIVE_HWM --- cpp/data/eigerfan/src/EigerFan.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/data/eigerfan/src/EigerFan.cpp b/cpp/data/eigerfan/src/EigerFan.cpp index 93b4702..e402975 100644 --- a/cpp/data/eigerfan/src/EigerFan.cpp +++ b/cpp/data/eigerfan/src/EigerFan.cpp @@ -363,8 +363,6 @@ void EigerFan::HandleRxSocket(std::string& endpoint, int num_zmq_context_threads zmq::socket_t rx_socket(inproc_context, ZMQ_PULL); rx_socket.setsockopt(ZMQ_RCVHWM, &RECEIVE_HWM, sizeof(RECEIVE_HWM)); rx_socket.bind(BROKER_INPROC_ENDPOINT.c_str()); - int hwm = 100000; - rx_socket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); int linger = 100; rx_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); this->broker.connect(endpoint, &inproc_context); From 6eb42d271f8da19c875ba32d63cd8ebc44cd580e Mon Sep 17 00:00:00 2001 From: James O'Hea Date: Mon, 9 Sep 2024 09:23:38 +0000 Subject: [PATCH 2/5] Use SEND_HWM instead of local hwm --- cpp/data/eigerfan/src/MultiPullBroker.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/data/eigerfan/src/MultiPullBroker.cpp b/cpp/data/eigerfan/src/MultiPullBroker.cpp index 2fa5cd1..047bf8e 100644 --- a/cpp/data/eigerfan/src/MultiPullBroker.cpp +++ b/cpp/data/eigerfan/src/MultiPullBroker.cpp @@ -47,7 +47,6 @@ void MultiPullBroker::connect(std::string& endpoint, void* inproc_context) { * \param[in] endpoint Endpoint of socket to pull data from */ void MultiPullBroker::worker_loop(std::string& endpoint) { - int hwm = 10000; int linger = 100; // Create source in new isolated context @@ -56,7 +55,7 @@ void MultiPullBroker::worker_loop(std::string& endpoint) { // threads on the context is not sufficient. zmq::context_t source_context(1); zmq::socket_t source_socket(source_context, ZMQ_PULL); - source_socket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); + source_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM)); source_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); source_socket.connect(endpoint.c_str()); @@ -64,7 +63,7 @@ void MultiPullBroker::worker_loop(std::string& endpoint) { // The sink sockets must use the context from the main thread to use the inproc:// // protocol. If it uses a different context the client will not see the messages. zmq::socket_t sink_socket(*this->inproc_context_, ZMQ_PUSH); - sink_socket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); + sink_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM)); sink_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); sink_socket.connect(this->sink_endpoint_.c_str()); From 1f1b74944395afc4b402e7e6cb8a9de2ab9e629c Mon Sep 17 00:00:00 2001 From: James O'Hea Date: Mon, 9 Sep 2024 09:29:11 +0000 Subject: [PATCH 3/5] Introduce LINGER_TIMEOUT constant, replace local linger variables --- cpp/data/common/include/EigerDefinitions.h | 1 + cpp/data/eigerfan/src/EigerFan.cpp | 14 ++++++-------- cpp/data/eigerfan/src/MultiPullBroker.cpp | 5 ++--- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/cpp/data/common/include/EigerDefinitions.h b/cpp/data/common/include/EigerDefinitions.h index 7257b8c..8a834be 100644 --- a/cpp/data/common/include/EigerDefinitions.h +++ b/cpp/data/common/include/EigerDefinitions.h @@ -45,6 +45,7 @@ namespace Eiger { const int MORE_MESSAGES = 1; const int RECEIVE_HWM = 100000; const int SEND_HWM = 100000; + const int LINGER_TIMEOUT = 100; // Socket linger timeout in milliseconds const std::string CONTROL_CMD_KEY = "msg_val"; const std::string CONTROL_ID_KEY = "id"; diff --git a/cpp/data/eigerfan/src/EigerFan.cpp b/cpp/data/eigerfan/src/EigerFan.cpp index e402975..7244857 100644 --- a/cpp/data/eigerfan/src/EigerFan.cpp +++ b/cpp/data/eigerfan/src/EigerFan.cpp @@ -132,14 +132,13 @@ EigerFan::~EigerFan() { void EigerFan::run() { LOG4CXX_INFO(log, "EigerFan::run()"); LOG4CXX_INFO(log, "Starting EigerFan"); - int linger = 100; // Socket linger timeout in milliseconds // Setup Control socket std::string controlAddress("tcp://*:"); controlAddress.append(config.ctrl_channel_port); LOG4CXX_INFO(log, std::string("Binding control address to ").append(controlAddress)); controlSocket.bind (controlAddress.c_str()); - controlSocket.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + controlSocket.setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT)); // Setup Fan Send Sockets for (int i = 0; i < config.num_consumers; i++) { @@ -152,7 +151,7 @@ void EigerFan::run() { boost::shared_ptr sendSocket(new zmq::socket_t(ctx_, ZMQ_PUSH)); sendSocket->setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof (SEND_HWM)); sendSocket->bind(fanAddress.str().c_str()); - sendSocket->setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + sendSocket->setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT)); EigerConsumer consumer; consumer.connected = false; consumer.sendSocket = sendSocket; @@ -177,7 +176,7 @@ void EigerFan::run() { } boost::shared_ptr monitorSocket(new zmq::socket_t(ctx_, ZMQ_PAIR)); monitorSocket->connect(monitorAddress.str().c_str()); - monitorSocket->setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + monitorSocket->setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT)); monitorSockets.push_back(monitorSocket); } @@ -187,7 +186,7 @@ void EigerFan::run() { LOG4CXX_INFO(log, std::string("Binding forwarding address to ").append(forwardAddress)); forwardSocket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof (SEND_HWM)); forwardSocket.bind(forwardAddress.c_str()); - forwardSocket.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + forwardSocket.setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT)); // Setup Forwarding Socket monitor std::ostringstream forwardMonitorAddress; @@ -200,7 +199,7 @@ void EigerFan::run() { } zmq::socket_t forwardMonitorSocket(ctx_, ZMQ_PAIR); forwardMonitorSocket.connect(forwardMonitorAddress.str().c_str()); - forwardMonitorSocket.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + forwardMonitorSocket.setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT)); // Wait for configured number of consumers to connect LOG4CXX_INFO(log, "Waiting for Consumers"); @@ -363,8 +362,7 @@ void EigerFan::HandleRxSocket(std::string& endpoint, int num_zmq_context_threads zmq::socket_t rx_socket(inproc_context, ZMQ_PULL); rx_socket.setsockopt(ZMQ_RCVHWM, &RECEIVE_HWM, sizeof(RECEIVE_HWM)); rx_socket.bind(BROKER_INPROC_ENDPOINT.c_str()); - int linger = 100; - rx_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + rx_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT)); this->broker.connect(endpoint, &inproc_context); state = WAITING_STREAM; diff --git a/cpp/data/eigerfan/src/MultiPullBroker.cpp b/cpp/data/eigerfan/src/MultiPullBroker.cpp index 047bf8e..8fc1cd1 100644 --- a/cpp/data/eigerfan/src/MultiPullBroker.cpp +++ b/cpp/data/eigerfan/src/MultiPullBroker.cpp @@ -47,7 +47,6 @@ void MultiPullBroker::connect(std::string& endpoint, void* inproc_context) { * \param[in] endpoint Endpoint of socket to pull data from */ void MultiPullBroker::worker_loop(std::string& endpoint) { - int linger = 100; // Create source in new isolated context // It is important to create a new context in each worker thread, as there are @@ -56,7 +55,7 @@ void MultiPullBroker::worker_loop(std::string& endpoint) { zmq::context_t source_context(1); zmq::socket_t source_socket(source_context, ZMQ_PULL); source_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM)); - source_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + source_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT)); source_socket.connect(endpoint.c_str()); // Create sink socket in context of main thread @@ -64,7 +63,7 @@ void MultiPullBroker::worker_loop(std::string& endpoint) { // protocol. If it uses a different context the client will not see the messages. zmq::socket_t sink_socket(*this->inproc_context_, ZMQ_PUSH); sink_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM)); - sink_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + sink_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT)); sink_socket.connect(this->sink_endpoint_.c_str()); // Initialise recv variables From 0932403581f420c584b1ad95e41e01f2ec89c942 Mon Sep 17 00:00:00 2001 From: James O'Hea Date: Mon, 9 Sep 2024 14:26:23 +0000 Subject: [PATCH 4/5] Use a different, lower HWM for the worker threads --- cpp/data/common/include/EigerDefinitions.h | 3 ++- cpp/data/eigerfan/src/MultiPullBroker.cpp | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/data/common/include/EigerDefinitions.h b/cpp/data/common/include/EigerDefinitions.h index 8a834be..f054bfe 100644 --- a/cpp/data/common/include/EigerDefinitions.h +++ b/cpp/data/common/include/EigerDefinitions.h @@ -43,8 +43,9 @@ namespace Eiger { // EigerFan related constants const int MORE_MESSAGES = 1; - const int RECEIVE_HWM = 100000; + const int RECEIVE_HWM = 100000; // High water marks for the main receiver thread const int SEND_HWM = 100000; + const int WORKER_SEND_HWM = 10000; // A lower high water mark for the worker threads const int LINGER_TIMEOUT = 100; // Socket linger timeout in milliseconds const std::string CONTROL_CMD_KEY = "msg_val"; diff --git a/cpp/data/eigerfan/src/MultiPullBroker.cpp b/cpp/data/eigerfan/src/MultiPullBroker.cpp index 8fc1cd1..89f2cb8 100644 --- a/cpp/data/eigerfan/src/MultiPullBroker.cpp +++ b/cpp/data/eigerfan/src/MultiPullBroker.cpp @@ -54,7 +54,7 @@ void MultiPullBroker::worker_loop(std::string& endpoint) { // threads on the context is not sufficient. zmq::context_t source_context(1); zmq::socket_t source_socket(source_context, ZMQ_PULL); - source_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM)); + source_socket.setsockopt(ZMQ_SNDHWM, &WORKER_SEND_HWM, sizeof(WORKER_SEND_HWM)); source_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT)); source_socket.connect(endpoint.c_str()); @@ -62,7 +62,7 @@ void MultiPullBroker::worker_loop(std::string& endpoint) { // The sink sockets must use the context from the main thread to use the inproc:// // protocol. If it uses a different context the client will not see the messages. zmq::socket_t sink_socket(*this->inproc_context_, ZMQ_PUSH); - sink_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM)); + sink_socket.setsockopt(ZMQ_SNDHWM, &WORKER_SEND_HWM, sizeof(WORKER_SEND_HWM)); sink_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT)); sink_socket.connect(this->sink_endpoint_.c_str()); From 0626e439c589c543317c50526c29328f7a33a09a Mon Sep 17 00:00:00 2001 From: James O'Hea Date: Tue, 17 Sep 2024 12:54:59 +0000 Subject: [PATCH 5/5] Rename WORKER_SEND_HWM to WORKER_HWM --- cpp/data/common/include/EigerDefinitions.h | 2 +- cpp/data/eigerfan/src/MultiPullBroker.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/data/common/include/EigerDefinitions.h b/cpp/data/common/include/EigerDefinitions.h index f054bfe..987facc 100644 --- a/cpp/data/common/include/EigerDefinitions.h +++ b/cpp/data/common/include/EigerDefinitions.h @@ -45,7 +45,7 @@ namespace Eiger { const int MORE_MESSAGES = 1; const int RECEIVE_HWM = 100000; // High water marks for the main receiver thread const int SEND_HWM = 100000; - const int WORKER_SEND_HWM = 10000; // A lower high water mark for the worker threads + const int WORKER_HWM = 10000; // A lower high water mark for the worker threads const int LINGER_TIMEOUT = 100; // Socket linger timeout in milliseconds const std::string CONTROL_CMD_KEY = "msg_val"; diff --git a/cpp/data/eigerfan/src/MultiPullBroker.cpp b/cpp/data/eigerfan/src/MultiPullBroker.cpp index 89f2cb8..7fc54bf 100644 --- a/cpp/data/eigerfan/src/MultiPullBroker.cpp +++ b/cpp/data/eigerfan/src/MultiPullBroker.cpp @@ -54,7 +54,7 @@ void MultiPullBroker::worker_loop(std::string& endpoint) { // threads on the context is not sufficient. zmq::context_t source_context(1); zmq::socket_t source_socket(source_context, ZMQ_PULL); - source_socket.setsockopt(ZMQ_SNDHWM, &WORKER_SEND_HWM, sizeof(WORKER_SEND_HWM)); + source_socket.setsockopt(ZMQ_SNDHWM, &WORKER_HWM, sizeof(WORKER_HWM)); source_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT)); source_socket.connect(endpoint.c_str()); @@ -62,7 +62,7 @@ void MultiPullBroker::worker_loop(std::string& endpoint) { // The sink sockets must use the context from the main thread to use the inproc:// // protocol. If it uses a different context the client will not see the messages. zmq::socket_t sink_socket(*this->inproc_context_, ZMQ_PUSH); - sink_socket.setsockopt(ZMQ_SNDHWM, &WORKER_SEND_HWM, sizeof(WORKER_SEND_HWM)); + sink_socket.setsockopt(ZMQ_SNDHWM, &WORKER_HWM, sizeof(WORKER_HWM)); sink_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT)); sink_socket.connect(this->sink_endpoint_.c_str());