diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index 83f19881b72bca..9de00993117f7a 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -228,8 +228,13 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { // if version or compaction stats can't match, it means that this is a retry and there are // compaction or other loads finished successfully on the same tablet. So the previous publish // is stale and we should re-calculate the delete bitmap - LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id - << ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap."; + + // we still need to update delete bitmap KVs to MS when we skip to calcalate delete bitmaps, + // because the pending delete bitmap KVs in MS we wrote before may have been removed and replaced by other txns + RETURN_IF_ERROR(tablet->save_delete_bitmap_to_ms(_version, _transaction_id, delete_bitmap)); + + LOG(INFO) << "tablet=" << _tablet_id << ", txn=" << _transaction_id + << ", publish_status=SUCCEED, not need to re-calculate delete_bitmaps."; } else { status = CloudTablet::update_delete_bitmap(tablet, &txn_info, _transaction_id, txn_expiration); diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 0fe9f48ceb9ef3..e9693097d9b4ac 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -723,6 +723,35 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta)); } + RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap)); + + // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason, + // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do + // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail + RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info( + txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED, + txn_info->publish_info)); + + DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", { + auto sleep_sec = dp->param("sleep", 5); + std::this_thread::sleep_for(std::chrono::seconds(sleep_sec)); + }); + + DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", { + auto retry = dp->param("retry", false); + if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry + return Status::Error( + "injected DELETE_BITMAP_LOCK_ERROR"); + } else { + return Status::InternalError("injected non-retryable error"); + } + }); + + return Status::OK(); +} + +Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id, + DeleteBitmapPtr delete_bitmap) { DeleteBitmapPtr new_delete_bitmap = std::make_shared(tablet_id()); for (auto iter = delete_bitmap->delete_bitmap.begin(); iter != delete_bitmap->delete_bitmap.end(); ++iter) { @@ -736,14 +765,6 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, txn_id, LOAD_INITIATOR_ID, new_delete_bitmap.get())); - - // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason, - // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do - // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail - RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info( - txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED, - txn_info->publish_info)); - return Status::OK(); } diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 458964bac2cdda..7897786eb0395f 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -172,6 +172,9 @@ class CloudTablet final : public BaseTablet { DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer, const RowsetIdUnorderedSet& cur_rowset_ids) override; + Status save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id, + DeleteBitmapPtr delete_bitmap); + Status calc_delete_bitmap_for_compaction(const std::vector& input_rowsets, const RowsetSharedPtr& output_rowset, const RowIdConversion& rowid_conversion, diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 3a2ae311f0d8f7..2ca8379f0baa10 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1813,6 +1813,7 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont // 3. store all pending delete bitmap for this txn PendingDeleteBitmapPB delete_bitmap_keys; + delete_bitmap_keys.set_lock_id(request->lock_id()); for (size_t i = 0; i < request->rowset_ids_size(); ++i) { MetaDeleteBitmapInfo key_info {instance_id, tablet_id, request->rowset_ids(i), request->versions(i), request->segment_ids(i)}; diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 1d9a0154571467..d624d428ef9ea1 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -1229,8 +1229,37 @@ void commit_txn_immediately( LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << hex(lock_keys[i]) << " txn_id=" << txn_id; + int64_t lock_id = lock_info.lock_id(); for (auto tablet_id : table_id_tablet_ids[table_id]) { std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id}); + + // check that if the pending info's lock_id is correct + std::string pending_val; + err = txn->get(pending_key, &pending_val); + if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + ss << "failed to get delete bitmap pending info, instance_id=" << instance_id + << " tablet_id=" << tablet_id << " key=" << hex(pending_key) + << " err=" << err; + msg = ss.str(); + code = cast_as(err); + return; + } + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) continue; + PendingDeleteBitmapPB pending_info; + if (!pending_info.ParseFromString(pending_val)) [[unlikely]] { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "failed to parse PendingDeleteBitmapPB"; + return; + } + if (pending_info.has_lock_id() && pending_info.lock_id() != lock_id) { + code = MetaServiceCode::PENDING_DELETE_BITMAP_WRONG; + msg = fmt::format( + "wrong lock_id in pending delete bitmap infos, expect lock_id={}, but " + "found {} tablet_id={} instance_id={}", + lock_id, pending_info.lock_id(), tablet_id, instance_id); + return; + } + txn->remove(pending_key); LOG(INFO) << "xxx remove delete bitmap pending key, pending_key=" << hex(pending_key) << " txn_id=" << txn_id; diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 777c7419b701ad..49b1587228c912 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -5351,7 +5351,7 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) { // case: first version of rowset { - int64_t txn_id = -1; + int64_t txn_id = 98765; int64_t table_id = 123456; // same as table_id of tmp rowset int64_t db_id = 222; int64_t tablet_id_base = 8113; @@ -5430,11 +5430,16 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) { std::string lock_val; auto ret = txn->get(lock_key, &lock_val); ASSERT_EQ(ret, TxnErrorCode::TXN_OK); + DeleteBitmapUpdateLockPB lock_info; + ASSERT_TRUE(lock_info.ParseFromString(lock_val)); std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base}); std::string pending_val; ret = txn->get(pending_key, &pending_val); ASSERT_EQ(ret, TxnErrorCode::TXN_OK); + PendingDeleteBitmapPB pending_info; + ASSERT_TRUE(pending_info.ParseFromString(pending_val)); + ASSERT_EQ(pending_info.lock_id(), lock_info.lock_id()); } // commit txn @@ -5468,6 +5473,138 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) { } } +TEST(MetaServiceTest, WrongPendingBitmapTest) { + auto meta_service = get_meta_service(); + extern std::string get_instance_id(const std::shared_ptr& rc_mgr, + const std::string& cloud_unique_id); + auto instance_id = get_instance_id(meta_service->resource_mgr(), "test_cloud_unique_id"); + + // case: first version of rowset + { + int64_t txn_id = 56789; + int64_t table_id = 123456; // same as table_id of tmp rowset + int64_t db_id = 222; + int64_t tablet_id_base = 8113; + int64_t partition_id = 1234; + // begin txn + { + brpc::Controller cntl; + BeginTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + TxnInfoPB txn_info_pb; + txn_info_pb.set_db_id(db_id); + txn_info_pb.set_label("test_label"); + txn_info_pb.add_table_ids(table_id); + txn_info_pb.set_timeout_ms(36000); + req.mutable_txn_info()->CopyFrom(txn_info_pb); + BeginTxnResponse res; + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + txn_id = res.txn_id(); + } + + // mock rowset and tablet + for (int i = 0; i < 5; ++i) { + create_tablet(meta_service.get(), table_id, 1235, partition_id, tablet_id_base + i); + auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i); + tmp_rowset.set_partition_id(partition_id); + CreateRowsetResponse res; + commit_rowset(meta_service.get(), tmp_rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + // update delete bitmap + { + // get delete bitmap update lock + brpc::Controller cntl; + GetDeleteBitmapUpdateLockRequest get_lock_req; + GetDeleteBitmapUpdateLockResponse get_lock_res; + get_lock_req.set_cloud_unique_id("test_cloud_unique_id"); + get_lock_req.set_table_id(table_id); + get_lock_req.add_partition_ids(partition_id); + get_lock_req.set_expiration(5); + get_lock_req.set_lock_id(txn_id); + get_lock_req.set_initiator(-1); + meta_service->get_delete_bitmap_update_lock( + reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &get_lock_req, + &get_lock_res, nullptr); + ASSERT_EQ(get_lock_res.status().code(), MetaServiceCode::OK); + + // first update delete bitmap + UpdateDeleteBitmapRequest update_delete_bitmap_req; + UpdateDeleteBitmapResponse update_delete_bitmap_res; + update_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id"); + update_delete_bitmap_req.set_table_id(table_id); + update_delete_bitmap_req.set_partition_id(partition_id); + update_delete_bitmap_req.set_lock_id(txn_id); + update_delete_bitmap_req.set_initiator(-1); + update_delete_bitmap_req.set_tablet_id(tablet_id_base); + + update_delete_bitmap_req.add_rowset_ids("123"); + update_delete_bitmap_req.add_segment_ids(1); + update_delete_bitmap_req.add_versions(2); + update_delete_bitmap_req.add_segment_delete_bitmaps("abc0"); + + meta_service->update_delete_bitmap( + reinterpret_cast(&cntl), + &update_delete_bitmap_req, &update_delete_bitmap_res, nullptr); + ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK); + } + + // check delete bitmap update lock and pending delete bitmap + { + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); + std::string lock_val; + auto ret = txn->get(lock_key, &lock_val); + ASSERT_EQ(ret, TxnErrorCode::TXN_OK); + DeleteBitmapUpdateLockPB lock_info; + ASSERT_TRUE(lock_info.ParseFromString(lock_val)); + + std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base}); + std::string pending_val; + ret = txn->get(pending_key, &pending_val); + ASSERT_EQ(ret, TxnErrorCode::TXN_OK); + PendingDeleteBitmapPB pending_info; + ASSERT_TRUE(pending_info.ParseFromString(pending_val)); + ASSERT_EQ(pending_info.lock_id(), lock_info.lock_id()); + } + + { + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + // pending bitmap have been modified by other txn + std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base}); + std::string pending_val; + auto ret = txn->get(pending_key, &pending_val); + ASSERT_EQ(ret, TxnErrorCode::TXN_OK); + PendingDeleteBitmapPB pending_info; + ASSERT_TRUE(pending_info.ParseFromString(pending_val)); + // change pending bitmap's lock_id + pending_info.set_lock_id(pending_info.lock_id() + 1); + ASSERT_TRUE(pending_info.SerializeToString(&pending_val)); + txn->put(pending_key, pending_val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + } + + // commit txn + { + brpc::Controller cntl; + CommitTxnRequest req; + req.set_cloud_unique_id("test_cloud_unique_id"); + req.set_db_id(db_id); + req.set_txn_id(txn_id); + req.add_mow_table_ids(table_id); + CommitTxnResponse res; + meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::PENDING_DELETE_BITMAP_WRONG); + } + } +} + TEST(MetaServiceTest, GetDeleteBitmapWithRetryTest1) { auto meta_service = get_meta_service(); SyncPoint::get_instance()->enable_processing(); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index c113868a2c3286..4e00faa0c6f4eb 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -1381,6 +1381,7 @@ enum MetaServiceCode { LOCK_EXPIRED = 8001; LOCK_CONFLICT = 8002; ROWSETS_EXPIRED = 8003; + PENDING_DELETE_BITMAP_WRONG = 8004; // partial update ROWSET_META_NOT_FOUND = 9001; @@ -1446,6 +1447,7 @@ message RemoveDeleteBitmapResponse { message PendingDeleteBitmapPB { repeated bytes delete_bitmap_keys = 1; + optional int64 lock_id = 2; } message DeleteBitmapUpdateLockPB { diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.out new file mode 100644 index 00000000000000..8b8e97822cf766 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 +2 2 2 +3 3 3 + +-- !sql -- +1 999 999 +2 2 2 +3 3 3 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.groovy new file mode 100644 index 00000000000000..2b34d2bbb49d63 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_pending_delete_bitmaps_removed_by_other_txn.groovy @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_pending_delete_bitmaps_removed_by_other_txn", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + def customFeConfig = [ + delete_bitmap_lock_expiration_seconds : 5, + calculate_delete_bitmap_task_timeout_seconds : 20, + ] + + setFeConfigTemporary(customFeConfig) { + + def table1 = "test_cloud_pending_delete_bitmaps_removed_by_other_txn" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1);" + sql "insert into ${table1} values(2,2,2);" + sql "insert into ${table1} values(3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // let the first load fail and retry after it writes pending delete bitmaps in MS and before + // it commit txn in MS + GetDebugPoint().enableDebugPointForAllBEs("CloudTablet::save_delete_bitmap.enable_sleep", [sleep: 5]) + GetDebugPoint().enableDebugPointForAllBEs("CloudTablet::save_delete_bitmap.injected_error", [retry: true]) + + // the first load + def t1 = Thread.start { + sql "insert into ${table1} values(1,999,999);" + } + + Thread.sleep(1000) + + def t2 = Thread.start { + try { + // this should fail + sql "insert into ${table1} values(2,888,888);" + } catch(Exception e) { + logger.info(e.getMessage()) + } + } + + // let the second load fail after it remove the pending delete bitmaps in MS written by load 1 + Thread.sleep(5000) + GetDebugPoint().enableDebugPointForAllBEs("CloudTablet::save_delete_bitmap.injected_error", [retry: false]) + + + Thread.sleep(5000) + GetDebugPoint().disableDebugPointForAllBEs("CloudTablet::save_delete_bitmap.injected_error") + + t1.join() + t2.join() + + Thread.sleep(300) + // force it read delete bitmaps from MS rather than BE's cache + GetDebugPoint().enableDebugPointForAllBEs("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss") + qt_sql "select * from ${table1} order by k1,c1,c2;" + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + } +}