Skip to content

Commit

Permalink
branch-3.0: [Fix](merge-on-write) Should update pending delete bitmap…
Browse files Browse the repository at this point in the history
… KVs in MS when no need to calc delete bitmaps in publish phase #46039 (#46139)

pick #46039
  • Loading branch information
bobhan1 authored Dec 31, 2024
1 parent 32a6de4 commit 1ae4d26
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 11 deletions.
9 changes: 7 additions & 2 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,13 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
// if version or compaction stats can't match, it means that this is a retry and there are
// compaction or other loads finished successfully on the same tablet. So the previous publish
// is stale and we should re-calculate the delete bitmap
LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id
<< ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap.";

// we still need to update delete bitmap KVs to MS when we skip to calcalate delete bitmaps,
// because the pending delete bitmap KVs in MS we wrote before may have been removed and replaced by other txns
RETURN_IF_ERROR(tablet->save_delete_bitmap_to_ms(_version, _transaction_id, delete_bitmap));

LOG(INFO) << "tablet=" << _tablet_id << ", txn=" << _transaction_id
<< ", publish_status=SUCCEED, not need to re-calculate delete_bitmaps.";
} else {
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, _transaction_id,
txn_expiration);
Expand Down
37 changes: 29 additions & 8 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,35 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
}

RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));

DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", {
auto sleep_sec = dp->param<int>("sleep", 5);
std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
});

DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", {
auto retry = dp->param<bool>("retry", false);
if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
"injected DELETE_BITMAP_LOCK_ERROR");
} else {
return Status::InternalError<false>("injected non-retryable error");
}
});

return Status::OK();
}

Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
DeleteBitmapPtr delete_bitmap) {
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
for (auto iter = delete_bitmap->delete_bitmap.begin();
iter != delete_bitmap->delete_bitmap.end(); ++iter) {
Expand All @@ -736,14 +765,6 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx

RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, txn_id, LOAD_INITIATOR_ID,
new_delete_bitmap.get()));

// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
// delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
txn_info->publish_info));

return Status::OK();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ class CloudTablet final : public BaseTablet {
DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
const RowsetIdUnorderedSet& cur_rowset_ids) override;

Status save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
DeleteBitmapPtr delete_bitmap);

Status calc_delete_bitmap_for_compaction(const std::vector<RowsetSharedPtr>& input_rowsets,
const RowsetSharedPtr& output_rowset,
const RowIdConversion& rowid_conversion,
Expand Down
1 change: 1 addition & 0 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,7 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont

// 3. store all pending delete bitmap for this txn
PendingDeleteBitmapPB delete_bitmap_keys;
delete_bitmap_keys.set_lock_id(request->lock_id());
for (size_t i = 0; i < request->rowset_ids_size(); ++i) {
MetaDeleteBitmapInfo key_info {instance_id, tablet_id, request->rowset_ids(i),
request->versions(i), request->segment_ids(i)};
Expand Down
29 changes: 29 additions & 0 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1229,8 +1229,37 @@ void commit_txn_immediately(
LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << hex(lock_keys[i])
<< " txn_id=" << txn_id;

int64_t lock_id = lock_info.lock_id();
for (auto tablet_id : table_id_tablet_ids[table_id]) {
std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id});

// check that if the pending info's lock_id is correct
std::string pending_val;
err = txn->get(pending_key, &pending_val);
if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
ss << "failed to get delete bitmap pending info, instance_id=" << instance_id
<< " tablet_id=" << tablet_id << " key=" << hex(pending_key)
<< " err=" << err;
msg = ss.str();
code = cast_as<ErrCategory::READ>(err);
return;
}
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) continue;
PendingDeleteBitmapPB pending_info;
if (!pending_info.ParseFromString(pending_val)) [[unlikely]] {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "failed to parse PendingDeleteBitmapPB";
return;
}
if (pending_info.has_lock_id() && pending_info.lock_id() != lock_id) {
code = MetaServiceCode::PENDING_DELETE_BITMAP_WRONG;
msg = fmt::format(
"wrong lock_id in pending delete bitmap infos, expect lock_id={}, but "
"found {} tablet_id={} instance_id={}",
lock_id, pending_info.lock_id(), tablet_id, instance_id);
return;
}

txn->remove(pending_key);
LOG(INFO) << "xxx remove delete bitmap pending key, pending_key="
<< hex(pending_key) << " txn_id=" << txn_id;
Expand Down
139 changes: 138 additions & 1 deletion cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5351,7 +5351,7 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) {

// case: first version of rowset
{
int64_t txn_id = -1;
int64_t txn_id = 98765;
int64_t table_id = 123456; // same as table_id of tmp rowset
int64_t db_id = 222;
int64_t tablet_id_base = 8113;
Expand Down Expand Up @@ -5430,11 +5430,16 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) {
std::string lock_val;
auto ret = txn->get(lock_key, &lock_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
DeleteBitmapUpdateLockPB lock_info;
ASSERT_TRUE(lock_info.ParseFromString(lock_val));

std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base});
std::string pending_val;
ret = txn->get(pending_key, &pending_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
PendingDeleteBitmapPB pending_info;
ASSERT_TRUE(pending_info.ParseFromString(pending_val));
ASSERT_EQ(pending_info.lock_id(), lock_info.lock_id());
}

// commit txn
Expand Down Expand Up @@ -5468,6 +5473,138 @@ TEST(MetaServiceTest, DeleteBimapCommitTxnTest) {
}
}

