Skip to content

Commit

Permalink
[pick]reset memtable flush thread num (apache#37092)
Browse files Browse the repository at this point in the history
## Proposed changes

pick apache#37028
  • Loading branch information
wangbo authored Jul 2, 2024
1 parent b63e6ac commit f5572ac
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 49 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,8 @@ DEFINE_Int32(flush_thread_num_per_store, "6");
// number of thread for flushing memtable per store, for high priority load task
DEFINE_Int32(high_priority_flush_thread_num_per_store, "6");

DEFINE_Int32(wg_flush_thread_num_per_store, "6");

// config for tablet meta checkpoint
DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
DEFINE_mInt32(tablet_meta_checkpoint_min_interval_secs, "600");
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,9 @@ DECLARE_Int32(flush_thread_num_per_store);
// number of thread for flushing memtable per store, for high priority load task
DECLARE_Int32(high_priority_flush_thread_num_per_store);

// workload group's flush thread num
DECLARE_Int32(wg_flush_thread_num_per_store);

// config for tablet meta checkpoint
DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num);
DECLARE_mInt32(tablet_meta_checkpoint_min_interval_secs);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Status DeltaWriterV2::init() {
RETURN_IF_ERROR(_rowset_writer->init(context));
ThreadPool* wg_thread_pool_ptr = nullptr;
if (_state->get_query_ctx()) {
wg_thread_pool_ptr = _state->get_query_ctx()->get_non_pipe_exec_thread_pool();
wg_thread_pool_ptr = _state->get_query_ctx()->get_memtable_flush_pool();
}
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info,
wg_thread_pool_ptr,
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ Status StorageEngine::_open() {
RETURN_NOT_OK_STATUS_WITH_WARN(_check_file_descriptor_number(), "check fd number failed");

auto dirs = get_stores<false>();
_disk_num = dirs.size();
RETURN_IF_ERROR(load_data_dirs(dirs));

_memtable_flush_executor.reset(new MemTableFlushExecutor());
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ class StorageEngine {

std::set<string> get_broken_paths() { return _broken_paths; }

int get_disk_num() { return _disk_num; }

private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
Expand Down Expand Up @@ -469,6 +471,8 @@ class StorageEngine {

std::unique_ptr<CreateTabletIdxCache> _create_tablet_idx_lru_cache;

int _disk_num {-1};

DISALLOW_COPY_AND_ASSIGN(StorageEngine);
};

Expand Down
6 changes: 1 addition & 5 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,11 +775,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
std::make_pair(params.params.fragment_instance_id, fragment_executor));
}

auto* current_thread_pool = query_ctx->get_non_pipe_exec_thread_pool();
if (!current_thread_pool) {
current_thread_pool = _thread_pool.get();
}
auto st = current_thread_pool->submit_func([this, fragment_executor, cb]() {
auto st = _thread_pool->submit_func([this, fragment_executor, cb]() {
#ifndef BE_TEST
SCOPED_ATTACH_TASK(fragment_executor->runtime_state());
#endif
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,9 @@ doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
return _exec_env->pipeline_task_scheduler();
}

ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() {
ThreadPool* QueryContext::get_memtable_flush_pool() {
if (_workload_group) {
return _non_pipe_thread_pool;
return _memtable_flush_pool;
} else {
return nullptr;
}
Expand All @@ -321,7 +321,7 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) {
// see task_group_manager::delete_workload_group_by_ids
_workload_group->add_mem_tracker_limiter(query_mem_tracker);
_workload_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler,
&_non_pipe_thread_pool, &_remote_scan_task_scheduler);
&_memtable_flush_pool, &_remote_scan_task_scheduler);
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class QueryContext {

doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();

ThreadPool* get_non_pipe_exec_thread_pool();
ThreadPool* get_memtable_flush_pool();

int64_t mem_limit() const { return _bytes_limit; }

Expand Down Expand Up @@ -337,7 +337,7 @@ class QueryContext {

doris::pipeline::TaskScheduler* _task_scheduler = nullptr;
vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
ThreadPool* _non_pipe_thread_pool = nullptr;
ThreadPool* _memtable_flush_pool = nullptr;
vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
std::unique_ptr<pipeline::Dependency> _execution_dependency;

Expand Down
47 changes: 29 additions & 18 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <utility>

#include "common/logging.h"
#include "olap/storage_engine.h"
#include "pipeline/task_queue.h"
#include "pipeline/task_scheduler.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -429,19 +430,29 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
min_remote_scan_thread_num);
}

if (_non_pipe_thread_pool == nullptr) {
std::unique_ptr<ThreadPool> thread_pool = nullptr;
auto ret = ThreadPoolBuilder("nonPip_" + tg_name)
.set_min_threads(1)
.set_max_threads(config::fragment_pool_thread_num_max)
.set_max_queue_size(config::fragment_pool_queue_size)
.set_cgroup_cpu_ctl(cg_cpu_ctl_ptr)
.build(&thread_pool);
if (!ret.ok()) {
LOG(INFO) << "[upsert wg thread pool] create non-pipline thread pool failed, gid="
<< tg_id;
} else {
_non_pipe_thread_pool = std::move(thread_pool);
if (_memtable_flush_pool == nullptr) {
int num_disk = ExecEnv::GetInstance()->get_storage_engine()->get_disk_num();
// -1 means disk num may not be inited, so not create flush pool
if (num_disk != -1) {
std::unique_ptr<ThreadPool> thread_pool = nullptr;

size_t min_threads = std::max(1, config::wg_flush_thread_num_per_store);
size_t max_threads = num_disk * min_threads;
std::string pool_name = "wg_flush_" + tg_name;
auto ret = ThreadPoolBuilder(pool_name)
.set_min_threads(min_threads)
.set_max_threads(max_threads)
.set_cgroup_cpu_ctl(cg_cpu_ctl_ptr)
.build(&thread_pool);
if (!ret.ok()) {
LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " failed, gid="
<< tg_id;
} else {
_memtable_flush_pool = std::move(thread_pool);
LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " succ, gid=" << tg_id
<< ", max thread num=" << max_threads
<< ", min thread num=" << min_threads;
}
}
}

Expand Down Expand Up @@ -469,13 +480,13 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e

void WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched,
vectorized::SimplifiedScanScheduler** scan_sched,
ThreadPool** non_pipe_thread_pool,
ThreadPool** memtable_flush_pool,
vectorized::SimplifiedScanScheduler** remote_scan_sched) {
std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
*exec_sched = _task_sched.get();
*scan_sched = _scan_task_sched.get();
*remote_scan_sched = _remote_scan_task_sched.get();
*non_pipe_thread_pool = _non_pipe_thread_pool.get();
*memtable_flush_pool = _memtable_flush_pool.get();
}

void WorkloadGroup::try_stop_schedulers() {
Expand All @@ -489,9 +500,9 @@ void WorkloadGroup::try_stop_schedulers() {
if (_remote_scan_task_sched) {
_remote_scan_task_sched->stop();
}
if (_non_pipe_thread_pool) {
_non_pipe_thread_pool->shutdown();
_non_pipe_thread_pool->wait();
if (_memtable_flush_pool) {
_memtable_flush_pool->shutdown();
_memtable_flush_pool->wait();
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr};
std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched {nullptr};
std::unique_ptr<vectorized::SimplifiedScanScheduler> _remote_scan_task_sched {nullptr};
std::unique_ptr<ThreadPool> _non_pipe_thread_pool = nullptr;
std::unique_ptr<ThreadPool> _memtable_flush_pool = nullptr;
};

using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
Expand Down
27 changes: 8 additions & 19 deletions be/src/vec/sink/writer/async_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,14 @@ Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* prof
// This is a async thread, should lock the task ctx, to make sure runtimestate and profile
// not deconstructed before the thread exit.
auto task_ctx = state->get_task_execution_context();
if (state->get_query_ctx() && state->get_query_ctx()->get_non_pipe_exec_thread_pool()) {
ThreadPool* pool_ptr = state->get_query_ctx()->get_non_pipe_exec_thread_pool();
RETURN_IF_ERROR(pool_ptr->submit_func([this, state, profile, task_ctx]() {
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
return;
}
this->process_block(state, profile);
}));
} else {
RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
[this, state, profile, task_ctx]() {
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
return;
}
this->process_block(state, profile);
}));
}
RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
[this, state, profile, task_ctx]() {
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
return;
}
this->process_block(state, profile);
}));
return Status::OK();
}

Expand Down

0 comments on commit f5572ac

Please sign in to comment.