Skip to content

Commit

Permalink
[Fix](recycler) Fix potential data leak when a partial update load wh…
Browse files Browse the repository at this point in the history
…ich has publish conflict fails (#45626)

### What problem does this PR solve?

when recycling tmp rowsets, if rowset's state is
`RowsetStatePB::BEGIN_PARTIAL_UPDATE`, the `num_segments` field in
`RowsetMetaCloudPB` may not reflect the actual segments num(This may
happen if partial update load writes a new segment to an existing tmp
rowset in publish phase due to conflict and fails before it updates
segments num in `RowsetMetaCloudPB` in MS successfully). So we need to
delete the rowsets by prefix rather than delete by path in this case.

related case: #45795
  • Loading branch information
bobhan1 authored Dec 27, 2024
1 parent ac2b060 commit a70673f
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 0 deletions.
16 changes: 16 additions & 0 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,8 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
int ret = 0;
// resource_id -> file_paths
std::map<std::string, std::vector<std::string>> resource_file_paths;
// (resource_id, tablet_id, rowset_id)
std::vector<std::tuple<std::string, int64_t, std::string>> rowsets_delete_by_prefix;

for (const auto& rs : rowsets) {
{
Expand Down Expand Up @@ -1519,6 +1521,12 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
index_format = index_info.first;
index_ids = std::move(index_info.second);
}
if (rs.rowset_state() == RowsetStatePB::BEGIN_PARTIAL_UPDATE) {
// if rowset state is RowsetStatePB::BEGIN_PARTIAL_UPDATE, the number of segments data
// may be larger than num_segments field in RowsetMeta, so we need to delete the rowset's data by prefix
rowsets_delete_by_prefix.emplace_back(rs.resource_id(), tablet_id, rs.rowset_id_v2());
continue;
}
for (int64_t i = 0; i < num_segments; ++i) {
file_paths.push_back(segment_path(tablet_id, rowset_id, i));
if (index_format == InvertedIndexStorageFormatPB::V1) {
Expand All @@ -1542,6 +1550,14 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
return accessor->delete_files(*paths);
});
}
for (const auto& [resource_id, tablet_id, rowset_id] : rowsets_delete_by_prefix) {
LOG_INFO(
"delete rowset {} by prefix because it's in BEGIN_PARTIAL_UPDATE state, "
"resource_id={}, tablet_id={}, instance_id={}",
rowset_id, resource_id, tablet_id, instance_id_);
concurrent_delete_executor.add(
[&]() -> int { return delete_rowset_data(resource_id, tablet_id, rowset_id); });
}
bool finished = true;
std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
for (int r : rets) {
Expand Down
81 changes: 81 additions & 0 deletions cloud/test/recycler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ static doris::RowsetMetaCloudPB create_rowset(const std::string& resource_id, in
return rowset;
}

static doris::RowsetMetaCloudPB create_rowset(const std::string& resource_id, int64_t tablet_id,
int64_t index_id, int num_segments,
const doris::TabletSchemaCloudPB& schema,
RowsetStatePB rowset_state, int64_t txn_id = 0) {
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // useless but required
rowset.set_rowset_id_v2(next_rowset_id());
rowset.set_txn_id(txn_id);
rowset.set_num_segments(num_segments);
rowset.set_tablet_id(tablet_id);
rowset.set_index_id(index_id);
rowset.set_resource_id(resource_id);
rowset.set_schema_version(schema.schema_version());
rowset.mutable_tablet_schema()->CopyFrom(schema);
rowset.set_rowset_state(rowset_state);
return rowset;
}

static int create_recycle_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor,
const doris::RowsetMetaCloudPB& rowset, RecycleRowsetPB::Type type,
bool write_schema_kv) {
Expand Down Expand Up @@ -924,6 +942,69 @@ TEST(RecyclerTest, recycle_tmp_rowsets) {
EXPECT_EQ(insert_no_inverted_index, 4);
}

TEST(RecyclerTest, recycle_tmp_rowsets_partial_update) {
config::retention_seconds = 0;
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);

InstanceInfoPB instance;
instance.set_instance_id(instance_id);
auto obj_info = instance.add_obj_info();
obj_info->set_id("recycle_tmp_rowsets_partial_update");
obj_info->set_ak(config::test_s3_ak);
obj_info->set_sk(config::test_s3_sk);
obj_info->set_endpoint(config::test_s3_endpoint);
obj_info->set_region(config::test_s3_region);
obj_info->set_bucket(config::test_s3_bucket);
obj_info->set_prefix("recycle_tmp_rowsets_partial_update");

InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);

doris::TabletSchemaCloudPB schema;

auto accessor = recycler.accessor_map_.begin()->second;
int64_t tablet_id = 10015;
int64_t index_id = 1000;
int64_t txn_id_base = 293039;
for (int j = 0; j < 20; ++j) {
int64_t txn_id = txn_id_base + j;
int segment_num = 5;
if (j < 15) {
auto rowset = create_rowset("recycle_tmp_rowsets_partial_update", tablet_id, index_id,
segment_num, schema, RowsetStatePB::VISIBLE, txn_id);
create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, false);
} else {
auto rowset =
create_rowset("recycle_tmp_rowsets_partial_update", tablet_id, tablet_id,
segment_num, schema, RowsetStatePB::BEGIN_PARTIAL_UPDATE, txn_id);
create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, false);

// partial update may write new segment to an existing tmp rowsets
// we simulate that partial update load fails after it writes a segment
// and before it updates the segments num in tmp rowset meta
int extra_segment_id = segment_num;
auto path = segment_path(rowset.tablet_id(), rowset.rowset_id_v2(), extra_segment_id);
accessor->put_file(path, path);
}
}

ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0);
// check rowset does not exist on obj store
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_directory("data/", &list_iter));
ASSERT_FALSE(list_iter->has_next());
// check all tmp rowset kv have been deleted
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::unique_ptr<RangeGetIterator> it;
auto begin_key = meta_rowset_tmp_key({instance_id, 0, 0});
auto end_key = meta_rowset_tmp_key({instance_id, INT64_MAX, 0});
ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
ASSERT_EQ(it->size(), 0);
}

TEST(RecyclerTest, recycle_tablet) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
Expand Down

0 comments on commit a70673f

Please sign in to comment.