TEST(MetaServiceTest, WrongPendingBitmapTest) {
auto meta_service = get_meta_service();
extern std::string get_instance_id(const std::shared_ptr<ResourceManager>& rc_mgr,
const std::string& cloud_unique_id);
auto instance_id = get_instance_id(meta_service->resource_mgr(), "test_cloud_unique_id");

// case: first version of rowset
{
int64_t txn_id = 56789;
int64_t table_id = 123456; // same as table_id of tmp rowset
int64_t db_id = 222;
int64_t tablet_id_base = 8113;
int64_t partition_id = 1234;
// begin txn
{
brpc::Controller cntl;
BeginTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
TxnInfoPB txn_info_pb;
txn_info_pb.set_db_id(db_id);
txn_info_pb.set_label("test_label");
txn_info_pb.add_table_ids(table_id);
txn_info_pb.set_timeout_ms(36000);
req.mutable_txn_info()->CopyFrom(txn_info_pb);
BeginTxnResponse res;
meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
txn_id = res.txn_id();
}

// mock rowset and tablet
for (int i = 0; i < 5; ++i) {
create_tablet(meta_service.get(), table_id, 1235, partition_id, tablet_id_base + i);
auto tmp_rowset = create_rowset(txn_id, tablet_id_base + i);
tmp_rowset.set_partition_id(partition_id);
CreateRowsetResponse res;
commit_rowset(meta_service.get(), tmp_rowset, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

// update delete bitmap
{
// get delete bitmap update lock
brpc::Controller cntl;
GetDeleteBitmapUpdateLockRequest get_lock_req;
GetDeleteBitmapUpdateLockResponse get_lock_res;
get_lock_req.set_cloud_unique_id("test_cloud_unique_id");
get_lock_req.set_table_id(table_id);
get_lock_req.add_partition_ids(partition_id);
get_lock_req.set_expiration(5);
get_lock_req.set_lock_id(txn_id);
get_lock_req.set_initiator(-1);
meta_service->get_delete_bitmap_update_lock(
reinterpret_cast<::google::protobuf::RpcController*>(&cntl), &get_lock_req,
&get_lock_res, nullptr);
ASSERT_EQ(get_lock_res.status().code(), MetaServiceCode::OK);

// first update delete bitmap
UpdateDeleteBitmapRequest update_delete_bitmap_req;
UpdateDeleteBitmapResponse update_delete_bitmap_res;
update_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
update_delete_bitmap_req.set_table_id(table_id);
update_delete_bitmap_req.set_partition_id(partition_id);
update_delete_bitmap_req.set_lock_id(txn_id);
update_delete_bitmap_req.set_initiator(-1);
update_delete_bitmap_req.set_tablet_id(tablet_id_base);

update_delete_bitmap_req.add_rowset_ids("123");
update_delete_bitmap_req.add_segment_ids(1);
update_delete_bitmap_req.add_versions(2);
update_delete_bitmap_req.add_segment_delete_bitmaps("abc0");

meta_service->update_delete_bitmap(
reinterpret_cast<google::protobuf::RpcController*>(&cntl),
&update_delete_bitmap_req, &update_delete_bitmap_res, nullptr);
ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK);
}

// check delete bitmap update lock and pending delete bitmap
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1});
std::string lock_val;
auto ret = txn->get(lock_key, &lock_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
DeleteBitmapUpdateLockPB lock_info;
ASSERT_TRUE(lock_info.ParseFromString(lock_val));

std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base});
std::string pending_val;
ret = txn->get(pending_key, &pending_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
PendingDeleteBitmapPB pending_info;
ASSERT_TRUE(pending_info.ParseFromString(pending_val));
ASSERT_EQ(pending_info.lock_id(), lock_info.lock_id());
}

{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
// pending bitmap have been modified by other txn
std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id_base});
std::string pending_val;
auto ret = txn->get(pending_key, &pending_val);
ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
PendingDeleteBitmapPB pending_info;
ASSERT_TRUE(pending_info.ParseFromString(pending_val));
// change pending bitmap's lock_id
pending_info.set_lock_id(pending_info.lock_id() + 1);
ASSERT_TRUE(pending_info.SerializeToString(&pending_val));
txn->put(pending_key, pending_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}

// commit txn
{
brpc::Controller cntl;
CommitTxnRequest req;
req.set_cloud_unique_id("test_cloud_unique_id");
req.set_db_id(db_id);
req.set_txn_id(txn_id);
req.add_mow_table_ids(table_id);
CommitTxnResponse res;
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::PENDING_DELETE_BITMAP_WRONG);
}
}
}

TEST(MetaServiceTest, GetDeleteBitmapWithRetryTest1) {
auto meta_service = get_meta_service();
SyncPoint::get_instance()->enable_processing();
Expand Down
2 changes: 2 additions & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,7 @@ enum MetaServiceCode {
LOCK_EXPIRED = 8001;
LOCK_CONFLICT = 8002;
ROWSETS_EXPIRED = 8003;
PENDING_DELETE_BITMAP_WRONG = 8004;

// partial update
ROWSET_META_NOT_FOUND = 9001;
Expand Down Expand Up @@ -1446,6 +1447,7 @@ message RemoveDeleteBitmapResponse {

message PendingDeleteBitmapPB {
repeated bytes delete_bitmap_keys = 1;
optional int64 lock_id = 2;
}

message DeleteBitmapUpdateLockPB {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1
2 2 2
3 3 3

-- !sql --
1 999 999
2 2 2
3 3 3

Loading

0 comments on commit 1ae4d26

Please sign in to comment.