Skip to content

Commit

Permalink
4
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Oct 28, 2023
1 parent f00cea5 commit 608e899
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
9 changes: 5 additions & 4 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block)
if (block->rows() > 0) {
_block_queue.push_back(block);
*_all_block_queues_bytes += block->bytes();
*_single_block_queue_bytes += block->bytes();
}
_get_cond.notify_all();
return Status::OK();
Expand All @@ -77,7 +78,7 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
}
while (_status.ok() && _block_queue.empty() &&
(!need_commit || (need_commit && !_load_ids.empty()))) {
CHECK(*_all_block_queues_bytes == 0);
CHECK(*_single_block_queue_bytes == 0);
auto left_milliseconds = config::group_commit_interval_ms;
if (!need_commit) {
left_milliseconds = config::group_commit_interval_ms -
Expand All @@ -102,11 +103,11 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
*find_block = true;
_block_queue.pop_front();
*_all_block_queues_bytes -= fblock->bytes();
CHECK(*_all_block_queues_bytes >= 0);
*_single_block_queue_bytes -= block->bytes();
_put_cond.notify_all();
}
if (_block_queue.empty() && need_commit && _load_ids.empty()) {
CHECK(*_all_block_queues_bytes == 0);
CHECK(*_single_block_queue_bytes == 0);
*eos = true;
} else {
*eos = false;
Expand Down Expand Up @@ -142,7 +143,7 @@ void LoadBlockQueue::cancel(const Status& st) {
std::unique_lock<doris::Mutex> l0(*(future_block->lock));
future_block->set_result(st, future_block->rows(), 0);
*_all_block_queues_bytes -= future_block->bytes();
CHECK(*_all_block_queues_bytes >= 0);
*_single_block_queue_bytes -= future_block->bytes();
future_block->cv->notify_all();
}
_block_queue.pop_front();
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class LoadBlockQueue {
_start_time(std::chrono::steady_clock::now()),
_all_block_queues_bytes(all_block_queues_bytes) {
_mutex = std::make_shared<doris::Mutex>();
_single_block_queue_bytes = std::make_shared<std::atomic_size_t>(0);
};

Status add_block(std::shared_ptr<vectorized::FutureBlock> block);
Expand Down Expand Up @@ -80,6 +81,8 @@ class LoadBlockQueue {
Status _status = Status::OK();
// memory consumption of all tables' load block queues, used for back pressure.
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
// memory consumption of one load block queue, used for correctness check.
std::shared_ptr<std::atomic_size_t> _single_block_queue_bytes;
};

class GroupCommitTable {
Expand Down

0 comments on commit 608e899

Please sign in to comment.