Skip to content

Commit

Permalink
[opt](exchange)Optimize read performance in scenarios with large data…
Browse files Browse the repository at this point in the history
… volumes, high filtering rates, and limits.
  • Loading branch information
hubgeter committed Jan 20, 2025
1 parent 744691a commit 4dc20b8
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 1 deletion.
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ DEFINE_mInt32(doris_scanner_row_num, "16384");
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
// single read execute fragment max run time millseconds
DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000");
DEFINE_mInt32(doris_exchange_block_max_wait_time_ms, "500");
// (Advanced) Maximum size of per-query receive-side buffer
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ DECLARE_mInt32(doris_scanner_row_num);
DECLARE_mInt32(doris_scanner_row_bytes);
// single read execute fragment max run time millseconds
DECLARE_mInt32(doris_scanner_max_run_time_ms);
//In the exchange sink operator, the maximum waiting time for a block to be sent.
DECLARE_mInt32(doris_exchange_block_max_wait_time_ms);
// (Advanced) Maximum size of per-query receive-side buffer
DECLARE_mInt32(exchg_node_buffer_size_bytes);
DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
Expand Down
2 changes: 2 additions & 0 deletions be/src/util/stopwatch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class CustomStopWatch {
return end.tv_sec - _start.tv_sec;
}

bool is_running() { return _running; }

private:
timespec _start;
uint64_t _total_time; // in nanosec
Expand Down
14 changes: 13 additions & 1 deletion be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t
_mutable_block = MutableBlock::create_unique(block->clone_empty());
}

if (_mutable_block->rows() == 0 && block->rows() != 0) {
_max_block_life_time.start();
}

{
SCOPED_TIMER(_parent->merge_block_timer());
if (data) {
Expand All @@ -275,7 +279,15 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t
}
}

if (_mutable_block->rows() >= _batch_size || eos) {
// Avoid the need to execute until the EOS before sending data
// in scenarios with large data volumes, high filtering rates, and limit.
// (select * from xxx where xxx limit 1;)
if (_mutable_block->rows() >= _batch_size ||
(_max_block_life_time.is_running() &&
_max_block_life_time.elapsed_time() >
config::doris_exchange_block_max_wait_time_ms * 1000000) ||
eos) {
_max_block_life_time.stop();
if (!_is_local) {
RETURN_IF_ERROR(serialize_block(dest, num_receivers));
}
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/vdata_stream_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class BlockSerializer {
private:
pipeline::ExchangeSinkLocalState* _parent;
std::unique_ptr<MutableBlock> _mutable_block;
MonotonicStopWatch _max_block_life_time;

bool _is_local;
const int _batch_size;
Expand Down

0 comments on commit 4dc20b8

Please sign in to comment.