Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test](scanner) Scanner scheduler unit test #47783

Merged
merged 9 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ class RuntimeState {
: _query_options.mem_limit / 20;
}

int32_t max_column_reader_num() const {
return _query_options.__isset.max_column_reader_num ? _query_options.max_column_reader_num
: 20000;
}

ObjectPool* obj_pool() const { return _obj_pool.get(); }

const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
Expand Down Expand Up @@ -215,8 +220,8 @@ class RuntimeState {
// _unreported_error_idx to _errors_log.size()
void get_unreported_errors(std::vector<std::string>* new_errors);

[[nodiscard]] bool is_cancelled() const;
Status cancel_reason() const;
[[nodiscard]] MOCK_FUNCTION bool is_cancelled() const;
MOCK_FUNCTION Status cancel_reason() const;
void cancel(const Status& reason) {
if (_exec_status.update(reason)) {
// Create a error status, so that we could print error stack, and
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <utility>
#include <vector>

#include "common/be_mock_util.h"
#include "common/exception.h"
#include "common/factory_creator.h"
#include "common/status.h"
Expand Down Expand Up @@ -91,7 +92,7 @@ class Block {
Block(const std::vector<SlotDescriptor>& slots, size_t block_size,
bool ignore_trivial_slot = false);

~Block() = default;
MOCK_FUNCTION ~Block() = default;
Block(const Block& block) = default;
Block& operator=(const Block& p) = default;
Block(Block&& block) = default;
Expand Down Expand Up @@ -209,7 +210,7 @@ class Block {
std::string columns_bytes() const;

/// Approximate number of allocated bytes in memory - for profiling and limits.
size_t allocated_bytes() const;
MOCK_FUNCTION size_t allocated_bytes() const;

/** Get a list of column names separated by commas. */
std::string dump_names() const;
Expand Down Expand Up @@ -253,7 +254,7 @@ class Block {
// Else clear column [0, column_size) delete column [column_size, data.size)
void clear_column_data(int64_t column_size = -1) noexcept;

bool mem_reuse() { return !data.empty(); }
MOCK_FUNCTION bool mem_reuse() { return !data.empty(); }

bool is_empty_column() { return data.empty(); }

Expand Down
89 changes: 52 additions & 37 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,31 +71,31 @@ ScannerContext::ScannerContext(
_parallism_of_scan_operator(parallism_of_scan_operator),
_min_scan_concurrency_of_scan_scheduler(_state->min_scan_concurrency_of_scan_scheduler()),
_min_scan_concurrency(_state->min_scan_concurrency_of_scanner()) {
DCHECK(_state != nullptr);
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
#ifndef BE_TEST
_query_id = _state->get_query_ctx()->query_id();
_resource_ctx = _state->get_query_ctx()->resource_ctx();
#endif
ctx_id = UniqueId::gen_uid().to_string();
for (auto& scanner : _all_scanners) {
_pending_scanners.push(scanner);
};
if (limit < 0) {
limit = -1;
}
_resource_ctx = _state->get_query_ctx()->resource_ctx();
_dependency = dependency;
if (_min_scan_concurrency_of_scan_scheduler == 0) {
_min_scan_concurrency_of_scan_scheduler = 2 * config::doris_scanner_thread_pool_thread_num;
}
DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}

// After init function call, should not access _parent
Status ScannerContext::init() {
#ifndef BE_TEST
_scanner_profile = _local_state->_scanner_profile;
_newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
_scanner_memory_used_counter = _local_state->_memory_used_counter;

#ifndef BE_TEST
// 3. get thread token
if (!_state->get_query_ctx()) {
return Status::InternalError("Query context of {} is not set",
Expand All @@ -108,26 +108,13 @@ Status ScannerContext::init() {
_should_reset_thread_name = false;
}

#endif
_local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
thread_token == nullptr ? "False" : "True");

// _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance.
// scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
// is larger than 10MB.
_max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);

// Provide more memory for wide tables, increase proportionally by multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;

// TODO: Where is the proper position to place this code?
if (_all_scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}

auto scanner = _all_scanners.front().lock();
DCHECK(scanner != nullptr);

// TODO: Maybe need refactor.
// A query could have remote scan task and local scan task at the same time.
// So we need to compute the _scanner_scheduler in each scan operator instead of query context.
SimplifiedScanScheduler* simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
Expand All @@ -148,14 +135,31 @@ Status ScannerContext::init() {
_scanner_scheduler = _scanner_scheduler_global->get_remote_scan_thread_pool();
}
}
#endif
// _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator.
// scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
// is larger than 10MB.
_max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);

// Provide more memory for wide tables, increase proportionally by multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;

if (_min_scan_concurrency_of_scan_scheduler == 0) {
// _scanner_scheduler->get_max_threads() is setted by workload group.
_min_scan_concurrency_of_scan_scheduler = 2 * _scanner_scheduler->get_max_threads();
}

if (_all_scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}

// The overall target of our system is to make full utilization of the resources.
// At the same time, we dont want too many tasks are queued by scheduler, that is not necessary.
// Each scan operator can submit _max_scan_concurrency scanner to scheduelr if scheduler has enough resource.
// So that for a single query, we can make sure it could make full utilization of the resource.
_max_scan_concurrency = _state->num_scanner_threads();
if (_max_scan_concurrency == 0) {
// TODO: Add unit test.
// Why this is safe:
/*
1. If num cpu cores is less than or equal to 24:
Expand All @@ -172,11 +176,6 @@ Status ScannerContext::init() {
*/
_max_scan_concurrency =
_min_scan_concurrency_of_scan_scheduler / _parallism_of_scan_operator;
// In some rare cases, user may set parallel_pipeline_task_num to 1 handly to make many query could be executed
// in parallel. We need to make sure the _max_thread_num is smaller than previous value in this situation.
_max_scan_concurrency =
std::min(_max_scan_concurrency, config::doris_scanner_thread_pool_thread_num);

_max_scan_concurrency = _max_scan_concurrency == 0 ? 1 : _max_scan_concurrency;
}

Expand All @@ -185,7 +184,7 @@ Status ScannerContext::init() {
// when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
// becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
// you can refer https://github.com/apache/doris/issues/35340 for details.
int32_t max_column_reader_num = _state->query_options().max_column_reader_num;
const int32_t max_column_reader_num = _state->max_column_reader_num();

if (_max_scan_concurrency != 1 && max_column_reader_num > 0) {
int32_t scan_column_num = _output_tuple_desc->slots().size();
Expand Down Expand Up @@ -478,6 +477,15 @@ int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
_min_scan_concurrency_of_scan_scheduler -
(_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size());

// Remaing margin is less than _parallism_of_scan_operator of this ScanNode.
if (margin_2 > 0 && margin_2 < _parallism_of_scan_operator) {
// Each scan operator will at most one scanner.
margin_2 = 1;
} else {
// The margin is distributed evenly to each scan operator.
margin_2 = margin_2 / _parallism_of_scan_operator;
}

if (margin_1 <= 0 && margin_2 <= 0) {
return 0;
}
Expand All @@ -500,6 +508,11 @@ int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
Status ScannerContext::_schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task,
std::unique_lock<std::mutex>& transfer_lock,
std::unique_lock<std::shared_mutex>& scheduler_lock) {
if (current_scan_task &&
(!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos())) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
}

std::list<std::shared_ptr<ScanTask>> tasks_to_submit;

int32_t margin = _get_margin(transfer_lock, scheduler_lock);
Expand All @@ -509,19 +522,23 @@ Status ScannerContext::_schedule_scan_task(std::shared_ptr<ScanTask> current_sca
// Be careful with current scan task.
// We need to add it back to task queue to make sure it could be resubmitted.
if (current_scan_task) {
DCHECK(current_scan_task->cached_blocks.empty());
DCHECK(!current_scan_task->is_eos());
if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Scanner schduler logical error.");
}
// This usually happens when we should downgrade the concurrency.
_pending_scanners.push(current_scan_task->scanner);
VLOG_DEBUG << fmt::format(
"{} push back scanner to task queue, because diff <= 0, task_queue size "
"{}, _num_scheduled_scanners {}",
ctx_id, _tasks_queue.size(), _num_scheduled_scanners);
}

#ifndef NDEBUG
// This DCHECK is necessary.
// We need to make sure each scan operator could have at least 1 scan tasks.
// Or this scan operator will not be re-scheduled.
if (!_pending_scanners.empty() && _num_scheduled_scanners == 0 && _tasks_queue.empty()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
}
#endif

return Status::OK();
}

Expand All @@ -546,7 +563,7 @@ Status ScannerContext::_schedule_scan_task(std::shared_ptr<ScanTask> current_sca
if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) {
// This should not happen.
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Scanner schduler logical error.");
"Scanner scheduler logical error.");
}
// Current scan task is not eos, but we can not resubmit it.
// Add current_scan_task back to task queue, so that we have chance to resubmit it in the future.
Expand Down Expand Up @@ -595,11 +612,9 @@ std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task(
}

if (current_scan_task != nullptr) {
DCHECK(current_scan_task->cached_blocks.empty());
DCHECK(!current_scan_task->is_eos());
if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) {
// This should not happen.
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner schduler logical error.");
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
}
return current_scan_task;
}
Expand Down
22 changes: 14 additions & 8 deletions be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstdint>
#include <memory>

#include "common/be_mock_util.h"
#include "common/status.h"
#include "util/threadpool.h"

Expand Down Expand Up @@ -53,11 +54,12 @@ class SimplifiedScanScheduler;
class ScannerScheduler {
public:
ScannerScheduler();
~ScannerScheduler();
virtual ~ScannerScheduler();

[[nodiscard]] Status init(ExecEnv* env);

Status submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task);
MOCK_FUNCTION Status submit(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task);

void stop();

Expand Down Expand Up @@ -119,8 +121,10 @@ class SimplifiedScanScheduler {
_sched_name(sched_name),
_workload_group(workload_group) {}

~SimplifiedScanScheduler() {
MOCK_FUNCTION ~SimplifiedScanScheduler() {
#ifndef BE_TEST
stop();
#endif
LOG(INFO) << "Scanner sche " << _sched_name << " shutdown";
}

Expand Down Expand Up @@ -201,15 +205,17 @@ class SimplifiedScanScheduler {
}
}

int get_queue_size() { return _scan_thread_pool->get_queue_size(); }
MOCK_FUNCTION int get_queue_size() { return _scan_thread_pool->get_queue_size(); }

int get_active_threads() { return _scan_thread_pool->num_active_threads(); }
MOCK_FUNCTION int get_active_threads() { return _scan_thread_pool->num_active_threads(); }

int get_max_threads() { return _scan_thread_pool->max_threads(); }

std::vector<int> thread_debug_info() { return _scan_thread_pool->debug_info(); }

Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx,
std::shared_ptr<ScanTask> current_scan_task,
std::unique_lock<std::mutex>& transfer_lock);
MOCK_FUNCTION Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx,
std::shared_ptr<ScanTask> current_scan_task,
std::unique_lock<std::mutex>& transfer_lock);

private:
std::unique_ptr<ThreadPool> _scan_thread_pool;
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/scan/vscanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,9 @@ Status VScanner::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK();
}

#ifndef BE_TEST
COUNTER_UPDATE(_local_state->_scanner_wait_worker_timer, _scanner_wait_worker_timer);
#endif
_is_closed = true;
return Status::OK();
}
Expand Down
30 changes: 30 additions & 0 deletions be/test/scan/mock_scanner_scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gmock/gmock.h>

#include "common/status.h"
#include "vec/exec/scan/scanner_scheduler.h"

namespace doris::vectorized {
class MockScannerScheduler : ScannerScheduler {
public:
MockScannerScheduler() = default;

MOCK_METHOD2(submit, Status(std::shared_ptr<ScannerContext>, std::shared_ptr<ScanTask>));
};
} // namespace doris::vectorized
34 changes: 34 additions & 0 deletions be/test/scan/mock_simplified_scan_scheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gmock/gmock.h>

#include "vec/exec/scan/scanner_scheduler.h"

namespace doris::vectorized {
class MockSimplifiedScanScheduler : SimplifiedScanScheduler {
public:
MockSimplifiedScanScheduler(std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
: SimplifiedScanScheduler("ForTest", cgroup_cpu_ctl) {}

MOCK_METHOD0(get_active_threads, int());
MOCK_METHOD0(get_queue_size, int());
MOCK_METHOD3(schedule_scan_task, Status(std::shared_ptr<ScannerContext> scanner_ctx,
std::shared_ptr<ScanTask> current_scan_task,
std::unique_lock<std::mutex>& transfer_lock));
};
} // namespace doris::vectorized
Loading
Loading