From 40efd97666ee87d8d8a556a948cfd1d2902630c7 Mon Sep 17 00:00:00 2001 From: bobhan1 <baohan@selectdb.com> Date: Tue, 31 Dec 2024 10:58:32 +0800 Subject: [PATCH] branch-3.0: [Fix](recycler) Fix potential data leak when a partial update load which has publish conflict fails #45626 (#46138) pick https://github.com/apache/doris/pull/45626 --- cloud/src/recycler/recycler.cpp | 16 +++++++ cloud/test/recycler_test.cpp | 81 +++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 84d755958ee29c..04476704bd36c7 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -1449,6 +1449,8 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou int ret = 0; // resource_id -> file_paths std::map<std::string, std::vector<std::string>> resource_file_paths; + // (resource_id, tablet_id, rowset_id) + std::vector<std::tuple<std::string, int64_t, std::string>> rowsets_delete_by_prefix; for (const auto& rs : rowsets) { { @@ -1519,6 +1521,12 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou index_format = index_info.first; index_ids = std::move(index_info.second); } + if (rs.rowset_state() == RowsetStatePB::BEGIN_PARTIAL_UPDATE) { + // if rowset state is RowsetStatePB::BEGIN_PARTIAL_UPDATE, the number of segments data + // may be larger than num_segments field in RowsetMeta, so we need to delete the rowset's data by prefix + rowsets_delete_by_prefix.emplace_back(rs.resource_id(), tablet_id, rs.rowset_id_v2()); + continue; + } for (int64_t i = 0; i < num_segments; ++i) { file_paths.push_back(segment_path(tablet_id, rowset_id, i)); if (index_format == InvertedIndexStorageFormatPB::V1) { @@ -1542,6 +1550,14 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou return accessor->delete_files(*paths); }); } + for (const auto& [resource_id, tablet_id, rowset_id] : rowsets_delete_by_prefix) { + LOG_INFO( + "delete rowset {} by prefix because it's in BEGIN_PARTIAL_UPDATE state, " + "resource_id={}, tablet_id={}, instance_id={}", + rowset_id, resource_id, tablet_id, instance_id_); + concurrent_delete_executor.add( + [&]() -> int { return delete_rowset_data(resource_id, tablet_id, rowset_id); }); + } bool finished = true; std::vector<int> rets = concurrent_delete_executor.when_all(&finished); for (int r : rets) { diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 0bc16644a82041..e38d25aaa8420a 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -121,6 +121,24 @@ static doris::RowsetMetaCloudPB create_rowset(const std::string& resource_id, in return rowset; } +static doris::RowsetMetaCloudPB create_rowset(const std::string& resource_id, int64_t tablet_id, + int64_t index_id, int num_segments, + const doris::TabletSchemaCloudPB& schema, + RowsetStatePB rowset_state, int64_t txn_id = 0) { + doris::RowsetMetaCloudPB rowset; + rowset.set_rowset_id(0); // useless but required + rowset.set_rowset_id_v2(next_rowset_id()); + rowset.set_txn_id(txn_id); + rowset.set_num_segments(num_segments); + rowset.set_tablet_id(tablet_id); + rowset.set_index_id(index_id); + rowset.set_resource_id(resource_id); + rowset.set_schema_version(schema.schema_version()); + rowset.mutable_tablet_schema()->CopyFrom(schema); + rowset.set_rowset_state(rowset_state); + return rowset; +} + static int create_recycle_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor, const doris::RowsetMetaCloudPB& rowset, RecycleRowsetPB::Type type, bool write_schema_kv) { @@ -924,6 +942,69 @@ TEST(RecyclerTest, recycle_tmp_rowsets) { EXPECT_EQ(insert_no_inverted_index, 4); } +TEST(RecyclerTest, recycle_tmp_rowsets_partial_update) { + config::retention_seconds = 0; + auto txn_kv = std::make_shared<MemTxnKv>(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("recycle_tmp_rowsets_partial_update"); + obj_info->set_ak(config::test_s3_ak); + obj_info->set_sk(config::test_s3_sk); + obj_info->set_endpoint(config::test_s3_endpoint); + obj_info->set_region(config::test_s3_region); + obj_info->set_bucket(config::test_s3_bucket); + obj_info->set_prefix("recycle_tmp_rowsets_partial_update"); + + InstanceRecycler recycler(txn_kv, instance, thread_group, + std::make_shared<TxnLazyCommitter>(txn_kv)); + ASSERT_EQ(recycler.init(), 0); + + doris::TabletSchemaCloudPB schema; + + auto accessor = recycler.accessor_map_.begin()->second; + int64_t tablet_id = 10015; + int64_t index_id = 1000; + int64_t txn_id_base = 293039; + for (int j = 0; j < 20; ++j) { + int64_t txn_id = txn_id_base + j; + int segment_num = 5; + if (j < 15) { + auto rowset = create_rowset("recycle_tmp_rowsets_partial_update", tablet_id, index_id, + segment_num, schema, RowsetStatePB::VISIBLE, txn_id); + create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, false); + } else { + auto rowset = + create_rowset("recycle_tmp_rowsets_partial_update", tablet_id, tablet_id, + segment_num, schema, RowsetStatePB::BEGIN_PARTIAL_UPDATE, txn_id); + create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, false); + + // partial update may write new segment to an existing tmp rowsets + // we simulate that partial update load fails after it writes a segment + // and before it updates the segments num in tmp rowset meta + int extra_segment_id = segment_num; + auto path = segment_path(rowset.tablet_id(), rowset.rowset_id_v2(), extra_segment_id); + accessor->put_file(path, path); + } + } + + ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0); + // check rowset does not exist on obj store + std::unique_ptr<ListIterator> list_iter; + ASSERT_EQ(0, accessor->list_directory("data/", &list_iter)); + ASSERT_FALSE(list_iter->has_next()); + // check all tmp rowset kv have been deleted + std::unique_ptr<Transaction> txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::unique_ptr<RangeGetIterator> it; + auto begin_key = meta_rowset_tmp_key({instance_id, 0, 0}); + auto end_key = meta_rowset_tmp_key({instance_id, INT64_MAX, 0}); + ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); + ASSERT_EQ(it->size(), 0); +} + TEST(RecyclerTest, recycle_tablet) { auto txn_kv = std::make_shared<MemTxnKv>(); ASSERT_EQ(txn_kv->init(), 0);