Skip to content

Commit

Permalink
Implement BLOB file GC for online compaction
Browse files Browse the repository at this point in the history
This commit completes the implementation of the BLOB file garbage collection (GC)
feature for online compaction. The GC process scans and removes obsolete BLOB files
that are no longer referenced after compaction.

Key changes:
- Implemented BLOB file scanning and deletion logic.
- Integrated GC into the online compaction process.
- Ensured thread safety and proper synchronization.
- Defensive shutdown logic added to prevent resource leaks.

Verification:
- The code compiles successfully without errors.
- Existing test cases pass without issues.

Note:
- Test cases for the newly implemented GC feature are not yet written.
  This will be addressed in a future commit.
  • Loading branch information
umegane committed Feb 17, 2025
1 parent fa5a0bb commit 3511001
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 10 deletions.
7 changes: 7 additions & 0 deletions include/limestone/api/datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ class datastore {
std::lock_guard<std::mutex> lock(persistent_blob_ids_mutex_);
return persistent_blob_ids_;
}
write_version_type get_available_boundary_version_for_tests() const noexcept { return available_boundary_version_; }

// These virtual methods are hooks for testing thread synchronization.
// They allow derived classes to inject custom behavior or notifications
Expand Down Expand Up @@ -500,6 +501,12 @@ class datastore {

std::unique_ptr<limestone::internal::blob_file_garbage_collector> blob_file_garbage_collector_;

// Boundary version for safe snapshots
write_version_type available_boundary_version_;

// Mutex to protect boundary version updates
mutable std::mutex boundary_mutex_;

};

} // namespace limestone::api
40 changes: 38 additions & 2 deletions src/limestone/datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "blob_file_resolver.h"
#include "blob_pool_impl.h"
#include "blob_file_garbage_collector.h"
#include "blob_file_gc_snapshot.h"

