Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](replace_column_data)fix replace_column_data semantic #44309

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -700,12 +700,18 @@ class IColumn : public COW<IColumn> {
*/
String dump_structure() const;

// only used in agg value replace
// ColumnString should replace according to 0,1,2... ,size,0,1,2...
virtual void replace_column_data(const IColumn&, size_t row, size_t self_row = 0) = 0;
// only used in agg value replace for column which is not variable length
virtual void replace_column_data(const IColumn&, size_t row, size_t self_row = 0) {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"Method replace_column_data is not supported for " + get_name());
}

// only used in ColumnNullable replace_column_data
virtual void replace_column_data_default(size_t self_row = 0) = 0;
virtual void replace_column_data_default(size_t self_row = 0) {
throw doris::Exception(
ErrorCode::NOT_IMPLEMENTED_ERROR,
"Method replace_column_data_default is not supported for " + get_name());
}

virtual void replace_column_null_data(const uint8_t* __restrict null_map) {}

Expand Down
20 changes: 0 additions & 20 deletions be/src/vec/columns/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,26 +231,6 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
void insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) override;

void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override {
DCHECK(size() > self_row);
const auto& r = assert_cast<const ColumnArray&>(rhs);
const size_t nested_row_size = r.size_at(row);
const size_t r_nested_start_off = r.offset_at(row);

// we should clear data because we call resize() before replace_column_data()
if (self_row == 0) {
data->clear();
}
get_offsets()[self_row] = get_offsets()[self_row - 1] + nested_row_size;
// we make sure call replace_column_data() by order so, here we just insert data for nested
data->insert_range_from(r.get_data(), r_nested_start_off, nested_row_size);
}

void replace_column_data_default(size_t self_row = 0) override {
DCHECK(size() > self_row);
get_offsets()[self_row] = get_offsets()[self_row - 1];
}

void clear() override {
data->clear();
offsets->clear();
Expand Down
22 changes: 0 additions & 22 deletions be/src/vec/columns/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,28 +145,6 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
return append_data_by_selector_impl<ColumnMap>(res, selector);
}

void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override {
DCHECK(size() > self_row);
const auto& r = assert_cast<const ColumnMap&>(rhs);
const size_t nested_row_size = r.size_at(row);
const size_t r_key_nested_start_off = r.offset_at(row);
const size_t r_val_nested_start_off = r.offset_at(row);

if (self_row == 0) {
keys_column->clear();
values_column->clear();
}
get_offsets()[self_row] = get_offsets()[self_row - 1] + nested_row_size;
// here we use batch size to avoid many virtual call in nested column
keys_column->insert_range_from(r.get_keys(), r_key_nested_start_off, nested_row_size);
values_column->insert_range_from(r.get_values(), r_val_nested_start_off, nested_row_size);
}

void replace_column_data_default(size_t self_row = 0) override {
DCHECK(size() > self_row);
get_offsets()[self_row] = get_offsets()[self_row - 1];
}

ColumnArray::Offsets64& ALWAYS_INLINE get_offsets() {
return assert_cast<COffsets&>(*offsets_column).get_data();
}
Expand Down
30 changes: 0 additions & 30 deletions be/src/vec/columns/column_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -552,36 +552,6 @@ class ColumnString final : public COWHelper<IColumn, ColumnString> {
offsets.clear();
}

void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override {
DCHECK(size() > self_row);
const auto& r = assert_cast<const ColumnString&>(rhs);
auto data = r.get_data_at(row);

if (!self_row) {
// self_row == 0 means we first call replace_column_data() with batch column data. so we
// should clean last batch column data.
chars.clear();
offsets[self_row] = data.size;
} else {
offsets[self_row] = offsets[self_row - 1] + data.size;
check_chars_length(offsets[self_row], self_row);
}

chars.insert(data.data, data.data + data.size);
}

// should replace according to 0,1,2... ,size,0,1,2...
void replace_column_data_default(size_t self_row = 0) override {
DCHECK(size() > self_row);

if (!self_row) {
chars.clear();
offsets[self_row] = 0;
} else {
offsets[self_row] = offsets[self_row - 1];
}
}

void compare_internal(size_t rhs_row_id, const IColumn& rhs, int nan_direction_hint,
int direction, std::vector<uint8>& cmp_res,
uint8* __restrict filter) const override;
Expand Down
18 changes: 2 additions & 16 deletions be/src/vec/columns/column_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
static MutablePtr create(Arg&& arg) {
return Base::create(std::forward<Arg>(arg));
}

std::string get_name() const override;
bool is_column_struct() const override { return true; }
const char* get_family_name() const override { return "Struct"; }
Expand All @@ -89,6 +88,8 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
MutableColumnPtr clone_resized(size_t size) const override;
size_t size() const override { return columns.at(0)->size(); }

[[nodiscard]] bool is_variable_length() const override { return true; }

