diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 6f9e2d0a18b2..736d5633ee67 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -8,7 +8,6 @@ #include #include -#include #include #include @@ -96,7 +95,7 @@ class JobWithPriority static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool"; static constexpr const size_t GLOBAL_THREAD_POOL_MIN_FREE_THREADS = 32; -static constexpr const size_t LOCAL_THREAD_POOL_MIN_FREE_THREADS = 0; +static constexpr const size_t LOCAL_THREAD_POOL_MIN_FREE_THREADS = 1; static constexpr const size_t GLOBAL_THREAD_POOL_HOUSEKEEP_INTERVAL_MILLISECONDS = 10000; // 10 seconds /// static constexpr const size_t GLOBAL_THREAD_POOL_HOUSEKEEP_HISTORY_WINDOW_SECONDS = 600; // 10 minutes @@ -138,7 +137,6 @@ ThreadPoolImpl::ThreadPoolImpl( , queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */) , shutdown_on_exception(shutdown_on_exception_) { - std::unique_lock lock(mutex); // LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"), // "ThreadPoolImpl constructor [Instance Address: {}]: max_threads = {}, max_free_threads = {}, queue_size = {}, StackTrace: {}", // static_cast(this), max_threads, max_free_threads, queue_size, StackTrace().toString()); @@ -148,14 +146,15 @@ ThreadPoolImpl::ThreadPoolImpl( if constexpr (std::is_same_v) // global thread pool { - // for global thread pool we need to start housekeeping thread - // it will run during the whole lifetime of the global thread pool - housekeeping_thread.emplace(&ThreadPoolImpl::housekeep, this); - + // { + // std::lock_guard lock(mutex); + // // for global thread pool we need to start housekeeping thread + // // it will run during the whole lifetime of the global thread pool + // // housekeeping_thread.emplace(&ThreadPoolImpl::housekeep, this); + // } // we will start GLOBAL_THREAD_POOL_MIN_FREE_THREADS immediately adjustThreadPoolSize(); } - } template @@ -172,7 +171,8 @@ void ThreadPoolImpl::setMaxThreads(size_t value) jobs.reserve(queue_size ? queue_size : desired_pool_size.load()); } - adjustThreadPoolSize(); + if (current_pool_size > 0) + adjustThreadPoolSize(); } template @@ -190,7 +190,9 @@ void ThreadPoolImpl::setMaxFreeThreads(size_t value) max_free_threads = std::min(value, max_threads); calculateDesiredThreadPoolSizeNoLock(); } - adjustThreadPoolSize(); + + if (current_pool_size > 0) + adjustThreadPoolSize(); } template @@ -220,23 +222,58 @@ void ThreadPoolImpl::calculateDesiredThreadPoolSizeNoLock() { desired_pool_size = std::min(max_threads, scheduled_jobs + (std::is_same_v ? GLOBAL_THREAD_POOL_MIN_FREE_THREADS : LOCAL_THREAD_POOL_MIN_FREE_THREADS)); } - // else if (current_pool_size > std::min(max_threads, scheduled_jobs + max_free_threads)) - // { - // /// desired_pool_size = std::min(max_threads, scheduled_jobs + max_free_threads); + else if (current_pool_size > std::min(max_threads, scheduled_jobs + max_free_threads)) + { + desired_pool_size = std::min(max_threads, scheduled_jobs + max_free_threads); - // // ExponentiallySmoothedCounter.h / ExponentiallySmoothedAverage - // // - // /// TODO - // /// our desired_pool_size should be at least as big as minimum utilization over last 10 minutes - // /// and at least as small as maximum utilization over last 10 minutes + // // // ExponentiallySmoothedCounter.h / ExponentiallySmoothedAverage + // // // + // // /// TODO + // // /// our desired_pool_size should be at least as big as minimum utilization over last 10 minutes + // // /// and at least as small as maximum utilization over last 10 minutes - // // we are in allowed range, let's try to guess the optimal number of threads based on the current & history of utilization - // // we want to have some free threads in the pool, but not too many + // // // we are in allowed range, let's try to guess the optimal number of threads based on the current & history of utilization + // // // we want to have some free threads in the pool, but not too many - // } + } // todo: we need to shrink the pool if there are too many free threads } +template +typename ThreadPoolImpl::PreparedThread ThreadPoolImpl::prepareThread() +{ + std::promise::iterator> promise_thread_it; + std::shared_future::iterator> future_thread_id = promise_thread_it.get_future().share(); + + Stopwatch watch; + std::unique_ptr thread_ptr = std::make_unique([this, ft = std::move(future_thread_id)] mutable + { + auto thread_it = ft.get(); + worker(thread_it); + }); + + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, + watch.elapsedMicroseconds()); + + return { + std::move(promise_thread_it), + std::move(thread_ptr) + }; +} + +template +void ThreadPoolImpl::activateThread(typename ThreadPoolImpl::PreparedThread & prepared_thread) +{ + threads.push_front(std::move(*prepared_thread.thread_ptr)); + prepared_thread.thread_ptr.reset(); + + prepared_thread.promise_thread_it.set_value( threads.begin() ); + current_pool_size = threads.size(); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); +} + template template @@ -261,6 +298,25 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: return false; }; + std::optional prepared_thread; + + if (current_pool_size < desired_pool_size) + { + try + { + prepared_thread = prepareThread(); + } + catch (const std::exception & e) + { + std::lock_guard lock(mutex); + return on_error(fmt::format("cannot start threads: {}", e.what())); + } + catch (...) + { + std::lock_guard lock(mutex); + return on_error("cannot start thread"); + } + } { Stopwatch watch; @@ -269,6 +325,12 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: std::is_same_v ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds, watch.elapsedMicroseconds()); + if (prepared_thread) + { + activateThread(*prepared_thread); + prepared_thread.reset(); + } + auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; }; if (wait_microseconds) /// Check for optional. Condition is true if the optional is set. Even if the value is zero. @@ -299,68 +361,9 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: /// Wake up a free thread to run the new job. new_job_or_shutdown.notify_one(); // maybe while we starting the new thread some of the existing will become free and take the job? - if (current_pool_size < desired_pool_size) - { - try - { - startThreads(); - } - catch (const DB::Exception & e) - { - return on_error(fmt::format("cannot start threads: {}", e.what())); - } - catch (...) - { - return on_error("cannot start threads"); - } - } - return static_cast(true); } -template -void ThreadPoolImpl::startThreads() -{ - std::lock_guard lock(threads_mutex); - - while (threads.size() < desired_pool_size) - { - try - { - std::promise::iterator> promise_thread_it; - std::shared_future::iterator> future_thread_id = promise_thread_it.get_future().share(); - - Stopwatch watch; - - std::unique_ptr thread_ptr = std::make_unique([this, ft = std::move(future_thread_id)] mutable - { - auto thread_it = ft.get(); - worker(thread_it); - }); - - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, - watch.elapsedMicroseconds()); - - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); - - threads.push_front(std::move(*thread_ptr)); - promise_thread_it.set_value( threads.begin() ); - - current_pool_size = threads.size(); - new_job_or_shutdown.notify_one(); // let's notify the new thread just in case - } - catch (const std::exception & e) - { - throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread: {})", e.what()); - } - catch (...) - { - throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread"); - } - } -} template void ThreadPoolImpl::adjustThreadPoolSize() @@ -369,7 +372,24 @@ void ThreadPoolImpl::adjustThreadPoolSize() auto desired = desired_pool_size.load(); if (pool_size < desired) { - startThreads(); + std::lock_guard lock(mutex); + + while (threads.size() < desired_pool_size) + { + try + { + auto prepared_thread = prepareThread(); + activateThread(prepared_thread); + } + catch (const std::exception & e) + { + throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread: {})", e.what()); + } + catch (...) + { + throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot start thread"); + } + } } else if (pool_size > desired) { @@ -452,17 +472,25 @@ void ThreadPoolImpl::finalize() /// Wake up threads so they can finish themselves. new_job_or_shutdown.notify_all(); - /// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does). - for (auto & thread : threads) { - thread.join(); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); + /// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does). + for (auto & thread : threads) + { + thread.join(); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); + } + threads.clear(); } - threads.clear(); housekeeping_thread_cv.notify_all(); - housekeeping_thread.reset(); + + if (housekeeping_thread) + { + std::lock_guard lock(mutex); + housekeeping_thread->join(); + housekeeping_thread.reset(); + } } template @@ -569,7 +597,6 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ CurrentMetrics::Increment metric_pool_threads(metric_threads); bool job_is_done = false; - bool should_remove_myself = false; std::exception_ptr exception_from_job; @@ -627,12 +654,14 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ if (threads_remove_themselves) { - should_remove_myself = true; // we will do it out the the lock + thread_it->detach(); + threads.erase(thread_it); + current_pool_size = threads.size(); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); } - else - return; + return; } - /// boost::priority_queue does not provide interface for getting non-const reference to an element /// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority. job_data = std::move(const_cast(jobs.top())); @@ -650,17 +679,6 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ } } - if (should_remove_myself) - { - std::lock_guard lock(threads_mutex); - thread_it->detach(); - threads.erase(thread_it); - current_pool_size = threads.size(); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); - return; - } - // if (have_more_jobs) // { // new_job_or_shutdown.notify_one(); // just in case some other thread missed the cond var signal diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index f7a2040c9b4c..50b97fb2b643 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -109,9 +110,14 @@ class ThreadPoolImpl using OnDestroyCallback = std::function; void addOnDestroyCallback(OnDestroyCallback && callback); + private: friend class GlobalThreadPool; + struct PreparedThread { + std::promise::iterator> promise_thread_it; + std::unique_ptr thread_ptr; + }; mutable std::mutex mutex; std::condition_variable job_finished; @@ -135,9 +141,11 @@ class ThreadPoolImpl /// boost::heap::stable is used to preserve the FIFO order of jobs with same priority boost::heap::priority_queue> jobs; - mutable std::mutex threads_mutex; - std::list threads; // modified when threads_mutex is locked - std::atomic current_pool_size = 0; // modified when threads_mutex is locked + std::list threads; + std::atomic current_pool_size = 0; + + PreparedThread prepareThread(); + void activateThread(PreparedThread & prepared_thread); std::exception_ptr first_exception; std::stack on_destroy_callbacks; @@ -150,11 +158,7 @@ class ThreadPoolImpl void worker(typename std::list::iterator thread_it); - - /// if number of threads is less than desired, creates new threads - void startThreads(); - - /// will incrase number of threads if needed or decrease if there are too many + /// will increase number of threads if needed or decrease if there are too many void adjustThreadPoolSize(); void finalize();