From 8c12b1b88de32d93956b592597e0af30208f7634 Mon Sep 17 00:00:00 2001 From: Shinichi Umegane Date: Thu, 20 Feb 2025 19:42:52 +0900 Subject: [PATCH] fix: Resolve timing-dependent bug in blob file GC --- include/limestone/api/datastore.h | 2 +- src/limestone/blob_file_garbage_collector.cpp | 15 ++++++++ src/limestone/blob_file_garbage_collector.h | 10 +++++ src/limestone/compaction_options.h | 38 ++++++++++--------- src/limestone/datastore.cpp | 19 ++++------ src/limestone/datastore_snapshot.cpp | 1 + .../blob/compaction_blob_gc_test.cpp | 10 +++-- .../compaction/compaction_options_test.cpp | 7 ++-- .../compaction/compaction_test_fixture.h | 2 + 9 files changed, 67 insertions(+), 37 deletions(-) diff --git a/include/limestone/api/datastore.h b/include/limestone/api/datastore.h index 6ec7dba..d1d336c 100644 --- a/include/limestone/api/datastore.h +++ b/include/limestone/api/datastore.h @@ -323,7 +323,7 @@ class datastore { auto next_blob_id_for_tests() const noexcept { return next_blob_id_.load(); } auto& files_for_tests() const noexcept { return files_; } void rotate_epoch_file_for_tests() { rotate_epoch_file(); } - void set_next_blob_id_for_tests(blob_id_type next_blob_id) noexcept { next_blob_id_ = next_blob_id; } + void set_next_blob_id_for_tests(blob_id_type next_blob_id) noexcept { next_blob_id_.store(next_blob_id); } std::set get_persistent_blob_ids_for_tests() noexcept { std::lock_guard lock(persistent_blob_ids_mutex_); return persistent_blob_ids_; diff --git a/src/limestone/blob_file_garbage_collector.cpp b/src/limestone/blob_file_garbage_collector.cpp index 680fd9b..5459b62 100644 --- a/src/limestone/blob_file_garbage_collector.cpp +++ b/src/limestone/blob_file_garbage_collector.cpp @@ -116,6 +116,7 @@ using limestone::api::log_entry; } void blob_file_garbage_collector::add_gc_exempt_blob_id(blob_id_type id) { + VLOG_LP(log_trace_fine) << "Adding blob id to gc_exempt_blob_: " << id; gc_exempt_blob_->add_blob_id(id); } @@ -135,7 +136,10 @@ using limestone::api::log_entry; this->wait_for_blob_file_scan(); // Calculate the difference and perform deletion operations + VLOG_LP(100) << "Scanned blobs before diff: " << scanned_blobs_->debug_string(); + VLOG_LP(100) << "GC exempt blobs: " << gc_exempt_blob_->debug_string(); scanned_blobs_->diff(*gc_exempt_blob_); + VLOG_LP(100) << "Scanned blobs after: " << scanned_blobs_->debug_string(); for (const auto &id : *scanned_blobs_) { if (shutdown_requested_.load(std::memory_order_acquire)) { @@ -282,7 +286,18 @@ void blob_file_garbage_collector::scan_snapshot(const boost::filesystem::path &s std::lock_guard lock(mutex_); scanned_blobs_ = std::make_unique(); gc_exempt_blob_ = std::make_unique(); + blob_file_scan_started_ = false; + blob_file_scan_waited_ = false; + snapshot_scan_started_ = false; + snapshot_scan_waited_ = false; + cleanup_started_ = false; + cleanup_waited_ = false; } + bool blob_file_garbage_collector::is_active() const { + std::lock_guard lock(mutex_); + return !(blob_file_scan_complete_ && snapshot_scan_complete_ && cleanup_complete_); +} + } // namespace limestone::internal \ No newline at end of file diff --git a/src/limestone/blob_file_garbage_collector.h b/src/limestone/blob_file_garbage_collector.h index 526da7e..c1b8e83 100644 --- a/src/limestone/blob_file_garbage_collector.h +++ b/src/limestone/blob_file_garbage_collector.h @@ -140,6 +140,16 @@ class blob_file_garbage_collector { */ void shutdown(); + /** + * Determines if any blob file operations are currently in progress. + * + * Returns true if blob file scanning, snapshot scanning, or cleanup has been started. + * This indicates that the blob file is actively undergoing some form of scanning or cleanup. + * + * @return True if any corresponding operation is active; false otherwise. + */ + bool is_active() const; + /** * @brief Blocks the current thread until all worker threads have completed execution. * diff --git a/src/limestone/compaction_options.h b/src/limestone/compaction_options.h index 0a1468e..05797c9 100644 --- a/src/limestone/compaction_options.h +++ b/src/limestone/compaction_options.h @@ -40,7 +40,7 @@ num_worker_(workers), file_names_(std::move(file_names)), has_file_set_(true), - gc_snapshot_(std::nullopt) + gc_snapshot_(nullptr) {} // Constructor: to_dir provided, GC disabled, no file set. @@ -53,7 +53,7 @@ to_dir_(std::move(to)), num_worker_(workers), has_file_set_(false), - gc_snapshot_(std::nullopt) + gc_snapshot_(nullptr) {} // Constructor: to_dir provided, file set available, GC disabled. @@ -68,7 +68,7 @@ num_worker_(workers), file_names_(std::move(file_names)), has_file_set_(true), - gc_snapshot_(std::nullopt) + gc_snapshot_(nullptr) {} // Constructor: to_dir provided, file set available, GC enabled. @@ -78,14 +78,14 @@ boost::filesystem::path to, int workers, std::set file_names, - blob_file_gc_snapshot& gc_snapshot + std::unique_ptr gc_snapshot ) : from_dir_(std::move(from)), to_dir_(std::move(to)), num_worker_(workers), file_names_(std::move(file_names)), has_file_set_(true), - gc_snapshot_(std::ref(gc_snapshot)) + gc_snapshot_(std::move(gc_snapshot)) {} // Getter for from_dir. @@ -102,27 +102,31 @@ // Returns true if a file set is configured. [[nodiscard]] bool has_file_set() const { return has_file_set_; } - + // Check if GC is enabled. - [[nodiscard]] bool is_gc_enabled() const { return gc_snapshot_.has_value(); } - + [[nodiscard]] bool is_gc_enabled() const { return static_cast(gc_snapshot_); } + // Getter for gc_snapshot. // It is caller's responsibility to ensure GC is enabled before calling. - [[nodiscard]] blob_file_gc_snapshot& get_gc_snapshot() const { return gc_snapshot_.value().get(); } - - private: + [[nodiscard]] blob_file_gc_snapshot& get_gc_snapshot() const { + if (!gc_snapshot_) { + throw std::logic_error("GC is not enabled"); + } + return *gc_snapshot_; + } + + private: // Basic compaction settings. boost::filesystem::path from_dir_; boost::filesystem::path to_dir_; int num_worker_; - + // File set for compaction. std::set file_names_; bool has_file_set_; - + // Garbage collection settings. - std::optional> gc_snapshot_; + std::unique_ptr gc_snapshot_; }; - - } // namespace limestone::internal - \ No newline at end of file + + } // namespace limestone::internal diff --git a/src/limestone/datastore.cpp b/src/limestone/datastore.cpp index ba942ee..a6b2d8e 100644 --- a/src/limestone/datastore.cpp +++ b/src/limestone/datastore.cpp @@ -670,13 +670,9 @@ void datastore::compact_with_online() { // check blob file garbage collection runnable bool blob_file_gc_runnable = false; - if (boundary_version_copy.get_major() > compaction_catalog_->get_max_epoch_id()) { + if (boundary_version_copy.get_major() > compaction_catalog_->get_max_epoch_id() && !blob_file_garbage_collector_->is_active()) { blob_file_gc_runnable = true; } - std::cerr << "boundary_version_copy = " << boundary_version_copy.get_major() << "." << boundary_version_copy.get_minor() << std::endl; - std::cerr << "max_epoch_id = " << compaction_catalog_->get_max_epoch_id() << std::endl; - std::cerr << "blob_file_gc_runnable = " << blob_file_gc_runnable << std::endl; - // rotate first rotation_result result = rotate_log_files(); @@ -710,13 +706,12 @@ void datastore::compact_with_online() { // Set the appropriate options based on whether blob file GC is executable. compaction_options options = [&]() -> compaction_options { if (blob_file_gc_runnable) { - blob_file_gc_snapshot gc_snapshot(boundary_version_copy); - return compaction_options{location_, compaction_temp_dir, recover_max_parallelism_, need_compaction_filenames, gc_snapshot}; + auto gc_snapshot = std::make_unique(boundary_version_copy); + return compaction_options{location_, compaction_temp_dir, recover_max_parallelism_, need_compaction_filenames, std::move(gc_snapshot)}; } return compaction_options{location_, compaction_temp_dir, recover_max_parallelism_, need_compaction_filenames}; }(); - std::cerr << "is_gc_enabled = " << options.is_gc_enabled() << std::endl; // create a compacted file blob_id_type max_blob_id = create_compact_pwal_and_get_max_blob_id(options); @@ -762,15 +757,15 @@ void datastore::compact_with_online() { // blob files garbage collection if (options.is_gc_enabled()) { LOG_LP(INFO) << "start blob files garbage collection"; - blob_file_garbage_collector garb_collector(*blob_file_resolver_); - garb_collector.scan_blob_files(next_blob_id_copy); + blob_file_garbage_collector_->wait_for_all_threads(); + blob_file_garbage_collector_->scan_blob_files(next_blob_id_copy); log_entry_container log_entries = options.get_gc_snapshot().finalize_snapshot(); for (const auto& entry : log_entries) { for (const auto& blob_id : entry.get_blob_ids()) { - garb_collector.add_gc_exempt_blob_id(blob_id); + blob_file_garbage_collector_->add_gc_exempt_blob_id(blob_id); } } - garb_collector.finalize_scan_and_cleanup(); + blob_file_garbage_collector_->finalize_scan_and_cleanup(); LOG_LP(INFO) << "blob files garbage collection finished"; } diff --git a/src/limestone/datastore_snapshot.cpp b/src/limestone/datastore_snapshot.cpp index 2134c9e..23cbe7b 100644 --- a/src/limestone/datastore_snapshot.cpp +++ b/src/limestone/datastore_snapshot.cpp @@ -148,6 +148,7 @@ static std::pair create_sorted_from_wals(compact switch (e.type()) { case log_entry::entry_type::normal_with_blob: if (options.is_gc_enabled()) { + std::cerr << "write version = " << options.get_gc_snapshot().boundary_version().get_major() << "." << options.get_gc_snapshot().boundary_version().get_minor() << std::endl; options.get_gc_snapshot().sanitize_and_add_entry(e); } add_entry_to_point(sctx.get_sortdb(), e); diff --git a/test/limestone/blob/compaction_blob_gc_test.cpp b/test/limestone/blob/compaction_blob_gc_test.cpp index f775f8d..4deed87 100644 --- a/test/limestone/blob/compaction_blob_gc_test.cpp +++ b/test/limestone/blob/compaction_blob_gc_test.cpp @@ -55,7 +55,7 @@ TEST_F(compaction_test, basic_blob_gc_test) { auto path1003 = create_dummy_blob_files(1003); auto path2001 = create_dummy_blob_files(2001); auto path2002 = create_dummy_blob_files(2002); - + datastore_->set_next_blob_id(2003); // Verify PWAL content before compaction. // Here, we assume that "pwal_0000" aggregates entries from both epoch 1 and epoch 2. @@ -109,7 +109,7 @@ TEST_F(compaction_test, basic_blob_gc_test) { lc0_->begin_session(); - lc0_->add_entry(1, "blob_key5", "value5", {1, 1}); + lc0_->add_entry(1, "noblob_key5", "noblob_value5", {1, 1}); lc0_->end_session(); datastore_->switch_epoch(4); @@ -117,7 +117,9 @@ TEST_F(compaction_test, basic_blob_gc_test) { datastore_->switch_available_boundary_version({3,0}); // Perform compaction in epoch 5. + FLAGS_v = 100; run_compact_with_epoch_switch(5); + FLAGS_v = 30; // Verify the existence of the compacted blob files. EXPECT_FALSE(boost::filesystem::exists(path1001)); @@ -128,7 +130,7 @@ TEST_F(compaction_test, basic_blob_gc_test) { // Restart datastore and verify snapshot content. std::vector> kv_list = restart_datastore_and_read_snapshot(); - ASSERT_EQ(kv_list.size(), 4); + ASSERT_EQ(kv_list.size(), 5); EXPECT_EQ(kv_list[0].first, "blob_key1"); EXPECT_EQ(kv_list[0].second, "blob_value1_epoch2"); EXPECT_EQ(kv_list[1].first, "blob_key2"); @@ -137,6 +139,8 @@ TEST_F(compaction_test, basic_blob_gc_test) { EXPECT_EQ(kv_list[2].second, "noblob_value1_epoch2"); EXPECT_EQ(kv_list[3].first, "noblob_key2"); EXPECT_EQ(kv_list[3].second, "noblob_value2"); + EXPECT_EQ(kv_list[4].first, "noblob_key5"); + EXPECT_EQ(kv_list[4].second, "noblob_value5"); // Verify that no snapshot PWAL file exists. log_entries = read_log_file("data/snapshot", location); diff --git a/test/limestone/compaction/compaction_options_test.cpp b/test/limestone/compaction/compaction_options_test.cpp index c1a1095..bcf532b 100644 --- a/test/limestone/compaction/compaction_options_test.cpp +++ b/test/limestone/compaction/compaction_options_test.cpp @@ -54,9 +54,8 @@ TEST_F(compaction_options_test, construct_with_file_set_and_gc) { std::set file_names = {"file1", "file2"}; write_version_type boundary_version(42, 5); - - blob_file_gc_snapshot gc_snapshot(boundary_version); - compaction_options options(from_dir_, to_dir_, num_workers_, file_names, gc_snapshot); + auto gc_snapshot = std::make_unique(boundary_version); + compaction_options options(from_dir_, to_dir_, num_workers_, file_names, std::move(gc_snapshot)); EXPECT_EQ(options.get_from_dir(), from_dir_); EXPECT_EQ(options.get_to_dir(), to_dir_); @@ -70,7 +69,7 @@ compaction_options options(from_dir_, to_dir_, num_workers_); EXPECT_FALSE(options.is_gc_enabled()); - EXPECT_THROW((void)options.get_gc_snapshot(), std::bad_optional_access); + EXPECT_THROW((void)options.get_gc_snapshot(), std::logic_error); } // New test for the constructor without to_dir (pre-compaction phase) diff --git a/test/limestone/compaction/compaction_test_fixture.h b/test/limestone/compaction/compaction_test_fixture.h index de30b6e..eaf20c7 100644 --- a/test/limestone/compaction/compaction_test_fixture.h +++ b/test/limestone/compaction/compaction_test_fixture.h @@ -58,6 +58,7 @@ class compaction_test : public ::testing::Test { lc2_ = &datastore_->create_channel(location); datastore_->ready(); + datastore_->wait_for_blob_file_garbace_collector(); } void TearDown() { @@ -152,6 +153,7 @@ class compaction_test : public ::testing::Test { // Wait for the compact operation to finish future.get(); // Will rethrow any exception from compact_with_online + datastore_->wait_for_blob_file_garbace_collector(); } catch (const std::exception& e) { std::cerr << "Error: " << e.what() << std::endl; throw; // Re-throw the exception for further handling