Skip to content

Commit

Permalink
fix(blob_file_gc_snapshot): preserve required log entries for online …
Browse files Browse the repository at this point in the history
…compaction
  • Loading branch information
umegane committed Feb 18, 2025
1 parent d282cf2 commit 29bb7ec
Showing 1 changed file with 34 additions and 14 deletions.
48 changes: 34 additions & 14 deletions src/limestone/datastore_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static int comp_twisted_key(const std::string_view& a, const std::string_view& b
}

[[maybe_unused]]
static void insert_entry_or_update_to_max(sortdb_wrapper* sortdb, const log_entry& e) {
static void insert_entry_or_update_to_max(compaction_options &options, sortdb_wrapper* sortdb, const log_entry& e) {
bool need_write = true;
// skip older entry than already inserted
std::string value;
Expand All @@ -69,6 +69,14 @@ static void insert_entry_or_update_to_max(sortdb_wrapper* sortdb, const log_entr
need_write = false;
}
}
if (e.type() == log_entry::entry_type::normal_with_blob && options.is_gc_enabled()) {
write_version_type write_version;
e.write_version(write_version);
if (options.get_gc_snapshot().boundary_version() <= write_version) {
need_write = true;
}
}

if (need_write) {
std::string db_value;
db_value.append(1, static_cast<char>(e.type()));
Expand All @@ -87,7 +95,7 @@ static void insert_entry_or_update_to_max(sortdb_wrapper* sortdb, const log_entr


[[maybe_unused]]
static void insert_twisted_entry(sortdb_wrapper* sortdb, const log_entry& e) {
static void insert_twisted_entry([[maybe_unused]]compaction_options &options, sortdb_wrapper* sortdb, const log_entry& e) {
// key_sid: storage_id[8] key[*], value_etc: epoch[8]LE minor_version[8]LE value[*], type: type[1]
// db_key: epoch[8]BE minor_version[8]BE storage_id[8] key[*], db_value: type[1] value[*]
std::string db_key(write_version_size + e.key_sid().size(), '\0');
Expand Down Expand Up @@ -135,11 +143,11 @@ static std::pair<epoch_id_type, sorting_context> create_sorted_from_wals(compact
if (options.is_gc_enabled()) {
options.get_gc_snapshot().sanitize_and_add_entry(e);
}
add_entry_to_point(sctx.get_sortdb(), e);
add_entry_to_point(options, sctx.get_sortdb(), e);
break;
case log_entry::entry_type::normal_entry:
case log_entry::entry_type::remove_entry:
add_entry_to_point(sctx.get_sortdb(), e);
add_entry_to_point(options,sctx.get_sortdb(), e);
break;
case log_entry::entry_type::clear_storage:
case log_entry::entry_type::remove_storage: { // remove_storage is treated as clear_storage
Expand Down Expand Up @@ -221,6 +229,7 @@ static std::pair<std::string, std::string_view> split_db_value_and_blob_ids(cons


static void sortdb_foreach(
[[maybe_unused]] compaction_options &options,
sorting_context& sctx,
const std::function<void(
const log_entry::entry_type entry_type,
Expand All @@ -230,12 +239,19 @@ static void sortdb_foreach(
)>& write_snapshot_entry) {
static_assert(sizeof(log_entry::entry_type) == 1);
#if defined SORT_METHOD_PUT_ONLY
sctx.get_sortdb()->each([&sctx, &write_snapshot_entry, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable {
sctx.get_sortdb()->each([&sctx, &write_snapshot_entry, &options, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable {
auto entry_type = static_cast<log_entry::entry_type>(db_value[0]);
write_version_type boundary_version = {UINT64_MAX, UINT64_MAX};
if (entry_type == log_entry::entry_type::normal_with_blob && options.is_gc_enabled()) {
boundary_version = options.get_gc_snapshot().boundary_version();
}
write_version_type write_version = extract_write_version(db_key);

// using the first entry in GROUP BY (original-)key
// NB: max versions comes first (by the custom-comparator)
std::string_view key(db_key.data() + write_version_size, db_key.size() - write_version_size);
if (key == last_key) { // same (original-)key with prev
return; // skip
if (key == last_key && write_version < boundary_version) { // same (original-)key with prev
return; // first skip
}
last_key.assign(key);
storage_id_type st_bytes{};
Expand All @@ -245,12 +261,11 @@ static void sortdb_foreach(
if (auto ret = sctx.clear_storage_find(st); ret) {
// check range delete
write_version_type range_ver = ret.value();
if (extract_write_version(db_key) < range_ver) {
if (write_version < range_ver && write_version < boundary_version) {
return; // skip
}
}

auto entry_type = static_cast<log_entry::entry_type>(db_value[0]);
switch (entry_type) {
case log_entry::entry_type::normal_entry:
case log_entry::entry_type::remove_entry:
Expand All @@ -268,19 +283,24 @@ static void sortdb_foreach(
}
});
#else
sctx.get_sortdb()->each([&sctx, &write_snapshot_entry](const std::string_view db_key, const std::string_view db_value) {
sctx.get_sortdb()->each([&sctx, &write_snapshot_entry, &options](const std::string_view db_key, const std::string_view db_value) {
auto entry_type = static_cast<log_entry::entry_type>(db_value[0]);
write_version_type boundary_version = {UINT64_MAX, UINT64_MAX};
if (entry_type == log_entry::entry_type::normal_with_blob && options.is_gc_enabled()) {
boundary_version = options.get_gc_snapshot().boundary_version();
}

storage_id_type st_bytes{};
memcpy(static_cast<void*>(&st_bytes), db_key.data(), sizeof(storage_id_type));
storage_id_type st = le64toh(st_bytes);
if (auto ret = sctx.clear_storage_find(st); ret) {
// check range delete
write_version_type range_ver = ret.value();
write_version_type point_ver{db_value.substr(1)};
if (point_ver < range_ver) {
if (point_ver < range_ver && point_ver < boundary_version) {
return; // skip
}
}
auto entry_type = static_cast<log_entry::entry_type>(db_value[0]);
switch (entry_type) {
case log_entry::entry_type::normal_entry:
case log_entry::entry_type::remove_entry:
Expand Down Expand Up @@ -340,7 +360,7 @@ blob_id_type create_compact_pwal_and_get_max_blob_id(compaction_options &options
};


sortdb_foreach(sctx, write_snapshot_entry);
sortdb_foreach(options, sctx, write_snapshot_entry);
//log_entry::end_session(ostrm, epoch);
if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory)
LOG_AND_THROW_IO_EXCEPTION("cannot close snapshot file (" + snapshot_file.string() + ")", errno);
Expand Down Expand Up @@ -480,7 +500,7 @@ blob_id_type datastore::create_snapshot_and_get_max_blob_id() {
}
};

sortdb_foreach(sctx, write_snapshot_entry);
sortdb_foreach(options, sctx, write_snapshot_entry);
if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory)
LOG_AND_THROW_IO_EXCEPTION("cannot close snapshot file (" + snapshot_file.string() + ")", errno);
}
Expand Down

0 comments on commit 29bb7ec

Please sign in to comment.