diff --git a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc index 56b008a45fbfb..bbd4f02807840 100644 --- a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc +++ b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc @@ -605,6 +605,8 @@ absl::Status Epoll1Poller::RestartOnFork() { return absl::OkStatus(); } +void Epoll1Poller::FinishPolling() { grpc_core::Crash("Not implemented"); } + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h index b757dadea98c8..863aba0ab3c90 100644 --- a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h +++ b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h @@ -80,6 +80,7 @@ class Epoll1Poller : public PosixEventPoller { void Close(); SystemApi* GetSystemApi() override { return &system_api_; } + void FinishPolling() override; private: // This initial vector size may need to be tuned diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc index 5876752e2f078..40102ec89d136 100644 --- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc +++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc @@ -639,9 +639,16 @@ PollPoller::~PollPoller() { Poller::WorkResult PollPoller::Work( EventEngine::Duration timeout, absl::FunctionRef schedule_poll_again) { - LOG(INFO) << "Polling"; - auto cleanup = absl::MakeCleanup([]() { LOG(INFO) << "Polling done"; }); CHECK(!in_fork_.load()); + { + grpc_core::MutexLock lock(&polling_mu_); + polling_ = true; + } + auto cleanup = absl::MakeCleanup([this]() { + grpc_core::MutexLock lock(&polling_mu_); + polling_ = false; + polling_cond_.SignalAll(); + }); // Avoid malloc for small number of elements. enum { inline_elements = 96 }; struct pollfd pollfd_space[inline_elements]; @@ -864,6 +871,14 @@ void PollPoller::Close() { closed_ = true; } +void PollPoller::FinishPolling() { + grpc_core::MutexLock lock(&polling_mu_); + if (polling_) { + Kick(); + polling_cond_.Wait(&polling_mu_); + } +} + std::shared_ptr MakePollPoller(Scheduler* scheduler, bool use_phony_poll) { static bool kPollPollerSupported = InitPollPollerPosix(); diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.h b/src/core/lib/event_engine/posix_engine/ev_poll_posix.h index 080f233d6adac..838c45dc227b1 100644 --- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.h +++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.h @@ -64,6 +64,8 @@ class PollPoller : public PosixEventPoller, SystemApi* GetSystemApi() override { return &system_api_; } + void FinishPolling() override; + private: void KickExternal(bool ext); void PollerHandlesListAddHandle(PollEventHandle* handle); @@ -87,6 +89,9 @@ class PollPoller : public PosixEventPoller, bool closed_ ABSL_GUARDED_BY(mu_); SystemApi system_api_; std::atomic_bool in_fork_{false}; + grpc_core::Mutex polling_mu_; + grpc_core::CondVar polling_cond_; + bool polling_; }; // Return an instance of a poll based poller tied to the specified scheduler. diff --git a/src/core/lib/event_engine/posix_engine/event_poller.h b/src/core/lib/event_engine/posix_engine/event_poller.h index 0805222ecd345..ab74ed78e05c0 100644 --- a/src/core/lib/event_engine/posix_engine/event_poller.h +++ b/src/core/lib/event_engine/posix_engine/event_poller.h @@ -107,7 +107,7 @@ class PosixEventPoller : public grpc_event_engine::experimental::Poller, virtual SystemApi* GetSystemApi() = 0; virtual absl::Status PrepareForkNew() = 0; virtual absl::Status RestartOnFork() = 0; - // virtual void Suspend() = 0; + virtual void FinishPolling() = 0; ~PosixEventPoller() override = default; }; diff --git a/src/core/lib/event_engine/posix_engine/fork_support.h b/src/core/lib/event_engine/posix_engine/fork_support.h index 1df7db1ade70e..cc2ec8eda421e 100644 --- a/src/core/lib/event_engine/posix_engine/fork_support.h +++ b/src/core/lib/event_engine/posix_engine/fork_support.h @@ -51,7 +51,6 @@ class ForkSupport { grpc_core::MutexLock lock(&mu_); int key = next_key_++; listeners_.emplace(key, std::move(listener)); - LOG(INFO) << "Subbed " << key; return ForkSubscription(this, key); } @@ -71,7 +70,6 @@ class ForkSupport { void Unsubscribe(int key) { grpc_core::MutexLock lock(&mu_); listeners_.erase(key); - LOG(INFO) << "Unsubbed " << key; } grpc_core::Mutex mu_; diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index e024efc71297a..09c10810f2409 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -355,6 +355,12 @@ void PosixEnginePollerManager::TriggerShutdown() { poller_->Kick(); } +void PosixEnginePollerManager::Suspend() { + PollerState expected = PollerState::kOk; + CHECK(poller_state_.compare_exchange_weak(expected, PollerState::kSuspended)); + poller_->FinishPolling(); +} + PosixEnginePollerManager::~PosixEnginePollerManager() { if (poller_ != nullptr) { poller_->Shutdown(); @@ -396,6 +402,10 @@ PosixEventEngine::PosixEventEngine() void PosixEventEngine::PollerWorkInternal( std::shared_ptr poller_manager) { + if (poller_manager->IsSuspended()) { + LOG(INFO) << "Suspended!"; + return; + } // TODO(vigneshbabu): The timeout specified here is arbitrary. For instance, // this can be improved by setting the timeout to the next expiring timer. PosixEventPoller* poller = poller_manager->Poller(); @@ -754,14 +764,11 @@ PosixEventEngine::CreatePosixListener( absl::Status PosixEventEngine::HandlePreFork() { #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING grpc_core::MutexLock lock(&fork_mutex_); - // absl::AnyInv - - // action(); - auto poller = poller_manager_->Poller(); - if (poller != nullptr) { - return poller->PrepareForkNew(); - } - LOG(INFO) << "Pool shutdown"; + LOG(INFO) << "Suspending poller"; + poller_manager_->Suspend(); + LOG(INFO) << "Suspended polling, stopping timer manager."; + timer_manager_->PrepareFork(); + LOG(INFO) << "Suspended timer manager, stopping thread pool."; executor_->Quiesce([]() {}); #endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING return absl::OkStatus(); diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.h b/src/core/lib/event_engine/posix_engine/posix_engine.h index f7b4c3cf14cc9..248695ed4ef1f 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine.h @@ -111,6 +111,11 @@ class PosixEnginePollerManager void Run(experimental::EventEngine::Closure* closure) override; void Run(absl::AnyInvocable) override; + void Suspend(); + bool IsSuspended() { + return poller_state_.load(std::memory_order_relaxed) == + PollerState::kSuspended; + } bool IsShuttingDown() { return poller_state_.load(std::memory_order_acquire) == @@ -121,7 +126,7 @@ class PosixEnginePollerManager ~PosixEnginePollerManager() override; private: - enum class PollerState { kExternal, kOk, kShuttingDown }; + enum class PollerState { kExternal, kOk, kShuttingDown, kSuspended }; std::shared_ptr poller_; std::atomic poller_state_{PollerState::kOk}; std::shared_ptr executor_;