Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Oct 27, 2023
1 parent 31d2a9a commit 2f30556
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 6 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,9 @@ DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
// Dir of default timezone files
DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");

// Max size(bytes) of group commit queues, used for mem back pressure.
DEFINE_Int32(group_commit_max_queue_size, "65536");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,9 @@ DECLARE_Bool(ignore_always_true_predicate_for_segment);
// Dir of default timezone files
DECLARE_String(default_tzfiles_path);

// Max size(bytes) of group commit queues, used for mem back pressure.
DECLARE_Int32(group_commit_max_queue_size);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
37 changes: 33 additions & 4 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>

#include <memory>
#include <numeric>

#include "client_cache.h"
#include "common/config.h"
#include "common/object_pool.h"
#include "exec/data_sink.h"
#include "io/fs/stream_load_pipe.h"
Expand All @@ -34,6 +38,7 @@
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/thrift_rpc_helper.h"
#include "vec/core/future_block.h"
#include "vec/exec/scan/new_file_scan_node.h"
#include "vec/sink/group_commit_block_sink.h"

Expand All @@ -45,10 +50,14 @@ Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block)
DCHECK(block->get_schema_version() == schema_version);
std::unique_lock l(*_mutex);
RETURN_IF_ERROR(_status);
// TODO: change it.
while (bytes() > config::group_commit_max_queue_size) {
_put_cond.wait(l);
}
if (block->rows() > 0) {
_block_queue.push_back(block);
}
_cv->notify_one();
_get_cond.notify_all();
return Status::OK();
}

Expand Down Expand Up @@ -79,9 +88,9 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
}
}
#if !defined(USE_BTHREAD_SCANNER)
_cv->wait_for(l, std::chrono::milliseconds(left_milliseconds));
_get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds));
#else
_cv->wait_for(l, left_milliseconds * 1000);
_get_cond.wait_for(l, left_milliseconds * 1000);
#endif
}
if (!_block_queue.empty()) {
Expand All @@ -90,6 +99,7 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
fblock->swap_future_block(future_block);
*find_block = true;
_block_queue.pop_front();
_put_cond.notify_all();
}
if (_block_queue.empty() && need_commit && _load_ids.empty()) {
*eos = true;
Expand All @@ -103,7 +113,7 @@ void LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
std::unique_lock l(*_mutex);
if (_load_ids.find(load_id) != _load_ids.end()) {
_load_ids.erase(load_id);
_cv->notify_one();
_get_cond.notify_all();
}
}

Expand Down Expand Up @@ -132,6 +142,14 @@ void LoadBlockQueue::cancel(const Status& st) {
}
}

size_t LoadBlockQueue::bytes() {
return std::accumulate(
_block_queue.begin(), _block_queue.end(), 0,
[](size_t block_queue_mem_size, std::shared_ptr<vectorized::FutureBlock> block) {
return block_queue_mem_size + block->bytes();
});
}

Status GroupCommitTable::get_first_block_load_queue(
int64_t table_id, std::shared_ptr<vectorized::FutureBlock> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
Expand Down Expand Up @@ -389,6 +407,14 @@ Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id,
return Status::OK();
}

size_t GroupCommitTable::load_block_queues_bytes() {
return std::accumulate(
_load_block_queues.begin(), _load_block_queues.end(), 0,
[](size_t block_queues_size,
std::pair<const doris::UniqueId, std::shared_ptr<doris::LoadBlockQueue>>
block_queues) { return block_queues_size + block_queues.second->bytes(); });
}

GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) {
static_cast<void>(ThreadPoolBuilder("InsertIntoGroupCommitThreadPool")
.set_min_threads(config::group_commit_insert_threads)
Expand Down Expand Up @@ -557,4 +583,7 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i
}
return group_commit_table->get_load_block_queue(instance_id, load_block_queue);
}
size_t GroupCommitMgr::all_block_queues_bytes() {
return std::accumulate(, , 0, )
}
} // namespace doris
10 changes: 8 additions & 2 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ class LoadBlockQueue {
schema_version(schema_version),
_start_time(std::chrono::steady_clock::now()) {
_mutex = std::make_shared<doris::Mutex>();
_cv = std::make_shared<doris::ConditionVariable>();
};

Status add_block(std::shared_ptr<vectorized::FutureBlock> block);
Status get_block(vectorized::Block* block, bool* find_block, bool* eos);
Status add_load_id(const UniqueId& load_id);
void remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
// memory consumption of one load block queue, used for back pressure.
size_t bytes();

UniqueId load_instance_id;
std::string label;
Expand All @@ -67,7 +68,8 @@ class LoadBlockQueue {
std::chrono::steady_clock::time_point _start_time;

std::shared_ptr<doris::Mutex> _mutex;
std::shared_ptr<doris::ConditionVariable> _cv;
doris::ConditionVariable _put_cond;
doris::ConditionVariable _get_cond;
// the set of load ids of all blocks in this queue
std::set<UniqueId> _load_ids;
std::list<std::shared_ptr<vectorized::FutureBlock>> _block_queue;
Expand All @@ -85,6 +87,8 @@ class GroupCommitTable {
std::shared_ptr<LoadBlockQueue>& load_block_queue);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
// memory consumption of load block queues, used for back pressure.
size_t load_block_queues_bytes();

private:
Status _create_group_commit_load(std::shared_ptr<LoadBlockQueue>& load_block_queue);
Expand Down Expand Up @@ -127,6 +131,8 @@ class GroupCommitMgr {
Status get_first_block_load_queue(int64_t db_id, int64_t table_id,
std::shared_ptr<vectorized::FutureBlock> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
// memory consumption of all tables' load block queues, used for back pressure.
size_t all_block_queues_bytes();

private:
// used by insert into
Expand Down

0 comments on commit 2f30556

Please sign in to comment.