From 0e4f88d3f9c66c678cd56154e11827d5c9ec9f9e Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Thu, 23 Jan 2025 08:13:06 -0500 Subject: [PATCH] Fix "disconnected: write(m_post_fd, &buffer, 1): Broken pipe" EventLoop shutdown races. The EventLoop shutdown sequence has race conditions that could cause it to shut down right before a `removeClient` `write(m_post_fd, ...)` call is about to happen, if threads run in an unexpected order, and cause the write to fail. Cases where this can happen are described in https://github.com/bitcoin/bitcoin/issues/31151#issuecomment-2609686156 and the possible causes are that (1) `EventLoop::m_mutex` is not used to protect some EventLoop member variables that are accessed from multiple threads, particularly (`m_num_clients` and `m_async_fns`) and (2) the `removeClient` method can do unnecessary `write(m_post_fd, ...)` calls before the loop is supposed to exit because it is not checking the `m_async_fns.empty()` condition, and these multiple write calls can make the event loop exit early and cause the final `write()` call to fail. In practice, only the second cause seems to actually trigger this bug, but PR fixes both possible causes. Fixes https://github.com/bitcoin/bitcoin/issues/31151 Co-authored-by: Vasil Dimov --- include/mp/proxy-io.h | 2 ++ src/mp/proxy.cpp | 49 ++++++++++++++++++++++++++----------------- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 29cd3a4..83b725d 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -165,6 +165,8 @@ class EventLoop //! Add/remove remote client reference counts. void addClient(std::unique_lock& lock); void removeClient(std::unique_lock& lock); + //! Check if loop should exit. + bool done(std::unique_lock& lock); Logger log() { diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index bcfbde8..c7516b5 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -188,29 +188,31 @@ void EventLoop::loop() kj::Own wait_stream{ m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)}; + int post_fd{m_post_fd}; char buffer = 0; for (;;) { size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope); - if (read_bytes == 1) { - std::unique_lock lock(m_mutex); - if (m_post_fn) { - Unlock(lock, *m_post_fn); - m_post_fn = nullptr; - } - } else { - throw std::logic_error("EventLoop wait_stream closed unexpectedly"); - } - m_cv.notify_all(); - if (m_num_clients == 0 && m_async_fns.empty()) { - log() << "EventLoop::loop done, cancelling event listeners."; - m_task_set.reset(); - log() << "EventLoop::loop bye."; + if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly"); + std::unique_lock lock(m_mutex); + if (m_post_fn) { + Unlock(lock, *m_post_fn); + m_post_fn = nullptr; + m_cv.notify_all(); + } else if (done(lock)) { + // Intentionally do not break if m_post_fn was set, even if done() + // would return true, to ensure that the removeClient write(post_fd) + // call always succeeds and the loop does not exit between the time + // that the done condition is set and the write call is made. break; } } + log() << "EventLoop::loop done, cancelling event listeners."; + m_task_set.reset(); + log() << "EventLoop::loop bye."; wait_stream = nullptr; + KJ_SYSCALL(::close(post_fd)); + std::unique_lock lock(m_mutex); m_wait_fd = -1; - KJ_SYSCALL(::close(m_post_fd)); m_post_fd = -1; } @@ -222,9 +224,10 @@ void EventLoop::post(const std::function& fn) std::unique_lock lock(m_mutex); m_cv.wait(lock, [this] { return m_post_fn == nullptr; }); m_post_fn = &fn; + int post_fd{m_post_fd}; Unlock(lock, [&] { char buffer = 0; - KJ_SYSCALL(write(m_post_fd, &buffer, 1)); + KJ_SYSCALL(write(post_fd, &buffer, 1)); }); m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; }); } @@ -233,13 +236,13 @@ void EventLoop::addClient(std::unique_lock& lock) { m_num_clients += void EventLoop::removeClient(std::unique_lock& lock) { - assert(m_num_clients > 0); m_num_clients -= 1; - if (m_num_clients == 0) { + if (done(lock)) { m_cv.notify_all(); + int post_fd{m_post_fd}; Unlock(lock, [&] { char buffer = 0; - KJ_SYSCALL(write(m_post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon) + KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon) }); } } @@ -268,6 +271,14 @@ void EventLoop::startAsyncThread(std::unique_lock& lock) } } +bool EventLoop::done(std::unique_lock& lock) +{ + assert(m_num_clients >= 0); + assert(lock.owns_lock()); + assert(lock.mutex() == &m_mutex); + return m_num_clients == 0 && m_async_fns.empty(); +} + std::tuple SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, std::function make_thread) { std::unique_lock lock(mutex);