Skip to content

Commit

Permalink
[test](scanner) Scanner scheduler unit test (#47783)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Unit test for scanner schedule. Adaptive scan schedule is introduced by
#44690

* ScannerContext::init
* ScannerContext::_push_back_scan_task
* ScannerContext::_get_margin
* ScannerContext::_pull_next_scan_task
* ScannerContext::_schedule_scan_task 
* Additional test for scan operator, make sure
`adaptive_pipeline_task_serial_read_on_limit` is working correctlly.
* ScannerContext::get_free_block
* ScannerContext::return_free_block
* ScannerContext::get_block_from_queue
  • Loading branch information
zhiqiang-hhhh authored Feb 18, 2025
1 parent dd0c2d8 commit 5cec569
Show file tree
Hide file tree
Showing 9 changed files with 1,104 additions and 51 deletions.
9 changes: 7 additions & 2 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ class RuntimeState {
: _query_options.mem_limit / 20;
}

int32_t max_column_reader_num() const {
return _query_options.__isset.max_column_reader_num ? _query_options.max_column_reader_num
: 20000;
}

ObjectPool* obj_pool() const { return _obj_pool.get(); }

const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
Expand Down Expand Up @@ -215,8 +220,8 @@ class RuntimeState {
// _unreported_error_idx to _errors_log.size()
void get_unreported_errors(std::vector<std::string>* new_errors);

[[nodiscard]] bool is_cancelled() const;
Status cancel_reason() const;
[[nodiscard]] MOCK_FUNCTION bool is_cancelled() const;
MOCK_FUNCTION Status cancel_reason() const;
void cancel(const Status& reason) {
if (_exec_status.update(reason)) {
// Create a error status, so that we could print error stack, and
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <utility>
#include <vector>

#include "common/be_mock_util.h"
#include "common/exception.h"
#include "common/factory_creator.h"
#include "common/status.h"
Expand Down Expand Up @@ -91,7 +92,7 @@ class Block {
Block(const std::vector<SlotDescriptor>& slots, size_t block_size,
bool ignore_trivial_slot = false);

~Block() = default;
MOCK_FUNCTION ~Block() = default;
Block(const Block& block) = default;
Block& operator=(const Block& p) = default;
Block(Block&& block) = default;
Expand Down Expand Up @@ -209,7 +210,7 @@ class Block {
std::string columns_bytes() const;

/// Approximate number of allocated bytes in memory - for profiling and limits.
size_t allocated_bytes() const;
MOCK_FUNCTION size_t allocated_bytes() const;

/** Get a list of column names separated by commas. */
std::string dump_names() const;
Expand Down Expand Up @@ -253,7 +254,7 @@ class Block {
// Else clear column [0, column_size) delete column [column_size, data.size)
void clear_column_data(int64_t column_size = -1) noexcept;

bool mem_reuse() { return !data.empty(); }
MOCK_FUNCTION bool mem_reuse() { return !data.empty(); }

bool is_empty_column() { return data.empty(); }

Expand Down
89 changes: 52 additions & 37 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,31 +71,31 @@ ScannerContext::ScannerContext(
_parallism_of_scan_operator(parallism_of_scan_operator),
_min_scan_concurrency_of_scan_scheduler(_state->min_scan_concurrency_of_scan_scheduler()),
_min_scan_concurrency(_state->min_scan_concurrency_of_scanner()) {
DCHECK(_state != nullptr);
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
#ifndef BE_TEST
_query_id = _state->get_query_ctx()->query_id();
_resource_ctx = _state->get_query_ctx()->resource_ctx();
#endif
ctx_id = UniqueId::gen_uid().to_string();
for (auto& scanner : _all_scanners) {
_pending_scanners.push(scanner);
};
if (limit < 0) {
limit = -1;
}
_resource_ctx = _state->get_query_ctx()->resource_ctx();
_dependency = dependency;
if (_min_scan_concurrency_of_scan_scheduler == 0) {
_min_scan_concurrency_of_scan_scheduler = 2 * config::doris_scanner_thread_pool_thread_num;
}
DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}

// After init function call, should not access _parent
Status ScannerContext::init() {
#ifndef BE_TEST
_scanner_profile = _local_state->_scanner_profile;
_newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
_scanner_memory_used_counter = _local_state->_memory_used_counter;

#ifndef BE_TEST
// 3. get thread token
if (!_state->get_query_ctx()) {
return Status::InternalError("Query context of {} is not set",
Expand All @@ -108,26 +108,13 @@ Status ScannerContext::init() {
_should_reset_thread_name = false;
}

#endif
_local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
thread_token == nullptr ? "False" : "True");

// _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance.
// scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
// is larger than 10MB.
_max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);

// Provide more memory for wide tables, increase proportionally by multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;

// TODO: Where is the proper position to place this code?
if (_all_scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}

auto scanner = _all_scanners.front().lock();
DCHECK(scanner != nullptr);

// TODO: Maybe need refactor.
// A query could have remote scan task and local scan task at the same time.
// So we need to compute the _scanner_scheduler in each scan operator instead of query context.
SimplifiedScanScheduler* simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
Expand All @@ -148,14 +135,31 @@ Status ScannerContext::init() {
_scanner_scheduler = _scanner_scheduler_global->get_remote_scan_thread_pool();
}
}
#endif
// _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator.
// scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
// is larger than 10MB.
_max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);

// Provide more memory for wide tables, increase proportionally by multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;

if (_min_scan_concurrency_of_scan_scheduler == 0) {
// _scanner_scheduler->get_max_threads() is setted by workload group.
_min_scan_concurrency_of_scan_scheduler = 2 * _scanner_scheduler->get_max_threads();
}

if (_all_scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}

// The overall target of our system is to make full utilization of the resources.
// At the same time, we dont want too many tasks are queued by scheduler, that is not necessary.
// Each scan operator can submit _max_scan_concurrency scanner to scheduelr if scheduler has enough resource.
// So that for a single query, we can make sure it could make full utilization of the resource.
_max_scan_concurrency = _state->num_scanner_threads();
if (_max_scan_concurrency == 0) {
// TODO: Add unit test.
// Why this is safe:
/*
1. If num cpu cores is less than or equal to 24:
Expand All @@ -172,11 +176,6 @@ Status ScannerContext::init() {
*/
_max_scan_concurrency =
_min_scan_concurrency_of_scan_scheduler / _parallism_of_scan_operator;
// In some rare cases, user may set parallel_pipeline_task_num to 1 handly to make many query could be executed
// in parallel. We need to make sure the _max_thread_num is smaller than previous value in this situation.
_max_scan_concurrency =
std::min(_max_scan_concurrency, config::doris_scanner_thread_pool_thread_num);

_max_scan_concurrency = _max_scan_concurrency == 0 ? 1 : _max_scan_concurrency;
}

Expand All @@ -185,7 +184,7 @@ Status ScannerContext::init() {
// when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
// becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
// you can refer https://github.com/apache/doris/issues/35340 for details.
int32_t max_column_reader_num = _state->query_options().max_column_reader_num;
const int32_t max_column_reader_num = _state->max_column_reader_num();

if (_max_scan_concurrency != 1 && max_column_reader_num > 0) {
int32_t scan_column_num = _output_tuple_desc->slots().size();
Expand Down Expand Up @@ -478,6 +477,15 @@ int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
_min_scan_concurrency_of_scan_scheduler -
(_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size());

// Remaing margin is less than _parallism_of_scan_operator of this ScanNode.
if (margin_2 > 0 && margin_2 < _parallism_of_scan_operator) {
// Each scan operator will at most one scanner.
margin_2 = 1;
} else {
// The margin is distributed evenly to each scan operator.
margin_2 = margin_2 / _parallism_of_scan_operator;
}

if (margin_1 <= 0 && margin_2 <= 0) {
return 0;
}
Expand All @@ -500,6 +508,11 @@ int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
Status ScannerContext::_schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task,
std::unique_lock<std::mutex>& transfer_lock,
std::unique_lock<std::shared_mutex>& scheduler_lock) {
if (current_scan_task &&
(!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos())) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
}

std::list<std::shared_ptr<ScanTask>> tasks_to_submit;

int32_t margin = _get_margin(transfer_lock, scheduler_lock);
Expand All @@ -509,19 +522,23 @@ Status ScannerContext::_schedule_scan_task(std::shared_ptr<ScanTask> current_sca
// Be careful with current scan task.
// We need to add it back to task queue to make sure it could be resubmitted.
if (current_scan_task) {
DCHECK(current_scan_task->cached_blocks.empty());
DCHECK(!current_scan_task->is_eos());
if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Scanner schduler logical error.");
}
// This usually happens when we should downgrade the concurrency.
_pending_scanners.push(current_scan_task->scanner);
VLOG_DEBUG << fmt::format(
"{} push back scanner to task queue, because diff <= 0, task_queue size "
"{}, _num_scheduled_scanners {}",
ctx_id, _tasks_queue.size(), _num_scheduled_scanners);
}

#ifndef NDEBUG
// This DCHECK is necessary.
// We need to make sure each scan operator could have at least 1 scan tasks.
// Or this scan operator will not be re-scheduled.
if (!_pending_scanners.empty() && _num_scheduled_scanners == 0 && _tasks_queue.empty()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
}
#endif

return Status::OK();
}

Expand All @@ -546,7 +563,7 @@ Status ScannerContext::_schedule_scan_task(std::shared_ptr<ScanTask> current_sca
if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) {
// This should not happen.
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Scanner schduler logical error.");
"Scanner scheduler logical error.");
}
// Current scan task is not eos, but we can not resubmit it.
// Add current_scan_task back to task queue, so that we have chance to resubmit it in the future.
Expand Down Expand Up @@ -595,11 +612,9 @@ std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task(
}

if (current_scan_task != nullptr) {
DCHECK(current_scan_task->cached_blocks.empty());
DCHECK(!current_scan_task->is_eos());
if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) {
// This should not happen.
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner schduler logical error.");
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
}
return current_scan_task;
}
Expand Down
22 changes: 14 additions & 8 deletions be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstdint>
#include <memory>

#include "common/be_mock_util.h"
#include "common/status.h"
#include "util/threadpool.h"

Expand Down Expand Up @@ -53,11 +54,12 @@ class SimplifiedScanScheduler;
class ScannerScheduler {
public:
ScannerScheduler();
~ScannerScheduler();
virtual ~ScannerScheduler();

[[nodiscard]] Status init(ExecEnv* env);

Status submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task);
MOCK_FUNCTION Status submit(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task);

void stop();

Expand Down Expand Up @@ -119,8 +121,10 @@ class SimplifiedScanScheduler {
_sched_name(sched_name),
_workload_group(workload_group) {}

~SimplifiedScanScheduler() {
MOCK_FUNCTION ~SimplifiedScanScheduler() {
#ifndef BE_TEST
stop();
#endif
LOG(INFO) << "Scanner sche " << _sched_name << " shutdown";
}

Expand Down Expand Up @@ -201,15 +205,17 @@ class SimplifiedScanScheduler {
}
}

int get_queue_size() { return _scan_thread_pool->get_queue_size(); }
MOCK_FUNCTION int get_queue_size() { return _scan_thread_pool->get_queue_size(); }

int get_active_threads() { return _scan_thread_pool->num_active_threads(); }
MOCK_FUNCTION int get_active_threads() { return _scan_thread_pool->num_active_threads(); }

int get_max_threads() { return _scan_thread_pool->max_threads(); }

std::vector<int> thread_debug_info() { return _scan_thread_pool->debug_info(); }

Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx,
std::shared_ptr<ScanTask> current_scan_task,
std::unique_lock<std::mutex>& transfer_lock);
MOCK_FUNCTION Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx,
std::shared_ptr<ScanTask> current_scan_task,
std::unique_lock<std::mutex>& transfer_lock);

private:
std::unique_ptr<ThreadPool> _scan_thread_pool;
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/scan/vscanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,9 @@ Status VScanner::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK();
}

#ifndef BE_TEST
COUNTER_UPDATE(_local_state->_scanner_wait_worker_timer, _scanner_wait_worker_timer);
#endif
_is_closed = true;
return Status::OK();
}
Expand Down
30 changes: 30 additions & 0 deletions be/test/scan/mock_scanner_scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gmock/gmock.h>

#include "common/status.h"
#include "vec/exec/scan/scanner_scheduler.h"

namespace doris::vectorized {
class MockScannerScheduler : ScannerScheduler {
public:
MockScannerScheduler() = default;

MOCK_METHOD2(submit, Status(std::shared_ptr<ScannerContext>, std::shared_ptr<ScanTask>));
};
} // namespace doris::vectorized
34 changes: 34 additions & 0 deletions be/test/scan/mock_simplified_scan_scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gmock/gmock.h>

#include "vec/exec/scan/scanner_scheduler.h"

namespace doris::vectorized {
class MockSimplifiedScanScheduler : SimplifiedScanScheduler {
public:
MockSimplifiedScanScheduler(std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
: SimplifiedScanScheduler("ForTest", cgroup_cpu_ctl) {}

MOCK_METHOD0(get_active_threads, int());
MOCK_METHOD0(get_queue_size, int());
MOCK_METHOD3(schedule_scan_task, Status(std::shared_ptr<ScannerContext> scanner_ctx,
std::shared_ptr<ScanTask> current_scan_task,
std::unique_lock<std::mutex>& transfer_lock));
};
} // namespace doris::vectorized
Loading

0 comments on commit 5cec569

Please sign in to comment.