From 2f30556185c23ff6f7ebbe073528994de7421655 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Fri, 27 Oct 2023 16:29:18 +0800 Subject: [PATCH] 1 --- be/src/common/config.cpp | 3 +++ be/src/common/config.h | 3 +++ be/src/runtime/group_commit_mgr.cpp | 37 +++++++++++++++++++++++++---- be/src/runtime/group_commit_mgr.h | 10 ++++++-- 4 files changed, 47 insertions(+), 6 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 58121768723d7d2..d105cbba4c7b45c 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1117,6 +1117,9 @@ DEFINE_Bool(ignore_always_true_predicate_for_segment, "true"); // Dir of default timezone files DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo"); +// Max size(bytes) of group commit queues, used for mem back pressure. +DEFINE_Int32(group_commit_max_queue_size, "65536"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index f8350d8731666c2..8439668d20d6b76 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1187,6 +1187,9 @@ DECLARE_Bool(ignore_always_true_predicate_for_segment); // Dir of default timezone files DECLARE_String(default_tzfiles_path); +// Max size(bytes) of group commit queues, used for mem back pressure. +DECLARE_Int32(group_commit_max_queue_size); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index be49cca52581f44..7fa82781c634f81 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -23,7 +23,11 @@ #include #include +#include +#include + #include "client_cache.h" +#include "common/config.h" #include "common/object_pool.h" #include "exec/data_sink.h" #include "io/fs/stream_load_pipe.h" @@ -34,6 +38,7 @@ #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_context.h" #include "util/thrift_rpc_helper.h" +#include "vec/core/future_block.h" #include "vec/exec/scan/new_file_scan_node.h" #include "vec/sink/group_commit_block_sink.h" @@ -45,10 +50,14 @@ Status LoadBlockQueue::add_block(std::shared_ptr block) DCHECK(block->get_schema_version() == schema_version); std::unique_lock l(*_mutex); RETURN_IF_ERROR(_status); + // TODO: change it. + while (bytes() > config::group_commit_max_queue_size) { + _put_cond.wait(l); + } if (block->rows() > 0) { _block_queue.push_back(block); } - _cv->notify_one(); + _get_cond.notify_all(); return Status::OK(); } @@ -79,9 +88,9 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo } } #if !defined(USE_BTHREAD_SCANNER) - _cv->wait_for(l, std::chrono::milliseconds(left_milliseconds)); + _get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds)); #else - _cv->wait_for(l, left_milliseconds * 1000); + _get_cond.wait_for(l, left_milliseconds * 1000); #endif } if (!_block_queue.empty()) { @@ -90,6 +99,7 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo fblock->swap_future_block(future_block); *find_block = true; _block_queue.pop_front(); + _put_cond.notify_all(); } if (_block_queue.empty() && need_commit && _load_ids.empty()) { *eos = true; @@ -103,7 +113,7 @@ void LoadBlockQueue::remove_load_id(const UniqueId& load_id) { std::unique_lock l(*_mutex); if (_load_ids.find(load_id) != _load_ids.end()) { _load_ids.erase(load_id); - _cv->notify_one(); + _get_cond.notify_all(); } } @@ -132,6 +142,14 @@ void LoadBlockQueue::cancel(const Status& st) { } } +size_t LoadBlockQueue::bytes() { + return std::accumulate( + _block_queue.begin(), _block_queue.end(), 0, + [](size_t block_queue_mem_size, std::shared_ptr block) { + return block_queue_mem_size + block->bytes(); + }); +} + Status GroupCommitTable::get_first_block_load_queue( int64_t table_id, std::shared_ptr block, std::shared_ptr& load_block_queue) { @@ -389,6 +407,14 @@ Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id, return Status::OK(); } +size_t GroupCommitTable::load_block_queues_bytes() { + return std::accumulate( + _load_block_queues.begin(), _load_block_queues.end(), 0, + [](size_t block_queues_size, + std::pair> + block_queues) { return block_queues_size + block_queues.second->bytes(); }); +} + GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) { static_cast(ThreadPoolBuilder("InsertIntoGroupCommitThreadPool") .set_min_threads(config::group_commit_insert_threads) @@ -557,4 +583,7 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i } return group_commit_table->get_load_block_queue(instance_id, load_block_queue); } +size_t GroupCommitMgr::all_block_queues_bytes() { + return std::accumulate(, , 0, ) +} } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index aa8d05534ca29cc..4baf7cc73213625 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -48,7 +48,6 @@ class LoadBlockQueue { schema_version(schema_version), _start_time(std::chrono::steady_clock::now()) { _mutex = std::make_shared(); - _cv = std::make_shared(); }; Status add_block(std::shared_ptr block); @@ -56,6 +55,8 @@ class LoadBlockQueue { Status add_load_id(const UniqueId& load_id); void remove_load_id(const UniqueId& load_id); void cancel(const Status& st); + // memory consumption of one load block queue, used for back pressure. + size_t bytes(); UniqueId load_instance_id; std::string label; @@ -67,7 +68,8 @@ class LoadBlockQueue { std::chrono::steady_clock::time_point _start_time; std::shared_ptr _mutex; - std::shared_ptr _cv; + doris::ConditionVariable _put_cond; + doris::ConditionVariable _get_cond; // the set of load ids of all blocks in this queue std::set _load_ids; std::list> _block_queue; @@ -85,6 +87,8 @@ class GroupCommitTable { std::shared_ptr& load_block_queue); Status get_load_block_queue(const TUniqueId& instance_id, std::shared_ptr& load_block_queue); + // memory consumption of load block queues, used for back pressure. + size_t load_block_queues_bytes(); private: Status _create_group_commit_load(std::shared_ptr& load_block_queue); @@ -127,6 +131,8 @@ class GroupCommitMgr { Status get_first_block_load_queue(int64_t db_id, int64_t table_id, std::shared_ptr block, std::shared_ptr& load_block_queue); + // memory consumption of all tables' load block queues, used for back pressure. + size_t all_block_queues_bytes(); private: // used by insert into