Skip to content

Commit

Permalink
continue experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
filimonov committed Jan 22, 2024
1 parent 0350103 commit 485917f
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 64 deletions.
5 changes: 5 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,14 @@
M(GlobalThreadPoolExpansions, "Counts the total number of times new threads were added to the global thread pool. This metric indicates the frequency of global thread pool expansions to accommodate increased processing demands.") \
M(GlobalThreadPoolShrinks, "Counts the total number of times the global thread pool was shrunk by removing threads, triggered when the number of idle threads exceeded max_thread_pool_free_size. This metric highlights the adjustments in the global thread pool size in response to decreased thread utilization") \
M(GlobalThreadPoolJobScheduleMicroseconds, "Total time spent waiting to schedule a job in the global thread pool. This metric accounts for the time elapsed from the moment a job scheduling request is made until the job is successfully queued in the global thread pool, reflecting the responsiveness and scheduling efficiency of the pool.") \
M(GlobalThreadPoolThreadCreationMicroseconds, "") \
M(GlobalThreadPoolJobScheduleLockWaitMicroseconds, "") \
\
M(LocalThreadPoolExpansions, "Counts the total number of times threads were borrowed from the global thread pool to expand local thread pools.") \
M(LocalThreadPoolShrinks, "Counts the total number of times threads were returned to the global thread pool from local thread pools.") \
M(LocalThreadPoolJobScheduleMicroseconds, "Total time spent waiting to schedule a job in a local thread pool. This metric measures the time elapsed from when a job scheduling request is initiated until the job is successfully queued in the local thread pool. Shows how much time the jobs were waiting for a free slot in the local pool.") \
M(LocalThreadPoolThreadCreationMicroseconds, "") \
M(LocalThreadPoolJobScheduleLockWaitMicroseconds, "") \
\
M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \
M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \
Expand Down
164 changes: 105 additions & 59 deletions src/Common/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ namespace ProfileEvents
extern const Event GlobalThreadPoolExpansions;
extern const Event GlobalThreadPoolShrinks;
extern const Event GlobalThreadPoolJobScheduleMicroseconds;
extern const Event GlobalThreadPoolThreadCreationMicroseconds;
extern const Event GlobalThreadPoolJobScheduleLockWaitMicroseconds;
extern const Event LocalThreadPoolExpansions;
extern const Event LocalThreadPoolShrinks;
extern const Event LocalThreadPoolJobScheduleMicroseconds;
extern const Event LocalThreadPoolThreadCreationMicroseconds;
extern const Event LocalThreadPoolJobScheduleLockWaitMicroseconds;
}

class JobWithPriority
Expand Down Expand Up @@ -109,6 +113,19 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(
, queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */)
, shutdown_on_exception(shutdown_on_exception_)
{
std::lock_guard lock(mutex);
jobs.reserve(queue_size);
while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v<Thread, std::thread> ? 256 : 1) ) )
{
try
{
createThreadNoLock();
}
catch (...)
{
break; /// failed to start more threads
}
}
}

template <typename Thread>
Expand All @@ -128,7 +145,17 @@ void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
if (need_start_threads)
{
/// Start new threads while there are more scheduled jobs in the queue and the limit `max_threads` is not reached.
startNewThreadsNoLock();
while (!shutdown && threads.size() < std::min(max_threads, scheduled_jobs))
{
try
{
createThreadNoLock();
}
catch (...)
{
break; /// failed to start more threads
}
}
}
else if (need_finish_free_threads)
{
Expand Down Expand Up @@ -168,6 +195,48 @@ void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
jobs.reserve(queue_size);
}

template <typename Thread>
void ThreadPoolImpl<Thread>::createThreadNoLock()
{
Stopwatch watch;

try
{
threads.emplace_front();
}
catch (const std::exception & e)
{
/// Most likely this is a std::bad_alloc exception
throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot allocate thread slot: {})", e.what());
}
catch (...)
{
throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot allocate thread slot");
}

try
{
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions
);
}
catch (const std::exception & e)
{
threads.pop_front();
throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot allocate thread: {})", e.what());
}
catch (...)
{
threads.pop_front();
throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "cannot allocate thread");
}
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds,
watch.elapsedMicroseconds());

}


template <typename Thread>
template <typename ReturnType>
Expand All @@ -194,6 +263,10 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:

{
std::unique_lock lock(mutex);
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobScheduleLockWaitMicroseconds : ProfileEvents::LocalThreadPoolJobScheduleLockWaitMicroseconds,
watch.elapsedMicroseconds());


auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };

Expand All @@ -211,31 +284,23 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:
/// We must not to allocate any memory after we emplaced a job in a queue.
/// Because if an exception would be thrown, we won't notify a thread about job occurrence.

/// Check if there are enough threads to process job.
/// Check if there are enough threads to process job
if (threads.size() < std::min(max_threads, scheduled_jobs + 1))
{
try
while (threads.size() < std::min(max_threads, scheduled_jobs + (std::is_same_v<Thread, std::thread> ? 32 : 1) ) )
{
threads.emplace_front();
}
catch (...)
{
/// Most likely this is a std::bad_alloc exception
return on_error("cannot allocate thread slot");
}

try
{
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions
);

}
catch (...)
{
threads.pop_front();
return on_error("cannot allocate thread");
try
{
createThreadNoLock();
}
catch (DB::Exception & e)
{
on_error(e.what());
}
catch (...)
{
on_error("can not start new thread");
}
}
}

Expand All @@ -252,45 +317,14 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, Priority priority, std:

/// Wake up a free thread to run the new job.
new_job_or_shutdown.notify_one();


ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolJobScheduleMicroseconds : ProfileEvents::LocalThreadPoolJobScheduleMicroseconds,
watch.elapsedMicroseconds());
return static_cast<ReturnType>(true);
}

template <typename Thread>
void ThreadPoolImpl<Thread>::startNewThreadsNoLock()
{
if (shutdown)
return;

/// Start new threads while there are more scheduled jobs in the queue and the limit `max_threads` is not reached.
while (threads.size() < std::min(scheduled_jobs, max_threads))
{
try
{
threads.emplace_front();
}
catch (...)
{
break; /// failed to start more threads
}

try
{
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
ProfileEvents::increment(
std::is_same_v<Thread, std::thread> ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions
);
}
catch (...)
{
threads.pop_front();
break; /// failed to start more threads
}
}
}

template <typename Thread>
void ThreadPoolImpl<Thread>::scheduleOrThrowOnError(Job job, Priority priority)
{
Expand All @@ -317,7 +351,8 @@ void ThreadPoolImpl<Thread>::wait()
/// If threads are waiting on condition variables, but there are some jobs in the queue
/// then it will prevent us from deadlock.
new_job_or_shutdown.notify_all();
job_finished.wait(lock, [this] { return scheduled_jobs == 0; });

no_jobs.wait(lock, [this] { return scheduled_jobs == 0; });

if (first_exception)
{
Expand Down Expand Up @@ -432,9 +467,20 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_

--scheduled_jobs;

job_finished.notify_all();
if (shutdown)
if (!shutdown)
{
job_finished.notify_one();
if (!scheduled_jobs)
{
no_jobs.notify_all();
}
}
else
{
job_finished.notify_one();
new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves.
no_jobs.notify_all();
}
}

new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads); });
Expand Down
5 changes: 3 additions & 2 deletions src/Common/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class ThreadPoolImpl
mutable std::mutex mutex;
std::condition_variable job_finished;
std::condition_variable new_job_or_shutdown;
std::condition_variable no_jobs;

Metric metric_threads;
Metric metric_active_threads;
Expand All @@ -139,8 +140,8 @@ class ThreadPoolImpl

void worker(typename std::list<Thread>::iterator thread_it);

/// Tries to start new threads if there are scheduled jobs and the limit `max_threads` is not reached. Must be called with `mutex` locked.
void startNewThreadsNoLock();
/// starts one thread (unless limit is reached). Must be called with `mutex` locked.
void createThreadNoLock();

void finalize();
void onDestroy();
Expand Down
1 change: 1 addition & 0 deletions src/Databases/DatabaseOnDisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Common/logger_useful.h>
#include <Common/filesystemHelpers.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadPool.h>
#include <Common/assert_cast.h>
#include <Databases/DatabaseOrdinary.h>
#include <Databases/DatabaseAtomic.h>
Expand Down
2 changes: 0 additions & 2 deletions src/Databases/DatabaseOrdinary.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#pragma once

#include <Databases/DatabaseOnDisk.h>
#include <Common/ThreadPool.h>


namespace DB
{
Expand Down
1 change: 0 additions & 1 deletion src/Disks/ObjectStorages/IObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

#include <Disks/ObjectStorages/StoredObject.h>
#include <Disks/DiskType.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/ObjectStorageKey.h>
#include <Disks/WriteMode.h>
#include <Interpreters/Context_fwd.h>
Expand Down

0 comments on commit 485917f

Please sign in to comment.