Skip to content

Commit

Permalink
Change Event::PostCb from std::function to absl::AnyInvocable (envoyp…
Browse files Browse the repository at this point in the history
…roxy#26296)

* Change Event::PostCb from std::function to absl::AnyInvocable

Signed-off-by: Raven Black <[email protected]>

* Fix more mistyped things

Signed-off-by: Raven Black <[email protected]>

* More bad PostCb usage

Signed-off-by: Raven Black <[email protected]>

* Another bad PostCb

Signed-off-by: Raven Black <[email protected]>

* More typefixes and SaveArg to lambda because move is required.

Signed-off-by: Raven Black <[email protected]>

* One more std::move

Signed-off-by: Raven Black <[email protected]>

* The rest of the mistyped things

Signed-off-by: Raven Black <[email protected]>

* More types

Signed-off-by: Raven Black <[email protected]>

* mobile/provisional_dispatcher PostCbs made move-compatible

Signed-off-by: Raven Black <[email protected]>

* MockProvisionalDispatcher

Signed-off-by: Raven Black <[email protected]>

* Moar mocks

Signed-off-by: Raven Black <[email protected]>

---------

Signed-off-by: Raven Black <[email protected]>
  • Loading branch information
ravenblackx authored Apr 4, 2023
1 parent 7c795f5 commit 39cf327
Show file tree
Hide file tree
Showing 42 changed files with 218 additions and 156 deletions.
8 changes: 8 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ behavior_changes:
minor_behavior_changes:
# *Changes that may cause incompatibilities for some users, but should not for most*
- area: event
change: |
``Event::PostCb`` type changed from ``std::function`` to ``absl::AnyInvocable``. This makes it possible
to capture unique_ptrs in dispatcher callbacks. If you have used ``Event::PostCb`` as shorthand for
``std::function<void()>`` in a non-post-callback-related context, you will have to change that.
If you have used ``std::function`` in a mock dispatcher, you will have to change that to ``Event::PostCb``
and may need to make it moveable. See https://github.com/envoyproxy/envoy/pull/26296 for a variety of
example fixes.
- area: build
change: |
Moved the REST config subscripton code into extensions. If you use REST for config updates and override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ void RichKafkaProducer::dr_cb(RdKafka::Message& message) {
const DeliveryMemento memento = {message.payload(), message.err(), message.offset()};
// Because this method gets executed in poller thread, we need to pass the data through
// dispatcher.
const Event::PostCb callback = [this, memento]() -> void { processDelivery(memento); };
dispatcher_.post(callback);
dispatcher_.post([this, memento]() -> void { processDelivery(memento); });
}

// We got the delivery data.
Expand Down
1 change: 1 addition & 0 deletions envoy/event/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ envoy_cc_library(
"//envoy/network:transport_socket_interface",
"//envoy/server:watchdog_interface",
"//envoy/thread:thread_interface",
"@com_google_absl//absl/functional:any_invocable",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
Expand Down
4 changes: 3 additions & 1 deletion envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "envoy/stream_info/stream_info.h"
#include "envoy/thread/thread.h"

#include "absl/functional/any_invocable.h"

namespace Envoy {
namespace Event {

Expand All @@ -51,7 +53,7 @@ using DispatcherStatsPtr = std::unique_ptr<DispatcherStats>;
/**
* Callback invoked when a dispatcher post() runs.
*/
using PostCb = std::function<void()>;
using PostCb = absl::AnyInvocable<void()>;

using PostCbSharedPtr = std::shared_ptr<PostCb>;

Expand Down
2 changes: 1 addition & 1 deletion envoy/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Worker {
* @param guard_dog supplies the guard dog to use for thread watching.
* @param cb a callback to run when the worker thread starts running.
*/
virtual void start(GuardDog& guard_dog, const Event::PostCb& cb) PURE;
virtual void start(GuardDog& guard_dog, const std::function<void()>& cb) PURE;

/**
* Initialize stats for this worker's dispatcher, if available. The worker will output
Expand Down
5 changes: 3 additions & 2 deletions envoy/thread_local/thread_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class Slot {

// Callers must use the TypedSlot API, below.
virtual void runOnAllThreads(const UpdateCb& update_cb) PURE;
virtual void runOnAllThreads(const UpdateCb& update_cb, const Event::PostCb& complete_cb) PURE;
virtual void runOnAllThreads(const UpdateCb& update_cb,
const std::function<void()>& complete_cb) PURE;

/**
* Returns whether or not global threading has been shutdown.
Expand Down Expand Up @@ -180,7 +181,7 @@ template <class T> class TypedSlot {
*/
using UpdateCb = std::function<void(OptRef<T> obj)>;
void runOnAllThreads(const UpdateCb& cb) { slot_->runOnAllThreads(makeSlotUpdateCb(cb)); }
void runOnAllThreads(const UpdateCb& cb, const Event::PostCb& complete_cb) {
void runOnAllThreads(const UpdateCb& cb, const std::function<void()>& complete_cb) {
slot_->runOnAllThreads(makeSlotUpdateCb(cb), complete_cb);
}

Expand Down
9 changes: 5 additions & 4 deletions mobile/library/common/event/provisional_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ void ProvisionalDispatcher::drain(Event::Dispatcher& event_dispatcher) {
drained_ = true;
event_dispatcher_ = &event_dispatcher;

for (const Event::PostCb& cb : init_queue_) {
event_dispatcher_->post(cb);
while (!init_queue_.empty()) {
event_dispatcher_->post(std::move(init_queue_.front()));
init_queue_.pop_front();
}
}

Expand All @@ -37,11 +38,11 @@ envoy_status_t ProvisionalDispatcher::post(Event::PostCb callback) {
}

if (drained_) {
event_dispatcher_->post(callback);
event_dispatcher_->post(std::move(callback));
return ENVOY_SUCCESS;
}

init_queue_.push_back(callback);
init_queue_.push_back(std::move(callback));
return ENVOY_SUCCESS;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ platform_filter_name: StopOnRequestHeadersThenResumeOnResumeDecoding
EXPECT_EQ(invocations.on_request_headers_calls, 1);

Event::PostCb resume_post_cb;
EXPECT_CALL(dispatcher_, post(_)).WillOnce(SaveArg<0>(&resume_post_cb));
EXPECT_CALL(dispatcher_, post(_)).WillOnce([&resume_post_cb](Event::PostCb cb) {
resume_post_cb = std::move(cb);
});
EXPECT_CALL(decoder_callbacks_, continueDecoding());
filter_->resumeDecoding();
resume_post_cb();
Expand Down Expand Up @@ -401,7 +403,9 @@ platform_filter_name: StopOnRequestHeadersThenResumeOnResumeDecoding
EXPECT_EQ(invocations.on_request_headers_calls, 1);

Event::PostCb resume_post_cb;
EXPECT_CALL(dispatcher_, post(_)).WillOnce(SaveArg<0>(&resume_post_cb));
EXPECT_CALL(dispatcher_, post(_)).WillOnce([&resume_post_cb](Event::PostCb cb) {
resume_post_cb = std::move(cb);
});
EXPECT_CALL(decoder_callbacks_, continueDecoding());
filter_->resumeDecoding();
resume_post_cb();
Expand Down Expand Up @@ -473,7 +477,9 @@ platform_filter_name: StopOnRequestHeadersThenResumeOnResumeDecoding
EXPECT_EQ(invocations.on_request_headers_calls, 1);

Event::PostCb resume_post_cb;
EXPECT_CALL(dispatcher_, post(_)).WillOnce(SaveArg<0>(&resume_post_cb));
EXPECT_CALL(dispatcher_, post(_)).WillOnce([&resume_post_cb](Event::PostCb cb) {
resume_post_cb = std::move(cb);
});
Http::TestRequestTrailerMapImpl trailers;
EXPECT_CALL(decoder_callbacks_, addDecodedTrailers()).WillOnce(ReturnRef(trailers));
EXPECT_CALL(decoder_callbacks_, continueDecoding());
Expand Down Expand Up @@ -555,7 +561,9 @@ platform_filter_name: AsyncResumeDecodingIsNoopAfterPreviousResume
"12");

Event::PostCb resume_post_cb;
EXPECT_CALL(dispatcher_, post(_)).WillOnce(SaveArg<0>(&resume_post_cb));
EXPECT_CALL(dispatcher_, post(_)).WillOnce([&resume_post_cb](Event::PostCb cb) {
resume_post_cb = std::move(cb);
});
EXPECT_CALL(decoder_callbacks_, continueDecoding()).Times(0);
filter_->resumeDecoding();
resume_post_cb();
Expand Down Expand Up @@ -1174,7 +1182,9 @@ platform_filter_name: StopOnRequestHeadersThenBufferThenResumeOnResumeDecoding
EXPECT_EQ(invocations.on_request_trailers_calls, 1);

Event::PostCb resume_post_cb;
EXPECT_CALL(dispatcher_, post(_)).WillOnce(SaveArg<0>(&resume_post_cb));
EXPECT_CALL(dispatcher_, post(_)).WillOnce([&resume_post_cb](Event::PostCb cb) {
resume_post_cb = std::move(cb);
});
EXPECT_CALL(decoder_callbacks_, continueDecoding());
filter_->resumeDecoding();
resume_post_cb();
Expand Down Expand Up @@ -1303,7 +1313,9 @@ platform_filter_name: StopOnRequestHeadersThenBufferThenResumeOnResumeDecoding
EXPECT_EQ(invocations.on_request_trailers_calls, 1);

Event::PostCb resume_post_cb;
EXPECT_CALL(dispatcher_, post(_)).WillOnce(SaveArg<0>(&resume_post_cb));
EXPECT_CALL(dispatcher_, post(_)).WillOnce([&resume_post_cb](Event::PostCb cb) {
resume_post_cb = std::move(cb);
});
filter_->resumeDecoding();
resume_post_cb();
EXPECT_EQ(invocations.on_resume_request_calls, 1);
Expand Down Expand Up @@ -1459,7 +1471,9 @@ platform_filter_name: StopOnResponseHeadersThenResumeOnResumeEncoding
EXPECT_EQ(invocations.on_response_headers_calls, 1);

Event::PostCb resume_post_cb;
EXPECT_CALL(dispatcher_, post(_)).WillOnce(SaveArg<0>(&resume_post_cb));
EXPECT_CALL(dispatcher_, post(_)).WillOnce([&resume_post_cb](Event::PostCb cb) {
resume_post_cb = std::move(cb);
});
EXPECT_CALL(encoder_callbacks_, continueEncoding());
filter_->resumeEncoding();
resume_post_cb();
Expand Down Expand Up @@ -1535,7 +1549,9 @@ platform_filter_name: AsyncResumeEncodingIsNoopAfterPreviousResume
"13");

Event::PostCb resume_post_cb;
EXPECT_CALL(dispatcher_, post(_)).WillOnce(SaveArg<0>(&resume_post_cb));
EXPECT_CALL(dispatcher_, post(_)).WillOnce([&resume_post_cb](Event::PostCb cb) {
resume_post_cb = std::move(cb);
});
EXPECT_CALL(encoder_callbacks_, continueEncoding()).Times(0);
filter_->resumeEncoding();
resume_post_cb();
Expand Down Expand Up @@ -1592,7 +1608,9 @@ platform_filter_name: AsyncResumeEncodingIsNoopAfterFilterIsPendingDestruction

// Simulate posted resume call.
Event::PostCb resume_post_cb;
EXPECT_CALL(dispatcher_, post(_)).WillOnce(SaveArg<0>(&resume_post_cb));
EXPECT_CALL(dispatcher_, post(_)).WillOnce([&resume_post_cb](Event::PostCb cb) {
resume_post_cb = std::move(cb);
});
EXPECT_CALL(encoder_callbacks_, continueEncoding()).Times(0);
filter_->resumeEncoding();

Expand Down Expand Up @@ -2160,7 +2178,9 @@ platform_filter_name: StopOnResponseHeadersThenBufferThenResumeOnResumeEncoding
EXPECT_EQ(invocations.on_response_trailers_calls, 1);

Event::PostCb resume_post_cb;
EXPECT_CALL(dispatcher_, post(_)).WillOnce(SaveArg<0>(&resume_post_cb));
EXPECT_CALL(dispatcher_, post(_)).WillOnce([&resume_post_cb](Event::PostCb cb) {
resume_post_cb = std::move(cb);
});
EXPECT_CALL(encoder_callbacks_, continueEncoding());
filter_->resumeEncoding();
resume_post_cb();
Expand Down Expand Up @@ -2226,7 +2246,9 @@ platform_filter_name: StopOnRequestHeadersThenResumeOnResumeDecoding
EXPECT_EQ(invocations.on_request_headers_calls, 1);

Event::PostCb resume_post_cb;
EXPECT_CALL(dispatcher_, post(_)).WillOnce(SaveArg<0>(&resume_post_cb));
EXPECT_CALL(dispatcher_, post(_)).WillOnce([&resume_post_cb](Event::PostCb cb) {
resume_post_cb = std::move(cb);
});
EXPECT_CALL(decoder_callbacks_, continueDecoding());
filter_->resumeDecoding();
resume_post_cb();
Expand Down
8 changes: 2 additions & 6 deletions mobile/test/common/mocks/event/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,12 @@ class MockProvisionalDispatcher : public ProvisionalDispatcher {
}
}

envoy_status_t post(std::function<void()> callback) override {
callbacks_.push_back(callback);
return post_(callback);
}
envoy_status_t post(Event::PostCb callback) override { return post_(std::move(callback)); }

// Event::ProvisionalDispatcher
MOCK_METHOD(void, drain, (Event::Dispatcher & event_dispatcher));
MOCK_METHOD(void, deferredDelete_, (DeferredDeletable * to_delete));
MOCK_METHOD(envoy_status_t, post_, (std::function<void()> callback));
MOCK_METHOD(envoy_status_t, post_, (Event::PostCb callback));
MOCK_METHOD(Event::SchedulableCallbackPtr, createSchedulableCallback, (std::function<void()> cb));
MOCK_METHOD(bool, isThreadSafe, (), (const));
MOCK_METHOD(void, pushTrackedObject, (const ScopeTrackedObject* object));
Expand All @@ -47,7 +44,6 @@ class MockProvisionalDispatcher : public ProvisionalDispatcher {

Event::GlobalTimeSystem time_system_;
std::list<DeferredDeletablePtr> to_delete_;
std::list<std::function<void()>> callbacks_;
};

} // namespace Event
Expand Down
6 changes: 3 additions & 3 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,12 @@ SignalEventPtr DispatcherImpl::listenForSignal(signal_t signal_num, SignalCb cb)
return SignalEventPtr{new SignalEventImpl(*this, signal_num, cb)};
}

void DispatcherImpl::post(std::function<void()> callback) {
void DispatcherImpl::post(PostCb callback) {
bool do_post;
{
Thread::LockGuard lock(post_lock_);
do_post = post_callbacks_.empty();
post_callbacks_.push_back(callback);
post_callbacks_.push_back(std::move(callback));
}

if (do_post) {
Expand Down Expand Up @@ -359,7 +359,7 @@ void DispatcherImpl::runPostCallbacks() {
// objects that is being deferred deleted.
clearDeferredDeleteList();

std::list<std::function<void()>> callbacks;
std::list<PostCb> callbacks;
{
// Take ownership of the callbacks under the post_lock_. The lock must be released before
// callbacks execute. Callbacks added after this transfer will re-arm post_cb_ and will execute
Expand Down
4 changes: 2 additions & 2 deletions source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
void deferredDelete(DeferredDeletablePtr&& to_delete) override;
void exit() override;
SignalEventPtr listenForSignal(signal_t signal_num, SignalCb cb) override;
void post(std::function<void()> callback) override;
void post(PostCb callback) override;
void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) override;
void run(RunType type) override;
Buffer::WatermarkFactory& getWatermarkFactory() override { return *buffer_factory_; }
Expand Down Expand Up @@ -169,7 +169,7 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,

SchedulableCallbackPtr post_cb_;
Thread::MutexBasicLockable post_lock_;
std::list<std::function<void()>> post_callbacks_ ABSL_GUARDED_BY(post_lock_);
std::list<PostCb> post_callbacks_ ABSL_GUARDED_BY(post_lock_);

std::vector<DeferredDeletablePtr> to_delete_1_;
std::vector<DeferredDeletablePtr> to_delete_2_;
Expand Down
25 changes: 14 additions & 11 deletions source/common/thread_local/thread_local_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ SlotPtr InstanceImpl::allocateSlot() {
InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index)
: parent_(parent), index_(index), still_alive_guard_(std::make_shared<bool>(true)) {}

Event::PostCb InstanceImpl::SlotImpl::wrapCallback(const Event::PostCb& cb) {
std::function<void()> InstanceImpl::SlotImpl::wrapCallback(const std::function<void()>& cb) {
// See the header file comments for still_alive_guard_ for the purpose of this capture and the
// expired check below.
//
Expand Down Expand Up @@ -69,9 +69,10 @@ ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::getWorker(uint32_t index) {

ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { return getWorker(index_); }

Event::PostCb InstanceImpl::SlotImpl::dataCallback(const UpdateCb& cb) {
std::function<void()> InstanceImpl::SlotImpl::dataCallback(const UpdateCb& cb) {
// See the header file comments for still_alive_guard_ for why we capture index_.
return [still_alive_guard = std::weak_ptr<bool>(still_alive_guard_), cb, index = index_] {
return [still_alive_guard = std::weak_ptr<bool>(still_alive_guard_), cb = std::move(cb),
index = index_]() mutable {
// This duplicates logic in wrapCallback() (above). Using wrapCallback also
// works, but incurs another indirection of lambda at runtime. As the
// duplicated logic is only an if-statement and a bool function, it doesn't
Expand All @@ -82,7 +83,8 @@ Event::PostCb InstanceImpl::SlotImpl::dataCallback(const UpdateCb& cb) {
};
}

void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, const Event::PostCb& complete_cb) {
void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb,
const std::function<void()>& complete_cb) {
parent_.runOnAllThreads(dataCallback(cb), complete_cb);
}

Expand Down Expand Up @@ -145,7 +147,7 @@ void InstanceImpl::removeSlot(uint32_t slot) {
});
}

void InstanceImpl::runOnAllThreads(Event::PostCb cb) {
void InstanceImpl::runOnAllThreads(std::function<void()> cb) {
ASSERT_IS_MAIN_OR_TEST_THREAD();
ASSERT(!shutdown_);

Expand All @@ -157,19 +159,20 @@ void InstanceImpl::runOnAllThreads(Event::PostCb cb) {
cb();
}

void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) {
void InstanceImpl::runOnAllThreads(std::function<void()> cb,
std::function<void()> all_threads_complete_cb) {
ASSERT_IS_MAIN_OR_TEST_THREAD();
ASSERT(!shutdown_);
// Handle main thread first so that when the last worker thread wins, we could just call the
// all_threads_complete_cb method. Parallelism of main thread execution is being traded off
// for programming simplicity here.
cb();

Event::PostCbSharedPtr cb_guard(new Event::PostCb(cb),
[this, all_threads_complete_cb](Event::PostCb* cb) {
main_thread_dispatcher_->post(all_threads_complete_cb);
delete cb;
});
std::shared_ptr<std::function<void()>> cb_guard(
new std::function<void()>(cb), [this, all_threads_complete_cb](std::function<void()>* cb) {
main_thread_dispatcher_->post(all_threads_complete_cb);
delete cb;
});

for (Event::Dispatcher& dispatcher : registered_threads_) {
dispatcher.post([cb_guard]() -> void { (*cb_guard)(); });
Expand Down
10 changes: 5 additions & 5 deletions source/common/thread_local/thread_local_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, pub
struct SlotImpl : public Slot {
SlotImpl(InstanceImpl& parent, uint32_t index);
~SlotImpl() override { parent_.removeSlot(index_); }
Event::PostCb wrapCallback(const Event::PostCb& cb);
Event::PostCb dataCallback(const UpdateCb& cb);
std::function<void()> wrapCallback(const std::function<void()>& cb);
std::function<void()> dataCallback(const UpdateCb& cb);
static bool currentThreadRegisteredWorker(uint32_t index);
static ThreadLocalObjectSharedPtr getWorker(uint32_t index);

// ThreadLocal::Slot
ThreadLocalObjectSharedPtr get() override;
void runOnAllThreads(const UpdateCb& cb) override;
void runOnAllThreads(const UpdateCb& cb, const Event::PostCb& complete_cb) override;
void runOnAllThreads(const UpdateCb& cb, const std::function<void()>& complete_cb) override;
bool currentThreadRegistered() override;
void set(InitializeCb cb) override;
bool isShutdown() const override { return parent_.shutdown_; }
Expand Down Expand Up @@ -72,8 +72,8 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, pub
};

void removeSlot(uint32_t slot);
void runOnAllThreads(Event::PostCb cb);
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback);
void runOnAllThreads(std::function<void()> cb);
void runOnAllThreads(std::function<void()> cb, std::function<void()> main_callback);
static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object);

static thread_local ThreadLocalData thread_local_data_;
Expand Down
Loading

0 comments on commit 39cf327

Please sign in to comment.