Skip to content

Commit

Permalink
NED
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiqiang-hhhh committed Feb 17, 2025
1 parent e126c05 commit 53f8f2a
Show file tree
Hide file tree
Showing 4 changed files with 365 additions and 7 deletions.
4 changes: 2 additions & 2 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ class RuntimeState {
// _unreported_error_idx to _errors_log.size()
void get_unreported_errors(std::vector<std::string>* new_errors);

[[nodiscard]] bool is_cancelled() const;
Status cancel_reason() const;
[[nodiscard]] MOCK_FUNCTION bool is_cancelled() const;
MOCK_FUNCTION Status cancel_reason() const;
void cancel(const Status& reason) {
if (_exec_status.update(reason)) {
// Create a error status, so that we could print error stack, and
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <utility>
#include <vector>

#include "common/be_mock_util.h"
#include "common/exception.h"
#include "common/factory_creator.h"
#include "common/status.h"
Expand Down Expand Up @@ -91,7 +92,7 @@ class Block {
Block(const std::vector<SlotDescriptor>& slots, size_t block_size,
bool ignore_trivial_slot = false);

~Block() = default;
MOCK_FUNCTION ~Block() = default;
Block(const Block& block) = default;
Block& operator=(const Block& p) = default;
Block(Block&& block) = default;
Expand Down Expand Up @@ -209,7 +210,7 @@ class Block {
std::string columns_bytes() const;

/// Approximate number of allocated bytes in memory - for profiling and limits.
size_t allocated_bytes() const;
MOCK_FUNCTION size_t allocated_bytes() const;

/** Get a list of column names separated by commas. */
std::string dump_names() const;
Expand Down Expand Up @@ -253,7 +254,7 @@ class Block {
// Else clear column [0, column_size) delete column [column_size, data.size)
void clear_column_data(int64_t column_size = -1) noexcept;

bool mem_reuse() { return !data.empty(); }
MOCK_FUNCTION bool mem_reuse() { return !data.empty(); }

bool is_empty_column() { return data.empty(); }

Expand Down
248 changes: 246 additions & 2 deletions be/test/scan/scanner_context_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class ScannerContextTest : public testing::Test {
// First one is input tuple, second one is output tuple.
tnode.row_tuples.push_back(TTupleId(0));
tnode.row_tuples.push_back(TTupleId(1));
std::vector<bool> null_map {false, false};
tnode.nullable_tuples = null_map;
tbl_desc.tableType = TTableType::OLAP_TABLE;

tuple_desc.id = 0;
Expand All @@ -66,8 +68,6 @@ class ScannerContextTest : public testing::Test {
slot_desc.id = 1;
slot_desc.parent = 1;
slot_descs.push_back(slot_desc);
std::vector<bool> null_map {false, false};
tnode.nullable_tuples = null_map;
thrift_tbl.tableDescriptors.push_back(tbl_desc);
thrift_tbl.tupleDescriptors = tuple_descs;
thrift_tbl.slotDescriptors = slot_descs;
Expand All @@ -78,6 +78,19 @@ class ScannerContextTest : public testing::Test {
}

private:
class MockBlock : public Block {
MockBlock() = default;
MOCK_CONST_METHOD0(allocated_bytes, size_t());
MOCK_METHOD0(mem_reuse, bool());
MOCK_METHOD1(clear_column_data, void(int64_t));
};

class MockRuntimeState : public RuntimeState {
MockRuntimeState() = default;
MOCK_CONST_METHOD0(is_cancelled, bool());
MOCK_CONST_METHOD0(cancel_reason, Status());
};

std::unique_ptr<ObjectPool> obj_pool;
TPlanNode tnode;
TTableDescriptor tbl_desc;
Expand All @@ -95,6 +108,12 @@ class ScannerContextTest : public testing::Test {
std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
std::unique_ptr<RuntimeProfile::Counter> min_concurrency_counter =
std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);

std::unique_ptr<RuntimeProfile::Counter> newly_create_free_blocks_num =
std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);
std::unique_ptr<RuntimeProfile::Counter> scanner_memory_used_counter =
std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT, 1, 3);

TupleDescriptor* output_tuple_desc = nullptr;
RowDescriptor* output_row_descriptor = nullptr;
std::shared_ptr<pipeline::Dependency> scan_dependency =
Expand Down Expand Up @@ -139,6 +158,7 @@ TEST_F(ScannerContextTest, test_init) {

olap_scan_local_state->_parent = scan_operator.get();

// User specified num_scanner_threads is less than _max_scan_concurrency that we calculated
TQueryOptions query_options;
query_options.__set_num_scanner_threads(2);
query_options.__set_max_column_reader_num(0);
Expand All @@ -149,9 +169,11 @@ TEST_F(ScannerContextTest, test_init) {
.WillRepeatedly(testing::Return(Status::OK()));
scanner_context->_scanner_scheduler = scheduler.get();

// max_scan_concurrency that we calculate will be 10 / 1 = 10;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
Status st = scanner_context->init();
ASSERT_TRUE(st.ok());
// actual max_scan_concurrency will be 2 since user specified num_scanner_threads is 2.
ASSERT_EQ(scanner_context->_max_scan_concurrency, 2);

query_options.__set_num_scanner_threads(0);
Expand Down Expand Up @@ -600,4 +622,226 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
scheduler_lock));
}

TEST_F(ScannerContextTest, scan_queue_mem_limit) {
state->_query_options.__set_scan_queue_mem_limit(100);
ASSERT_EQ(state->scan_queue_mem_limit(), 100);

state->_query_options.__isset.scan_queue_mem_limit = false;
state->_query_options.__set_mem_limit(200);
ASSERT_EQ(state->scan_queue_mem_limit(), 200 / 20);

const int parallel_tasks = 1;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});

auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());
olap_scan_local_state->_max_scan_concurrency = max_concurrency_counter.get();
olap_scan_local_state->_min_scan_concurrency = min_concurrency_counter.get();

olap_scan_local_state->_parent = scan_operator.get();

const int64_t limit = 100;

NewOlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty

std::shared_ptr<VScanner> scanner =
NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));

std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}