namespace limestone::api {
using namespace limestone::internal;
Expand Down Expand Up @@ -206,6 +207,7 @@ void datastore::ready() {
online_compaction_worker_future_ = std::async(std::launch::async, &datastore::online_compaction_worker, this);
if (epoch_id_switched_.load() != 0) {
write_epoch_callback_(epoch_id_informed_.load());
available_boundary_version_ = write_version_type{epoch_id_informed_.load(), 0};
}
cleanup_rotated_epoch_files(location_);
state_ = state::ready;
Expand Down Expand Up @@ -683,9 +685,18 @@ void datastore::compact_with_online() {
boost::filesystem::path compaction_temp_dir = location_ / compaction_catalog::get_compaction_temp_dirname();
ensure_directory_exists(compaction_temp_dir);

// create a compacted file
// create a compacted file and snapshot for blob file garbage collection
write_version_type boundary_version_copy;
{
std::lock_guard<std::mutex> lock(boundary_mutex_);
boundary_version_copy = available_boundary_version_;
}
blob_file_gc_snapshot gc_snapshot(boundary_version_copy);

blob_id_type max_blob_id = create_compact_pwal_and_get_max_blob_id(location_, compaction_temp_dir, recover_max_parallelism_, need_compaction_filenames);



// handle existing compacted file
handle_existing_compacted_file(location_);

Expand Down Expand Up @@ -723,6 +734,19 @@ void datastore::compact_with_online() {
remove_file_safely(location_ / compaction_catalog::get_compacted_backup_filename());

LOG_LP(INFO) << "compaction finished";

// blob files garbage collection
blob_file_garbage_collector garb_collector(*blob_file_resolver_);
garb_collector.scan_blob_files(boundary_version_copy.get_major());
log_entry_container log_entries = gc_snapshot.finalize_snapshot();
for(const auto& entry : log_entries) {
for(const auto& blob_id : entry.get_blob_ids()) {
garb_collector.add_gc_exempt_blob_id(blob_id);
}
}
garb_collector.finalize_scan_and_cleanup();

LOG_LP(INFO) << "blob files garbage collection finished";
TRACE_END;
}

Expand Down Expand Up @@ -772,7 +796,19 @@ blob_file datastore::get_blob_file(blob_id_type reference) {
}

void datastore::switch_available_boundary_version([[maybe_unused]] write_version_type version) {
LOG_FIRST_N(ERROR, 1) << "not implemented";
TRACE_FINE_START << "version=" << version.get_major() << "." << version.get_minor();
{
std::lock_guard<std::mutex> lock(boundary_mutex_);
if (version < available_boundary_version_) {
LOG_LP(ERROR) << "The new boundary version (" << version.get_major() << ", "
<< version.get_minor() << ") is smaller than the current boundary version ("
<< available_boundary_version_.get_major() << ", "
<< available_boundary_version_.get_minor() << ")";
return;
}
}
available_boundary_version_ = version;
TRACE_FINE_END;
}

void datastore::add_persistent_blob_ids(const std::vector<blob_id_type>& blob_ids) {
Expand Down
18 changes: 10 additions & 8 deletions src/limestone/datastore_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ static void insert_twisted_entry(sortdb_wrapper* sortdb, const log_entry& e) {
static std::pair<epoch_id_type, sorting_context> create_sorted_from_wals(
const boost::filesystem::path& from_dir,
int num_worker,
const std::set<std::string>& file_names = std::set<std::string>()) {
const std::set<std::string>& file_names = std::set<std::string>(),
std::optional<std::reference_wrapper<blob_file_gc_snapshot>> gc_snapshot_ref = std::nullopt) {
#if defined SORT_METHOD_PUT_ONLY
sorting_context sctx{std::make_unique<sortdb_wrapper>(from_dir, comp_twisted_key)};
#else
Expand All @@ -129,9 +130,14 @@ static std::pair<epoch_id_type, sorting_context> create_sorted_from_wals(
const auto add_entry_to_point = insert_entry_or_update_to_max;
bool works_with_multi_thread = false;
#endif
auto add_entry = [&sctx, &add_entry_to_point](const log_entry& e){
auto add_entry = [&sctx, &add_entry_to_point, &gc_snapshot_ref](const log_entry& e){
switch (e.type()) {
case log_entry::entry_type::normal_with_blob:
if(gc_snapshot_ref) {
gc_snapshot_ref.value().get().sanitize_and_add_entry(e);
}
add_entry_to_point(sctx.get_sortdb(), e);
break;
case log_entry::entry_type::normal_entry:
case log_entry::entry_type::remove_entry:
add_entry_to_point(sctx.get_sortdb(), e);
Expand Down Expand Up @@ -302,7 +308,7 @@ blob_id_type create_compact_pwal_and_get_max_blob_id(
const std::set<std::string>& file_names,
std::optional<std::reference_wrapper<blob_file_gc_snapshot>> gc_snapshot_ref) {

auto [max_appeared_epoch, sctx] = create_sorted_from_wals(from_dir, num_worker, file_names);
auto [max_appeared_epoch, sctx] = create_sorted_from_wals(from_dir, num_worker, file_names, gc_snapshot_ref);

boost::system::error_code error;
const bool result_check = boost::filesystem::exists(to_dir, error);
Expand All @@ -322,18 +328,14 @@ blob_id_type create_compact_pwal_and_get_max_blob_id(
setvbuf(ostrm, nullptr, _IOFBF, 128L * 1024L); // NOLINT, NB. glibc may ignore size when _IOFBF and buffer=NULL
log_entry::begin_session(ostrm, 0);

auto write_snapshot_entry = [&ostrm, &gc_snapshot_ref](log_entry::entry_type entry_type, std::string_view key_sid, std::string_view value_etc,
auto write_snapshot_entry = [&ostrm](log_entry::entry_type entry_type, std::string_view key_sid, std::string_view value_etc,
std::string_view blob_ids) {
switch (entry_type) {
case log_entry::entry_type::normal_entry:
log_entry::write(ostrm, key_sid, value_etc);
break;
case log_entry::entry_type::normal_with_blob:
log_entry::write_with_blob(ostrm, key_sid, value_etc, blob_ids);
if (gc_snapshot_ref.has_value()) {
log_entry entry = log_entry::make_normal_with_blob_log_entry(key_sid, value_etc, blob_ids);
gc_snapshot_ref->get().sanitize_and_add_entry(entry);
}
break;
case log_entry::entry_type::remove_entry:
break;
Expand Down
64 changes: 64 additions & 0 deletions test/limestone/blob/datastore_blob_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace limestone::testing {

using limestone::api::log_channel;
using limestone::api::blob_id_type;
using limestone::api::write_version_type;

constexpr const char* data_location = "/tmp/datastore_blob_test/data_location";
constexpr const char* metadata_location = "/tmp/datastore_blob_test/metadata_location";
Expand Down Expand Up @@ -385,4 +386,67 @@ TEST_F(datastore_blob_test, next_blob_id) {
}
}

TEST_F(datastore_blob_test, switch_available_boundary_version_basic) {
write_version_type initial_version(0, 0);
write_version_type version1(1, 0);
write_version_type version2(2, 5);
write_version_type invalid_version(0, 5); // Boundary version must be monotonically increasing

// Check initial version
EXPECT_EQ(datastore_->get_available_boundary_version(), initial_version);

// Set valid versions
datastore_->switch_available_boundary_version(version1);
EXPECT_EQ(datastore_->get_available_boundary_version(), version1);

datastore_->switch_available_boundary_version(version2);
EXPECT_EQ(datastore_->get_available_boundary_version(), version2);

// Attempting to set an invalid version (smaller value) results in an error (version remains unchanged)
datastore_->switch_available_boundary_version(invalid_version);
EXPECT_EQ(datastore_->get_available_boundary_version(), version2);
}

TEST_F(datastore_blob_test, available_boundary_version_after_reboot) {
// Check the initial value after ready() execution
write_version_type expected_version(0, 0);
if (datastore_->last_epoch() != 0) {
expected_version = write_version_type(datastore_->last_epoch(), 0);
}
EXPECT_EQ(datastore_->get_available_boundary_version(), expected_version);

// --- Step 1: Write data ---
datastore_->switch_epoch(115);
lc0_->begin_session();
lc0_->add_entry(101, "test_key", "test_value", {115, 52});
lc0_->end_session();
datastore_->switch_epoch(116);

// --- Step 2: Shutdown ---
datastore_->shutdown();
datastore_ = nullptr;

// --- Step 3: Restart ---
gen_datastore();

// --- Step 4: Check after restart ---
// Verify that available_boundary_version_ is properly restored after restart

EXPECT_EQ(datastore_->get_available_boundary_version().get_major(), 115);
EXPECT_EQ(datastore_->get_available_boundary_version().get_minor(), 0);

// Verify data consistency
auto cursor = datastore_->get_snapshot()->get_cursor();
EXPECT_TRUE(cursor->next());

std::string key;
std::string value;
cursor->key(key);
cursor->value(value);
EXPECT_EQ(key, "test_key");
EXPECT_EQ(value, "test_value");

EXPECT_FALSE(cursor->next());
}

} // namespace limestone::testing
1 change: 1 addition & 0 deletions test/test_root.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class datastore_test : public datastore {
void rotate_epoch_file() { rotate_epoch_file_for_tests(); }
void set_next_blob_id(blob_id_type next_blob_id) noexcept { set_next_blob_id_for_tests(next_blob_id); }
std::set<blob_id_type> get_persistent_blob_ids() noexcept { return get_persistent_blob_ids_for_tests(); }
write_version_type get_available_boundary_version() const noexcept { return get_available_boundary_version_for_tests(); }

protected:
inline void execute_callback(const std::function<void()>& callback) noexcept {
Expand Down

0 comments on commit 3511001

Please sign in to comment.