Skip to content

Commit

Permalink
[Opt] opt max scanner thread number in batch split mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaka11chen committed Dec 19, 2024
1 parent 14d928b commit 5016ffa
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 20 deletions.
44 changes: 34 additions & 10 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/scan_operator.h"
#include "vec/exec/format/format_common.h"
#include "vec/exec/scan/scanner_context.h"
#include "vec/exec/scan/vfile_scanner.h"

namespace doris::pipeline {
Expand All @@ -37,9 +38,10 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
}

auto& p = _parent->cast<FileScanOperatorX>();
uint32_t shard_num = std::min(
config::doris_scanner_thread_pool_thread_num / state()->query_parallel_instance_num(),
_max_scanners);
uint32_t shard_num =
std::min(config::doris_scanner_thread_pool_thread_num /
(_batch_split_mode ? 1 : state()->query_parallel_instance_num()),
_max_scanners);
shard_num = std::max(shard_num, 1U);
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
for (int i = 0; i < _max_scanners; ++i) {
Expand All @@ -52,6 +54,16 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
return Status::OK();
}

Status FileScanLocalState::start_scanners(
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners) {
auto& p = _parent->cast<FileScanOperatorX>();
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
_scan_dependency, p.is_serial_operator(), true,
_batch_split_mode ? 1 : state()->query_parallel_instance_num());
return Status::OK();
}

std::string FileScanLocalState::name_suffix() const {
return fmt::format(" (id={}. nereids_id={}. table name = {})",
std::to_string(_parent->node_id()), std::to_string(_parent->nereids_id()),
Expand All @@ -60,21 +72,33 @@ std::string FileScanLocalState::name_suffix() const {

void FileScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
_max_scanners =
config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num();
_max_scanners = std::max(std::max(_max_scanners, state->parallel_scan_max_scanners_count()), 1);
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
_max_scanners = 1;
}
if (scan_ranges.size() == 1) {
auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range;
if (scan_range.__isset.split_source) {
auto split_source = scan_range.split_source;
RuntimeProfile::Counter* get_split_timer = ADD_TIMER(_runtime_profile, "GetSplitTime");
_max_scanners = config::doris_scanner_thread_pool_thread_num;
_max_scanners =
std::max(std::max(_max_scanners, state->parallel_scan_max_scanners_count()), 1);
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
_max_scanners = 1;
}
_split_source = std::make_shared<vectorized::RemoteSplitSourceConnector>(
state, get_split_timer, split_source.split_source_id, split_source.num_splits,
_max_scanners);
_batch_split_mode = true;
}
}

if (!_batch_split_mode) {
_max_scanners =
config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num();
_max_scanners =
std::max(std::max(_max_scanners, state->parallel_scan_max_scanners_count()), 1);
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
_max_scanners = 1;
}
}
if (_split_source == nullptr) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/file_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {

Status _process_conjuncts(RuntimeState* state) override;
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override;
Status start_scanners(
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners) override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
int parent_id() { return _parent->node_id(); }
Expand All @@ -64,6 +66,7 @@ class FileScanLocalState final : public ScanLocalState<FileScanLocalState> {
// KVCache<std::string> _kv_cache;
std::unique_ptr<vectorized::ShardedKVCache> _kv_cache;
TupleId _output_tuple_id = -1;
bool _batch_split_mode = false;
};

class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -979,18 +979,19 @@ Status ScanLocalState<Derived>::_prepare_scanners() {
scanner->set_query_statistics(_query_statistics.get());
}
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
RETURN_IF_ERROR(_start_scanners(_scanners));
RETURN_IF_ERROR(start_scanners(_scanners));
}
return Status::OK();
}

template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
Status ScanLocalState<Derived>::start_scanners(
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
_scan_dependency, p.is_serial_operator(), p.is_file_scan_operator());
_scan_dependency, p.is_serial_operator(), p.is_file_scan_operator(),
state()->query_parallel_instance_num());
return Status::OK();
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ class ScanLocalState : public ScanLocalStateBase {
Status _prepare_scanners();

// Submit the scanner to the thread pool and start execution
Status _start_scanners(const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners);
virtual Status start_scanners(
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners);

// For some conjunct there is chance to elimate cast operator
// Eg. Variant's sub column could eliminate cast in storage layer if
Expand Down
9 changes: 4 additions & 5 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ScannerContext::ScannerContext(
const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_,
std::shared_ptr<pipeline::Dependency> dependency, bool ignore_data_distribution,
bool is_file_scan_operator)
bool is_file_scan_operator, int num_parallel_instances)
: HasTaskExecutionCtx(state),
_state(state),
_local_state(local_state),
Expand All @@ -60,7 +60,8 @@ ScannerContext::ScannerContext(
_scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
_all_scanners(scanners.begin(), scanners.end()),
_ignore_data_distribution(ignore_data_distribution),
_is_file_scan_operator(is_file_scan_operator) {
_is_file_scan_operator(is_file_scan_operator),
_num_parallel_instances(num_parallel_instances) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
Expand Down Expand Up @@ -102,8 +103,6 @@ Status ScannerContext::init() {
_local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
thread_token == nullptr ? "False" : "True");

const int num_parallel_instances = _state->query_parallel_instance_num();

// _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance.
// scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
// is larger than 10MB.
Expand Down Expand Up @@ -173,7 +172,7 @@ Status ScannerContext::init() {
} else {
const size_t factor = _is_file_scan_operator ? 1 : 4;
_max_thread_num = factor * (config::doris_scanner_thread_pool_thread_num /
num_parallel_instances);
_num_parallel_instances);
// In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed
// in parallel. We need to make sure the _max_thread_num is smaller than previous value.
_max_thread_num =
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, std::shared_ptr<pipeline::Dependency> dependency,
bool ignore_data_distribution, bool is_file_scan_operator);
bool ignore_data_distribution, bool is_file_scan_operator,
int num_parallel_instances);

~ScannerContext() override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
Expand Down Expand Up @@ -215,6 +216,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
bool _ignore_data_distribution = false;
bool _is_file_scan_operator = false;
int _num_parallel_instances;

// for scaling up the running scanners
size_t _estimated_block_size = 0;
Expand Down

0 comments on commit 5016ffa

Please sign in to comment.