std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);

std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_))
.WillRepeatedly(testing::Return(Status::OK()));
scanner_context->_scanner_scheduler = scheduler.get();
// max_scan_concurrency that we calculate will be 10 / 1 = 10;
scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;

std::ignore = scanner_context->init();
ASSERT_EQ(scanner_context->_max_bytes_in_queue, (1024 * 1024 * 10) * (1 / 300 + 1));
}

TEST_F(ScannerContextTest, get_free_block) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});

auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());

const int64_t limit = 100;

NewOlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty

std::shared_ptr<VScanner> scanner =
NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));

std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}

std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get();
scanner_context->_newly_create_free_blocks_num->set(0L);
scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get();
scanner_context->_scanner_memory_used_counter->set(0L);
BlockUPtr block = scanner_context->get_free_block(/*force=*/true);
ASSERT_NE(block, nullptr);
ASSERT_TRUE(scanner_context->_newly_create_free_blocks_num->value() == 1);

scanner_context->_max_bytes_in_queue = 200;
// no free block
// force is false, _block_memory_usage < _max_bytes_in_queue
block = scanner_context->get_free_block(/*force=*/false);
ASSERT_NE(block, nullptr);
ASSERT_TRUE(scanner_context->_newly_create_free_blocks_num->value() == 2);

std::unique_ptr<MockBlock> return_block = std::make_unique<MockBlock>();
EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100));
EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true));
scanner_context->_free_blocks.enqueue(std::move(return_block));
// get free block from queue
block = scanner_context->get_free_block(/*force=*/false);
ASSERT_NE(block, nullptr);
ASSERT_EQ(scanner_context->_block_memory_usage, -100);
ASSERT_EQ(scanner_context->_scanner_memory_used_counter->value(), -100);
}

TEST_F(ScannerContextTest, return_free_block) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});

auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());

const int64_t limit = 100;

NewOlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty

std::shared_ptr<VScanner> scanner =
NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));

std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}

std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get();
scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get();
scanner_context->_max_bytes_in_queue = 200;
scanner_context->_block_memory_usage = 0;

