diff --git a/include/limestone/api/datastore.h b/include/limestone/api/datastore.h index a584fea..6205b33 100644 --- a/include/limestone/api/datastore.h +++ b/include/limestone/api/datastore.h @@ -328,6 +328,7 @@ class datastore { std::lock_guard lock(persistent_blob_ids_mutex_); return persistent_blob_ids_; } + write_version_type get_available_boundary_version_for_tests() const noexcept { return available_boundary_version_; } // These virtual methods are hooks for testing thread synchronization. // They allow derived classes to inject custom behavior or notifications @@ -500,6 +501,12 @@ class datastore { std::unique_ptr blob_file_garbage_collector_; + // Boundary version for safe snapshots + write_version_type available_boundary_version_; + + // Mutex to protect boundary version updates + mutable std::mutex boundary_mutex_; + }; } // namespace limestone::api diff --git a/src/limestone/datastore.cpp b/src/limestone/datastore.cpp index 7c3b947..d2f9ef1 100644 --- a/src/limestone/datastore.cpp +++ b/src/limestone/datastore.cpp @@ -35,6 +35,7 @@ #include "blob_file_resolver.h" #include "blob_pool_impl.h" #include "blob_file_garbage_collector.h" +#include "blob_file_gc_snapshot.h" namespace limestone::api { using namespace limestone::internal; @@ -206,6 +207,7 @@ void datastore::ready() { online_compaction_worker_future_ = std::async(std::launch::async, &datastore::online_compaction_worker, this); if (epoch_id_switched_.load() != 0) { write_epoch_callback_(epoch_id_informed_.load()); + available_boundary_version_ = write_version_type{epoch_id_informed_.load(), 0}; } cleanup_rotated_epoch_files(location_); state_ = state::ready; @@ -683,9 +685,18 @@ void datastore::compact_with_online() { boost::filesystem::path compaction_temp_dir = location_ / compaction_catalog::get_compaction_temp_dirname(); ensure_directory_exists(compaction_temp_dir); - // create a compacted file + // create a compacted file and snapshot for blob file garbage collection + write_version_type boundary_version_copy; + { + std::lock_guard lock(boundary_mutex_); + boundary_version_copy = available_boundary_version_; + } + blob_file_gc_snapshot gc_snapshot(boundary_version_copy); + blob_id_type max_blob_id = create_compact_pwal_and_get_max_blob_id(location_, compaction_temp_dir, recover_max_parallelism_, need_compaction_filenames); + + // handle existing compacted file handle_existing_compacted_file(location_); @@ -723,6 +734,19 @@ void datastore::compact_with_online() { remove_file_safely(location_ / compaction_catalog::get_compacted_backup_filename()); LOG_LP(INFO) << "compaction finished"; + + // blob files garbage collection + blob_file_garbage_collector garb_collector(*blob_file_resolver_); + garb_collector.scan_blob_files(boundary_version_copy.get_major()); + log_entry_container log_entries = gc_snapshot.finalize_snapshot(); + for(const auto& entry : log_entries) { + for(const auto& blob_id : entry.get_blob_ids()) { + garb_collector.add_gc_exempt_blob_id(blob_id); + } + } + garb_collector.finalize_scan_and_cleanup(); + + LOG_LP(INFO) << "blob files garbage collection finished"; TRACE_END; } @@ -772,7 +796,19 @@ blob_file datastore::get_blob_file(blob_id_type reference) { } void datastore::switch_available_boundary_version([[maybe_unused]] write_version_type version) { - LOG_FIRST_N(ERROR, 1) << "not implemented"; + TRACE_FINE_START << "version=" << version.get_major() << "." << version.get_minor(); + { + std::lock_guard lock(boundary_mutex_); + if (version < available_boundary_version_) { + LOG_LP(ERROR) << "The new boundary version (" << version.get_major() << ", " + << version.get_minor() << ") is smaller than the current boundary version (" + << available_boundary_version_.get_major() << ", " + << available_boundary_version_.get_minor() << ")"; + return; + } + } + available_boundary_version_ = version; + TRACE_FINE_END; } void datastore::add_persistent_blob_ids(const std::vector& blob_ids) { diff --git a/src/limestone/datastore_snapshot.cpp b/src/limestone/datastore_snapshot.cpp index 0075ff1..f2b255f 100644 --- a/src/limestone/datastore_snapshot.cpp +++ b/src/limestone/datastore_snapshot.cpp @@ -112,7 +112,8 @@ static void insert_twisted_entry(sortdb_wrapper* sortdb, const log_entry& e) { static std::pair create_sorted_from_wals( const boost::filesystem::path& from_dir, int num_worker, - const std::set& file_names = std::set()) { + const std::set& file_names = std::set(), + std::optional> gc_snapshot_ref = std::nullopt) { #if defined SORT_METHOD_PUT_ONLY sorting_context sctx{std::make_unique(from_dir, comp_twisted_key)}; #else @@ -129,9 +130,14 @@ static std::pair create_sorted_from_wals( const auto add_entry_to_point = insert_entry_or_update_to_max; bool works_with_multi_thread = false; #endif - auto add_entry = [&sctx, &add_entry_to_point](const log_entry& e){ + auto add_entry = [&sctx, &add_entry_to_point, &gc_snapshot_ref](const log_entry& e){ switch (e.type()) { case log_entry::entry_type::normal_with_blob: + if(gc_snapshot_ref) { + gc_snapshot_ref.value().get().sanitize_and_add_entry(e); + } + add_entry_to_point(sctx.get_sortdb(), e); + break; case log_entry::entry_type::normal_entry: case log_entry::entry_type::remove_entry: add_entry_to_point(sctx.get_sortdb(), e); @@ -302,7 +308,7 @@ blob_id_type create_compact_pwal_and_get_max_blob_id( const std::set& file_names, std::optional> gc_snapshot_ref) { - auto [max_appeared_epoch, sctx] = create_sorted_from_wals(from_dir, num_worker, file_names); + auto [max_appeared_epoch, sctx] = create_sorted_from_wals(from_dir, num_worker, file_names, gc_snapshot_ref); boost::system::error_code error; const bool result_check = boost::filesystem::exists(to_dir, error); @@ -322,7 +328,7 @@ blob_id_type create_compact_pwal_and_get_max_blob_id( setvbuf(ostrm, nullptr, _IOFBF, 128L * 1024L); // NOLINT, NB. glibc may ignore size when _IOFBF and buffer=NULL log_entry::begin_session(ostrm, 0); - auto write_snapshot_entry = [&ostrm, &gc_snapshot_ref](log_entry::entry_type entry_type, std::string_view key_sid, std::string_view value_etc, + auto write_snapshot_entry = [&ostrm](log_entry::entry_type entry_type, std::string_view key_sid, std::string_view value_etc, std::string_view blob_ids) { switch (entry_type) { case log_entry::entry_type::normal_entry: @@ -330,10 +336,6 @@ blob_id_type create_compact_pwal_and_get_max_blob_id( break; case log_entry::entry_type::normal_with_blob: log_entry::write_with_blob(ostrm, key_sid, value_etc, blob_ids); - if (gc_snapshot_ref.has_value()) { - log_entry entry = log_entry::make_normal_with_blob_log_entry(key_sid, value_etc, blob_ids); - gc_snapshot_ref->get().sanitize_and_add_entry(entry); - } break; case log_entry::entry_type::remove_entry: break; diff --git a/test/limestone/blob/datastore_blob_test.cpp b/test/limestone/blob/datastore_blob_test.cpp index 1f88caf..34eb45c 100644 --- a/test/limestone/blob/datastore_blob_test.cpp +++ b/test/limestone/blob/datastore_blob_test.cpp @@ -12,6 +12,7 @@ namespace limestone::testing { using limestone::api::log_channel; using limestone::api::blob_id_type; +using limestone::api::write_version_type; constexpr const char* data_location = "/tmp/datastore_blob_test/data_location"; constexpr const char* metadata_location = "/tmp/datastore_blob_test/metadata_location"; @@ -385,4 +386,67 @@ TEST_F(datastore_blob_test, next_blob_id) { } } +TEST_F(datastore_blob_test, switch_available_boundary_version_basic) { + write_version_type initial_version(0, 0); + write_version_type version1(1, 0); + write_version_type version2(2, 5); + write_version_type invalid_version(0, 5); // Boundary version must be monotonically increasing + + // Check initial version + EXPECT_EQ(datastore_->get_available_boundary_version(), initial_version); + + // Set valid versions + datastore_->switch_available_boundary_version(version1); + EXPECT_EQ(datastore_->get_available_boundary_version(), version1); + + datastore_->switch_available_boundary_version(version2); + EXPECT_EQ(datastore_->get_available_boundary_version(), version2); + + // Attempting to set an invalid version (smaller value) results in an error (version remains unchanged) + datastore_->switch_available_boundary_version(invalid_version); + EXPECT_EQ(datastore_->get_available_boundary_version(), version2); +} + +TEST_F(datastore_blob_test, available_boundary_version_after_reboot) { + // Check the initial value after ready() execution + write_version_type expected_version(0, 0); + if (datastore_->last_epoch() != 0) { + expected_version = write_version_type(datastore_->last_epoch(), 0); + } + EXPECT_EQ(datastore_->get_available_boundary_version(), expected_version); + + // --- Step 1: Write data --- + datastore_->switch_epoch(115); + lc0_->begin_session(); + lc0_->add_entry(101, "test_key", "test_value", {115, 52}); + lc0_->end_session(); + datastore_->switch_epoch(116); + + // --- Step 2: Shutdown --- + datastore_->shutdown(); + datastore_ = nullptr; + + // --- Step 3: Restart --- + gen_datastore(); + + // --- Step 4: Check after restart --- + // Verify that available_boundary_version_ is properly restored after restart + + EXPECT_EQ(datastore_->get_available_boundary_version().get_major(), 115); + EXPECT_EQ(datastore_->get_available_boundary_version().get_minor(), 0); + + // Verify data consistency + auto cursor = datastore_->get_snapshot()->get_cursor(); + EXPECT_TRUE(cursor->next()); + + std::string key; + std::string value; + cursor->key(key); + cursor->value(value); + EXPECT_EQ(key, "test_key"); + EXPECT_EQ(value, "test_value"); + + EXPECT_FALSE(cursor->next()); +} + } // namespace limestone::testing \ No newline at end of file diff --git a/test/test_root.h b/test/test_root.h index 2e2c6f9..6b20938 100644 --- a/test/test_root.h +++ b/test/test_root.h @@ -38,6 +38,7 @@ class datastore_test : public datastore { void rotate_epoch_file() { rotate_epoch_file_for_tests(); } void set_next_blob_id(blob_id_type next_blob_id) noexcept { set_next_blob_id_for_tests(next_blob_id); } std::set get_persistent_blob_ids() noexcept { return get_persistent_blob_ids_for_tests(); } + write_version_type get_available_boundary_version() const noexcept { return get_available_boundary_version_for_tests(); } protected: inline void execute_callback(const std::function& callback) noexcept {