bool is_exclusive() const override {
for (const auto& col : columns) {
if (!col->is_exclusive()) {
Expand Down Expand Up @@ -140,21 +141,6 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
void append_data_by_selector(MutableColumnPtr& res, const Selector& selector) const override {
return append_data_by_selector_impl<ColumnStruct>(res, selector);
}
void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override {
DCHECK(size() > self_row);
const auto& r = assert_cast<const ColumnStruct&>(rhs);

for (size_t idx = 0; idx < columns.size(); ++idx) {
columns[idx]->replace_column_data(r.get_column(idx), row, self_row);
}
}

void replace_column_data_default(size_t self_row = 0) override {
DCHECK(size() > self_row);
for (size_t idx = 0; idx < columns.size(); ++idx) {
columns[idx]->replace_column_data_default(self_row);
}
}

void insert_range_from(const IColumn& src, size_t start, size_t length) override;
ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --
1 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 ["a", "b", "c"] {"b":1, "c":2} {"f1": "b"}
2 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 \N 10 1 ["amory", "doris", "commiter"] {"b":1} {"f1": "c"}
2 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 ["amory", "doris", "2024-04-29"] {"c":2} {"f1": "d"}
3 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 \N 10 1 ["e", "f", "g", "d"] {} {"f1": "e"}
3 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 \N \N {"f1": "f"}
4 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 [null, "sdf"] \N \N

-- !select_default2 --
1 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 ["a", "b", "c"] {"b":1, "c":2} {"f1": "b"}
2 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 \N 10 1 ["amory", "doris", "commiter"] {"b":1} {"f1": "c"}
2 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 ["amory", "doris", "2024-04-29"] {"c":2} {"f1": "d"}
3 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 \N 10 1 ["e", "f", "g", "d"] {} {"f1": "e"}
3 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 \N \N {"f1": "f"}
4 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 [null, "sdf"] \N \N

Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_agg_variable_compaction") {
def tableName = "test_compaction_agg_keys_with_variable_col"

try {
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
backend_id = backendId_to_backendIP.keySet()[0]

def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

boolean disableAutoCompaction = true
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datev2` DATEV2 NOT NULL COMMENT "数据灌入日期时间",
`datetimev2_1` DATETIMEV2(3) NOT NULL COMMENT "数据灌入日期时间",
`datetimev2_2` DATETIMEV2(6) NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`array_col` ARRAY<STRING> REPLACE NULL COMMENT "array column",
`map_col` MAP<STRING, INT> REPLACE NULL COMMENT "map column",
`struct_col` STRUCT<f1: STRING> REPLACE NULL COMMENT "struct column")
AGGREGATE KEY(`user_id`, `date`, `datev2`, `datetimev2_1`, `datetimev2_2`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`)
PROPERTIES ( "replication_num" = "1", "disable_auto_compaction" = "true");
"""

sql """ INSERT INTO ${tableName} VALUES
(1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['a', 'b'], map('a', 1), named_struct('f1', 'a'));
"""

sql """ INSERT INTO ${tableName} VALUES
(1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['a', 'b', 'c'], map('b', 1, 'c', 2), named_struct('f1', 'b'));
"""

sql """ INSERT INTO ${tableName} VALUES
(2, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', NULL, 10, 1, ['amory', 'doris', 'commiter'], map('b', 1), named_struct('f1', 'c'));
"""

sql """ INSERT INTO ${tableName} VALUES
(2, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['amory', 'doris', '2024-04-29'], map('c', 2), named_struct('f1', 'd'));
"""

sql """ INSERT INTO ${tableName} VALUES
(3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', NULL, 10, 1, ['e', 'f', 'g', 'd'], map(), named_struct('f1', 'e'));
"""

sql """ INSERT INTO ${tableName} VALUES
(3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['e', 'f', 'g', 'd'], map('a', 1, 'b', 2), NULL);
"""

sql """ INSERT INTO ${tableName} VALUES
(3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, NULL, NULL, named_struct('f1', 'f'));
"""

sql """ INSERT INTO ${tableName} VALUES
(4, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, [NULL, 'sdf'], NULL, NULL);
"""

qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id; """

//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,QueryHits,VersionCount,PathHash,MetaUrl,CompactionStatus
def tablets = sql """ show tablets from ${tableName}; """

// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet[0]
backend_id = tablet[2]
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}

// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet[0]
backend_id = tablet[2]
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)

def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}

// check agg table rowcount should less than times of insert into
def replicaNum = 1
logger.info("get table replica num: " + replicaNum)
int rowCount = 0
for (def tablet in tablets) {
String tablet_id = tablet[0]

(code, out, err) = curl("GET", tablet[18])
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)

assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
for (String rowset in (List<String>) tabletJson.rowsets) {
rowCount += Integer.parseInt(rowset.split(" ")[1])
}
}
assert (rowCount < 8 * replicaNum)
qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id; """
} finally {
// try_sql("DROP TABLE IF EXISTS ${tableName}")
}
}
Loading