diff --git a/src/api/callback.cc b/src/api/callback.cc index 6c6aec4573ed04..b90f5ca92daf09 100644 --- a/src/api/callback.cc +++ b/src/api/callback.cc @@ -82,7 +82,7 @@ void InternalCallbackScope::Close() { HandleScope handle_scope(env_->isolate()); if (!env_->can_call_into_js()) return; - if (failed_ && !env_->is_main_thread() && env_->is_stopping_worker()) { + if (failed_ && !env_->is_main_thread() && env_->is_stopping()) { env_->async_hooks()->clear_async_id_stack(); } diff --git a/src/api/environment.cc b/src/api/environment.cc index 29cf7ab808a2aa..bbabafe7c96ae6 100644 --- a/src/api/environment.cc +++ b/src/api/environment.cc @@ -37,7 +37,7 @@ static bool ShouldAbortOnUncaughtException(Isolate* isolate) { DebugSealHandleScope scope(isolate); Environment* env = Environment::GetCurrent(isolate); return env != nullptr && - (env->is_main_thread() || !env->is_stopping_worker()) && + (env->is_main_thread() || !env->is_stopping()) && env->should_abort_on_uncaught_toggle()[0] && !env->inside_should_not_abort_on_uncaught_scope(); } diff --git a/src/env-inl.h b/src/env-inl.h index 3426393901966e..d6b1787be3661a 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -32,7 +32,6 @@ #include "v8.h" #include "node_perf_common.h" #include "node_context_data.h" -#include "node_worker.h" #include #include @@ -661,7 +660,7 @@ void Environment::SetUnrefImmediate(native_immediate_callback cb, } inline bool Environment::can_call_into_js() const { - return can_call_into_js_ && (is_main_thread() || !is_stopping_worker()); + return can_call_into_js_ && !is_stopping(); } inline void Environment::set_can_call_into_js(bool can_call_into_js) { @@ -709,9 +708,8 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) { sub_worker_contexts_.erase(context); } -inline bool Environment::is_stopping_worker() const { - CHECK(!is_main_thread()); - return worker_context_->is_stopped(); +inline bool Environment::is_stopping() const { + return thread_stopper_.IsStopped(); } inline performance::performance_state* Environment::performance_state() { diff --git a/src/env.cc b/src/env.cc index 095715380323ea..589972cff2adf2 100644 --- a/src/env.cc +++ b/src/env.cc @@ -350,6 +350,14 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) { uv_unref(reinterpret_cast(&idle_prepare_handle_)); uv_unref(reinterpret_cast(&idle_check_handle_)); + GetAsyncRequest()->Install( + this, static_cast(this), [](uv_async_t* handle) { + Environment* env = static_cast(handle->data); + uv_stop(env->event_loop()); + }); + GetAsyncRequest()->SetStopped(false); + uv_unref(reinterpret_cast(GetAsyncRequest()->GetHandle())); + // Register clean-up cb to be called to clean up the handles // when the environment is freed, note that they are not cleaned in // the one environment per process setup, but will be called in @@ -365,6 +373,12 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) { uv_key_set(&thread_local_env, this); } +void Environment::ExitEnv() { + set_can_call_into_js(false); + GetAsyncRequest()->Stop(); + isolate_->TerminateExecution(); +} + MaybeLocal Environment::ProcessCliArgs( const std::vector& args, const std::vector& exec_args) { @@ -529,6 +543,7 @@ void Environment::RunCleanup() { started_cleanup_ = true; TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment), "RunCleanup", this); + GetAsyncRequest()->Uninstall(); CleanupHandles(); while (!cleanup_hooks_.empty()) { @@ -961,6 +976,53 @@ char* Environment::Reallocate(char* data, size_t old_size, size_t size) { return new_data; } +void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) { + Mutex::ScopedLock lock(mutex_); + env_ = env; + async_ = new uv_async_t; + async_->data = data; + CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0); +} + +void AsyncRequest::Uninstall() { + Mutex::ScopedLock lock(mutex_); + if (async_ != nullptr) { + env_->CloseHandle(async_, [](uv_async_t* async) { delete async; }); + async_ = nullptr; + } +} + +void AsyncRequest::Stop() { + Mutex::ScopedLock lock(mutex_); + stop_ = true; + if (async_ != nullptr) uv_async_send(async_); +} + +void AsyncRequest::SetStopped(bool flag) { + Mutex::ScopedLock lock(mutex_); + stop_ = flag; +} + +bool AsyncRequest::IsStopped() const { + Mutex::ScopedLock lock(mutex_); + return stop_; +} + +uv_async_t* AsyncRequest::GetHandle() { + Mutex::ScopedLock lock(mutex_); + return async_; +} + +void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const { + Mutex::ScopedLock lock(mutex_); + if (async_ != nullptr) tracker->TrackField("async_request", *async_); +} + +AsyncRequest::~AsyncRequest() { + Mutex::ScopedLock lock(mutex_); + CHECK_NULL(async_); +} + // Not really any better place than env.cc at this moment. void BaseObject::DeleteMe(void* data) { BaseObject* self = static_cast(data); diff --git a/src/env.h b/src/env.h index 9583586aac1163..2fb6da349becbf 100644 --- a/src/env.h +++ b/src/env.h @@ -508,6 +508,27 @@ struct AllocatedBuffer { friend class Environment; }; +class AsyncRequest : public MemoryRetainer { + public: + AsyncRequest() {} + ~AsyncRequest(); + void Install(Environment* env, void* data, uv_async_cb target); + void Uninstall(); + void Stop(); + void SetStopped(bool flag); + bool IsStopped() const; + uv_async_t* GetHandle(); + void MemoryInfo(MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(AsyncRequest) + SET_SELF_SIZE(AsyncRequest) + + private: + Environment* env_; + uv_async_t* async_ = nullptr; + mutable Mutex mutex_; + bool stop_ = true; +}; + class Environment { public: class AsyncHooks { @@ -692,6 +713,7 @@ class Environment { void RegisterHandleCleanups(); void CleanupHandles(); void Exit(int code); + void ExitEnv(); // Register clean-up cb to be called on environment destruction. inline void RegisterHandleCleanup(uv_handle_t* handle, @@ -847,7 +869,7 @@ class Environment { inline void add_sub_worker_context(worker::Worker* context); inline void remove_sub_worker_context(worker::Worker* context); void stop_sub_worker_contexts(); - inline bool is_stopping_worker() const; + inline bool is_stopping() const; inline void ThrowError(const char* errmsg); inline void ThrowTypeError(const char* errmsg); @@ -1021,6 +1043,7 @@ class Environment { inline ExecutionMode execution_mode() { return execution_mode_; } inline void set_execution_mode(ExecutionMode mode) { execution_mode_ = mode; } + inline AsyncRequest* GetAsyncRequest() { return &thread_stopper_; } private: inline void CreateImmediate(native_immediate_callback cb, @@ -1177,6 +1200,10 @@ class Environment { uint64_t cleanup_hook_counter_ = 0; bool started_cleanup_ = false; + // A custom async abstraction (a pair of async handle and a state variable) + // Used by embedders to shutdown running Node instance. + AsyncRequest thread_stopper_; + static void EnvPromiseHook(v8::PromiseHookType type, v8::Local promise, v8::Local parent); diff --git a/src/module_wrap.cc b/src/module_wrap.cc index d311f7cacaee1e..ac5d28fb2352ba 100644 --- a/src/module_wrap.cc +++ b/src/module_wrap.cc @@ -302,7 +302,7 @@ void ModuleWrap::Evaluate(const FunctionCallbackInfo& args) { // Convert the termination exception into a regular exception. if (timed_out || received_signal) { - if (!env->is_main_thread() && env->is_stopping_worker()) + if (!env->is_main_thread() && env->is_stopping()) return; env->isolate()->CancelTerminateExecution(); // It is possible that execution was terminated by another timeout in diff --git a/src/node.cc b/src/node.cc index e5d671c58b489d..4a5b10ae5f828b 100644 --- a/src/node.cc +++ b/src/node.cc @@ -824,15 +824,14 @@ inline int StartNodeWithIsolate(Isolate* isolate, per_process::v8_platform.DrainVMTasks(isolate); more = uv_loop_alive(env.event_loop()); - if (more) - continue; + if (more && !env.GetAsyncRequest()->IsStopped()) continue; RunBeforeExit(&env); // Emit `beforeExit` if the loop became alive either after emitting // event, or after running some callbacks. more = uv_loop_alive(env.event_loop()); - } while (more == true); + } while (more == true && !env.GetAsyncRequest()->IsStopped()); env.performance_state()->Mark( node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT); } @@ -969,6 +968,11 @@ int Start(int argc, char** argv) { return exit_code; } +int Stop(Environment* env) { + env->ExitEnv(); + return 0; +} + } // namespace node #if !HAVE_INSPECTOR diff --git a/src/node.h b/src/node.h index 06b8bd0ff58bea..68cc41f3170a39 100644 --- a/src/node.h +++ b/src/node.h @@ -199,10 +199,17 @@ typedef intptr_t ssize_t; namespace node { +class IsolateData; +class Environment; + // TODO(addaleax): Officially deprecate this and replace it with something // better suited for a public embedder API. NODE_EXTERN int Start(int argc, char* argv[]); +// Tear down Node.js while it is running (there are active handles +// in the loop and / or actively executing JavaScript code). +NODE_EXTERN int Stop(Environment* env); + // TODO(addaleax): Officially deprecate this and replace it with something // better suited for a public embedder API. NODE_EXTERN void Init(int* argc, @@ -215,9 +222,6 @@ class ArrayBufferAllocator; NODE_EXTERN ArrayBufferAllocator* CreateArrayBufferAllocator(); NODE_EXTERN void FreeArrayBufferAllocator(ArrayBufferAllocator* allocator); -class IsolateData; -class Environment; - class NODE_EXTERN MultiIsolatePlatform : public v8::Platform { public: ~MultiIsolatePlatform() override { } diff --git a/src/node_contextify.cc b/src/node_contextify.cc index b9962f091d9e40..621fa7eb16fe79 100644 --- a/src/node_contextify.cc +++ b/src/node_contextify.cc @@ -924,7 +924,7 @@ bool ContextifyScript::EvalMachine(Environment* env, // Convert the termination exception into a regular exception. if (timed_out || received_signal) { - if (!env->is_main_thread() && env->is_stopping_worker()) + if (!env->is_main_thread() && env->is_stopping()) return false; env->isolate()->CancelTerminateExecution(); // It is possible that execution was terminated by another timeout in diff --git a/src/node_worker.cc b/src/node_worker.cc index b79ba29e3feba1..dc3fe8d554d520 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -56,46 +56,6 @@ void WaitForWorkerInspectorToStop(Environment* child) { } // anonymous namespace -void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) { - Mutex::ScopedLock lock(mutex_); - env_ = env; - async_ = new uv_async_t; - if (data != nullptr) async_->data = data; - CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0); -} - -void AsyncRequest::Uninstall() { - Mutex::ScopedLock lock(mutex_); - if (async_ != nullptr) - env_->CloseHandle(async_, [](uv_async_t* async) { delete async; }); -} - -void AsyncRequest::Stop() { - Mutex::ScopedLock lock(mutex_); - stop_ = true; - if (async_ != nullptr) uv_async_send(async_); -} - -void AsyncRequest::SetStopped(bool flag) { - Mutex::ScopedLock lock(mutex_); - stop_ = flag; -} - -bool AsyncRequest::IsStopped() const { - Mutex::ScopedLock lock(mutex_); - return stop_; -} - -uv_async_t* AsyncRequest::GetHandle() { - Mutex::ScopedLock lock(mutex_); - return async_; -} - -void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const { - Mutex::ScopedLock lock(mutex_); - if (async_ != nullptr) tracker->TrackField("async_request", *async_); -} - Worker::Worker(Environment* env, Local wrap, const std::string& url, @@ -141,7 +101,10 @@ Worker::Worker(Environment* env, } bool Worker::is_stopped() const { - return thread_stopper_.IsStopped(); + Mutex::ScopedLock lock(mutex_); + if (env_ != nullptr) + return env_->GetAsyncRequest()->IsStopped(); + return stopped_; } // This class contains data that is only relevant to the child thread itself, @@ -249,8 +212,12 @@ void Worker::Run() { Context::Scope context_scope(env_->context()); if (child_port != nullptr) child_port->Close(); - thread_stopper_.Uninstall(); - thread_stopper_.SetStopped(true); + { + Mutex::ScopedLock lock(mutex_); + stopped_ = true; + this->env_ = nullptr; + } + env_->GetAsyncRequest()->SetStopped(true); env_->stop_sub_worker_contexts(); env_->RunCleanup(); RunAtExit(env_.get()); @@ -266,12 +233,12 @@ void Worker::Run() { } }); - if (thread_stopper_.IsStopped()) return; + if (is_stopped()) return; { HandleScope handle_scope(isolate_); Local context = NewContext(isolate_); - if (thread_stopper_.IsStopped()) return; + if (is_stopped()) return; CHECK(!context.IsEmpty()); Context::Scope context_scope(context); { @@ -289,18 +256,13 @@ void Worker::Run() { env_->ProcessCliArgs(std::vector{}, std::move(exec_argv_)); } - + { + Mutex::ScopedLock lock(mutex_); + if (stopped_) return; + this->env_ = env_.get(); + } Debug(this, "Created Environment for worker with id %llu", thread_id_); - if (is_stopped()) return; - thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) { - Environment* env_ = static_cast(handle->data); - uv_stop(env_->event_loop()); - }); - uv_unref(reinterpret_cast(thread_stopper_.GetHandle())); - - Debug(this, "Created Environment for worker with id %llu", thread_id_); - if (thread_stopper_.IsStopped()) return; { HandleScope handle_scope(isolate_); Mutex::ScopedLock lock(mutex_); @@ -316,7 +278,7 @@ void Worker::Run() { Debug(this, "Created message port for worker %llu", thread_id_); } - if (thread_stopper_.IsStopped()) return; + if (is_stopped()) return; { #if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR StartWorkerInspector(env_.get(), @@ -337,28 +299,28 @@ void Worker::Run() { Debug(this, "Loaded environment for worker %llu", thread_id_); } - if (thread_stopper_.IsStopped()) return; + if (is_stopped()) return; { SealHandleScope seal(isolate_); bool more; env_->performance_state()->Mark( node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START); do { - if (thread_stopper_.IsStopped()) break; + if (is_stopped()) break; uv_run(&data.loop_, UV_RUN_DEFAULT); - if (thread_stopper_.IsStopped()) break; + if (is_stopped()) break; platform_->DrainTasks(isolate_); more = uv_loop_alive(&data.loop_); - if (more && !thread_stopper_.IsStopped()) continue; + if (more && !is_stopped()) continue; EmitBeforeExit(env_.get()); // Emit `beforeExit` if the loop became alive either after emitting // event, or after running some callbacks. more = uv_loop_alive(&data.loop_); - } while (more == true); + } while (more == true && !is_stopped()); env_->performance_state()->Mark( node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT); } @@ -366,7 +328,7 @@ void Worker::Run() { { int exit_code; - bool stopped = thread_stopper_.IsStopped(); + bool stopped = is_stopped(); if (!stopped) exit_code = EmitExit(env_.get()); Mutex::ScopedLock lock(mutex_); @@ -414,7 +376,7 @@ void Worker::OnThreadStopped() { Worker::~Worker() { Mutex::ScopedLock lock(mutex_); - CHECK(thread_stopper_.IsStopped()); + CHECK(stopped_ || env_ == nullptr || env_->GetAsyncRequest()->IsStopped()); CHECK(thread_joined_); Debug(this, "Worker %llu destroyed", thread_id_); @@ -510,12 +472,12 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { w->ClearWeak(); w->env()->add_sub_worker_context(w); + w->stopped_ = false; w->thread_joined_ = false; - w->thread_stopper_.SetStopped(false); w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) { Worker* w_ = static_cast(handle->data); - CHECK(w_->thread_stopper_.IsStopped()); + CHECK(w_->is_stopped()); w_->parent_port_ = nullptr; w_->JoinThread(); delete w_; @@ -563,14 +525,12 @@ void Worker::Unref(const FunctionCallbackInfo& args) { void Worker::Exit(int code) { Mutex::ScopedLock lock(mutex_); - Debug(this, "Worker %llu called Exit(%d)", thread_id_, code); - if (!thread_stopper_.IsStopped()) { + if (env_ != nullptr) { exit_code_ = code; - Debug(this, "Received StopEventLoop request"); - thread_stopper_.Stop(); - if (isolate_ != nullptr) - isolate_->TerminateExecution(); + Stop(env_); + } else { + stopped_ = true; } } diff --git a/src/node_worker.h b/src/node_worker.h index 442056eaac1078..adc755426d03c8 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -12,26 +12,6 @@ namespace worker { class WorkerThreadData; -class AsyncRequest : public MemoryRetainer { - public: - AsyncRequest() {} - void Install(Environment* env, void* data, uv_async_cb target); - void Uninstall(); - void Stop(); - void SetStopped(bool flag); - bool IsStopped() const; - uv_async_t* GetHandle(); - void MemoryInfo(MemoryTracker* tracker) const override; - SET_MEMORY_INFO_NAME(AsyncRequest) - SET_SELF_SIZE(AsyncRequest) - - private: - Environment* env_; - uv_async_t* async_ = nullptr; - mutable Mutex mutex_; - bool stop_ = true; -}; - // A worker thread, as represented in its parent thread. class Worker : public AsyncWrap { public: @@ -54,7 +34,6 @@ class Worker : public AsyncWrap { void MemoryInfo(MemoryTracker* tracker) const override { tracker->TrackField("parent_port", parent_port_); - tracker->TrackInlineField(&thread_stopper_, "thread_stopper_"); tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_"); } @@ -107,9 +86,20 @@ class Worker : public AsyncWrap { // instance refers to it via its [kPort] property. MessagePort* parent_port_ = nullptr; - AsyncRequest thread_stopper_; AsyncRequest on_thread_finished_; + // A raw flag that is used by creator and worker threads to + // sync up on pre-mature termination of worker - while in the + // warmup phase. Once the worker is fully warmed up, use the + // async handle of the worker's Environment for the same purpose. + bool stopped_ = true; + + // The real Environment of the worker object. It has a lesser + // lifespan than the worker object itself - comes to life + // when the worker thread creates a new Environment, and gets + // destroyed alongwith the worker thread. + Environment* env_ = nullptr; + friend class WorkerThreadData; };