Skip to content

Commit

Permalink
Merge pull request #3048 from cloudflare/kenton/event-loop-local
Browse files Browse the repository at this point in the history
Pull in and use kj::EventLoopLocal.
  • Loading branch information
kentonv authored Nov 20, 2024
2 parents e789b3a + ade8460 commit 773987b
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 21 deletions.
8 changes: 4 additions & 4 deletions build/deps/gen/dep_capnp_cpp.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

load("@//:build/http.bzl", "http_archive")

URL = "https://github.com/capnproto/capnproto/tarball/6e071e34d88a8fc489638535899cd9d02e55bf76"
STRIP_PREFIX = "capnproto-capnproto-6e071e3/c++"
SHA256 = "78bad43b723d3b5a21e50424ad20b0d045f5d963c59d677cd5035ef5c68aabaa"
URL = "https://github.com/capnproto/capnproto/tarball/14132442b125d0383285e36809e467c6b6a759aa"
STRIP_PREFIX = "capnproto-capnproto-1413244/c++"
SHA256 = "d1e1ff677a53aaf840a8ae624af4e2fed2b08e324ad82b0efd821926d16d6ce6"
TYPE = "tgz"
COMMIT = "6e071e34d88a8fc489638535899cd9d02e55bf76"
COMMIT = "14132442b125d0383285e36809e467c6b6a759aa"

def dep_capnp_cpp():
http_archive(
Expand Down
6 changes: 3 additions & 3 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
namespace workerd {

static thread_local IoContext* threadLocalRequest = nullptr;
static thread_local void* threadId = nullptr;

static const kj::EventLoopLocal<int> threadId;

static void* getThreadId() {
if (threadId == nullptr) threadId = new int;
return threadId;
return threadId.get();
}

class IoContext::TimeoutManagerImpl final: public TimeoutManager {
Expand Down
20 changes: 11 additions & 9 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private:
kj::Maybe<AsyncWaiter&> next;
kj::Maybe<AsyncWaiter&>* prev;

static thread_local AsyncWaiter* threadCurrentWaiter;
static const kj::EventLoopLocal<AsyncWaiter*> threadCurrentWaiter;

friend class Worker::Isolate;
friend class Worker::AsyncLock;
Expand Down Expand Up @@ -2228,7 +2228,7 @@ void Worker::Lock::validateHandlers(ValidationErrorReporter& errorReporter) {
// =======================================================================================
// AsyncLock implementation

thread_local Worker::AsyncWaiter* Worker::AsyncWaiter::threadCurrentWaiter = nullptr;
const kj::EventLoopLocal<Worker::AsyncWaiter*> Worker::AsyncWaiter::threadCurrentWaiter;

Worker::Isolate::AsyncWaiterList::~AsyncWaiterList() noexcept {
// It should be impossible for this list to be non-empty since each member of the list holds a
Expand Down Expand Up @@ -2257,7 +2257,7 @@ kj::Promise<Worker::AsyncLock> Worker::Isolate::takeAsyncLockImpl(
}

for (uint threadWaitingDifferentLockCount = 0;; ++threadWaitingDifferentLockCount) {
AsyncWaiter* waiter = AsyncWaiter::threadCurrentWaiter;
AsyncWaiter* waiter = *AsyncWaiter::threadCurrentWaiter;

if (waiter == nullptr) {
// Thread is not currently waiting on a lock.
Expand Down Expand Up @@ -2327,7 +2327,7 @@ Worker::AsyncWaiter::AsyncWaiter(kj::Own<const Isolate> isolateParam)
*lock->tail = this;
lock->tail = &next;

threadCurrentWaiter = this;
*threadCurrentWaiter = this;

__atomic_add_fetch(&isolate->impl->lockAttemptGauge, 1, __ATOMIC_RELAXED);
}
Expand Down Expand Up @@ -2358,20 +2358,22 @@ Worker::AsyncWaiter::~AsyncWaiter() noexcept {
}
}

KJ_ASSERT(threadCurrentWaiter == this);
threadCurrentWaiter = nullptr;
auto& w = *threadCurrentWaiter;
KJ_ASSERT(w == this);
w = nullptr;
}

kj::Promise<void> Worker::AsyncLock::whenThreadIdle() {
AsyncWaiter*& currentWaiter = *AsyncWaiter::threadCurrentWaiter;
for (;;) {
if (auto waiter = AsyncWaiter::threadCurrentWaiter; waiter != nullptr) {
co_await waiter->releasePromise;
if (currentWaiter != nullptr) {
co_await currentWaiter->releasePromise;
continue;
}

co_await kj::yieldUntilQueueEmpty();

if (AsyncWaiter::threadCurrentWaiter == nullptr) {
if (currentWaiter == nullptr) {
co_return;
}
// Whoops, a new lock attempt appeared, loop.
Expand Down
10 changes: 5 additions & 5 deletions src/workerd/util/wait-list.c++
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace workerd {
namespace {
// Optimization: If the same wait list is waited multiple times in the same thread, we want to
// share the signal rather than send two cross-thread signals.
thread_local CrossThreadWaitList::WaiterMap threadLocalWaiters;
static const kj::EventLoopLocal<CrossThreadWaitList::WaiterMap> threadLocalWaiters;

void END_WAIT_LIST_CANCELER_STACK_START_CANCELEE_STACK() {}
} // namespace
Expand Down Expand Up @@ -50,9 +50,9 @@ CrossThreadWaitList::Waiter::~Waiter() noexcept(false) {
}

if (state->useThreadLocalOptimization) {
auto& entry = KJ_ASSERT_NONNULL(threadLocalWaiters.findEntry(state.get()));
auto& entry = KJ_ASSERT_NONNULL(threadLocalWaiters->findEntry(state.get()));
KJ_ASSERT(entry.value == this);
threadLocalWaiters.erase(entry);
threadLocalWaiters->erase(entry);
}
}

Expand All @@ -68,8 +68,8 @@ kj::Promise<void> CrossThreadWaitList::addWaiter() const {
if (state->useThreadLocalOptimization) {
kj::Own<Waiter> ownWaiter;

auto& waiter =
threadLocalWaiters.findOrCreate(state.get(), [&]() -> decltype(threadLocalWaiters)::Entry {
auto& waiter = threadLocalWaiters->findOrCreate(
state.get(), [&]() -> CrossThreadWaitList::WaiterMap::Entry {
auto paf = kj::newPromiseAndCrossThreadFulfiller<void>();
ownWaiter = kj::refcounted<Waiter>(*state, kj::mv(paf.fulfiller));
ownWaiter->forkedPromise = paf.promise.fork();
Expand Down

0 comments on commit 773987b

Please sign in to comment.