Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Dec 4, 2024
1 parent c146747 commit 96ab537
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 118 deletions.
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ Status CloudCumulativeCompaction::prepare_compact() {
// plus 1 to skip the delete version.
// NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter.
update_cumulative_point();
st = Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>(
"_last_delete_version.first not equal to -1");
}
return st;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,8 @@ Status CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
auto st = compaction->prepare_compact();
if (!st.ok()) {
long now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) {
if (st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>() &&
st.msg() != "_last_delete_version.first not equal to -1") {
// Backoff strategy if no suitable version
tablet->last_cumu_no_suitable_version_ms = now;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !1 --
1 1
2 2
3 3
4 4
5 5
6 6

-- !3 --
2 2
3 3
4 4
5 5
6 6

-- !4 --
3 3
4 4
5 5
6 6

-- !5 --
4 4
5 5
6 6

-- !5 --
5 5
6 6

-- !5 --
6 6

-- !6 --
6 6

228 changes: 111 additions & 117 deletions regression-test/suites/compaction/test_cumu_compaction_with_delete.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,40 @@
import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_cumu_compaction_with_delete") {
def tableName = "test_full_compaction"
def tableName = "test_cumu_compaction_with_delete"

def check_version_and_cumu_point = { tablets, version, cumu_point ->
// before compaction, there are 6 rowsets.
int rowsetCount = 0
int cumuPoint = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
rowsetCount +=((List<String>) tabletJson.rowsets).size()
//logger.info(tabletJson)
cumuPoint = tabletJson["cumulative point"]
}
assert (rowsetCount == version * 1)
assert (cumuPoint == cumu_point)
}

def cumulative_compaction = { tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id ->
// trigger cumu compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId

(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
sleep(1000)

def compactJson = parseJson(out.trim())
}
}

try {
String backend_id;
Expand All @@ -28,135 +61,102 @@ suite("test_cumu_compaction_with_delete") {
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

boolean disableAutoCompaction = true
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
`user_id` INT NOT NULL, `value` INT NOT NULL)
`user_id` INT NOT NULL,
`value` INT NOT NULL)
UNIQUE KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`)
BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1",
"disable_auto_compaction" = "true",
"enable_mow_light_delete" = "false",
"enable_unique_key_merge_on_write" = "true");"""

// version1 (1,1)(2,2)
sql """ INSERT INTO ${tableName} VALUES
(1,1),(2,2)
"""
qt_1 """select * from ${tableName} order by user_id"""


// version2 (1,10)(2,20)
sql """ INSERT INTO ${tableName} VALUES
(1,10),(2,20)
"""
qt_2 """select * from ${tableName} order by user_id"""


// version3 (1,100)(2,200)
sql """ INSERT INTO ${tableName} VALUES
(1,100),(2,200)
"""
qt_3 """select * from ${tableName} order by user_id"""


// version4 (1,100)(2,200)(3,300)
sql """ INSERT INTO ${tableName} VALUES
(3,300)
"""
qt_4 """select * from ${tableName} order by user_id"""


// version5 (1,100)(2,200)(3,100)
sql """update ${tableName} set value = 100 where user_id = 3"""
qt_5 """select * from ${tableName} order by user_id"""


// version6 (1,100)(2,200)
sql """delete from ${tableName} where user_id = 3"""
qt_6 """select * from ${tableName} order by user_id"""

sql "SET skip_delete_predicate = true"
sql "SET skip_delete_sign = true"
sql "SET skip_delete_bitmap = true"
// show all hidden data
// (1,10)(1,100)(2,2)(2,20)(2,200)(3,300)(3,100)
qt_skip_delete """select * from ${tableName} order by user_id, value"""
"enable_mow_light_delete" = "true")"""

// [0-1] | [2-2] [3-3] [4-4] [5-5] [6-6] [7-7]
// cumu point = 2
sql """ INSERT INTO ${tableName} VALUES (1,1)"""
sql """ INSERT INTO ${tableName} VALUES (2,2)"""
sql """ INSERT INTO ${tableName} VALUES (3,3)"""
sql """ INSERT INTO ${tableName} VALUES (4,4)"""
sql """ INSERT INTO ${tableName} VALUES (5,5)"""
sql """ INSERT INTO ${tableName} VALUES (6,6)"""
qt_1 """select * from ${tableName} order by user_id, value"""

//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName}; """

def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
// before full compaction, there are 7 rowsets.
int rowsetCount = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
rowsetCount +=((List<String>) tabletJson.rowsets).size()
}
assert (rowsetCount == 7 * replicaNum)

// trigger full compactions for all tablets in ${tableName}
check_version_and_cumu_point(tablets, 7, 2)

// [0-1] [2-7] |
// cumu point = 2
cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id)
check_version_and_cumu_point(tablets, 2, 2)

// [0-1] [2-7] | [8-8]
// cumu point = 8
sql """ delete from ${tableName} where user_id = 1"""
qt_3 """select * from ${tableName} order by user_id, value"""

// [0-1] [2-7] [8-8] |
// cumu point = 9
cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id)
check_version_and_cumu_point(tablets, 3, 9)

// [0-1] [2-7] [8-8] | [9-9]
// cumu point = 9
sql """ delete from ${tableName} where user_id = 2"""
qt_4 """select * from ${tableName} order by user_id, value"""

// [0-1] [2-7] [8-8] [9-9] |
// cumu point = 10
cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id)
check_version_and_cumu_point(tablets, 4, 10)

// [0-1] [2-7] [8-8] [9-9] | [10-10]
// cumu point = 10
sql """ delete from ${tableName} where user_id = 3"""
qt_5 """select * from ${tableName} order by user_id, value"""

// [0-1] [2-7] [8-8] [9-9] [10-10] |
// cumu point = 11
cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id)
check_version_and_cumu_point(tablets, 5, 11)

// [0-1] [2-7] [8-8] [9-9] [10-10] | [11-11]
// cumu point = 11
sql """ delete from ${tableName} where user_id = 4"""
qt_5 """select * from ${tableName} order by user_id, value"""

// [0-1] [2-7] [8-8] [9-9] [10-10] [11-11] |
// cumu point = 12
cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id)
check_version_and_cumu_point(tablets, 6, 12)

// [0-1] [2-7] [8-8] [9-9] [10-10] [11-11] | [12-12]
// cumu point = 12
sql """ delete from ${tableName} where user_id = 5"""
qt_5 """select * from ${tableName} order by user_id, value"""

// [0-1] [2-7] [8-8] [9-9] [10-10] [11-11] [12-12] |
// cumu point = 13
cumulative_compaction(tablets, backendId_to_backendIP, backendId_to_backendHttpPort, backend_id)
check_version_and_cumu_point(tablets, 7, 13)

// trigger base compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
times = 1

do{
(code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(2000)
} while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10)
(code, out, err) = be_run_base_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
sleep(1000)

def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}

// wait for full compaction done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}

// after full compaction, there is only 1 rowset.

// after base compaction, there is only 1 rowset.
rowsetCount = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
Expand All @@ -171,17 +171,11 @@ suite("test_cumu_compaction_with_delete") {
if (cloudMode) {
assert (rowsetCount == 2)
} else {
assert (rowsetCount == 1 * replicaNum)
assert (rowsetCount == 1)
}

// make sure all hidden data has been deleted
// (1,100)(2,200)
qt_select_final """select * from ${tableName} order by user_id"""
qt_6 """select * from ${tableName} order by user_id, value"""

sql "SET skip_delete_predicate = false"
sql "SET skip_delete_sign = false"
sql "SET skip_delete_bitmap = false"
qt_select_final2 """select * from ${tableName} order by user_id"""
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
}
Expand Down

0 comments on commit 96ab537

Please sign in to comment.