diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 33ce0ad2d7f2d6..3fc3d52f9bcdad 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -75,7 +75,7 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para return res; } _reader_context.is_vertical_compaction = true; - for (auto& rs_split : read_params.rs_splits) { + for (const auto& rs_split : read_params.rs_splits) { // segment iterator will be inited here // In vertical compaction, every group will load segment so we should cache // segment to avoid tot many s3 head request @@ -191,7 +191,7 @@ void VerticalBlockReader::_init_agg_state(const ReaderParams& read_params) { DCHECK(function != nullptr); _agg_functions.push_back(function); // create aggregate data - AggregateDataPtr place = new char[function->size_of_data()]; + auto* place = new char[function->size_of_data()]; SAFE_CREATE(function->create(place), { _agg_functions.pop_back(); delete[] place; @@ -306,11 +306,11 @@ void VerticalBlockReader::_update_agg_value(MutableColumns& columns, int begin, for (size_t idx = 0; idx < _return_columns.size(); ++idx) { AggregateFunctionPtr function = _agg_functions[idx]; AggregateDataPtr place = _agg_places[idx]; - auto column_ptr = _stored_data_columns[idx].get(); + auto* column_ptr = _stored_data_columns[idx].get(); if (begin <= end) { function->add_batch_range(begin, end, place, const_cast(&column_ptr), - nullptr, _stored_has_null_tag[idx]); + &_arena, _stored_has_null_tag[idx]); } if (is_close) { @@ -319,6 +319,9 @@ void VerticalBlockReader::_update_agg_value(MutableColumns& columns, int begin, function->reset(place); } } + if (is_close) { + _arena.clear(); + } } size_t VerticalBlockReader::_copy_agg_data() { @@ -340,7 +343,7 @@ size_t VerticalBlockReader::_copy_agg_data() { } else { for (auto& it : _temp_ref_map) { if (!it.second.empty()) { - auto& src_column = *it.first->get_by_position(idx).column; + const auto& src_column = *it.first->get_by_position(idx).column; for (auto& pos : it.second) { dst_column->replace_column_data(src_column, pos.first, pos.second); } diff --git a/be/src/vec/olap/vertical_block_reader.h b/be/src/vec/olap/vertical_block_reader.h index d3c500cbd8ee93..2c65fd616b1957 100644 --- a/be/src/vec/olap/vertical_block_reader.h +++ b/be/src/vec/olap/vertical_block_reader.h @@ -107,6 +107,7 @@ class VerticalBlockReader final : public TabletReader { // for agg mode std::vector _agg_functions; std::vector _agg_places; + Arena _arena; std::vector _normal_columns_idx; std::vector _agg_columns_idx; diff --git a/regression-test/data/compaction/test_vertical_compaction_agg_state.out b/regression-test/data/compaction/test_vertical_compaction_agg_state.out new file mode 100644 index 00000000000000..62a7a629187aa7 --- /dev/null +++ b/regression-test/data/compaction/test_vertical_compaction_agg_state.out @@ -0,0 +1,12 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- +a ["aa", "a"] + +-- !select_default -- +a ["aaa", "aa", "a"] +b ["b"] + +-- !select_default -- +a ["aaa", "aa", "a"] +b ["b"] + diff --git a/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy b/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy new file mode 100644 index 00000000000000..9e0f99dd0c35b5 --- /dev/null +++ b/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_vertical_compaction_agg_state") { + def tableName = "vertical_compaction_agg_state_regression_test" + + try { + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + boolean disableAutoCompaction = true + for (Object ele in (List) configList) { + assert ele instanceof List + if (((List) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List) ele)[2]) + } + } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql "set enable_agg_state=true" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + user_id VARCHAR, + agg_user_id agg_state collect_set(string) + )ENGINE=OLAP + AGGREGATE KEY(`user_id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 + PROPERTIES ( "replication_num" = "1" ); + """ + + sql """ INSERT INTO ${tableName} VALUES + ('a',collect_set_state('a')) + """ + + sql """ INSERT INTO ${tableName} VALUES + ('a',collect_set_state('aa')) + """ + + qt_select_default """ SELECT user_id,collect_set_merge(agg_user_id) FROM ${tableName} t group by user_id ORDER BY user_id;""" + + sql """ INSERT INTO ${tableName} VALUES + ('b',collect_set_state('b')) + """ + + sql """ INSERT INTO ${tableName} VALUES + ('a',collect_set_state('aaa')) + """ + + qt_select_default """ SELECT user_id,collect_set_merge(agg_user_id) FROM ${tableName} t group by user_id ORDER BY user_id;""" + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + // trigger compactions for all tablets in ${tableName} + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + if (compactJson.status.toLowerCase() == "fail") { + assertEquals(disableAutoCompaction, false) + logger.info("Compaction was done automatically!") + } + if (disableAutoCompaction) { + assertEquals("success", compactJson.status.toLowerCase()) + } + } + + // wait for all compactions done + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + def replicaNum = get_table_replica_num(tableName) + logger.info("get table replica num: " + replicaNum) + int rowCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List) tabletJson.rowsets) { + rowCount += Integer.parseInt(rowset.split(" ")[1]) + } + } + assert (rowCount < 8 * replicaNum) + qt_select_default """ SELECT user_id,collect_set_merge(agg_user_id) FROM ${tableName} t group by user_id ORDER BY user_id;""" + } finally { + try_sql("DROP TABLE IF EXISTS ${tableName}") + } +}