Skip to content

Commit

Permalink
[fix](cloud-mow) FE should release delete bitmap lock when calculatin…
Browse files Browse the repository at this point in the history
…g delete bitmap failed
  • Loading branch information
hust-hhb committed Dec 19, 2024
1 parent 55c26e0 commit dbcf5b8
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,10 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio

private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC,
TxnCommitAttachment txnCommitAttachment) throws UserException {
if (DebugPointUtil.isEnable("FE.mow.commit.exception")) {
LOG.info("debug point FE.mow.commit.exception, throw e");
throw new UserException("debug point FE.mow.commit.exception");
}
boolean txnOperated = false;
TransactionState txnState = null;
TxnStateChangeCallback cb = null;
Expand Down Expand Up @@ -679,15 +683,9 @@ private void calcDeleteBitmapForMow(long dbId, List<OlapTable> tableList, long t
Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = getCalcDeleteBitmapInfo(
backendToPartitionTablets, partitionVersions, baseCompactionCnts, cumulativeCompactionCnts,
cumulativePoints, partitionToSubTxnIds);
try {
sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos,
subTransactionStates == null ? Config.calculate_delete_bitmap_task_timeout_seconds
: Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load);
} catch (UserException e) {
LOG.warn("failed to sendCalcDeleteBitmaptask for txn=" + transactionId + ",exception=" + e.getMessage());
removeDeleteBitmapUpdateLock(tableToPartitions, transactionId);
throw e;
}
sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos,
subTransactionStates == null ? Config.calculate_delete_bitmap_task_timeout_seconds
: Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load);
}

private Map<Long, List<Long>> getPartitionSubTxnIds(List<SubTransactionState> subTransactionStates,
Expand Down Expand Up @@ -948,10 +946,10 @@ private void getDeleteBitmapUpdateLock(Map<Long, Set<Long>> tableToParttions, lo
}
}

private void removeDeleteBitmapUpdateLock(Map<Long, Set<Long>> tableToParttions, long transactionId) {
for (Map.Entry<Long, Set<Long>> entry : tableToParttions.entrySet()) {
private void removeDeleteBitmapUpdateLock(List<Table> tableList, long transactionId) {
for (Table table : tableList) {
RemoveDeleteBitmapUpdateLockRequest.Builder builder = RemoveDeleteBitmapUpdateLockRequest.newBuilder();
builder.setTableId(entry.getKey())
builder.setTableId(table.getId())
.setLockId(transactionId)
.setInitiator(-1);
final RemoveDeleteBitmapUpdateLockRequest request = builder.build();
Expand Down Expand Up @@ -1102,6 +1100,10 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId,
beforeCommitTransaction(tableList, transactionId, timeoutMillis);
try {
commitTransactionWithSubTxns(db.getId(), tableList, transactionId, subTransactionStates);
} catch (Exception e) {
LOG.info("release delete bitmap lock,commit txn=" + transactionId + ",catch exception=" + e.getMessage());
removeDeleteBitmapUpdateLock(tableList, transactionId);
throw e;
} finally {
afterCommitTransaction(tableList);
}
Expand Down Expand Up @@ -1187,6 +1189,10 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList,
beforeCommitTransaction(tableList, transactionId, timeoutMillis);
try {
commitTransaction(db.getId(), tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
} catch (Exception e) {
LOG.info("release delete bitmap lock,commit txn=" + transactionId + ",catch exception=" + e.getMessage());
removeDeleteBitmapUpdateLock(tableList, transactionId);
throw e;
} finally {
afterCommitTransaction(tableList);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --

-- !sql --
5 e 90
6 f 100

Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") {
GetDebugPoint().clearDebugPointsForAllFEs()

def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
def backendId_to_params = [string: [:]]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

def set_be_param = { paramName, paramValue ->
// for eache be node, set paramName=paramValue
for (String id in backendId_to_backendIP.keySet()) {
def beIp = backendId_to_backendIP.get(id)
def bePort = backendId_to_backendHttpPort.get(id)
def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue))
assertTrue(out.contains("OK"))
}
}

def reset_be_param = { paramName ->
// for eache be node, reset paramName to default
for (String id in backendId_to_backendIP.keySet()) {
def beIp = backendId_to_backendIP.get(id)
def bePort = backendId_to_backendHttpPort.get(id)
def original_value = backendId_to_params.get(id).get(paramName)
def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value))
assertTrue(out.contains("OK"))
}
}

def get_be_param = { paramName ->
// for eache be node, get param value by default
def paramValue = ""
for (String id in backendId_to_backendIP.keySet()) {
def beIp = backendId_to_backendIP.get(id)
def bePort = backendId_to_backendHttpPort.get(id)
// get the config value from be
def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName))
assertTrue(code == 0)
assertTrue(out.contains(paramName))
// parsing
def resultList = parseJson(out)[0]
assertTrue(resultList.size() == 4)
// get original value
paramValue = resultList[2]
backendId_to_params.get(id, [:]).put(paramName, paramValue)
}
}

def customFeConfig = [
calculate_delete_bitmap_task_timeout_seconds: 2,
meta_service_rpc_retry_times : 5
]

// store the original value
get_be_param("mow_stream_load_commit_retry_times")
// disable retry to make this problem more clear
set_be_param("mow_stream_load_commit_retry_times", "1")


def tableName = "tbl_basic"
setFeConfigTemporary(customFeConfig) {
try {
// create table
sql """ drop table if exists ${tableName}; """

sql """
CREATE TABLE `${tableName}` (
`id` int(11) NOT NULL,
`name` varchar(1100) NULL,
`score` int(11) NULL default "-1"
) ENGINE=OLAP
UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"replication_num" = "1"
);
"""
// this streamLoad will fail on fe commit phase
GetDebugPoint().enableDebugPointForAllFEs('FE.mow.commit.exception', null)
streamLoad {
table "${tableName}"

set 'column_separator', ','
set 'columns', 'id, name, score'
file "test_stream_load.csv"

time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
log.info("Stream load result: ${result}")
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(json.Message.contains("FE.mow.commit.exception"))
}
}
qt_sql """ select * from ${tableName} order by id"""

// this streamLoad will success because of removing exception injection
GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception')
streamLoad {
table "${tableName}"

set 'column_separator', ','
set 'columns', 'id, name, score'
file "test_stream_load.csv"

time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
log.info("Stream load result: ${result}")
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
}
}
qt_sql """ select * from ${tableName} order by id"""
} finally {
reset_be_param("mow_stream_load_commit_retry_times")
GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception')
sql "DROP TABLE IF EXISTS ${tableName};"
GetDebugPoint().clearDebugPointsForAllFEs()
}

}
}

0 comments on commit dbcf5b8

Please sign in to comment.