diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index fc2c9675b11e..5e11bc0e055f 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -75,9 +75,14 @@ M(GlobalThreadPoolExpansions, "Counts the total number of times new threads were added to the global thread pool. This metric indicates the frequency of global thread pool expansions to accommodate increased processing demands.") \ M(GlobalThreadPoolShrinks, "Counts the total number of times the global thread pool was shrunk by removing threads, triggered when the number of idle threads exceeded max_thread_pool_free_size. This metric highlights the adjustments in the global thread pool size in response to decreased thread utilization") \ M(GlobalThreadPoolJobScheduleMicroseconds, "Total time spent waiting to schedule a job in the global thread pool. This metric accounts for the time elapsed from the moment a job scheduling request is made until the job is successfully queued in the global thread pool, reflecting the responsiveness and scheduling efficiency of the pool.") \ + M(GlobalThreadPoolThreadCreationMicroseconds, "") \ + M(GlobalThreadPoolJobScheduleLockWaitMicroseconds, "") \ + \ M(LocalThreadPoolExpansions, "Counts the total number of times threads were borrowed from the global thread pool to expand local thread pools.") \ M(LocalThreadPoolShrinks, "Counts the total number of times threads were returned to the global thread pool from local thread pools.") \ M(LocalThreadPoolJobScheduleMicroseconds, "Total time spent waiting to schedule a job in a local thread pool. This metric measures the time elapsed from when a job scheduling request is initiated until the job is successfully queued in the local thread pool. Shows how much time the jobs were waiting for a free slot in the local pool.") \ + M(LocalThreadPoolThreadCreationMicroseconds, "") \ + M(LocalThreadPoolJobScheduleLockWaitMicroseconds, "") \ \ M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \ M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \ diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 7e6aad8b89d9..1cab39f37012 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -34,9 +34,13 @@ namespace ProfileEvents extern const Event GlobalThreadPoolExpansions; extern const Event GlobalThreadPoolShrinks; extern const Event GlobalThreadPoolJobScheduleMicroseconds; + extern const Event GlobalThreadPoolThreadCreationMicroseconds; + extern const Event GlobalThreadPoolJobScheduleLockWaitMicroseconds; extern const Event LocalThreadPoolExpansions; extern const Event LocalThreadPoolShrinks; extern const Event LocalThreadPoolJobScheduleMicroseconds; + extern const Event LocalThreadPoolThreadCreationMicroseconds; + extern const Event LocalThreadPoolJobScheduleLockWaitMicroseconds; } class JobWithPriority @@ -109,6 +113,19 @@ ThreadPoolImpl::ThreadPoolImpl( , queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */) , shutdown_on_exception(shutdown_on_exception_) { + std::lock_guard lock(mutex); + jobs.reserve(queue_size); + while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v ? 256 : 1) ) ) + { + try + { + createThreadNoLock(); + } + catch (...) + { + break; /// failed to start more threads + } + } } template @@ -128,7 +145,17 @@ void ThreadPoolImpl::setMaxThreads(size_t value) if (need_start_threads) { /// Start new threads while there are more scheduled jobs in the queue and the limit `max_threads` is not reached. - startNewThreadsNoLock(); + while (!shutdown && threads.size() < std::min(max_threads, scheduled_jobs)) + { + try + { + createThreadNoLock(); + } + catch (...) + { + break; /// failed to start more threads + } + } } else if (need_finish_free_threads) { @@ -168,6 +195,48 @@ void ThreadPoolImpl::setQueueSize(size_t value) jobs.reserve(queue_size); } +template +void ThreadPoolImpl::createThreadNoLock() +{ + Stopwatch watch; + + try + { + threads.emplace_front(); + } + catch (const std::exception & e) + { + /// Most likely this is a std::bad_alloc exception + throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot allocate thread slot: {})", e.what()); + } + catch (...) + { + throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot allocate thread slot"); + } + + try + { + threads.front() = Thread([this, it = threads.begin()] { worker(it); }); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions + ); + } + catch (const std::exception & e) + { + threads.pop_front(); + throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot allocate thread: {})", e.what()); + } + catch (...) + { + threads.pop_front(); + throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot allocate thread"); + } + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, + watch.elapsedMicroseconds()); + +} + template template @@ -194,6 +263,10 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: { std::unique_lock lock(mutex); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolJobScheduleLockWaitMicroseconds : ProfileEvents::LocalThreadPoolJobScheduleLockWaitMicroseconds, + watch.elapsedMicroseconds()); + auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; }; @@ -211,31 +284,23 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: /// We must not to allocate any memory after we emplaced a job in a queue. /// Because if an exception would be thrown, we won't notify a thread about job occurrence. - /// Check if there are enough threads to process job. + /// Check if there are enough threads to process job if (threads.size() < std::min(max_threads, scheduled_jobs + 1)) { - try + while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v ? 32 : 1) ) ) { - threads.emplace_front(); - } - catch (...) - { - /// Most likely this is a std::bad_alloc exception - return on_error("cannot allocate thread slot"); - } - - try - { - threads.front() = Thread([this, it = threads.begin()] { worker(it); }); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions - ); - - } - catch (...) - { - threads.pop_front(); - return on_error("cannot allocate thread"); + try + { + createThreadNoLock(); + } + catch (DB::Exception & e) + { + on_error(e.what()); + } + catch (...) + { + on_error("can not start new thread"); + } } } @@ -252,45 +317,14 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: /// Wake up a free thread to run the new job. new_job_or_shutdown.notify_one(); + + ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolJobScheduleMicroseconds : ProfileEvents::LocalThreadPoolJobScheduleMicroseconds, watch.elapsedMicroseconds()); return static_cast(true); } -template -void ThreadPoolImpl::startNewThreadsNoLock() -{ - if (shutdown) - return; - - /// Start new threads while there are more scheduled jobs in the queue and the limit `max_threads` is not reached. - while (threads.size() < std::min(scheduled_jobs, max_threads)) - { - try - { - threads.emplace_front(); - } - catch (...) - { - break; /// failed to start more threads - } - - try - { - threads.front() = Thread([this, it = threads.begin()] { worker(it); }); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions - ); - } - catch (...) - { - threads.pop_front(); - break; /// failed to start more threads - } - } -} - template void ThreadPoolImpl::scheduleOrThrowOnError(Job job, Priority priority) { @@ -317,7 +351,8 @@ void ThreadPoolImpl::wait() /// If threads are waiting on condition variables, but there are some jobs in the queue /// then it will prevent us from deadlock. new_job_or_shutdown.notify_all(); - job_finished.wait(lock, [this] { return scheduled_jobs == 0; }); + + no_jobs.wait(lock, [this] { return scheduled_jobs == 0; }); if (first_exception) { @@ -432,9 +467,20 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ --scheduled_jobs; - job_finished.notify_all(); - if (shutdown) + if (!shutdown) + { + job_finished.notify_one(); + if (!scheduled_jobs) + { + no_jobs.notify_all(); + } + } + else + { + job_finished.notify_one(); new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves. + no_jobs.notify_all(); + } } new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads); }); diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index 31e4eabf63b8..6a839a5da24f 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -115,6 +115,7 @@ class ThreadPoolImpl mutable std::mutex mutex; std::condition_variable job_finished; std::condition_variable new_job_or_shutdown; + std::condition_variable no_jobs; Metric metric_threads; Metric metric_active_threads; @@ -139,8 +140,8 @@ class ThreadPoolImpl void worker(typename std::list::iterator thread_it); - /// Tries to start new threads if there are scheduled jobs and the limit `max_threads` is not reached. Must be called with `mutex` locked. - void startNewThreadsNoLock(); + /// starts one thread (unless limit is reached). Must be called with `mutex` locked. + void createThreadNoLock(); void finalize(); void onDestroy(); diff --git a/src/Databases/DatabaseOnDisk.cpp b/src/Databases/DatabaseOnDisk.cpp index 12b0dc077990..202e8a086f70 100644 --- a/src/Databases/DatabaseOnDisk.cpp +++ b/src/Databases/DatabaseOnDisk.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include diff --git a/src/Databases/DatabaseOrdinary.h b/src/Databases/DatabaseOrdinary.h index d1d7dfd83fa7..da2b446311bf 100644 --- a/src/Databases/DatabaseOrdinary.h +++ b/src/Databases/DatabaseOrdinary.h @@ -1,8 +1,6 @@ #pragma once #include -#include - namespace DB { diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index f405be72287b..051cd247f5ff 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -15,7 +15,6 @@ #include #include -#include #include #include #include