diff --git a/cpp/src/DataStorm/CallbackExecutor.cpp b/cpp/src/DataStorm/CallbackExecutor.cpp index f9c7fab8357..6baad52c105 100644 --- a/cpp/src/DataStorm/CallbackExecutor.cpp +++ b/cpp/src/DataStorm/CallbackExecutor.cpp @@ -7,15 +7,15 @@ using namespace std; using namespace DataStormI; -CallbackExecutor::CallbackExecutor(function call)> callbackExecutor) +CallbackExecutor::CallbackExecutor(function call)> customExecutor) : _flush(false), _destroyed(false), - _callbackExecutor(std::move(callbackExecutor)) + _customExecutor(std::move(customExecutor)) { _thread = thread( [this] { - std::vector, std::function>> queue; + std::vector> queue; while (true) { unique_lock lock(_mutex); @@ -31,18 +31,17 @@ CallbackExecutor::CallbackExecutor(function call)> callbac } lock.unlock(); - for (const auto& p : queue) + for (const auto& callback : queue) { try { - if (_callbackExecutor) + if (_customExecutor) { - // TODO do we need to ensure p.second is executed before we continue? - _callbackExecutor(p.second); + _customExecutor(callback); } else { - p.second(); + callback(); } } catch (...) @@ -56,10 +55,10 @@ CallbackExecutor::CallbackExecutor(function call)> callbac } void -CallbackExecutor::queue(const std::shared_ptr& element, std::function cb, bool flush) +CallbackExecutor::queue(function cb, bool flush) { unique_lock lock(_mutex); - _queue.emplace_back(element, cb); + _queue.emplace_back(std::move(cb)); if (flush) { _flush = true; diff --git a/cpp/src/DataStorm/CallbackExecutor.h b/cpp/src/DataStorm/CallbackExecutor.h index e46a1462fe1..99429e48312 100644 --- a/cpp/src/DataStorm/CallbackExecutor.h +++ b/cpp/src/DataStorm/CallbackExecutor.h @@ -19,9 +19,9 @@ namespace DataStormI class CallbackExecutor { public: - CallbackExecutor(std::function call)> callbackExecutor); + CallbackExecutor(std::function call)> customExecutor); - void queue(const std::shared_ptr&, std::function, bool = false); + void queue(std::function, bool = false); void flush(); void destroy(); @@ -31,8 +31,9 @@ namespace DataStormI std::condition_variable _cond; bool _flush; bool _destroyed; - std::vector, std::function>> _queue; - std::function call)> _callbackExecutor; + std::vector> _queue; + // An optional executor or null if no o custom executor is provided during Node construction. + std::function call)> _customExecutor; }; } diff --git a/cpp/src/DataStorm/DataElementI.cpp b/cpp/src/DataStorm/DataElementI.cpp index c5d8f3cd1cf..ef7d56321ed 100644 --- a/cpp/src/DataStorm/DataElementI.cpp +++ b/cpp/src/DataStorm/DataElementI.cpp @@ -207,9 +207,8 @@ DataElementI::attachKey( auto subscriber = p->second.addOrGet(topicId, elementId, keyId, nullptr, sampleFilter, name, priority, added); if (_onConnectedElements && added) { - _executor->queue( - shared_from_this(), - [=, this] { _onConnectedElements(DataStorm::CallbackReason::Connect, name); }); + _executor->queue([self = shared_from_this(), name] + { self->_onConnectedElements(DataStorm::CallbackReason::Connect, name); }); } if (addConnectedKey(key, subscriber)) { @@ -265,8 +264,8 @@ DataElementI::detachKey( if (_onConnectedElements) { _executor->queue( - shared_from_this(), - [=, this] { _onConnectedElements(DataStorm::CallbackReason::Disconnect, subscriber->name); }); + [self = shared_from_this(), subscriber] + { self->_onConnectedElements(DataStorm::CallbackReason::Disconnect, subscriber->name); }); } if (p->second.remove(topicId, elementId)) { @@ -319,9 +318,8 @@ DataElementI::attachFilter( auto subscriber = p->second.addOrGet(topicId, -elementId, filterId, filter, sampleFilter, name, priority, added); if (_onConnectedElements && added) { - _executor->queue( - shared_from_this(), - [=, this] { _onConnectedElements(DataStorm::CallbackReason::Connect, name); }); + _executor->queue([self = shared_from_this(), name] + { self->_onConnectedElements(DataStorm::CallbackReason::Connect, name); }); } if (addConnectedKey(key, subscriber)) { @@ -377,8 +375,8 @@ DataElementI::detachFilter( if (_onConnectedElements) { _executor->queue( - shared_from_this(), - [=, this] { _onConnectedElements(DataStorm::CallbackReason::Disconnect, subscriber->name); }); + [self = shared_from_this(), subscriber] + { self->_onConnectedElements(DataStorm::CallbackReason::Disconnect, subscriber->name); }); } if (p->second.remove(topicId, -elementId)) { @@ -449,7 +447,7 @@ DataElementI::onConnectedKeys( { keys.push_back(key.first); } - _executor->queue(shared_from_this(), [init, keys] { init(keys); }, true); + _executor->queue([self = shared_from_this(), init, keys = std::move(keys)] { init(std::move(keys)); }, true); } } @@ -470,7 +468,7 @@ DataElementI::onConnectedElements( elements.push_back(subscriber.second->name); } } - _executor->queue(shared_from_this(), [init, elements] { init(elements); }, true); + _executor->queue([init, elements = std::move(elements)] { init(std::move(elements)); }, true); } } @@ -558,9 +556,8 @@ DataElementI::addConnectedKey(const shared_ptr& key, const shared_ptrqueue( - shared_from_this(), - [=, this] { _onConnectedKeys(DataStorm::CallbackReason::Connect, key); }); + _executor->queue([self = shared_from_this(), key] + { self->_onConnectedKeys(DataStorm::CallbackReason::Connect, key); }); } subscribers.push_back(subscriber); return true; @@ -580,9 +577,8 @@ DataElementI::removeConnectedKey(const shared_ptr& key, const shared_ptrqueue( - shared_from_this(), - [=, this] { _onConnectedKeys(DataStorm::CallbackReason::Disconnect, key); }); + _executor->queue([self = shared_from_this(), key] + { self->_onConnectedKeys(DataStorm::CallbackReason::Disconnect, key); }); } _connectedKeys.erase(key); } @@ -773,12 +769,11 @@ DataReaderI::initSamples( if (_onSamples) { _executor->queue( - shared_from_this(), - [this, valid] + [self = dynamic_pointer_cast(shared_from_this()), valid] { for (const auto& s : valid) { - _onSamples(s); + self->_onSamples(s); } }); } @@ -903,7 +898,8 @@ DataReaderI::queue( if (_onSamples) { - _executor->queue(shared_from_this(), [this, sample] { _onSamples(sample); }); + _executor->queue([self = dynamic_pointer_cast(shared_from_this()), sample] + { self->_onSamples(sample); }); } if (_config->sampleLifetime && *_config->sampleLifetime > 0) @@ -956,7 +952,7 @@ DataReaderI::onSamples( if (init && !_samples.empty()) { vector> samples(_samples.begin(), _samples.end()); - _executor->queue(shared_from_this(), [init, samples] { init(samples); }, true); + _executor->queue([init, samples] { init(samples); }, true); } }