diff --git a/cpp/data/common/include/EigerDefinitions.h b/cpp/data/common/include/EigerDefinitions.h index 7257b8c..8a834be 100644 --- a/cpp/data/common/include/EigerDefinitions.h +++ b/cpp/data/common/include/EigerDefinitions.h @@ -45,6 +45,7 @@ namespace Eiger { const int MORE_MESSAGES = 1; const int RECEIVE_HWM = 100000; const int SEND_HWM = 100000; + const int LINGER_TIMEOUT = 100; // Socket linger timeout in milliseconds const std::string CONTROL_CMD_KEY = "msg_val"; const std::string CONTROL_ID_KEY = "id"; diff --git a/cpp/data/eigerfan/src/EigerFan.cpp b/cpp/data/eigerfan/src/EigerFan.cpp index e402975..7244857 100644 --- a/cpp/data/eigerfan/src/EigerFan.cpp +++ b/cpp/data/eigerfan/src/EigerFan.cpp @@ -132,14 +132,13 @@ EigerFan::~EigerFan() { void EigerFan::run() { LOG4CXX_INFO(log, "EigerFan::run()"); LOG4CXX_INFO(log, "Starting EigerFan"); - int linger = 100; // Socket linger timeout in milliseconds // Setup Control socket std::string controlAddress("tcp://*:"); controlAddress.append(config.ctrl_channel_port); LOG4CXX_INFO(log, std::string("Binding control address to ").append(controlAddress)); controlSocket.bind (controlAddress.c_str()); - controlSocket.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + controlSocket.setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT)); // Setup Fan Send Sockets for (int i = 0; i < config.num_consumers; i++) { @@ -152,7 +151,7 @@ void EigerFan::run() { boost::shared_ptr sendSocket(new zmq::socket_t(ctx_, ZMQ_PUSH)); sendSocket->setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof (SEND_HWM)); sendSocket->bind(fanAddress.str().c_str()); - sendSocket->setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + sendSocket->setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT)); EigerConsumer consumer; consumer.connected = false; consumer.sendSocket = sendSocket; @@ -177,7 +176,7 @@ void EigerFan::run() { } boost::shared_ptr monitorSocket(new zmq::socket_t(ctx_, ZMQ_PAIR)); monitorSocket->connect(monitorAddress.str().c_str()); - monitorSocket->setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + monitorSocket->setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT)); monitorSockets.push_back(monitorSocket); } @@ -187,7 +186,7 @@ void EigerFan::run() { LOG4CXX_INFO(log, std::string("Binding forwarding address to ").append(forwardAddress)); forwardSocket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof (SEND_HWM)); forwardSocket.bind(forwardAddress.c_str()); - forwardSocket.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + forwardSocket.setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT)); // Setup Forwarding Socket monitor std::ostringstream forwardMonitorAddress; @@ -200,7 +199,7 @@ void EigerFan::run() { } zmq::socket_t forwardMonitorSocket(ctx_, ZMQ_PAIR); forwardMonitorSocket.connect(forwardMonitorAddress.str().c_str()); - forwardMonitorSocket.setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + forwardMonitorSocket.setsockopt (ZMQ_LINGER, &LINGER_TIMEOUT, sizeof (LINGER_TIMEOUT)); // Wait for configured number of consumers to connect LOG4CXX_INFO(log, "Waiting for Consumers"); @@ -363,8 +362,7 @@ void EigerFan::HandleRxSocket(std::string& endpoint, int num_zmq_context_threads zmq::socket_t rx_socket(inproc_context, ZMQ_PULL); rx_socket.setsockopt(ZMQ_RCVHWM, &RECEIVE_HWM, sizeof(RECEIVE_HWM)); rx_socket.bind(BROKER_INPROC_ENDPOINT.c_str()); - int linger = 100; - rx_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + rx_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT)); this->broker.connect(endpoint, &inproc_context); state = WAITING_STREAM; diff --git a/cpp/data/eigerfan/src/MultiPullBroker.cpp b/cpp/data/eigerfan/src/MultiPullBroker.cpp index 047bf8e..8fc1cd1 100644 --- a/cpp/data/eigerfan/src/MultiPullBroker.cpp +++ b/cpp/data/eigerfan/src/MultiPullBroker.cpp @@ -47,7 +47,6 @@ void MultiPullBroker::connect(std::string& endpoint, void* inproc_context) { * \param[in] endpoint Endpoint of socket to pull data from */ void MultiPullBroker::worker_loop(std::string& endpoint) { - int linger = 100; // Create source in new isolated context // It is important to create a new context in each worker thread, as there are @@ -56,7 +55,7 @@ void MultiPullBroker::worker_loop(std::string& endpoint) { zmq::context_t source_context(1); zmq::socket_t source_socket(source_context, ZMQ_PULL); source_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM)); - source_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + source_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT)); source_socket.connect(endpoint.c_str()); // Create sink socket in context of main thread @@ -64,7 +63,7 @@ void MultiPullBroker::worker_loop(std::string& endpoint) { // protocol. If it uses a different context the client will not see the messages. zmq::socket_t sink_socket(*this->inproc_context_, ZMQ_PUSH); sink_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM)); - sink_socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + sink_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT)); sink_socket.connect(this->sink_endpoint_.c_str()); // Initialise recv variables