Skip to content

Commit

Permalink
Simplify DataStorm callback executor (#2964)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone authored Oct 24, 2024
1 parent d177c1b commit f604fb2
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 37 deletions.
19 changes: 9 additions & 10 deletions cpp/src/DataStorm/CallbackExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
using namespace std;
using namespace DataStormI;

CallbackExecutor::CallbackExecutor(function<void(function<void()> call)> callbackExecutor)
CallbackExecutor::CallbackExecutor(function<void(function<void()> call)> customExecutor)
: _flush(false),
_destroyed(false),
_callbackExecutor(std::move(callbackExecutor))
_customExecutor(std::move(customExecutor))
{
_thread = thread(
[this]
{
std::vector<std::pair<std::shared_ptr<DataElementI>, std::function<void()>>> queue;
std::vector<function<void()>> queue;
while (true)
{
unique_lock<mutex> lock(_mutex);
Expand All @@ -31,18 +31,17 @@ CallbackExecutor::CallbackExecutor(function<void(function<void()> call)> callbac
}

lock.unlock();
for (const auto& p : queue)
for (const auto& callback : queue)
{
try
{
if (_callbackExecutor)
if (_customExecutor)
{
// TODO do we need to ensure p.second is executed before we continue?
_callbackExecutor(p.second);
_customExecutor(callback);
}
else
{
p.second();
callback();
}
}
catch (...)
Expand All @@ -56,10 +55,10 @@ CallbackExecutor::CallbackExecutor(function<void(function<void()> call)> callbac
}

void
CallbackExecutor::queue(const std::shared_ptr<DataElementI>& element, std::function<void()> cb, bool flush)
CallbackExecutor::queue(function<void()> cb, bool flush)
{
unique_lock<mutex> lock(_mutex);
_queue.emplace_back(element, cb);
_queue.emplace_back(std::move(cb));
if (flush)
{
_flush = true;
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/DataStorm/CallbackExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ namespace DataStormI
class CallbackExecutor
{
public:
CallbackExecutor(std::function<void(std::function<void()> call)> callbackExecutor);
CallbackExecutor(std::function<void(std::function<void()> call)> customExecutor);

void queue(const std::shared_ptr<DataElementI>&, std::function<void()>, bool = false);
void queue(std::function<void()>, bool = false);
void flush();
void destroy();

Expand All @@ -31,8 +31,9 @@ namespace DataStormI
std::condition_variable _cond;
bool _flush;
bool _destroyed;
std::vector<std::pair<std::shared_ptr<DataElementI>, std::function<void()>>> _queue;
std::function<void(std::function<void()> call)> _callbackExecutor;
std::vector<std::function<void()>> _queue;
// An optional executor or null if no o custom executor is provided during Node construction.
std::function<void(std::function<void()> call)> _customExecutor;
};
}

Expand Down
42 changes: 19 additions & 23 deletions cpp/src/DataStorm/DataElementI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,8 @@ DataElementI::attachKey(
auto subscriber = p->second.addOrGet(topicId, elementId, keyId, nullptr, sampleFilter, name, priority, added);
if (_onConnectedElements && added)
{
_executor->queue(
shared_from_this(),
[=, this] { _onConnectedElements(DataStorm::CallbackReason::Connect, name); });
_executor->queue([self = shared_from_this(), name]
{ self->_onConnectedElements(DataStorm::CallbackReason::Connect, name); });
}
if (addConnectedKey(key, subscriber))
{
Expand Down Expand Up @@ -265,8 +264,8 @@ DataElementI::detachKey(
if (_onConnectedElements)
{
_executor->queue(
shared_from_this(),
[=, this] { _onConnectedElements(DataStorm::CallbackReason::Disconnect, subscriber->name); });
[self = shared_from_this(), subscriber]
{ self->_onConnectedElements(DataStorm::CallbackReason::Disconnect, subscriber->name); });
}
if (p->second.remove(topicId, elementId))
{
Expand Down Expand Up @@ -319,9 +318,8 @@ DataElementI::attachFilter(
auto subscriber = p->second.addOrGet(topicId, -elementId, filterId, filter, sampleFilter, name, priority, added);
if (_onConnectedElements && added)
{
_executor->queue(
shared_from_this(),
[=, this] { _onConnectedElements(DataStorm::CallbackReason::Connect, name); });
_executor->queue([self = shared_from_this(), name]
{ self->_onConnectedElements(DataStorm::CallbackReason::Connect, name); });
}
if (addConnectedKey(key, subscriber))
{
Expand Down Expand Up @@ -377,8 +375,8 @@ DataElementI::detachFilter(
if (_onConnectedElements)
{
_executor->queue(
shared_from_this(),
[=, this] { _onConnectedElements(DataStorm::CallbackReason::Disconnect, subscriber->name); });
[self = shared_from_this(), subscriber]
{ self->_onConnectedElements(DataStorm::CallbackReason::Disconnect, subscriber->name); });
}
if (p->second.remove(topicId, -elementId))
{
Expand Down Expand Up @@ -449,7 +447,7 @@ DataElementI::onConnectedKeys(
{
keys.push_back(key.first);
}
_executor->queue(shared_from_this(), [init, keys] { init(keys); }, true);
_executor->queue([self = shared_from_this(), init, keys = std::move(keys)] { init(std::move(keys)); }, true);
}
}

Expand All @@ -470,7 +468,7 @@ DataElementI::onConnectedElements(
elements.push_back(subscriber.second->name);
}
}
_executor->queue(shared_from_this(), [init, elements] { init(elements); }, true);
_executor->queue([init, elements = std::move(elements)] { init(std::move(elements)); }, true);
}
}

Expand Down Expand Up @@ -558,9 +556,8 @@ DataElementI::addConnectedKey(const shared_ptr<Key>& key, const shared_ptr<Subsc
{
if (key && subscribers.empty() && _onConnectedKeys)
{
_executor->queue(
shared_from_this(),
[=, this] { _onConnectedKeys(DataStorm::CallbackReason::Connect, key); });
_executor->queue([self = shared_from_this(), key]
{ self->_onConnectedKeys(DataStorm::CallbackReason::Connect, key); });
}
subscribers.push_back(subscriber);
return true;
Expand All @@ -580,9 +577,8 @@ DataElementI::removeConnectedKey(const shared_ptr<Key>& key, const shared_ptr<Su
{
if (key && _onConnectedKeys)
{
_executor->queue(
shared_from_this(),
[=, this] { _onConnectedKeys(DataStorm::CallbackReason::Disconnect, key); });
_executor->queue([self = shared_from_this(), key]
{ self->_onConnectedKeys(DataStorm::CallbackReason::Disconnect, key); });
}
_connectedKeys.erase(key);
}
Expand Down Expand Up @@ -773,12 +769,11 @@ DataReaderI::initSamples(
if (_onSamples)
{
_executor->queue(
shared_from_this(),
[this, valid]
[self = dynamic_pointer_cast<DataReaderI>(shared_from_this()), valid]
{
for (const auto& s : valid)
{
_onSamples(s);
self->_onSamples(s);
}
});
}
Expand Down Expand Up @@ -903,7 +898,8 @@ DataReaderI::queue(

if (_onSamples)
{
_executor->queue(shared_from_this(), [this, sample] { _onSamples(sample); });
_executor->queue([self = dynamic_pointer_cast<DataReaderI>(shared_from_this()), sample]
{ self->_onSamples(sample); });
}

if (_config->sampleLifetime && *_config->sampleLifetime > 0)
Expand Down Expand Up @@ -956,7 +952,7 @@ DataReaderI::onSamples(
if (init && !_samples.empty())
{
vector<shared_ptr<Sample>> samples(_samples.begin(), _samples.end());
_executor->queue(shared_from_this(), [init, samples] { init(samples); }, true);
_executor->queue([init, samples] { init(samples); }, true);
}
}

Expand Down

0 comments on commit f604fb2

Please sign in to comment.