From 5016ffa874ca51cab73f0dc5fde3a98f88cab17f Mon Sep 17 00:00:00 2001 From: kakachen Date: Tue, 26 Nov 2024 17:08:23 +0800 Subject: [PATCH] [Opt] opt max scanner thread number in batch split mode. --- be/src/pipeline/exec/file_scan_operator.cpp | 44 ++++++++++++++++----- be/src/pipeline/exec/file_scan_operator.h | 3 ++ be/src/pipeline/exec/scan_operator.cpp | 7 ++-- be/src/pipeline/exec/scan_operator.h | 3 +- be/src/vec/exec/scan/scanner_context.cpp | 9 ++--- be/src/vec/exec/scan/scanner_context.h | 4 +- 6 files changed, 50 insertions(+), 20 deletions(-) diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 7afbb29134c0793..bf0d34f28154e83 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -26,6 +26,7 @@ #include "pipeline/exec/olap_scan_operator.h" #include "pipeline/exec/scan_operator.h" #include "vec/exec/format/format_common.h" +#include "vec/exec/scan/scanner_context.h" #include "vec/exec/scan/vfile_scanner.h" namespace doris::pipeline { @@ -37,9 +38,10 @@ Status FileScanLocalState::_init_scanners(std::list* s } auto& p = _parent->cast(); - uint32_t shard_num = std::min( - config::doris_scanner_thread_pool_thread_num / state()->query_parallel_instance_num(), - _max_scanners); + uint32_t shard_num = + std::min(config::doris_scanner_thread_pool_thread_num / + (_batch_split_mode ? 1 : state()->query_parallel_instance_num()), + _max_scanners); shard_num = std::max(shard_num, 1U); _kv_cache.reset(new vectorized::ShardedKVCache(shard_num)); for (int i = 0; i < _max_scanners; ++i) { @@ -52,6 +54,16 @@ Status FileScanLocalState::_init_scanners(std::list* s return Status::OK(); } +Status FileScanLocalState::start_scanners( + const std::list>& scanners) { + auto& p = _parent->cast(); + _scanner_ctx = vectorized::ScannerContext::create_shared( + state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), + _scan_dependency, p.is_serial_operator(), true, + _batch_split_mode ? 1 : state()->query_parallel_instance_num()); + return Status::OK(); +} + std::string FileScanLocalState::name_suffix() const { return fmt::format(" (id={}. nereids_id={}. table name = {})", std::to_string(_parent->node_id()), std::to_string(_parent->nereids_id()), @@ -60,21 +72,33 @@ std::string FileScanLocalState::name_suffix() const { void FileScanLocalState::set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) { - _max_scanners = - config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num(); - _max_scanners = std::max(std::max(_max_scanners, state->parallel_scan_max_scanners_count()), 1); - // For select * from table limit 10; should just use one thread. - if (should_run_serial()) { - _max_scanners = 1; - } if (scan_ranges.size() == 1) { auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range; if (scan_range.__isset.split_source) { auto split_source = scan_range.split_source; RuntimeProfile::Counter* get_split_timer = ADD_TIMER(_runtime_profile, "GetSplitTime"); + _max_scanners = config::doris_scanner_thread_pool_thread_num; + _max_scanners = + std::max(std::max(_max_scanners, state->parallel_scan_max_scanners_count()), 1); + // For select * from table limit 10; should just use one thread. + if (should_run_serial()) { + _max_scanners = 1; + } _split_source = std::make_shared( state, get_split_timer, split_source.split_source_id, split_source.num_splits, _max_scanners); + _batch_split_mode = true; + } + } + + if (!_batch_split_mode) { + _max_scanners = + config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num(); + _max_scanners = + std::max(std::max(_max_scanners, state->parallel_scan_max_scanners_count()), 1); + // For select * from table limit 10; should just use one thread. + if (should_run_serial()) { + _max_scanners = 1; } } if (_split_source == nullptr) { diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h index 2777a013d62f61a..8c726d0ff06200e 100644 --- a/be/src/pipeline/exec/file_scan_operator.h +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -49,6 +49,8 @@ class FileScanLocalState final : public ScanLocalState { Status _process_conjuncts(RuntimeState* state) override; Status _init_scanners(std::list* scanners) override; + Status start_scanners( + const std::list>& scanners) override; void set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) override; int parent_id() { return _parent->node_id(); } @@ -64,6 +66,7 @@ class FileScanLocalState final : public ScanLocalState { // KVCache _kv_cache; std::unique_ptr _kv_cache; TupleId _output_tuple_id = -1; + bool _batch_split_mode = false; }; class FileScanOperatorX final : public ScanOperatorX { diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 21c3103fe5a708a..875d20986f4b870 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -979,18 +979,19 @@ Status ScanLocalState::_prepare_scanners() { scanner->set_query_statistics(_query_statistics.get()); } COUNTER_SET(_num_scanners, static_cast(scanners.size())); - RETURN_IF_ERROR(_start_scanners(_scanners)); + RETURN_IF_ERROR(start_scanners(_scanners)); } return Status::OK(); } template -Status ScanLocalState::_start_scanners( +Status ScanLocalState::start_scanners( const std::list>& scanners) { auto& p = _parent->cast(); _scanner_ctx = vectorized::ScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - _scan_dependency, p.is_serial_operator(), p.is_file_scan_operator()); + _scan_dependency, p.is_serial_operator(), p.is_file_scan_operator(), + state()->query_parallel_instance_num()); return Status::OK(); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 5d41c800383bd06..1338a975e5a94ee 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -298,7 +298,8 @@ class ScanLocalState : public ScanLocalStateBase { Status _prepare_scanners(); // Submit the scanner to the thread pool and start execution - Status _start_scanners(const std::list>& scanners); + virtual Status start_scanners( + const std::list>& scanners); // For some conjunct there is chance to elimate cast operator // Eg. Variant's sub column could eliminate cast in storage layer if diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index d37d26b09f78156..825c675f2b060ce 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -47,7 +47,7 @@ ScannerContext::ScannerContext( const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, std::shared_ptr dependency, bool ignore_data_distribution, - bool is_file_scan_operator) + bool is_file_scan_operator, int num_parallel_instances) : HasTaskExecutionCtx(state), _state(state), _local_state(local_state), @@ -60,7 +60,8 @@ ScannerContext::ScannerContext( _scanner_scheduler_global(state->exec_env()->scanner_scheduler()), _all_scanners(scanners.begin(), scanners.end()), _ignore_data_distribution(ignore_data_distribution), - _is_file_scan_operator(is_file_scan_operator) { + _is_file_scan_operator(is_file_scan_operator), + _num_parallel_instances(num_parallel_instances) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); @@ -102,8 +103,6 @@ Status ScannerContext::init() { _local_state->_runtime_profile->add_info_string("UseSpecificThreadToken", thread_token == nullptr ? "False" : "True"); - const int num_parallel_instances = _state->query_parallel_instance_num(); - // _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance. // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value // is larger than 10MB. @@ -173,7 +172,7 @@ Status ScannerContext::init() { } else { const size_t factor = _is_file_scan_operator ? 1 : 4; _max_thread_num = factor * (config::doris_scanner_thread_pool_thread_num / - num_parallel_instances); + _num_parallel_instances); // In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed // in parallel. We need to make sure the _max_thread_num is smaller than previous value. _max_thread_num = diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 82e0a06799940bd..fb1073f85c5c774 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -107,7 +107,8 @@ class ScannerContext : public std::enable_shared_from_this, const RowDescriptor* output_row_descriptor, const std::list>& scanners, int64_t limit_, std::shared_ptr dependency, - bool ignore_data_distribution, bool is_file_scan_operator); + bool ignore_data_distribution, bool is_file_scan_operator, + int num_parallel_instances); ~ScannerContext() override { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker); @@ -215,6 +216,7 @@ class ScannerContext : public std::enable_shared_from_this, std::shared_ptr _dependency = nullptr; bool _ignore_data_distribution = false; bool _is_file_scan_operator = false; + int _num_parallel_instances; // for scaling up the running scanners size_t _estimated_block_size = 0;