diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 910bf69609edd1..563e4750165046 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -671,6 +671,8 @@ DEFINE_Int32(flush_thread_num_per_store, "6"); // number of thread for flushing memtable per store, for high priority load task DEFINE_Int32(high_priority_flush_thread_num_per_store, "6"); +DEFINE_Int32(wg_flush_thread_num_per_store, "6"); + // config for tablet meta checkpoint DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10"); DEFINE_mInt32(tablet_meta_checkpoint_min_interval_secs, "600"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 2d0dc128a2a93e..21325a0f011954 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -726,6 +726,9 @@ DECLARE_Int32(flush_thread_num_per_store); // number of thread for flushing memtable per store, for high priority load task DECLARE_Int32(high_priority_flush_thread_num_per_store); +// workload group's flush thread num +DECLARE_Int32(wg_flush_thread_num_per_store); + // config for tablet meta checkpoint DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num); DECLARE_mInt32(tablet_meta_checkpoint_min_interval_secs); diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 5cfc260d1b584b..378728f025cdb4 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -128,7 +128,7 @@ Status DeltaWriterV2::init() { RETURN_IF_ERROR(_rowset_writer->init(context)); ThreadPool* wg_thread_pool_ptr = nullptr; if (_state->get_query_ctx()) { - wg_thread_pool_ptr = _state->get_query_ctx()->get_non_pipe_exec_thread_pool(); + wg_thread_pool_ptr = _state->get_query_ctx()->get_memtable_flush_pool(); } RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info, wg_thread_pool_ptr, diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index b838af570a2f44..91c297b1960ca9 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -181,6 +181,7 @@ Status StorageEngine::_open() { RETURN_NOT_OK_STATUS_WITH_WARN(_check_file_descriptor_number(), "check fd number failed"); auto dirs = get_stores(); + _disk_num = dirs.size(); RETURN_IF_ERROR(load_data_dirs(dirs)); _memtable_flush_executor.reset(new MemTableFlushExecutor()); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index f2b5f421670e3a..9dc18dfb276ba6 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -224,6 +224,8 @@ class StorageEngine { std::set get_broken_paths() { return _broken_paths; } + int get_disk_num() { return _disk_num; } + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. @@ -469,6 +471,8 @@ class StorageEngine { std::unique_ptr _create_tablet_idx_lru_cache; + int _disk_num {-1}; + DISALLOW_COPY_AND_ASSIGN(StorageEngine); }; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 08de61f8931c67..bd5308aeba1b3f 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -775,11 +775,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, std::make_pair(params.params.fragment_instance_id, fragment_executor)); } - auto* current_thread_pool = query_ctx->get_non_pipe_exec_thread_pool(); - if (!current_thread_pool) { - current_thread_pool = _thread_pool.get(); - } - auto st = current_thread_pool->submit_func([this, fragment_executor, cb]() { + auto st = _thread_pool->submit_func([this, fragment_executor, cb]() { #ifndef BE_TEST SCOPED_ATTACH_TASK(fragment_executor->runtime_state()); #endif diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index bbcdc3b47711b4..40518e62cc8610 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -307,9 +307,9 @@ doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() { return _exec_env->pipeline_task_scheduler(); } -ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() { +ThreadPool* QueryContext::get_memtable_flush_pool() { if (_workload_group) { - return _non_pipe_thread_pool; + return _memtable_flush_pool; } else { return nullptr; } @@ -321,7 +321,7 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) { // see task_group_manager::delete_workload_group_by_ids _workload_group->add_mem_tracker_limiter(query_mem_tracker); _workload_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler, - &_non_pipe_thread_pool, &_remote_scan_task_scheduler); + &_memtable_flush_pool, &_remote_scan_task_scheduler); return Status::OK(); } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 2653eeddc8b32d..82e75b72ceeff5 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -236,7 +236,7 @@ class QueryContext { doris::pipeline::TaskScheduler* get_pipe_exec_scheduler(); - ThreadPool* get_non_pipe_exec_thread_pool(); + ThreadPool* get_memtable_flush_pool(); int64_t mem_limit() const { return _bytes_limit; } @@ -337,7 +337,7 @@ class QueryContext { doris::pipeline::TaskScheduler* _task_scheduler = nullptr; vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr; - ThreadPool* _non_pipe_thread_pool = nullptr; + ThreadPool* _memtable_flush_pool = nullptr; vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr; std::unique_ptr _execution_dependency; diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index fd885aaacb186d..372eecafd0770a 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -27,6 +27,7 @@ #include #include "common/logging.h" +#include "olap/storage_engine.h" #include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" #include "runtime/exec_env.h" @@ -429,19 +430,29 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e min_remote_scan_thread_num); } - if (_non_pipe_thread_pool == nullptr) { - std::unique_ptr thread_pool = nullptr; - auto ret = ThreadPoolBuilder("nonPip_" + tg_name) - .set_min_threads(1) - .set_max_threads(config::fragment_pool_thread_num_max) - .set_max_queue_size(config::fragment_pool_queue_size) - .set_cgroup_cpu_ctl(cg_cpu_ctl_ptr) - .build(&thread_pool); - if (!ret.ok()) { - LOG(INFO) << "[upsert wg thread pool] create non-pipline thread pool failed, gid=" - << tg_id; - } else { - _non_pipe_thread_pool = std::move(thread_pool); + if (_memtable_flush_pool == nullptr) { + int num_disk = ExecEnv::GetInstance()->get_storage_engine()->get_disk_num(); + // -1 means disk num may not be inited, so not create flush pool + if (num_disk != -1) { + std::unique_ptr thread_pool = nullptr; + + size_t min_threads = std::max(1, config::wg_flush_thread_num_per_store); + size_t max_threads = num_disk * min_threads; + std::string pool_name = "wg_flush_" + tg_name; + auto ret = ThreadPoolBuilder(pool_name) + .set_min_threads(min_threads) + .set_max_threads(max_threads) + .set_cgroup_cpu_ctl(cg_cpu_ctl_ptr) + .build(&thread_pool); + if (!ret.ok()) { + LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " failed, gid=" + << tg_id; + } else { + _memtable_flush_pool = std::move(thread_pool); + LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " succ, gid=" << tg_id + << ", max thread num=" << max_threads + << ", min thread num=" << min_threads; + } } } @@ -469,13 +480,13 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e void WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, vectorized::SimplifiedScanScheduler** scan_sched, - ThreadPool** non_pipe_thread_pool, + ThreadPool** memtable_flush_pool, vectorized::SimplifiedScanScheduler** remote_scan_sched) { std::shared_lock rlock(_task_sched_lock); *exec_sched = _task_sched.get(); *scan_sched = _scan_task_sched.get(); *remote_scan_sched = _remote_scan_task_sched.get(); - *non_pipe_thread_pool = _non_pipe_thread_pool.get(); + *memtable_flush_pool = _memtable_flush_pool.get(); } void WorkloadGroup::try_stop_schedulers() { @@ -489,9 +500,9 @@ void WorkloadGroup::try_stop_schedulers() { if (_remote_scan_task_sched) { _remote_scan_task_sched->stop(); } - if (_non_pipe_thread_pool) { - _non_pipe_thread_pool->shutdown(); - _non_pipe_thread_pool->wait(); + if (_memtable_flush_pool) { + _memtable_flush_pool->shutdown(); + _memtable_flush_pool->wait(); } } diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index cee35c66af8362..e41c1793233d64 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -190,7 +190,7 @@ class WorkloadGroup : public std::enable_shared_from_this { std::unique_ptr _task_sched {nullptr}; std::unique_ptr _scan_task_sched {nullptr}; std::unique_ptr _remote_scan_task_sched {nullptr}; - std::unique_ptr _non_pipe_thread_pool = nullptr; + std::unique_ptr _memtable_flush_pool = nullptr; }; using WorkloadGroupPtr = std::shared_ptr; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 4ed878a4634476..2982bf8174a5b5 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -96,25 +96,14 @@ Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* prof // This is a async thread, should lock the task ctx, to make sure runtimestate and profile // not deconstructed before the thread exit. auto task_ctx = state->get_task_execution_context(); - if (state->get_query_ctx() && state->get_query_ctx()->get_non_pipe_exec_thread_pool()) { - ThreadPool* pool_ptr = state->get_query_ctx()->get_non_pipe_exec_thread_pool(); - RETURN_IF_ERROR(pool_ptr->submit_func([this, state, profile, task_ctx]() { - auto task_lock = task_ctx.lock(); - if (task_lock == nullptr) { - return; - } - this->process_block(state, profile); - })); - } else { - RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( - [this, state, profile, task_ctx]() { - auto task_lock = task_ctx.lock(); - if (task_lock == nullptr) { - return; - } - this->process_block(state, profile); - })); - } + RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( + [this, state, profile, task_ctx]() { + auto task_lock = task_ctx.lock(); + if (task_lock == nullptr) { + return; + } + this->process_block(state, profile); + })); return Status::OK(); }