Skip to content

Commit

Permalink
remove wrong impl for replace_column_data
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan committed Nov 20, 2024
1 parent d1867fe commit 702a89d
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 33 deletions.
28 changes: 5 additions & 23 deletions be/src/vec/columns/column_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,33 +553,15 @@ class ColumnString final : public COWHelper<IColumn, ColumnString> {
}

void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override {
DCHECK(size() > self_row);
const auto& r = assert_cast<const ColumnString&>(rhs);
auto data = r.get_data_at(row);

if (!self_row) {
// self_row == 0 means we first call replace_column_data() with batch column data. so we
// should clean last batch column data.
chars.clear();
offsets[self_row] = data.size;
} else {
offsets[self_row] = offsets[self_row - 1] + data.size;
check_chars_length(offsets[self_row], self_row);
}

chars.insert(data.data, data.data + data.size);
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Method replace_column_data is not supported for " + get_name());
}

// should replace according to 0,1,2... ,size,0,1,2...
void replace_column_data_default(size_t self_row = 0) override {
DCHECK(size() > self_row);

if (!self_row) {
chars.clear();
offsets[self_row] = 0;
} else {
offsets[self_row] = offsets[self_row - 1];
}
throw doris::Exception(
ErrorCode::INTERNAL_ERROR,
"Method replace_column_data_default is not supported for " + get_name());
}

void compare_internal(size_t rhs_row_id, const IColumn& rhs, int nan_direction_hint,
Expand Down
16 changes: 6 additions & 10 deletions be/src/vec/columns/column_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
MutableColumnPtr clone_resized(size_t size) const override;
size_t size() const override { return columns.at(0)->size(); }

bool is_variable_length() const override { return true; }

bool is_exclusive() const override {
for (const auto& col : columns) {
if (!col->is_exclusive()) {
Expand Down Expand Up @@ -141,19 +143,13 @@ class ColumnStruct final : public COWHelper<IColumn, ColumnStruct> {
return append_data_by_selector_impl<ColumnStruct>(res, selector);
}
void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override {
DCHECK(size() > self_row);
const auto& r = assert_cast<const ColumnStruct&>(rhs);

for (size_t idx = 0; idx < columns.size(); ++idx) {
columns[idx]->replace_column_data(r.get_column(idx), row, self_row);
}
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Method replace_column_data is not supported for " + get_name());
}

void replace_column_data_default(size_t self_row = 0) override {
DCHECK(size() > self_row);
for (size_t idx = 0; idx < columns.size(); ++idx) {
columns[idx]->replace_column_data_default(self_row);
}
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Method replace_column_data is not supported for " + get_name());
}

void insert_range_from(const IColumn& src, size_t start, size_t length) override;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --
1 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 ["a", "b", "c"] {"b":1, "c":2} {"f1": "b"}
2 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 \N 10 1 ["amory", "doris", "commiter"] {"b":1} {"f1": "c"}
2 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 ["amory", "doris", "2024-04-29"] {"c":2} {"f1": "d"}
3 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 \N 10 1 ["e", "f", "g", "d"] {} {"f1": "e"}
3 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 \N \N {"f1": "f"}
4 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 [null, "sdf"] \N \N

-- !select_default2 --
1 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 ["a", "b", "c"] {"b":1, "c":2} {"f1": "b"}
2 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 \N 10 1 ["amory", "doris", "commiter"] {"b":1} {"f1": "c"}
2 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 ["amory", "doris", "2024-04-29"] {"c":2} {"f1": "d"}
3 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 \N 10 1 ["e", "f", "g", "d"] {} {"f1": "e"}
3 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 \N \N {"f1": "f"}
4 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 [null, "sdf"] \N \N

Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_agg_variable_compaction") {
def tableName = "test_compaction_agg_keys_with_variable_col"

try {
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
backend_id = backendId_to_backendIP.keySet()[0]

def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

boolean disableAutoCompaction = true
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datev2` DATEV2 NOT NULL COMMENT "数据灌入日期时间",
`datetimev2_1` DATETIMEV2(3) NOT NULL COMMENT "数据灌入日期时间",
`datetimev2_2` DATETIMEV2(6) NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`array_col` ARRAY<STRING> REPLACE NULL COMMENT "array column",
`map_col` MAP<STRING, INT> REPLACE NULL COMMENT "map column",
`struct_col` STRUCT<f1: STRING> REPLACE NULL COMMENT "struct column")
AGGREGATE KEY(`user_id`, `date`, `datev2`, `datetimev2_1`, `datetimev2_2`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`)
PROPERTIES ( "replication_num" = "1", "disable_auto_compaction" = "true");
"""

sql """ INSERT INTO ${tableName} VALUES
(1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['a', 'b'], map('a', 1), named_struct('f1', 'a'));
"""

sql """ INSERT INTO ${tableName} VALUES
(1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['a', 'b', 'c'], map('b', 1, 'c', 2), named_struct('f1', 'b'));
"""

sql """ INSERT INTO ${tableName} VALUES
(2, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', NULL, 10, 1, ['amory', 'doris', 'commiter'], map('b', 1), named_struct('f1', 'c'));
"""

sql """ INSERT INTO ${tableName} VALUES
(2, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['amory', 'doris', '2024-04-29'], map('c', 2), named_struct('f1', 'd'));
"""

sql """ INSERT INTO ${tableName} VALUES
(3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', NULL, 10, 1, ['e', 'f', 'g', 'd'], map(), named_struct('f1', 'e'));
"""

sql """ INSERT INTO ${tableName} VALUES
(3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['e', 'f', 'g', 'd'], map('a', 1, 'b', 2), NULL);
"""

sql """ INSERT INTO ${tableName} VALUES
(3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, NULL, NULL, named_struct('f1', 'f'));
"""

sql """ INSERT INTO ${tableName} VALUES
(4, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, [NULL, 'sdf'], NULL, NULL);
"""

qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id; """

//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,QueryHits,VersionCount,PathHash,MetaUrl,CompactionStatus
def tablets = sql """ show tablets from ${tableName}; """

// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet[0]
backend_id = tablet[2]
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}

// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet[0]
backend_id = tablet[2]
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)

def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}

// check agg table rowcount should less than times of insert into
def replicaNum = 1
logger.info("get table replica num: " + replicaNum)
int rowCount = 0
for (def tablet in tablets) {
String tablet_id = tablet[0]

(code, out, err) = curl("GET", tablet[18])
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)

assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
for (String rowset in (List<String>) tabletJson.rowsets) {
rowCount += Integer.parseInt(rowset.split(" ")[1])
}
}
assert (rowCount < 8 * replicaNum)
qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id; """
} finally {
// try_sql("DROP TABLE IF EXISTS ${tableName}")
}
}

0 comments on commit 702a89d

Please sign in to comment.