From 5cec5691badb1e343b6d2d26e49339fe8e56a9c9 Mon Sep 17 00:00:00 2001 From: zhiqiang Date: Tue, 18 Feb 2025 23:34:04 +0800 Subject: [PATCH] [test](scanner) Scanner scheduler unit test (#47783) ### What problem does this PR solve? Unit test for scanner schedule. Adaptive scan schedule is introduced by https://github.com/apache/doris/pull/44690 * ScannerContext::init * ScannerContext::_push_back_scan_task * ScannerContext::_get_margin * ScannerContext::_pull_next_scan_task * ScannerContext::_schedule_scan_task * Additional test for scan operator, make sure `adaptive_pipeline_task_serial_read_on_limit` is working correctlly. * ScannerContext::get_free_block * ScannerContext::return_free_block * ScannerContext::get_block_from_queue --- be/src/runtime/runtime_state.h | 9 +- be/src/vec/core/block.h | 7 +- be/src/vec/exec/scan/scanner_context.cpp | 89 +- be/src/vec/exec/scan/scanner_scheduler.h | 22 +- be/src/vec/exec/scan/vscanner.cpp | 3 +- be/test/scan/mock_scanner_scheduler.h | 30 + be/test/scan/mock_simplified_scan_scheduler.h | 34 + be/test/scan/scanner_context_test.cpp | 847 ++++++++++++++++++ be/test/vec/exec/scan_operator_test.cpp | 114 +++ 9 files changed, 1104 insertions(+), 51 deletions(-) create mode 100644 be/test/scan/mock_scanner_scheduler.h create mode 100644 be/test/scan/mock_simplified_scan_scheduler.h create mode 100644 be/test/scan/scanner_context_test.cpp create mode 100644 be/test/vec/exec/scan_operator_test.cpp diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 0fb53170c02f9e..bc7de94d8a6142 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -115,6 +115,11 @@ class RuntimeState { : _query_options.mem_limit / 20; } + int32_t max_column_reader_num() const { + return _query_options.__isset.max_column_reader_num ? _query_options.max_column_reader_num + : 20000; + } + ObjectPool* obj_pool() const { return _obj_pool.get(); } const DescriptorTbl& desc_tbl() const { return *_desc_tbl; } @@ -215,8 +220,8 @@ class RuntimeState { // _unreported_error_idx to _errors_log.size() void get_unreported_errors(std::vector* new_errors); - [[nodiscard]] bool is_cancelled() const; - Status cancel_reason() const; + [[nodiscard]] MOCK_FUNCTION bool is_cancelled() const; + MOCK_FUNCTION Status cancel_reason() const; void cancel(const Status& reason) { if (_exec_status.update(reason)) { // Create a error status, so that we could print error stack, and diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index c15d9dc681fe30..8f2d56f5de3805 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -34,6 +34,7 @@ #include #include +#include "common/be_mock_util.h" #include "common/exception.h" #include "common/factory_creator.h" #include "common/status.h" @@ -91,7 +92,7 @@ class Block { Block(const std::vector& slots, size_t block_size, bool ignore_trivial_slot = false); - ~Block() = default; + MOCK_FUNCTION ~Block() = default; Block(const Block& block) = default; Block& operator=(const Block& p) = default; Block(Block&& block) = default; @@ -209,7 +210,7 @@ class Block { std::string columns_bytes() const; /// Approximate number of allocated bytes in memory - for profiling and limits. - size_t allocated_bytes() const; + MOCK_FUNCTION size_t allocated_bytes() const; /** Get a list of column names separated by commas. */ std::string dump_names() const; @@ -253,7 +254,7 @@ class Block { // Else clear column [0, column_size) delete column [column_size, data.size) void clear_column_data(int64_t column_size = -1) noexcept; - bool mem_reuse() { return !data.empty(); } + MOCK_FUNCTION bool mem_reuse() { return !data.empty(); } bool is_empty_column() { return data.empty(); } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 8cd2b843f4c092..74d5cdc21b3666 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -71,9 +71,13 @@ ScannerContext::ScannerContext( _parallism_of_scan_operator(parallism_of_scan_operator), _min_scan_concurrency_of_scan_scheduler(_state->min_scan_concurrency_of_scan_scheduler()), _min_scan_concurrency(_state->min_scan_concurrency_of_scanner()) { + DCHECK(_state != nullptr); DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); +#ifndef BE_TEST _query_id = _state->get_query_ctx()->query_id(); + _resource_ctx = _state->get_query_ctx()->resource_ctx(); +#endif ctx_id = UniqueId::gen_uid().to_string(); for (auto& scanner : _all_scanners) { _pending_scanners.push(scanner); @@ -81,21 +85,17 @@ ScannerContext::ScannerContext( if (limit < 0) { limit = -1; } - _resource_ctx = _state->get_query_ctx()->resource_ctx(); _dependency = dependency; - if (_min_scan_concurrency_of_scan_scheduler == 0) { - _min_scan_concurrency_of_scan_scheduler = 2 * config::doris_scanner_thread_pool_thread_num; - } DorisMetrics::instance()->scanner_ctx_cnt->increment(1); } // After init function call, should not access _parent Status ScannerContext::init() { +#ifndef BE_TEST _scanner_profile = _local_state->_scanner_profile; _newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num; _scanner_memory_used_counter = _local_state->_memory_used_counter; -#ifndef BE_TEST // 3. get thread token if (!_state->get_query_ctx()) { return Status::InternalError("Query context of {} is not set", @@ -108,26 +108,13 @@ Status ScannerContext::init() { _should_reset_thread_name = false; } -#endif _local_state->_runtime_profile->add_info_string("UseSpecificThreadToken", thread_token == nullptr ? "False" : "True"); - // _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance. - // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value - // is larger than 10MB. - _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10); - - // Provide more memory for wide tables, increase proportionally by multiples of 300 - _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; - - // TODO: Where is the proper position to place this code? - if (_all_scanners.empty()) { - _is_finished = true; - _set_scanner_done(); - } - auto scanner = _all_scanners.front().lock(); DCHECK(scanner != nullptr); + + // TODO: Maybe need refactor. // A query could have remote scan task and local scan task at the same time. // So we need to compute the _scanner_scheduler in each scan operator instead of query context. SimplifiedScanScheduler* simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler(); @@ -148,6 +135,24 @@ Status ScannerContext::init() { _scanner_scheduler = _scanner_scheduler_global->get_remote_scan_thread_pool(); } } +#endif + // _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator. + // scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value + // is larger than 10MB. + _max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10); + + // Provide more memory for wide tables, increase proportionally by multiples of 300 + _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1; + + if (_min_scan_concurrency_of_scan_scheduler == 0) { + // _scanner_scheduler->get_max_threads() is setted by workload group. + _min_scan_concurrency_of_scan_scheduler = 2 * _scanner_scheduler->get_max_threads(); + } + + if (_all_scanners.empty()) { + _is_finished = true; + _set_scanner_done(); + } // The overall target of our system is to make full utilization of the resources. // At the same time, we dont want too many tasks are queued by scheduler, that is not necessary. @@ -155,7 +160,6 @@ Status ScannerContext::init() { // So that for a single query, we can make sure it could make full utilization of the resource. _max_scan_concurrency = _state->num_scanner_threads(); if (_max_scan_concurrency == 0) { - // TODO: Add unit test. // Why this is safe: /* 1. If num cpu cores is less than or equal to 24: @@ -172,11 +176,6 @@ Status ScannerContext::init() { */ _max_scan_concurrency = _min_scan_concurrency_of_scan_scheduler / _parallism_of_scan_operator; - // In some rare cases, user may set parallel_pipeline_task_num to 1 handly to make many query could be executed - // in parallel. We need to make sure the _max_thread_num is smaller than previous value in this situation. - _max_scan_concurrency = - std::min(_max_scan_concurrency, config::doris_scanner_thread_pool_thread_num); - _max_scan_concurrency = _max_scan_concurrency == 0 ? 1 : _max_scan_concurrency; } @@ -185,7 +184,7 @@ Status ScannerContext::init() { // when user not specify scan_thread_num, so we can try downgrade _max_thread_num. // becaue we found in a table with 5k columns, column reader may ocuppy too much memory. // you can refer https://github.com/apache/doris/issues/35340 for details. - int32_t max_column_reader_num = _state->query_options().max_column_reader_num; + const int32_t max_column_reader_num = _state->max_column_reader_num(); if (_max_scan_concurrency != 1 && max_column_reader_num > 0) { int32_t scan_column_num = _output_tuple_desc->slots().size(); @@ -478,6 +477,15 @@ int32_t ScannerContext::_get_margin(std::unique_lock& transfer_lock, _min_scan_concurrency_of_scan_scheduler - (_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size()); + // Remaing margin is less than _parallism_of_scan_operator of this ScanNode. + if (margin_2 > 0 && margin_2 < _parallism_of_scan_operator) { + // Each scan operator will at most one scanner. + margin_2 = 1; + } else { + // The margin is distributed evenly to each scan operator. + margin_2 = margin_2 / _parallism_of_scan_operator; + } + if (margin_1 <= 0 && margin_2 <= 0) { return 0; } @@ -500,6 +508,11 @@ int32_t ScannerContext::_get_margin(std::unique_lock& transfer_lock, Status ScannerContext::_schedule_scan_task(std::shared_ptr current_scan_task, std::unique_lock& transfer_lock, std::unique_lock& scheduler_lock) { + if (current_scan_task && + (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos())) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); + } + std::list> tasks_to_submit; int32_t margin = _get_margin(transfer_lock, scheduler_lock); @@ -509,12 +522,6 @@ Status ScannerContext::_schedule_scan_task(std::shared_ptr current_sca // Be careful with current scan task. // We need to add it back to task queue to make sure it could be resubmitted. if (current_scan_task) { - DCHECK(current_scan_task->cached_blocks.empty()); - DCHECK(!current_scan_task->is_eos()); - if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) { - throw doris::Exception(ErrorCode::INTERNAL_ERROR, - "Scanner schduler logical error."); - } // This usually happens when we should downgrade the concurrency. _pending_scanners.push(current_scan_task->scanner); VLOG_DEBUG << fmt::format( @@ -522,6 +529,16 @@ Status ScannerContext::_schedule_scan_task(std::shared_ptr current_sca "{}, _num_scheduled_scanners {}", ctx_id, _tasks_queue.size(), _num_scheduled_scanners); } + +#ifndef NDEBUG + // This DCHECK is necessary. + // We need to make sure each scan operator could have at least 1 scan tasks. + // Or this scan operator will not be re-scheduled. + if (!_pending_scanners.empty() && _num_scheduled_scanners == 0 && _tasks_queue.empty()) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); + } +#endif + return Status::OK(); } @@ -546,7 +563,7 @@ Status ScannerContext::_schedule_scan_task(std::shared_ptr current_sca if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) { // This should not happen. throw doris::Exception(ErrorCode::INTERNAL_ERROR, - "Scanner schduler logical error."); + "Scanner scheduler logical error."); } // Current scan task is not eos, but we can not resubmit it. // Add current_scan_task back to task queue, so that we have chance to resubmit it in the future. @@ -595,11 +612,9 @@ std::shared_ptr ScannerContext::_pull_next_scan_task( } if (current_scan_task != nullptr) { - DCHECK(current_scan_task->cached_blocks.empty()); - DCHECK(!current_scan_task->is_eos()); if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) { // This should not happen. - throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner schduler logical error."); + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error."); } return current_scan_task; } diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 9202f845345cda..b6d905eacd9212 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -21,6 +21,7 @@ #include #include +#include "common/be_mock_util.h" #include "common/status.h" #include "util/threadpool.h" @@ -53,11 +54,12 @@ class SimplifiedScanScheduler; class ScannerScheduler { public: ScannerScheduler(); - ~ScannerScheduler(); + virtual ~ScannerScheduler(); [[nodiscard]] Status init(ExecEnv* env); - Status submit(std::shared_ptr ctx, std::shared_ptr scan_task); + MOCK_FUNCTION Status submit(std::shared_ptr ctx, + std::shared_ptr scan_task); void stop(); @@ -119,8 +121,10 @@ class SimplifiedScanScheduler { _sched_name(sched_name), _workload_group(workload_group) {} - ~SimplifiedScanScheduler() { + MOCK_FUNCTION ~SimplifiedScanScheduler() { +#ifndef BE_TEST stop(); +#endif LOG(INFO) << "Scanner sche " << _sched_name << " shutdown"; } @@ -201,15 +205,17 @@ class SimplifiedScanScheduler { } } - int get_queue_size() { return _scan_thread_pool->get_queue_size(); } + MOCK_FUNCTION int get_queue_size() { return _scan_thread_pool->get_queue_size(); } - int get_active_threads() { return _scan_thread_pool->num_active_threads(); } + MOCK_FUNCTION int get_active_threads() { return _scan_thread_pool->num_active_threads(); } + + int get_max_threads() { return _scan_thread_pool->max_threads(); } std::vector thread_debug_info() { return _scan_thread_pool->debug_info(); } - Status schedule_scan_task(std::shared_ptr scanner_ctx, - std::shared_ptr current_scan_task, - std::unique_lock& transfer_lock); + MOCK_FUNCTION Status schedule_scan_task(std::shared_ptr scanner_ctx, + std::shared_ptr current_scan_task, + std::unique_lock& transfer_lock); private: std::unique_ptr _scan_thread_pool; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 5baf2ae9dad6f0..ff928732510fd1 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -246,8 +246,9 @@ Status VScanner::close(RuntimeState* state) { if (_is_closed) { return Status::OK(); } - +#ifndef BE_TEST COUNTER_UPDATE(_local_state->_scanner_wait_worker_timer, _scanner_wait_worker_timer); +#endif _is_closed = true; return Status::OK(); } diff --git a/be/test/scan/mock_scanner_scheduler.h b/be/test/scan/mock_scanner_scheduler.h new file mode 100644 index 00000000000000..2033a105b81cec --- /dev/null +++ b/be/test/scan/mock_scanner_scheduler.h @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "common/status.h" +#include "vec/exec/scan/scanner_scheduler.h" + +namespace doris::vectorized { +class MockScannerScheduler : ScannerScheduler { +public: + MockScannerScheduler() = default; + + MOCK_METHOD2(submit, Status(std::shared_ptr, std::shared_ptr)); +}; +} // namespace doris::vectorized diff --git a/be/test/scan/mock_simplified_scan_scheduler.h b/be/test/scan/mock_simplified_scan_scheduler.h new file mode 100644 index 00000000000000..6a139ac7ae6908 --- /dev/null +++ b/be/test/scan/mock_simplified_scan_scheduler.h @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "vec/exec/scan/scanner_scheduler.h" + +namespace doris::vectorized { +class MockSimplifiedScanScheduler : SimplifiedScanScheduler { +public: + MockSimplifiedScanScheduler(std::shared_ptr cgroup_cpu_ctl) + : SimplifiedScanScheduler("ForTest", cgroup_cpu_ctl) {} + + MOCK_METHOD0(get_active_threads, int()); + MOCK_METHOD0(get_queue_size, int()); + MOCK_METHOD3(schedule_scan_task, Status(std::shared_ptr scanner_ctx, + std::shared_ptr current_scan_task, + std::unique_lock& transfer_lock)); +}; +} // namespace doris::vectorized diff --git a/be/test/scan/scanner_context_test.cpp b/be/test/scan/scanner_context_test.cpp new file mode 100644 index 00000000000000..f482fd500e9ed9 --- /dev/null +++ b/be/test/scan/scanner_context_test.cpp @@ -0,0 +1,847 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/scanner_context.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "common/object_pool.h" +#include "mock_scanner_scheduler.h" +#include "mock_simplified_scan_scheduler.h" +#include "pipeline/dependency.h" +#include "pipeline/exec/olap_scan_operator.h" +#include "runtime/descriptors.h" +#include "vec/core/block.h" +#include "vec/exec/scan/new_olap_scanner.h" +#include "vec/exec/scan/scanner_scheduler.h" +#include "vec/exec/scan/vscan_node.h" + +namespace doris::vectorized { +class ScannerContextTest : public testing::Test { +public: + void SetUp() override { + obj_pool = std::make_unique(); + // This ScanNode has two tuples. + // First one is input tuple, second one is output tuple. + tnode.row_tuples.push_back(TTupleId(0)); + tnode.row_tuples.push_back(TTupleId(1)); + std::vector null_map {false, false}; + tnode.nullable_tuples = null_map; + tbl_desc.tableType = TTableType::OLAP_TABLE; + + tuple_desc.id = 0; + tuple_descs.push_back(tuple_desc); + tuple_desc.id = 1; + tuple_descs.push_back(tuple_desc); + + type_node.type = TTypeNodeType::SCALAR; + + scalar_type.__set_type(TPrimitiveType::STRING); + type_node.__set_scalar_type(scalar_type); + slot_desc.slotType.types.push_back(type_node); + slot_desc.id = 0; + slot_desc.parent = 0; + slot_descs.push_back(slot_desc); + slot_desc.id = 1; + slot_desc.parent = 1; + slot_descs.push_back(slot_desc); + thrift_tbl.tableDescriptors.push_back(tbl_desc); + thrift_tbl.tupleDescriptors = tuple_descs; + thrift_tbl.slotDescriptors = slot_descs; + std::ignore = DescriptorTbl::create(obj_pool.get(), thrift_tbl, &descs); + auto task_exec_ctx = std::make_shared(); + state->set_task_execution_context(task_exec_ctx); + output_tuple_desc = descs->get_tuple_descriptor(0); + } + +private: + class MockBlock : public Block { + MockBlock() = default; + MOCK_CONST_METHOD0(allocated_bytes, size_t()); + MOCK_METHOD0(mem_reuse, bool()); + MOCK_METHOD1(clear_column_data, void(int64_t)); + }; + + class MockRuntimeState : public RuntimeState { + MockRuntimeState() = default; + MOCK_CONST_METHOD0(is_cancelled, bool()); + MOCK_CONST_METHOD0(cancel_reason, Status()); + }; + + std::unique_ptr obj_pool; + TPlanNode tnode; + TTableDescriptor tbl_desc; + std::vector tuple_descs; + TTupleDescriptor tuple_desc; + std::vector slot_descs; + TSlotDescriptor slot_desc; + TTypeNode type_node; + TScalarType scalar_type; + TDescriptorTable thrift_tbl; + DescriptorTbl* descs = nullptr; + std::unique_ptr state = std::make_unique(); + std::unique_ptr profile = std::make_unique("TestProfile"); + std::unique_ptr max_concurrency_counter = + std::make_unique(TUnit::UNIT, 1, 3); + std::unique_ptr min_concurrency_counter = + std::make_unique(TUnit::UNIT, 1, 3); + + std::unique_ptr newly_create_free_blocks_num = + std::make_unique(TUnit::UNIT, 1, 3); + std::unique_ptr scanner_memory_used_counter = + std::make_unique(TUnit::UNIT, 1, 3); + + TupleDescriptor* output_tuple_desc = nullptr; + RowDescriptor* output_row_descriptor = nullptr; + std::shared_ptr scan_dependency = + pipeline::Dependency::create_shared(0, 0, "TestScanDependency"); + std::shared_ptr cgroup_cpu_ctl = std::make_shared(1); + std::unique_ptr scan_scheduler = + std::make_unique("ForTest", cgroup_cpu_ctl); +}; + +TEST_F(ScannerContextTest, test_init) { + const int parallel_tasks = 1; + auto scan_operator = std::make_unique( + obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + + const int64_t limit = 100; + + NewOlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); // empty + + std::shared_ptr scanner = + NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 11; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + + scan_operator->_should_run_serial = false; + + olap_scan_local_state->_max_scan_concurrency = max_concurrency_counter.get(); + olap_scan_local_state->_min_scan_concurrency = min_concurrency_counter.get(); + + olap_scan_local_state->_parent = scan_operator.get(); + + // User specified num_scanner_threads is less than _max_scan_concurrency that we calculated + TQueryOptions query_options; + query_options.__set_num_scanner_threads(2); + query_options.__set_max_column_reader_num(0); + state->set_query_options(query_options); + std::unique_ptr scheduler = + std::make_unique(cgroup_cpu_ctl); + EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_)) + .WillRepeatedly(testing::Return(Status::OK())); + scanner_context->_scanner_scheduler = scheduler.get(); + + // max_scan_concurrency that we calculate will be 10 / 1 = 10; + scanner_context->_min_scan_concurrency_of_scan_scheduler = 10; + Status st = scanner_context->init(); + ASSERT_TRUE(st.ok()); + // actual max_scan_concurrency will be 2 since user specified num_scanner_threads is 2. + ASSERT_EQ(scanner_context->_max_scan_concurrency, 2); + + query_options.__set_num_scanner_threads(0); + state->set_query_options(query_options); + + st = scanner_context->init(); + ASSERT_TRUE(st.ok()); + + ASSERT_EQ(scanner_context->_max_scan_concurrency, + scanner_context->_min_scan_concurrency_of_scan_scheduler / parallel_tasks); +} + +TEST_F(ScannerContextTest, test_serial_run) { + const int parallel_tasks = 1; + auto scan_operator = std::make_unique( + obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + + const int64_t limit = 100; + + NewOlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); // empty + + std::shared_ptr scanner = + NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 11; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + + scan_operator->_should_run_serial = true; + + olap_scan_local_state->_max_scan_concurrency = max_concurrency_counter.get(); + olap_scan_local_state->_min_scan_concurrency = min_concurrency_counter.get(); + + olap_scan_local_state->_parent = scan_operator.get(); + + TQueryOptions query_options; + query_options.__set_num_scanner_threads(2); + query_options.__set_max_column_reader_num(0); + state->set_query_options(query_options); + std::unique_ptr scheduler = + std::make_unique(cgroup_cpu_ctl); + EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_)) + .WillRepeatedly(testing::Return(Status::OK())); + scanner_context->_scanner_scheduler = scheduler.get(); + + scanner_context->_min_scan_concurrency_of_scan_scheduler = 10; + Status st = scanner_context->init(); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(scanner_context->_max_scan_concurrency, 1); + + query_options.__set_num_scanner_threads(0); + state->set_query_options(query_options); + st = scanner_context->init(); + ASSERT_TRUE(st.ok()); + + ASSERT_EQ(scanner_context->_max_scan_concurrency, 1); +} + +TEST_F(ScannerContextTest, test_max_column_reader_num) { + const int parallel_tasks = 1; + auto scan_operator = std::make_unique( + obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + + const int64_t limit = 100; + + NewOlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); // empty + + std::shared_ptr scanner = + NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 20; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + + scan_operator->_should_run_serial = false; + + olap_scan_local_state->_max_scan_concurrency = max_concurrency_counter.get(); + olap_scan_local_state->_min_scan_concurrency = min_concurrency_counter.get(); + + olap_scan_local_state->_parent = scan_operator.get(); + + TQueryOptions query_options; + query_options.__set_num_scanner_threads(20); + query_options.__set_max_column_reader_num(1); + state->set_query_options(query_options); + std::unique_ptr scheduler = + std::make_unique(cgroup_cpu_ctl); + EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_)) + .WillRepeatedly(testing::Return(Status::OK())); + scanner_context->_scanner_scheduler = scheduler.get(); + scanner_context->_min_scan_concurrency_of_scan_scheduler = 10; + Status st = scanner_context->init(); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(scanner_context->_max_scan_concurrency, 1); +} + +TEST_F(ScannerContextTest, test_push_back_scan_task) { + const int parallel_tasks = 1; + auto scan_operator = std::make_unique( + obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + + const int64_t limit = 100; + + NewOlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); // empty + + std::shared_ptr scanner = + NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 11; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + + scanner_context->_num_scheduled_scanners = 11; + + for (int i = 0; i < 5; ++i) { + auto scan_task = std::make_shared(std::make_shared(scanner)); + scanner_context->push_back_scan_task(scan_task); + ASSERT_EQ(scanner_context->_num_scheduled_scanners, 10 - i); + } +} + +TEST_F(ScannerContextTest, get_margin) { + const int parallel_tasks = 4; + auto scan_operator = std::make_unique( + obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + + const int64_t limit = 100; + + NewOlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); // empty + + std::shared_ptr scanner = + NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 11; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + + std::mutex transfer_mutex; + std::unique_lock transfer_lock(transfer_mutex); + std::shared_mutex scheduler_mutex; + std::unique_lock scheduler_lock(scheduler_mutex); + scanner_context->_scanner_scheduler = scan_scheduler.get(); + scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; + // _task_queue.size is 0. + // _num_schedule_scanners is 0. + std::shared_ptr cgroup_cpu_ctl = std::make_shared(1); + + // Has not submit any scan tasks. + // ScanScheduler is empty too. + // So margin shuold be equal to _min_scan_concurrency_of_scan_scheduler / parallel_tasks. + // We can make full utilization of the resource. + std::unique_ptr scheduler = + std::make_unique(cgroup_cpu_ctl); + EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(0)); + EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(0)); + scanner_context->_scanner_scheduler = scheduler.get(); + int32_t margin = scanner_context->_get_margin(transfer_lock, scheduler_lock); + + ASSERT_EQ(margin, scanner_context->_min_scan_concurrency_of_scan_scheduler / parallel_tasks); + + // ScanSchedule has 5 active threads and 10 tasks in queue. + // So remaing margin(3) is less than parallel_tasks(4). + scheduler = std::make_unique(cgroup_cpu_ctl); + EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(5)); + EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10)); + scanner_context->_scanner_scheduler = scheduler.get(); + scanner_context->_min_scan_concurrency_of_scan_scheduler = 18; + margin = scanner_context->_get_margin(transfer_lock, scheduler_lock); + ASSERT_EQ(margin, 1); + + // ScanSchedule has 10 active threads and 2 tasks in queue. + // Remaing margin(8) is greater than parallel_tasks(4). + // So margin should be equal to margin(8)/parallel_tasks(4) == 2. + scheduler = std::make_unique(cgroup_cpu_ctl); + EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(10)); + EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(2)); + scanner_context->_scanner_scheduler = scheduler.get(); + scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; + margin = scanner_context->_get_margin(transfer_lock, scheduler_lock); + ASSERT_EQ(margin, + (scanner_context->_min_scan_concurrency_of_scan_scheduler - 12) / parallel_tasks); + + // ScanSchedule is busy. + // Just submit _min_scan_concurrency tasks. + scheduler = std::make_unique(cgroup_cpu_ctl); + EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(50)); + EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10)); + scanner_context->_scanner_scheduler = scheduler.get(); + scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; + scanner_context->_num_scheduled_scanners = 0; + margin = scanner_context->_get_margin(transfer_lock, scheduler_lock); + ASSERT_EQ(margin, scanner_context->_min_scan_concurrency); + + // ScanSchedule is busy. + // _min_scan_concurrency is already satisfied. + scheduler = std::make_unique(cgroup_cpu_ctl); + EXPECT_CALL(*scheduler, get_active_threads()).WillOnce(testing::Return(50)); + EXPECT_CALL(*scheduler, get_queue_size()).WillOnce(testing::Return(10)); + scanner_context->_scanner_scheduler = scheduler.get(); + scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; + scanner_context->_num_scheduled_scanners = 20; + margin = scanner_context->_get_margin(transfer_lock, scheduler_lock); + ASSERT_EQ(margin, 0); +} + +TEST_F(ScannerContextTest, pull_next_scan_task) { + const int parallel_tasks = 4; + auto scan_operator = std::make_unique( + obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + + const int64_t limit = 100; + + NewOlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); // empty + + std::shared_ptr scanner = + NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 11; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + + std::mutex transfer_mutex; + std::unique_lock transfer_lock(transfer_mutex); + std::shared_mutex scheduler_mutex; + std::unique_lock scheduler_lock(scheduler_mutex); + scanner_context->_scanner_scheduler = scan_scheduler.get(); + scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; + std::shared_ptr cgroup_cpu_ctl = std::make_shared(1); + std::unique_ptr scheduler = + std::make_unique(cgroup_cpu_ctl); + + scanner_context->_scanner_scheduler = scan_scheduler.get(); + scanner_context->_max_scan_concurrency = 1; + std::shared_ptr pull_scan_task = + scanner_context->_pull_next_scan_task(nullptr, scanner_context->_max_scan_concurrency); + ASSERT_EQ(pull_scan_task, nullptr); + auto scan_task = std::make_shared(std::make_shared(scanner)); + pull_scan_task = scanner_context->_pull_next_scan_task(scan_task, + scanner_context->_max_scan_concurrency); + ASSERT_EQ(pull_scan_task, nullptr); + + scanner_context->_max_scan_concurrency = 2; + BlockUPtr cached_block = Block::create_unique(); + scan_task->cached_blocks.emplace_back(std::move(cached_block), 0); + EXPECT_ANY_THROW(scanner_context->_pull_next_scan_task( + scan_task, scanner_context->_max_scan_concurrency - 1)); + scan_task->cached_blocks.clear(); + scan_task->eos = true; + EXPECT_ANY_THROW(scanner_context->_pull_next_scan_task( + scan_task, scanner_context->_max_scan_concurrency - 1)); + + scan_task->cached_blocks.clear(); + scan_task->eos = false; + pull_scan_task = scanner_context->_pull_next_scan_task( + scan_task, scanner_context->_max_scan_concurrency - 1); + EXPECT_EQ(pull_scan_task.get(), scan_task.get()); + + scanner_context->_pending_scanners = std::stack>(); + pull_scan_task = scanner_context->_pull_next_scan_task( + nullptr, scanner_context->_max_scan_concurrency - 1); + EXPECT_EQ(pull_scan_task, nullptr); + + scanner_context->_pending_scanners.push(std::make_shared(scanner)); + pull_scan_task = scanner_context->_pull_next_scan_task( + nullptr, scanner_context->_max_scan_concurrency - 1); + EXPECT_NE(pull_scan_task, nullptr); +} + +TEST_F(ScannerContextTest, schedule_scan_task) { + const int parallel_tasks = 4; + auto scan_operator = std::make_unique( + obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + + const int64_t limit = 100; + + NewOlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); // empty + + std::shared_ptr scanner = + NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 15; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + + std::mutex transfer_mutex; + std::unique_lock transfer_lock(transfer_mutex); + std::shared_mutex scheduler_mutex; + std::unique_lock scheduler_lock(scheduler_mutex); + std::shared_ptr cgroup_cpu_ctl = std::make_shared(1); + + // Scan resource is enough. + std::unique_ptr scheduler = + std::make_unique(cgroup_cpu_ctl); + EXPECT_CALL(*scheduler, get_active_threads()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*scheduler, get_queue_size()).WillRepeatedly(testing::Return(0)); + + std::unique_ptr scanner_scheduler = + std::make_unique(); + EXPECT_CALL(*scanner_scheduler, submit(testing::_, testing::_)) + .WillRepeatedly(testing::Return(Status::OK())); + + scanner_context->_scanner_scheduler_global = scanner_scheduler.get(); + scanner_context->_scanner_scheduler = scheduler.get(); + scanner_context->_max_scan_concurrency = 1; + scanner_context->_max_scan_concurrency = 1; + scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; + + Status st = scanner_context->_schedule_scan_task(nullptr, transfer_lock, scheduler_lock); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(scanner_context->_num_scheduled_scanners, 1); + + scanner_context->_max_scan_concurrency = 10; + scanner_context->_max_scan_concurrency = 1; + scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; + st = scanner_context->_schedule_scan_task(nullptr, transfer_lock, scheduler_lock); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(scanner_context->_num_scheduled_scanners, scanner_context->_max_scan_concurrency); + + scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + + scanner_context->_scanner_scheduler_global = scanner_scheduler.get(); + scanner_context->_scanner_scheduler = scheduler.get(); + + scanner_context->_max_scan_concurrency = 100; + scanner_context->_min_scan_concurrency = 1; + scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; + int margin = scanner_context->_get_margin(transfer_lock, scheduler_lock); + ASSERT_EQ(margin, scanner_context->_min_scan_concurrency_of_scan_scheduler / parallel_tasks); + st = scanner_context->_schedule_scan_task(nullptr, transfer_lock, scheduler_lock); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(scanner_context->_num_scheduled_scanners, + scanner_context->_min_scan_concurrency_of_scan_scheduler / parallel_tasks); + + scanners = std::list>(); + for (int i = 0; i < 1; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + + scanner_context->_scanner_scheduler_global = scanner_scheduler.get(); + scanner_context->_scanner_scheduler = scheduler.get(); + + scanner_context->_max_scan_concurrency = 1; + scanner_context->_min_scan_concurrency = 1; + scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; + st = scanner_context->_schedule_scan_task(nullptr, transfer_lock, scheduler_lock); + auto scan_task = std::make_shared(std::make_shared(scanner)); + st = scanner_context->_schedule_scan_task(scan_task, transfer_lock, scheduler_lock); + // current scan task is added back. + ASSERT_EQ(scanner_context->_pending_scanners.size(), 1); + ASSERT_EQ(scanner_context->_num_scheduled_scanners, 1); + + scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + + scanner_context->_scanner_scheduler_global = scanner_scheduler.get(); + scanner_context->_scanner_scheduler = scheduler.get(); + + scanner_context->_max_scan_concurrency = 1; + scanner_context->_min_scan_concurrency = 1; + scanner_context->_min_scan_concurrency_of_scan_scheduler = 20; + st = scanner_context->_schedule_scan_task(nullptr, transfer_lock, scheduler_lock); + scan_task = std::make_shared(std::make_shared(scanner)); + scan_task->cached_blocks.emplace_back(Block::create_unique(), 0); + // Illigeal situation. + // If current scan task has cached block, it should not be called with this methods. + EXPECT_ANY_THROW(std::ignore = scanner_context->_schedule_scan_task(scan_task, transfer_lock, + scheduler_lock)); +} + +TEST_F(ScannerContextTest, scan_queue_mem_limit) { + state->_query_options.__set_scan_queue_mem_limit(100); + ASSERT_EQ(state->scan_queue_mem_limit(), 100); + + state->_query_options.__isset.scan_queue_mem_limit = false; + state->_query_options.__set_mem_limit(200); + ASSERT_EQ(state->scan_queue_mem_limit(), 200 / 20); + + const int parallel_tasks = 1; + auto scan_operator = std::make_unique( + obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + olap_scan_local_state->_max_scan_concurrency = max_concurrency_counter.get(); + olap_scan_local_state->_min_scan_concurrency = min_concurrency_counter.get(); + + olap_scan_local_state->_parent = scan_operator.get(); + + const int64_t limit = 100; + + NewOlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); // empty + + std::shared_ptr scanner = + NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 11; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + + std::unique_ptr scheduler = + std::make_unique(cgroup_cpu_ctl); + EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_)) + .WillRepeatedly(testing::Return(Status::OK())); + scanner_context->_scanner_scheduler = scheduler.get(); + // max_scan_concurrency that we calculate will be 10 / 1 = 10; + scanner_context->_min_scan_concurrency_of_scan_scheduler = 10; + + std::ignore = scanner_context->init(); + ASSERT_EQ(scanner_context->_max_bytes_in_queue, (1024 * 1024 * 10) * (1 / 300 + 1)); +} + +TEST_F(ScannerContextTest, get_free_block) { + const int parallel_tasks = 1; + auto scan_operator = std::make_unique( + obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + + const int64_t limit = 100; + + NewOlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); // empty + + std::shared_ptr scanner = + NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 11; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get(); + scanner_context->_newly_create_free_blocks_num->set(0L); + scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get(); + scanner_context->_scanner_memory_used_counter->set(0L); + BlockUPtr block = scanner_context->get_free_block(/*force=*/true); + ASSERT_NE(block, nullptr); + ASSERT_TRUE(scanner_context->_newly_create_free_blocks_num->value() == 1); + + scanner_context->_max_bytes_in_queue = 200; + // no free block + // force is false, _block_memory_usage < _max_bytes_in_queue + block = scanner_context->get_free_block(/*force=*/false); + ASSERT_NE(block, nullptr); + ASSERT_TRUE(scanner_context->_newly_create_free_blocks_num->value() == 2); + + std::unique_ptr return_block = std::make_unique(); + EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100)); + EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true)); + scanner_context->_free_blocks.enqueue(std::move(return_block)); + // get free block from queue + block = scanner_context->get_free_block(/*force=*/false); + ASSERT_NE(block, nullptr); + ASSERT_EQ(scanner_context->_block_memory_usage, -100); + ASSERT_EQ(scanner_context->_scanner_memory_used_counter->value(), -100); +} + +TEST_F(ScannerContextTest, return_free_block) { + const int parallel_tasks = 1; + auto scan_operator = std::make_unique( + obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + + const int64_t limit = 100; + + NewOlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); // empty + + std::shared_ptr scanner = + NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 11; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get(); + scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get(); + scanner_context->_max_bytes_in_queue = 200; + scanner_context->_block_memory_usage = 0; + + std::unique_ptr return_block = std::make_unique(); + EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100)); + EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true)); + EXPECT_CALL(*return_block, clear_column_data(testing::_)).WillRepeatedly(testing::Return()); + + scanner_context->return_free_block(std::move(return_block)); + ASSERT_EQ(scanner_context->_block_memory_usage, 100); + ASSERT_EQ(scanner_context->_scanner_memory_used_counter->value(), 100); + // free_block queue is stabilized, so size_approx is accurate. + ASSERT_EQ(scanner_context->_free_blocks.size_approx(), 1); +} + +TEST_F(ScannerContextTest, get_block_from_queue) { + const int parallel_tasks = 1; + auto scan_operator = std::make_unique( + obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {}); + + auto olap_scan_local_state = + pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get()); + + const int64_t limit = 100; + + NewOlapScanner::Params scanner_params; + scanner_params.state = state.get(); + scanner_params.profile = profile.get(); + scanner_params.limit = limit; + scanner_params.key_ranges = std::vector(); // empty + + std::shared_ptr scanner = + NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params)); + + std::list> scanners; + for (int i = 0; i < 11; ++i) { + scanners.push_back(std::make_shared(scanner)); + } + + std::shared_ptr scanner_context = ScannerContext::create_shared( + state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor, + scanners, limit, scan_dependency, parallel_tasks); + scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get(); + scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get(); + scanner_context->_max_bytes_in_queue = 200; + scanner_context->_block_memory_usage = 0; + + std::unique_ptr return_block = std::make_unique(); + EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100)); + EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true)); + EXPECT_CALL(*return_block, clear_column_data(testing::_)).WillRepeatedly(testing::Return()); + + std::unique_ptr mock_runtime_state = std::make_unique(); + EXPECT_CALL(*mock_runtime_state, is_cancelled()).WillOnce(testing::Return(true)); + EXPECT_CALL(*mock_runtime_state, cancel_reason()) + .WillOnce(testing::Return(Status::Cancelled("TestCancelMsg"))); + bool eos = false; + Status st = scanner_context->get_block_from_queue(mock_runtime_state.get(), return_block.get(), + &eos, 0); + EXPECT_TRUE(!st.ok()); + EXPECT_EQ(st.msg(), "TestCancelMsg"); + + EXPECT_CALL(*mock_runtime_state, is_cancelled()).WillRepeatedly(testing::Return(false)); + + scanner_context->_process_status = Status::InternalError("TestCancel"); + st = scanner_context->get_block_from_queue(mock_runtime_state.get(), return_block.get(), &eos, + 0); + EXPECT_TRUE(!st.ok()); + EXPECT_TRUE(st.msg() == "TestCancel"); + + scanner_context->_process_status = Status::OK(); + scanner_context->_is_finished = false; + scanner_context->_should_stop = false; + auto scan_task = std::make_shared(std::make_shared(scanner)); + scan_task->set_eos(true); + scanner_context->_tasks_queue.push_back(scan_task); + std::unique_ptr scheduler = + std::make_unique(cgroup_cpu_ctl); + EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_)) + .WillOnce(testing::Return(Status::OK())); + scanner_context->_scanner_scheduler = scheduler.get(); + scanner_context->_num_finished_scanners = 0; + EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(false)); + st = scanner_context->get_block_from_queue(mock_runtime_state.get(), return_block.get(), &eos, + 0); + EXPECT_TRUE(st.ok()); + EXPECT_EQ(scanner_context->_num_finished_scanners, 1); +} + +} // namespace doris::vectorized diff --git a/be/test/vec/exec/scan_operator_test.cpp b/be/test/vec/exec/scan_operator_test.cpp new file mode 100644 index 00000000000000..d5f36e31b3c20e --- /dev/null +++ b/be/test/vec/exec/scan_operator_test.cpp @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include "common/object_pool.h" +#include "pipeline/exec/olap_scan_operator.h" +#include "runtime/descriptors.h" + +namespace doris::vectorized { +class ScanOperatorTest : public testing::Test { +public: + void SetUp() override { + obj_pool = std::make_unique(); + // This ScanNode has two tuples. + // First one is input tuple, second one is output tuple. + tbl_desc.tableType = TTableType::OLAP_TABLE; + + tuple_desc.id = 0; + tuple_descs.push_back(tuple_desc); + tuple_desc.id = 1; + tuple_descs.push_back(tuple_desc); + thrift_tbl.tableDescriptors.push_back(tbl_desc); + thrift_tbl.tupleDescriptors = tuple_descs; + thrift_tbl.slotDescriptors = slot_descs; + scalar_type.__set_type(TPrimitiveType::STRING); + std::ignore = DescriptorTbl::create(obj_pool.get(), thrift_tbl, &descs); + } + +private: + std::unique_ptr obj_pool; + TTupleDescriptor tuple_desc; + std::vector tuple_descs; + DescriptorTbl* descs = nullptr; + TTableDescriptor tbl_desc; + TScalarType scalar_type; + TDescriptorTable thrift_tbl; + std::vector slot_descs; + std::unique_ptr state = std::make_unique(); +}; + +TEST_F(ScanOperatorTest, adaptive_pipeline_task_serial_read_on_limit) { + const int parallel_pipeline_task_num = 24; + TPlanNode tnode; + tnode.row_tuples.push_back(TTupleId(0)); + tnode.row_tuples.push_back(TTupleId(1)); + std::vector null_map {false, false}; + tnode.nullable_tuples = null_map; + + // Scan with conjuncts + TExpr conjunct; + std::vector conjuncts {conjunct}; + tnode.__set_conjuncts(conjuncts); + auto scan_operator = std::make_unique( + obj_pool.get(), tnode, 0, *descs, parallel_pipeline_task_num, TQueryCacheParam {}); + + TQueryOptions query_options; + // enable_adaptive_pipeline_task_serial_read_on_limit is true + query_options.__set_enable_adaptive_pipeline_task_serial_read_on_limit(true); + state->set_query_options(query_options); + + std::ignore = scan_operator->init(tnode, state.get()); + // With conjuncts, should_run_serial is false + ASSERT_EQ(scan_operator->_should_run_serial, false); + + // Scan without conjuncts + conjuncts.clear(); + tnode.__set_conjuncts(conjuncts); + // limit 10 + tnode.__set_limit(10); + scan_operator = std::make_unique( + obj_pool.get(), tnode, 0, *descs, parallel_pipeline_task_num, TQueryCacheParam {}); + + // enable_adaptive_pipeline_task_serial_read_on_limit is true + query_options.__set_enable_adaptive_pipeline_task_serial_read_on_limit(true); + query_options.__set_adaptive_pipeline_task_serial_read_on_limit(10); + state->set_query_options(query_options); + + std::ignore = scan_operator->init(tnode, state.get()); + // Without conjuncts, limit 10 <= adaptive_pipeline_task_serial_read_on_limit 10 + ASSERT_EQ(scan_operator->_should_run_serial, true); + + query_options.__set_adaptive_pipeline_task_serial_read_on_limit(9); + state->set_query_options(query_options); + std::ignore = scan_operator->init(tnode, state.get()); + // Without conjuncts, limit 10 > adaptive_pipeline_task_serial_read_on_limit 9 + ASSERT_EQ(scan_operator->_should_run_serial, true); + + query_options.__set_enable_adaptive_pipeline_task_serial_read_on_limit(false); + query_options.__set_adaptive_pipeline_task_serial_read_on_limit(900); + state->set_query_options(query_options); + scan_operator->_should_run_serial = false; + std::ignore = scan_operator->init(tnode, state.get()); + // Without conjuncts, enable_adaptive_pipeline_task_serial_read_on_limit is false + ASSERT_EQ(scan_operator->_should_run_serial, false); +} +} // namespace doris::vectorized \ No newline at end of file