From ade846041911cf98acae8e798b8a7e25531df878 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sun, 27 Oct 2024 15:08:18 -0500 Subject: [PATCH] Pull in and use kj::EventLoopLocal. KJ PR: https://github.com/capnproto/capnproto/pull/2167 --- build/deps/gen/dep_capnp_cpp.bzl | 8 ++++---- src/workerd/io/io-context.c++ | 6 +++--- src/workerd/io/worker.c++ | 20 +++++++++++--------- src/workerd/util/wait-list.c++ | 10 +++++----- 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/build/deps/gen/dep_capnp_cpp.bzl b/build/deps/gen/dep_capnp_cpp.bzl index cf3577230ed..bdae10307ac 100644 --- a/build/deps/gen/dep_capnp_cpp.bzl +++ b/build/deps/gen/dep_capnp_cpp.bzl @@ -2,11 +2,11 @@ load("@//:build/http.bzl", "http_archive") -URL = "https://github.com/capnproto/capnproto/tarball/6e071e34d88a8fc489638535899cd9d02e55bf76" -STRIP_PREFIX = "capnproto-capnproto-6e071e3/c++" -SHA256 = "78bad43b723d3b5a21e50424ad20b0d045f5d963c59d677cd5035ef5c68aabaa" +URL = "https://github.com/capnproto/capnproto/tarball/14132442b125d0383285e36809e467c6b6a759aa" +STRIP_PREFIX = "capnproto-capnproto-1413244/c++" +SHA256 = "d1e1ff677a53aaf840a8ae624af4e2fed2b08e324ad82b0efd821926d16d6ce6" TYPE = "tgz" -COMMIT = "6e071e34d88a8fc489638535899cd9d02e55bf76" +COMMIT = "14132442b125d0383285e36809e467c6b6a759aa" def dep_capnp_cpp(): http_archive( diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 5748e29b94d..55c4a75a370 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -18,11 +18,11 @@ namespace workerd { static thread_local IoContext* threadLocalRequest = nullptr; -static thread_local void* threadId = nullptr; + +static const kj::EventLoopLocal threadId; static void* getThreadId() { - if (threadId == nullptr) threadId = new int; - return threadId; + return threadId.get(); } class IoContext::TimeoutManagerImpl final: public TimeoutManager { diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 26db2700414..5ca204fc259 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -371,7 +371,7 @@ private: kj::Maybe next; kj::Maybe* prev; - static thread_local AsyncWaiter* threadCurrentWaiter; + static const kj::EventLoopLocal threadCurrentWaiter; friend class Worker::Isolate; friend class Worker::AsyncLock; @@ -2228,7 +2228,7 @@ void Worker::Lock::validateHandlers(ValidationErrorReporter& errorReporter) { // ======================================================================================= // AsyncLock implementation -thread_local Worker::AsyncWaiter* Worker::AsyncWaiter::threadCurrentWaiter = nullptr; +const kj::EventLoopLocal Worker::AsyncWaiter::threadCurrentWaiter; Worker::Isolate::AsyncWaiterList::~AsyncWaiterList() noexcept { // It should be impossible for this list to be non-empty since each member of the list holds a @@ -2257,7 +2257,7 @@ kj::Promise Worker::Isolate::takeAsyncLockImpl( } for (uint threadWaitingDifferentLockCount = 0;; ++threadWaitingDifferentLockCount) { - AsyncWaiter* waiter = AsyncWaiter::threadCurrentWaiter; + AsyncWaiter* waiter = *AsyncWaiter::threadCurrentWaiter; if (waiter == nullptr) { // Thread is not currently waiting on a lock. @@ -2327,7 +2327,7 @@ Worker::AsyncWaiter::AsyncWaiter(kj::Own isolateParam) *lock->tail = this; lock->tail = &next; - threadCurrentWaiter = this; + *threadCurrentWaiter = this; __atomic_add_fetch(&isolate->impl->lockAttemptGauge, 1, __ATOMIC_RELAXED); } @@ -2358,20 +2358,22 @@ Worker::AsyncWaiter::~AsyncWaiter() noexcept { } } - KJ_ASSERT(threadCurrentWaiter == this); - threadCurrentWaiter = nullptr; + auto& w = *threadCurrentWaiter; + KJ_ASSERT(w == this); + w = nullptr; } kj::Promise Worker::AsyncLock::whenThreadIdle() { + AsyncWaiter*& currentWaiter = *AsyncWaiter::threadCurrentWaiter; for (;;) { - if (auto waiter = AsyncWaiter::threadCurrentWaiter; waiter != nullptr) { - co_await waiter->releasePromise; + if (currentWaiter != nullptr) { + co_await currentWaiter->releasePromise; continue; } co_await kj::yieldUntilQueueEmpty(); - if (AsyncWaiter::threadCurrentWaiter == nullptr) { + if (currentWaiter == nullptr) { co_return; } // Whoops, a new lock attempt appeared, loop. diff --git a/src/workerd/util/wait-list.c++ b/src/workerd/util/wait-list.c++ index 13a5aed5a58..0a8a00bd33f 100644 --- a/src/workerd/util/wait-list.c++ +++ b/src/workerd/util/wait-list.c++ @@ -11,7 +11,7 @@ namespace workerd { namespace { // Optimization: If the same wait list is waited multiple times in the same thread, we want to // share the signal rather than send two cross-thread signals. -thread_local CrossThreadWaitList::WaiterMap threadLocalWaiters; +static const kj::EventLoopLocal threadLocalWaiters; void END_WAIT_LIST_CANCELER_STACK_START_CANCELEE_STACK() {} } // namespace @@ -50,9 +50,9 @@ CrossThreadWaitList::Waiter::~Waiter() noexcept(false) { } if (state->useThreadLocalOptimization) { - auto& entry = KJ_ASSERT_NONNULL(threadLocalWaiters.findEntry(state.get())); + auto& entry = KJ_ASSERT_NONNULL(threadLocalWaiters->findEntry(state.get())); KJ_ASSERT(entry.value == this); - threadLocalWaiters.erase(entry); + threadLocalWaiters->erase(entry); } } @@ -68,8 +68,8 @@ kj::Promise CrossThreadWaitList::addWaiter() const { if (state->useThreadLocalOptimization) { kj::Own ownWaiter; - auto& waiter = - threadLocalWaiters.findOrCreate(state.get(), [&]() -> decltype(threadLocalWaiters)::Entry { + auto& waiter = threadLocalWaiters->findOrCreate( + state.get(), [&]() -> CrossThreadWaitList::WaiterMap::Entry { auto paf = kj::newPromiseAndCrossThreadFulfiller(); ownWaiter = kj::refcounted(*state, kj::mv(paf.fulfiller)); ownWaiter->forkedPromise = paf.promise.fork();