Skip to content

Commit

Permalink
fix: Resolve timing-dependent bug in blob file GC
Browse files Browse the repository at this point in the history
  • Loading branch information
umegane committed Feb 20, 2025
1 parent 18309d7 commit 8c12b1b
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 37 deletions.
2 changes: 1 addition & 1 deletion include/limestone/api/datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class datastore {
auto next_blob_id_for_tests() const noexcept { return next_blob_id_.load(); }
auto& files_for_tests() const noexcept { return files_; }
void rotate_epoch_file_for_tests() { rotate_epoch_file(); }
void set_next_blob_id_for_tests(blob_id_type next_blob_id) noexcept { next_blob_id_ = next_blob_id; }
void set_next_blob_id_for_tests(blob_id_type next_blob_id) noexcept { next_blob_id_.store(next_blob_id); }
std::set<blob_id_type> get_persistent_blob_ids_for_tests() noexcept {
std::lock_guard<std::mutex> lock(persistent_blob_ids_mutex_);
return persistent_blob_ids_;
Expand Down
15 changes: 15 additions & 0 deletions src/limestone/blob_file_garbage_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ using limestone::api::log_entry;
}

void blob_file_garbage_collector::add_gc_exempt_blob_id(blob_id_type id) {
VLOG_LP(log_trace_fine) << "Adding blob id to gc_exempt_blob_: " << id;
gc_exempt_blob_->add_blob_id(id);
}

Expand All @@ -135,7 +136,10 @@ using limestone::api::log_entry;
this->wait_for_blob_file_scan();

// Calculate the difference and perform deletion operations
VLOG_LP(100) << "Scanned blobs before diff: " << scanned_blobs_->debug_string();
VLOG_LP(100) << "GC exempt blobs: " << gc_exempt_blob_->debug_string();
scanned_blobs_->diff(*gc_exempt_blob_);
VLOG_LP(100) << "Scanned blobs after: " << scanned_blobs_->debug_string();

for (const auto &id : *scanned_blobs_) {
if (shutdown_requested_.load(std::memory_order_acquire)) {
Expand Down Expand Up @@ -282,7 +286,18 @@ void blob_file_garbage_collector::scan_snapshot(const boost::filesystem::path &s
std::lock_guard<std::mutex> lock(mutex_);
scanned_blobs_ = std::make_unique<blob_id_container>();
gc_exempt_blob_ = std::make_unique<blob_id_container>();
blob_file_scan_started_ = false;
blob_file_scan_waited_ = false;
snapshot_scan_started_ = false;
snapshot_scan_waited_ = false;
cleanup_started_ = false;
cleanup_waited_ = false;
}

bool blob_file_garbage_collector::is_active() const {
std::lock_guard<std::mutex> lock(mutex_);
return !(blob_file_scan_complete_ && snapshot_scan_complete_ && cleanup_complete_);
}

} // namespace limestone::internal

10 changes: 10 additions & 0 deletions src/limestone/blob_file_garbage_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ class blob_file_garbage_collector {
*/
void shutdown();

/**
* Determines if any blob file operations are currently in progress.
*
* Returns true if blob file scanning, snapshot scanning, or cleanup has been started.
* This indicates that the blob file is actively undergoing some form of scanning or cleanup.
*
* @return True if any corresponding operation is active; false otherwise.
*/
bool is_active() const;

/**
* @brief Blocks the current thread until all worker threads have completed execution.
*
Expand Down
38 changes: 21 additions & 17 deletions src/limestone/compaction_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
num_worker_(workers),
file_names_(std::move(file_names)),
has_file_set_(true),
gc_snapshot_(std::nullopt)
gc_snapshot_(nullptr)
{}

// Constructor: to_dir provided, GC disabled, no file set.
Expand All @@ -53,7 +53,7 @@
to_dir_(std::move(to)),
num_worker_(workers),
has_file_set_(false),
gc_snapshot_(std::nullopt)
gc_snapshot_(nullptr)
{}

// Constructor: to_dir provided, file set available, GC disabled.
Expand All @@ -68,7 +68,7 @@
num_worker_(workers),
file_names_(std::move(file_names)),
has_file_set_(true),
gc_snapshot_(std::nullopt)
gc_snapshot_(nullptr)
{}

// Constructor: to_dir provided, file set available, GC enabled.
Expand All @@ -78,14 +78,14 @@
boost::filesystem::path to,
int workers,
std::set<std::string> file_names,
blob_file_gc_snapshot& gc_snapshot
std::unique_ptr<blob_file_gc_snapshot> gc_snapshot
)
: from_dir_(std::move(from)),
to_dir_(std::move(to)),
num_worker_(workers),
file_names_(std::move(file_names)),
has_file_set_(true),
gc_snapshot_(std::ref(gc_snapshot))
gc_snapshot_(std::move(gc_snapshot))
{}

// Getter for from_dir.
Expand All @@ -102,27 +102,31 @@

// Returns true if a file set is configured.
[[nodiscard]] bool has_file_set() const { return has_file_set_; }

// Check if GC is enabled.
[[nodiscard]] bool is_gc_enabled() const { return gc_snapshot_.has_value(); }
[[nodiscard]] bool is_gc_enabled() const { return static_cast<bool>(gc_snapshot_); }

// Getter for gc_snapshot.
// It is caller's responsibility to ensure GC is enabled before calling.
[[nodiscard]] blob_file_gc_snapshot& get_gc_snapshot() const { return gc_snapshot_.value().get(); }

private:
[[nodiscard]] blob_file_gc_snapshot& get_gc_snapshot() const {
if (!gc_snapshot_) {
throw std::logic_error("GC is not enabled");
}
return *gc_snapshot_;
}

private:
// Basic compaction settings.
boost::filesystem::path from_dir_;
boost::filesystem::path to_dir_;
int num_worker_;

// File set for compaction.
std::set<std::string> file_names_;
bool has_file_set_;

// Garbage collection settings.
std::optional<std::reference_wrapper<blob_file_gc_snapshot>> gc_snapshot_;
std::unique_ptr<blob_file_gc_snapshot> gc_snapshot_;
};

} // namespace limestone::internal


} // namespace limestone::internal
19 changes: 7 additions & 12 deletions src/limestone/datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -670,13 +670,9 @@ void datastore::compact_with_online() {

// check blob file garbage collection runnable
bool blob_file_gc_runnable = false;
if (boundary_version_copy.get_major() > compaction_catalog_->get_max_epoch_id()) {
if (boundary_version_copy.get_major() > compaction_catalog_->get_max_epoch_id() && !blob_file_garbage_collector_->is_active()) {
blob_file_gc_runnable = true;
}
std::cerr << "boundary_version_copy = " << boundary_version_copy.get_major() << "." << boundary_version_copy.get_minor() << std::endl;
std::cerr << "max_epoch_id = " << compaction_catalog_->get_max_epoch_id() << std::endl;
std::cerr << "blob_file_gc_runnable = " << blob_file_gc_runnable << std::endl;


// rotate first
rotation_result result = rotate_log_files();
Expand Down Expand Up @@ -710,13 +706,12 @@ void datastore::compact_with_online() {
// Set the appropriate options based on whether blob file GC is executable.
compaction_options options = [&]() -> compaction_options {
if (blob_file_gc_runnable) {
blob_file_gc_snapshot gc_snapshot(boundary_version_copy);
return compaction_options{location_, compaction_temp_dir, recover_max_parallelism_, need_compaction_filenames, gc_snapshot};
auto gc_snapshot = std::make_unique<blob_file_gc_snapshot>(boundary_version_copy);
return compaction_options{location_, compaction_temp_dir, recover_max_parallelism_, need_compaction_filenames, std::move(gc_snapshot)};
}
return compaction_options{location_, compaction_temp_dir, recover_max_parallelism_, need_compaction_filenames};
}();

std::cerr << "is_gc_enabled = " << options.is_gc_enabled() << std::endl;
// create a compacted file
blob_id_type max_blob_id = create_compact_pwal_and_get_max_blob_id(options);

Expand Down Expand Up @@ -762,15 +757,15 @@ void datastore::compact_with_online() {
// blob files garbage collection
if (options.is_gc_enabled()) {
LOG_LP(INFO) << "start blob files garbage collection";
blob_file_garbage_collector garb_collector(*blob_file_resolver_);
garb_collector.scan_blob_files(next_blob_id_copy);
blob_file_garbage_collector_->wait_for_all_threads();
blob_file_garbage_collector_->scan_blob_files(next_blob_id_copy);
log_entry_container log_entries = options.get_gc_snapshot().finalize_snapshot();
for (const auto& entry : log_entries) {
for (const auto& blob_id : entry.get_blob_ids()) {
garb_collector.add_gc_exempt_blob_id(blob_id);
blob_file_garbage_collector_->add_gc_exempt_blob_id(blob_id);
}
}
garb_collector.finalize_scan_and_cleanup();
blob_file_garbage_collector_->finalize_scan_and_cleanup();
LOG_LP(INFO) << "blob files garbage collection finished";
}

Expand Down
1 change: 1 addition & 0 deletions src/limestone/datastore_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ static std::pair<epoch_id_type, sorting_context> create_sorted_from_wals(compact
switch (e.type()) {
case log_entry::entry_type::normal_with_blob:
if (options.is_gc_enabled()) {
std::cerr << "write version = " << options.get_gc_snapshot().boundary_version().get_major() << "." << options.get_gc_snapshot().boundary_version().get_minor() << std::endl;
options.get_gc_snapshot().sanitize_and_add_entry(e);
}
add_entry_to_point(sctx.get_sortdb(), e);
Expand Down
10 changes: 7 additions & 3 deletions test/limestone/blob/compaction_blob_gc_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ TEST_F(compaction_test, basic_blob_gc_test) {
auto path1003 = create_dummy_blob_files(1003);
auto path2001 = create_dummy_blob_files(2001);
auto path2002 = create_dummy_blob_files(2002);

datastore_->set_next_blob_id(2003);

// Verify PWAL content before compaction.
// Here, we assume that "pwal_0000" aggregates entries from both epoch 1 and epoch 2.
Expand Down Expand Up @@ -109,15 +109,17 @@ TEST_F(compaction_test, basic_blob_gc_test) {


lc0_->begin_session();
lc0_->add_entry(1, "blob_key5", "value5", {1, 1});
lc0_->add_entry(1, "noblob_key5", "noblob_value5", {1, 1});
lc0_->end_session();

datastore_->switch_epoch(4);

datastore_->switch_available_boundary_version({3,0});

// Perform compaction in epoch 5.
FLAGS_v = 100;
run_compact_with_epoch_switch(5);
FLAGS_v = 30;

// Verify the existence of the compacted blob files.
EXPECT_FALSE(boost::filesystem::exists(path1001));
Expand All @@ -128,7 +130,7 @@ TEST_F(compaction_test, basic_blob_gc_test) {

// Restart datastore and verify snapshot content.
std::vector<std::pair<std::string, std::string>> kv_list = restart_datastore_and_read_snapshot();
ASSERT_EQ(kv_list.size(), 4);
ASSERT_EQ(kv_list.size(), 5);
EXPECT_EQ(kv_list[0].first, "blob_key1");
EXPECT_EQ(kv_list[0].second, "blob_value1_epoch2");
EXPECT_EQ(kv_list[1].first, "blob_key2");
Expand All @@ -137,6 +139,8 @@ TEST_F(compaction_test, basic_blob_gc_test) {
EXPECT_EQ(kv_list[2].second, "noblob_value1_epoch2");
EXPECT_EQ(kv_list[3].first, "noblob_key2");
EXPECT_EQ(kv_list[3].second, "noblob_value2");
EXPECT_EQ(kv_list[4].first, "noblob_key5");
EXPECT_EQ(kv_list[4].second, "noblob_value5");

// Verify that no snapshot PWAL file exists.
log_entries = read_log_file("data/snapshot", location);
Expand Down
7 changes: 3 additions & 4 deletions test/limestone/compaction/compaction_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@
TEST_F(compaction_options_test, construct_with_file_set_and_gc) {
std::set<std::string> file_names = {"file1", "file2"};
write_version_type boundary_version(42, 5);

blob_file_gc_snapshot gc_snapshot(boundary_version);
compaction_options options(from_dir_, to_dir_, num_workers_, file_names, gc_snapshot);
auto gc_snapshot = std::make_unique<blob_file_gc_snapshot>(boundary_version);
compaction_options options(from_dir_, to_dir_, num_workers_, file_names, std::move(gc_snapshot));

EXPECT_EQ(options.get_from_dir(), from_dir_);
EXPECT_EQ(options.get_to_dir(), to_dir_);
Expand All @@ -70,7 +69,7 @@
compaction_options options(from_dir_, to_dir_, num_workers_);

EXPECT_FALSE(options.is_gc_enabled());
EXPECT_THROW((void)options.get_gc_snapshot(), std::bad_optional_access);
EXPECT_THROW((void)options.get_gc_snapshot(), std::logic_error);
}

// New test for the constructor without to_dir (pre-compaction phase)
Expand Down
2 changes: 2 additions & 0 deletions test/limestone/compaction/compaction_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class compaction_test : public ::testing::Test {
lc2_ = &datastore_->create_channel(location);

datastore_->ready();
datastore_->wait_for_blob_file_garbace_collector();
}

void TearDown() {
Expand Down Expand Up @@ -152,6 +153,7 @@ class compaction_test : public ::testing::Test {

// Wait for the compact operation to finish
future.get(); // Will rethrow any exception from compact_with_online
datastore_->wait_for_blob_file_garbace_collector();
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
throw; // Re-throw the exception for further handling
Expand Down

0 comments on commit 8c12b1b

Please sign in to comment.