diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 6e610c395e8b73..0807ebca43466c 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -92,6 +92,10 @@ Status CloudCumulativeCompaction::prepare_compact() { // plus 1 to skip the delete version. // NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter. update_cumulative_point(); + if (!config::enable_sleep_between_delete_cumu_compaction) { + st = Status::Error( + "_last_delete_version.first not equal to -1"); + } } return st; } diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index a49809bd59c02b..c46811f8f99a9d 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -664,7 +664,8 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS auto st = compaction->prepare_compact(); if (!st.ok()) { long now = duration_cast(system_clock::now().time_since_epoch()).count(); - if (st.is()) { + if (st.is() && + st.msg() != "_last_delete_version.first not equal to -1") { // Backoff strategy if no suitable version tablet->last_cumu_no_suitable_version_ms = now; } diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index b73c4aae7a0f1c..8798e9008c260b 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1397,6 +1397,7 @@ DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, "false"); // Enable validation to check the correctness of table size. DEFINE_Bool(enable_table_size_correctness_check, "false"); DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false"); +DEFINE_mBool(enable_sleep_between_delete_cumu_compaction, "false"); // clang-format off #ifdef BE_TEST diff --git a/be/src/common/config.h b/be/src/common/config.h index 95b04b56a5c6c7..6d85db7af6af94 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1482,6 +1482,8 @@ DECLARE_mBool(enable_delete_bitmap_merge_on_compaction); // Enable validation to check the correctness of table size. DECLARE_Bool(enable_table_size_correctness_check); +// Enable sleep 5s between delete cumulative compaction. +DECLARE_mBool(enable_sleep_between_delete_cumu_compaction); #ifdef BE_TEST // test s3 diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 4349bfa9b95384..ee21a3b5f2ab23 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1693,8 +1693,13 @@ Status Tablet::prepare_compaction_and_calculate_permits( } if (!res.ok()) { - tablet->set_last_cumu_compaction_failure_time(UnixMillis()); permits = 0; + // if we meet a delete version, should increase the cumulative point to let base compaction handle the delete version. + // no need to wait 5s. + if (!(res.msg() == "_last_delete_version.first not equal to -1") || + config::enable_sleep_between_delete_cumu_compaction) { + tablet->set_last_cumu_compaction_failure_time(UnixMillis()); + } if (!res.is()) { DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); return Status::InternalError("prepare cumulative compaction with err: {}", res); @@ -1702,6 +1707,12 @@ Status Tablet::prepare_compaction_and_calculate_permits( // return OK if OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION, so that we don't need to // print too much useless logs. // And because we set permits to 0, so even if we return OK here, nothing will be done. + LOG_INFO( + "cumulative compaction meet delete rowset, increase cumu point without other " + "operation.") + .tag("tablet id:", tablet->tablet_id()) + .tag("after cumulative compaction, cumu point:", + tablet->cumulative_layer_point()); return Status::OK(); } } else if (compaction_type == CompactionType::BASE_COMPACTION) { diff --git a/regression-test/data/compaction/test_cumu_compaction_with_delete.out b/regression-test/data/compaction/test_cumu_compaction_with_delete.out new file mode 100644 index 00000000000000..642559699ac60c --- /dev/null +++ b/regression-test/data/compaction/test_cumu_compaction_with_delete.out @@ -0,0 +1,5 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- + +-- !select2 -- + diff --git a/regression-test/suites/compaction/test_cumu_compaction_with_delete.groovy b/regression-test/suites/compaction/test_cumu_compaction_with_delete.groovy new file mode 100644 index 00000000000000..7c6be0b177ce1e --- /dev/null +++ b/regression-test/suites/compaction/test_cumu_compaction_with_delete.groovy @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_cumu_compaction_with_delete") { + def tableName = "test_cumu_compaction_with_delete" + def check_cumu_point = { cumu_point -> + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + int cumuPoint = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + cumuPoint = tabletJson["cumulative point"] + } + return cumuPoint > cumu_point + } + + try { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `user_id` INT NOT NULL, + `value` INT NOT NULL) + UNIQUE KEY(`user_id`) + DISTRIBUTED BY HASH(`user_id`) + BUCKETS 1 + PROPERTIES ("replication_allocation" = "tag.location.default: 1", + "enable_mow_light_delete" = "true")""" + + for(int i = 1; i <= 100; ++i){ + sql """ INSERT INTO ${tableName} VALUES (1,1)""" + sql """ delete from ${tableName} where user_id = 1""" + } + + now = System.currentTimeMillis() + + while(true){ + if(check_cumu_point(100)){ + break; + } + Thread.sleep(1000) + } + time_diff = System.currentTimeMillis() - now + logger.info("time_diff:" + time_diff) + assertTrue(time_diff<200*1000) + + qt_select1 """select * from ${tableName} order by user_id, value""" + } catch (Exception e){ + logger.info(e.getMessage()) + assertFalse(true) + } finally { + try_sql("DROP TABLE IF EXISTS ${tableName} FORCE") + } + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_config = { key, value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + try { + set_be_config.call("enable_sleep_between_delete_cumu_compaction", "true") + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `user_id` INT NOT NULL, + `value` INT NOT NULL) + UNIQUE KEY(`user_id`) + DISTRIBUTED BY HASH(`user_id`) + BUCKETS 1 + PROPERTIES ("replication_allocation" = "tag.location.default: 1", + "enable_mow_light_delete" = "true")""" + + for(int i = 1; i <= 100; ++i){ + sql """ INSERT INTO ${tableName} VALUES (1,1)""" + sql """ delete from ${tableName} where user_id = 1""" + } + + now = System.currentTimeMillis() + + while(true){ + if(check_cumu_point(100)){ + break; + } + Thread.sleep(1000) + } + time_diff = System.currentTimeMillis() - now + logger.info("time_diff:" + time_diff) + assertTrue(time_diff>=200*1000) + + qt_select2 """select * from ${tableName} order by user_id, value""" + } catch (Exception e){ + logger.info(e.getMessage()) + assertFalse(true) + } finally { + try_sql("DROP TABLE IF EXISTS ${tableName} FORCE") + set_be_config.call("enable_sleep_between_delete_cumu_compaction", "false") + } +} \ No newline at end of file