diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 29158c56bb5070..d2bcb426bad400 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -314,6 +314,7 @@ DEFINE_mInt32(doris_scanner_row_num, "16384"); DEFINE_mInt32(doris_scanner_row_bytes, "10485760"); // single read execute fragment max run time millseconds DEFINE_mInt32(doris_scanner_max_run_time_ms, "1000"); +DEFINE_mInt32(doris_exchange_block_max_wait_time_ms, "500"); // (Advanced) Maximum size of per-query receive-side buffer DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760"); DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 6c84d89fabac50..8a3ff93867c12e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -358,6 +358,8 @@ DECLARE_mInt32(doris_scanner_row_num); DECLARE_mInt32(doris_scanner_row_bytes); // single read execute fragment max run time millseconds DECLARE_mInt32(doris_scanner_max_run_time_ms); +//In the exchange sink operator, the maximum waiting time for a block to be sent. +DECLARE_mInt32(doris_exchange_block_max_wait_time_ms); // (Advanced) Maximum size of per-query receive-side buffer DECLARE_mInt32(exchg_node_buffer_size_bytes); DECLARE_mInt32(exchg_buffer_queue_capacity_factor); diff --git a/be/src/util/stopwatch.hpp b/be/src/util/stopwatch.hpp index ef9c71d16e5cbb..93ba381209a028 100644 --- a/be/src/util/stopwatch.hpp +++ b/be/src/util/stopwatch.hpp @@ -85,6 +85,8 @@ class CustomStopWatch { return end.tv_sec - _start.tv_sec; } + bool is_running() { return _running; } + private: timespec _start; uint64_t _total_time; // in nanosec diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index f46602e8d67158..939493db9a804a 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -263,6 +263,10 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t _mutable_block = MutableBlock::create_unique(block->clone_empty()); } + if (_mutable_block->rows() == 0 && block->rows() != 0) { + _max_block_life_time.start(); + } + { SCOPED_TIMER(_parent->merge_block_timer()); if (data) { @@ -275,7 +279,15 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, size_t } } - if (_mutable_block->rows() >= _batch_size || eos) { + // Avoid the need to execute until the EOS before sending data + // in scenarios with large data volumes, high filtering rates, and limit. + // (select * from xxx where xxx limit 1;) + if (_mutable_block->rows() >= _batch_size || + (_max_block_life_time.is_running() && + _max_block_life_time.elapsed_time() > + config::doris_exchange_block_max_wait_time_ms * 1000000) || + eos) { + _max_block_life_time.stop(); if (!_is_local) { RETURN_IF_ERROR(serialize_block(dest, num_receivers)); } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 0ff1f252d5441f..16efc5e4789e99 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -95,6 +95,7 @@ class BlockSerializer { private: pipeline::ExchangeSinkLocalState* _parent; std::unique_ptr _mutable_block; + MonotonicStopWatch _max_block_life_time; bool _is_local; const int _batch_size;