std::unique_ptr<MockBlock> return_block = std::make_unique<MockBlock>();
EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100));
EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true));
EXPECT_CALL(*return_block, clear_column_data(testing::_)).WillRepeatedly(testing::Return());

scanner_context->return_free_block(std::move(return_block));
ASSERT_EQ(scanner_context->_block_memory_usage, 100);
ASSERT_EQ(scanner_context->_scanner_memory_used_counter->value(), 100);
// free_block queue is stabilized, so size_approx is accurate.
ASSERT_EQ(scanner_context->_free_blocks.size_approx(), 1);
}

TEST_F(ScannerContextTest, get_block_from_queue) {
const int parallel_tasks = 1;
auto scan_operator = std::make_unique<pipeline::OlapScanOperatorX>(
obj_pool.get(), tnode, 0, *descs, parallel_tasks, TQueryCacheParam {});

auto olap_scan_local_state =
pipeline::OlapScanLocalState::create_unique(state.get(), scan_operator.get());

const int64_t limit = 100;

NewOlapScanner::Params scanner_params;
scanner_params.state = state.get();
scanner_params.profile = profile.get();
scanner_params.limit = limit;
scanner_params.key_ranges = std::vector<OlapScanRange*>(); // empty

std::shared_ptr<VScanner> scanner =
NewOlapScanner::create_shared(olap_scan_local_state.get(), std::move(scanner_params));

std::list<std::shared_ptr<ScannerDelegate>> scanners;
for (int i = 0; i < 11; ++i) {
scanners.push_back(std::make_shared<ScannerDelegate>(scanner));
}

std::shared_ptr<ScannerContext> scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc, output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
scanner_context->_newly_create_free_blocks_num = newly_create_free_blocks_num.get();
scanner_context->_scanner_memory_used_counter = scanner_memory_used_counter.get();
scanner_context->_max_bytes_in_queue = 200;
scanner_context->_block_memory_usage = 0;

std::unique_ptr<MockBlock> return_block = std::make_unique<MockBlock>();
EXPECT_CALL(*return_block, allocated_bytes()).WillRepeatedly(testing::Return(100));
EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(true));
EXPECT_CALL(*return_block, clear_column_data(testing::_)).WillRepeatedly(testing::Return());

std::unique_ptr<MockRuntimeState> mock_runtime_state = std::make_unique<MockRuntimeState>();
EXPECT_CALL(*mock_runtime_state, is_cancelled()).WillOnce(testing::Return(true));
EXPECT_CALL(*mock_runtime_state, cancel_reason())
.WillOnce(testing::Return(Status::Cancelled("TestCancelMsg")));
bool eos = false;
Status st = scanner_context->get_block_from_queue(mock_runtime_state.get(), return_block.get(),
&eos, 0);
EXPECT_TRUE(!st.ok());
EXPECT_EQ(st.msg(), "TestCancelMsg");

EXPECT_CALL(*mock_runtime_state, is_cancelled()).WillRepeatedly(testing::Return(false));

scanner_context->_process_status = Status::InternalError("TestCancel");
st = scanner_context->get_block_from_queue(mock_runtime_state.get(), return_block.get(), &eos,
0);
EXPECT_TRUE(!st.ok());
EXPECT_TRUE(st.msg() == "TestCancel");

scanner_context->_process_status = Status::OK();
scanner_context->_is_finished = false;
scanner_context->_should_stop = false;
auto scan_task = std::make_shared<ScanTask>(std::make_shared<ScannerDelegate>(scanner));
scan_task->set_eos(true);
scanner_context->_tasks_queue.push_back(scan_task);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
EXPECT_CALL(*scheduler, schedule_scan_task(testing::_, testing::_, testing::_))
.WillOnce(testing::Return(Status::OK()));
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_num_finished_scanners = 0;
EXPECT_CALL(*return_block, mem_reuse()).WillRepeatedly(testing::Return(false));
st = scanner_context->get_block_from_queue(mock_runtime_state.get(), return_block.get(), &eos,
0);
EXPECT_TRUE(st.ok());
EXPECT_EQ(scanner_context->_num_finished_scanners, 1);
}

} // namespace doris::vectorized
Loading

0 comments on commit 53f8f2a

Please sign in to comment.