Skip to content

Commit

Permalink
[Bug](compaction) pass arena to function->add_batch_range (apache#30709)
Browse files Browse the repository at this point in the history
  • Loading branch information
BiteTheDDDDt authored Feb 4, 2024
1 parent 4ebe18c commit a798f15
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 5 deletions.
13 changes: 8 additions & 5 deletions be/src/vec/olap/vertical_block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para
return res;
}
_reader_context.is_vertical_compaction = true;
for (auto& rs_split : read_params.rs_splits) {
for (const auto& rs_split : read_params.rs_splits) {
// segment iterator will be inited here
// In vertical compaction, every group will load segment so we should cache
// segment to avoid tot many s3 head request
Expand Down Expand Up @@ -191,7 +191,7 @@ void VerticalBlockReader::_init_agg_state(const ReaderParams& read_params) {
DCHECK(function != nullptr);
_agg_functions.push_back(function);
// create aggregate data
AggregateDataPtr place = new char[function->size_of_data()];
auto* place = new char[function->size_of_data()];
SAFE_CREATE(function->create(place), {
_agg_functions.pop_back();
delete[] place;
Expand Down Expand Up @@ -306,11 +306,11 @@ void VerticalBlockReader::_update_agg_value(MutableColumns& columns, int begin,
for (size_t idx = 0; idx < _return_columns.size(); ++idx) {
AggregateFunctionPtr function = _agg_functions[idx];
AggregateDataPtr place = _agg_places[idx];
auto column_ptr = _stored_data_columns[idx].get();
auto* column_ptr = _stored_data_columns[idx].get();

if (begin <= end) {
function->add_batch_range(begin, end, place, const_cast<const IColumn**>(&column_ptr),
nullptr, _stored_has_null_tag[idx]);
&_arena, _stored_has_null_tag[idx]);
}

if (is_close) {
Expand All @@ -319,6 +319,9 @@ void VerticalBlockReader::_update_agg_value(MutableColumns& columns, int begin,
function->reset(place);
}
}
if (is_close) {
_arena.clear();
}
}

size_t VerticalBlockReader::_copy_agg_data() {
Expand All @@ -340,7 +343,7 @@ size_t VerticalBlockReader::_copy_agg_data() {
} else {
for (auto& it : _temp_ref_map) {
if (!it.second.empty()) {
auto& src_column = *it.first->get_by_position(idx).column;
const auto& src_column = *it.first->get_by_position(idx).column;
for (auto& pos : it.second) {
dst_column->replace_column_data(src_column, pos.first, pos.second);
}
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/olap/vertical_block_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class VerticalBlockReader final : public TabletReader {
// for agg mode
std::vector<AggregateFunctionPtr> _agg_functions;
std::vector<AggregateDataPtr> _agg_places;
Arena _arena;

std::vector<int> _normal_columns_idx;
std::vector<int> _agg_columns_idx;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --
a ["aa", "a"]

-- !select_default --
a ["aaa", "aa", "a"]
b ["b"]

-- !select_default --
a ["aaa", "aa", "a"]
b ["b"]

Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_vertical_compaction_agg_state") {
def tableName = "vertical_compaction_agg_state_regression_test"

try {
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

boolean disableAutoCompaction = true
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql "set enable_agg_state=true"
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
user_id VARCHAR,
agg_user_id agg_state collect_set(string)
)ENGINE=OLAP
AGGREGATE KEY(`user_id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES ( "replication_num" = "1" );
"""

sql """ INSERT INTO ${tableName} VALUES
('a',collect_set_state('a'))
"""

sql """ INSERT INTO ${tableName} VALUES
('a',collect_set_state('aa'))
"""

qt_select_default """ SELECT user_id,collect_set_merge(agg_user_id) FROM ${tableName} t group by user_id ORDER BY user_id;"""

sql """ INSERT INTO ${tableName} VALUES
('b',collect_set_state('b'))
"""

sql """ INSERT INTO ${tableName} VALUES
('a',collect_set_state('aaa'))
"""

qt_select_default """ SELECT user_id,collect_set_merge(agg_user_id) FROM ${tableName} t group by user_id ORDER BY user_id;"""

def tablets = sql_return_maparray """ show tablets from ${tableName}; """

// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}

// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}

def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
int rowCount = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
for (String rowset in (List<String>) tabletJson.rowsets) {
rowCount += Integer.parseInt(rowset.split(" ")[1])
}
}
assert (rowCount < 8 * replicaNum)
qt_select_default """ SELECT user_id,collect_set_merge(agg_user_id) FROM ${tableName} t group by user_id ORDER BY user_id;"""
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
}
}

0 comments on commit a798f15

Please sign in to comment.