Skip to content

Commit

Permalink
Checkpoint, fork does not hang.
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Dec 23, 2024
1 parent d2a76b3 commit 9c25dc1
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 14 deletions.
2 changes: 2 additions & 0 deletions src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,8 @@ absl::Status Epoll1Poller::RestartOnFork() {
return absl::OkStatus();
}

void Epoll1Poller::FinishPolling() { grpc_core::Crash("Not implemented"); }

} // namespace experimental
} // namespace grpc_event_engine

Expand Down
1 change: 1 addition & 0 deletions src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class Epoll1Poller : public PosixEventPoller {

void Close();
SystemApi* GetSystemApi() override { return &system_api_; }
void FinishPolling() override;

private:
// This initial vector size may need to be tuned
Expand Down
19 changes: 17 additions & 2 deletions src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -639,9 +639,16 @@ PollPoller::~PollPoller() {
Poller::WorkResult PollPoller::Work(
EventEngine::Duration timeout,
absl::FunctionRef<void()> schedule_poll_again) {
LOG(INFO) << "Polling";
auto cleanup = absl::MakeCleanup([]() { LOG(INFO) << "Polling done"; });
CHECK(!in_fork_.load());
{
grpc_core::MutexLock lock(&polling_mu_);
polling_ = true;
}
auto cleanup = absl::MakeCleanup([this]() {
grpc_core::MutexLock lock(&polling_mu_);
polling_ = false;
polling_cond_.SignalAll();
});
// Avoid malloc for small number of elements.
enum { inline_elements = 96 };
struct pollfd pollfd_space[inline_elements];
Expand Down Expand Up @@ -864,6 +871,14 @@ void PollPoller::Close() {
closed_ = true;
}

void PollPoller::FinishPolling() {
grpc_core::MutexLock lock(&polling_mu_);
if (polling_) {
Kick();
polling_cond_.Wait(&polling_mu_);
}
}

std::shared_ptr<PollPoller> MakePollPoller(Scheduler* scheduler,
bool use_phony_poll) {
static bool kPollPollerSupported = InitPollPollerPosix();
Expand Down
5 changes: 5 additions & 0 deletions src/core/lib/event_engine/posix_engine/ev_poll_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class PollPoller : public PosixEventPoller,

SystemApi* GetSystemApi() override { return &system_api_; }

void FinishPolling() override;

private:
void KickExternal(bool ext);
void PollerHandlesListAddHandle(PollEventHandle* handle);
Expand All @@ -87,6 +89,9 @@ class PollPoller : public PosixEventPoller,
bool closed_ ABSL_GUARDED_BY(mu_);
SystemApi system_api_;
std::atomic_bool in_fork_{false};
grpc_core::Mutex polling_mu_;
grpc_core::CondVar polling_cond_;
bool polling_;
};

// Return an instance of a poll based poller tied to the specified scheduler.
Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/event_engine/posix_engine/event_poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class PosixEventPoller : public grpc_event_engine::experimental::Poller,
virtual SystemApi* GetSystemApi() = 0;
virtual absl::Status PrepareForkNew() = 0;
virtual absl::Status RestartOnFork() = 0;
// virtual void Suspend() = 0;
virtual void FinishPolling() = 0;
~PosixEventPoller() override = default;
};

Expand Down
2 changes: 0 additions & 2 deletions src/core/lib/event_engine/posix_engine/fork_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class ForkSupport {
grpc_core::MutexLock lock(&mu_);
int key = next_key_++;
listeners_.emplace(key, std::move(listener));
LOG(INFO) << "Subbed " << key;
return ForkSubscription(this, key);
}

Expand All @@ -71,7 +70,6 @@ class ForkSupport {
void Unsubscribe(int key) {
grpc_core::MutexLock lock(&mu_);
listeners_.erase(key);
LOG(INFO) << "Unsubbed " << key;
}

grpc_core::Mutex mu_;
Expand Down
23 changes: 15 additions & 8 deletions src/core/lib/event_engine/posix_engine/posix_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,12 @@ void PosixEnginePollerManager::TriggerShutdown() {
poller_->Kick();
}

void PosixEnginePollerManager::Suspend() {
PollerState expected = PollerState::kOk;
CHECK(poller_state_.compare_exchange_weak(expected, PollerState::kSuspended));
poller_->FinishPolling();
}

PosixEnginePollerManager::~PosixEnginePollerManager() {
if (poller_ != nullptr) {
poller_->Shutdown();
Expand Down Expand Up @@ -396,6 +402,10 @@ PosixEventEngine::PosixEventEngine()

void PosixEventEngine::PollerWorkInternal(
std::shared_ptr<PosixEnginePollerManager> poller_manager) {
if (poller_manager->IsSuspended()) {
LOG(INFO) << "Suspended!";
return;
}
// TODO(vigneshbabu): The timeout specified here is arbitrary. For instance,
// this can be improved by setting the timeout to the next expiring timer.
PosixEventPoller* poller = poller_manager->Poller();
Expand Down Expand Up @@ -754,14 +764,11 @@ PosixEventEngine::CreatePosixListener(
absl::Status PosixEventEngine::HandlePreFork() {
#if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
grpc_core::MutexLock lock(&fork_mutex_);
// absl::AnyInv

// action();
auto poller = poller_manager_->Poller();
if (poller != nullptr) {
return poller->PrepareForkNew();
}
LOG(INFO) << "Pool shutdown";
LOG(INFO) << "Suspending poller";
poller_manager_->Suspend();
LOG(INFO) << "Suspended polling, stopping timer manager.";
timer_manager_->PrepareFork();
LOG(INFO) << "Suspended timer manager, stopping thread pool.";
executor_->Quiesce([]() {});
#endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
return absl::OkStatus();
Expand Down
7 changes: 6 additions & 1 deletion src/core/lib/event_engine/posix_engine/posix_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ class PosixEnginePollerManager

void Run(experimental::EventEngine::Closure* closure) override;
void Run(absl::AnyInvocable<void()>) override;
void Suspend();
bool IsSuspended() {
return poller_state_.load(std::memory_order_relaxed) ==
PollerState::kSuspended;
}

bool IsShuttingDown() {
return poller_state_.load(std::memory_order_acquire) ==
Expand All @@ -121,7 +126,7 @@ class PosixEnginePollerManager
~PosixEnginePollerManager() override;

private:
enum class PollerState { kExternal, kOk, kShuttingDown };
enum class PollerState { kExternal, kOk, kShuttingDown, kSuspended };
std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller> poller_;
std::atomic<PollerState> poller_state_{PollerState::kOk};
std::shared_ptr<ThreadPool> executor_;
Expand Down

0 comments on commit 9c25dc1

Please sign in to comment.