Skip to content

Commit

Permalink
fix(blob_file_gc_snapshot): preserve required log entries for online …
Browse files Browse the repository at this point in the history
…compaction
  • Loading branch information
umegane committed Feb 18, 2025
1 parent d282cf2 commit 488c19f
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 33 deletions.
17 changes: 10 additions & 7 deletions src/limestone/cursor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,16 @@ void cursor_impl::validate_and_read_stream(std::optional<boost::filesystem::ifst
// If the entry is not yet read, read it
if (!log_entry) {
log_entry.emplace(); // Construct a new log_entry
if (!log_entry->read(*stream)) {
// If reading fails, close the stream and reset the log_entry
stream->close();
stream = std::nullopt;
log_entry = std::nullopt;
return;
}
do {
if (!log_entry->read(*stream)) {
// If reading fails, close the stream and reset the log_entry
stream->close();
stream = std::nullopt;
log_entry = std::nullopt;
return;
}
} while (log_entry->type() != log_entry::entry_type::normal_entry &&
log_entry->type() != log_entry::entry_type::normal_with_blob);
// Check if the key_sid is in ascending order
// TODO: Key order violation is detected here and the process is aborted.
// However, this check should be moved to an earlier point, and if the key order is invalid,
Expand Down
72 changes: 56 additions & 16 deletions src/limestone/datastore_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static int comp_twisted_key(const std::string_view& a, const std::string_view& b
}

[[maybe_unused]]
static void insert_entry_or_update_to_max(sortdb_wrapper* sortdb, const log_entry& e) {
static void insert_entry_or_update_to_max(compaction_options &options, sortdb_wrapper* sortdb, const log_entry& e) {
bool need_write = true;
// skip older entry than already inserted
std::string value;
Expand All @@ -69,6 +69,14 @@ static void insert_entry_or_update_to_max(sortdb_wrapper* sortdb, const log_entr
need_write = false;
}
}
if (e.type() == log_entry::entry_type::normal_with_blob && options.is_gc_enabled()) {
write_version_type write_version;
e.write_version(write_version);
if (options.get_gc_snapshot().boundary_version() <= write_version) {
need_write = true;
}
}

if (need_write) {
std::string db_value;
db_value.append(1, static_cast<char>(e.type()));
Expand All @@ -87,7 +95,7 @@ static void insert_entry_or_update_to_max(sortdb_wrapper* sortdb, const log_entr


[[maybe_unused]]
static void insert_twisted_entry(sortdb_wrapper* sortdb, const log_entry& e) {
static void insert_twisted_entry([[maybe_unused]]compaction_options &options, sortdb_wrapper* sortdb, const log_entry& e) {
// key_sid: storage_id[8] key[*], value_etc: epoch[8]LE minor_version[8]LE value[*], type: type[1]
// db_key: epoch[8]BE minor_version[8]BE storage_id[8] key[*], db_value: type[1] value[*]
std::string db_key(write_version_size + e.key_sid().size(), '\0');
Expand Down Expand Up @@ -135,18 +143,21 @@ static std::pair<epoch_id_type, sorting_context> create_sorted_from_wals(compact
if (options.is_gc_enabled()) {
options.get_gc_snapshot().sanitize_and_add_entry(e);
}
add_entry_to_point(sctx.get_sortdb(), e);
add_entry_to_point(options, sctx.get_sortdb(), e);
break;
case log_entry::entry_type::normal_entry:
case log_entry::entry_type::remove_entry:
add_entry_to_point(sctx.get_sortdb(), e);
add_entry_to_point(options,sctx.get_sortdb(), e);
break;
case log_entry::entry_type::clear_storage:
case log_entry::entry_type::remove_storage: { // remove_storage is treated as clear_storage
// clear_storage[st] = max(clear_storage[st], wv)
write_version_type wv;
e.write_version(wv);
sctx.clear_storage_update(e.storage(), wv);
if (options.is_gc_enabled() && options.get_gc_snapshot().boundary_version() <= wv) {
add_entry_to_point(options, sctx.get_sortdb(), e);
}
return;
}
case log_entry::entry_type::add_storage:
Expand Down Expand Up @@ -221,6 +232,7 @@ static std::pair<std::string, std::string_view> split_db_value_and_blob_ids(cons


static void sortdb_foreach(
[[maybe_unused]] compaction_options &options,
sorting_context& sctx,
const std::function<void(
const log_entry::entry_type entry_type,
Expand All @@ -230,12 +242,19 @@ static void sortdb_foreach(
)>& write_snapshot_entry) {
static_assert(sizeof(log_entry::entry_type) == 1);
#if defined SORT_METHOD_PUT_ONLY
sctx.get_sortdb()->each([&sctx, &write_snapshot_entry, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable {
sctx.get_sortdb()->each([&sctx, &write_snapshot_entry, &options, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable {
auto entry_type = static_cast<log_entry::entry_type>(db_value[0]);
write_version_type boundary_version = {UINT64_MAX, UINT64_MAX};
if (entry_type == log_entry::entry_type::normal_with_blob && options.is_gc_enabled()) {
boundary_version = options.get_gc_snapshot().boundary_version();
}
write_version_type write_version = extract_write_version(db_key);

// using the first entry in GROUP BY (original-)key
// NB: max versions comes first (by the custom-comparator)
std::string_view key(db_key.data() + write_version_size, db_key.size() - write_version_size);
if (key == last_key) { // same (original-)key with prev
return; // skip
if (key == last_key && write_version < boundary_version) { // same (original-)key with prev
return; // first skip
}
last_key.assign(key);
storage_id_type st_bytes{};
Expand All @@ -245,12 +264,11 @@ static void sortdb_foreach(
if (auto ret = sctx.clear_storage_find(st); ret) {
// check range delete
write_version_type range_ver = ret.value();
if (extract_write_version(db_key) < range_ver) {
if (write_version < range_ver && write_version < boundary_version) {
return; // skip
}
}

auto entry_type = static_cast<log_entry::entry_type>(db_value[0]);
switch (entry_type) {
case log_entry::entry_type::normal_entry:
case log_entry::entry_type::remove_entry:
Expand All @@ -262,25 +280,34 @@ static void sortdb_foreach(
sctx.update_max_blob_id(log_entry::parse_blob_ids(blob_ids));
break;
}
default:
case log_entry::entry_type::clear_storage:
case log_entry::entry_type::remove_storage:
write_snapshot_entry(entry_type, db_key, db_value.substr(1), {});
break;
default:
LOG(ERROR) << "never reach " << static_cast<int>(entry_type);
std::abort();
}
});
#else
sctx.get_sortdb()->each([&sctx, &write_snapshot_entry](const std::string_view db_key, const std::string_view db_value) {
sctx.get_sortdb()->each([&sctx, &write_snapshot_entry, &options](const std::string_view db_key, const std::string_view db_value) {
auto entry_type = static_cast<log_entry::entry_type>(db_value[0]);
write_version_type boundary_version = {UINT64_MAX, UINT64_MAX};
if (entry_type == log_entry::entry_type::normal_with_blob && options.is_gc_enabled()) {
boundary_version = options.get_gc_snapshot().boundary_version();
}

storage_id_type st_bytes{};
memcpy(static_cast<void*>(&st_bytes), db_key.data(), sizeof(storage_id_type));
storage_id_type st = le64toh(st_bytes);
if (auto ret = sctx.clear_storage_find(st); ret) {
// check range delete
write_version_type range_ver = ret.value();
write_version_type point_ver{db_value.substr(1)};
if (point_ver < range_ver) {
if (point_ver < range_ver && point_ver < boundary_version) {
return; // skip
}
}
auto entry_type = static_cast<log_entry::entry_type>(db_value[0]);
switch (entry_type) {
case log_entry::entry_type::normal_entry:
case log_entry::entry_type::remove_entry:
Expand All @@ -292,6 +319,10 @@ static void sortdb_foreach(
sctx.update_max_blob_id(log_entry::parse_blob_ids(blob_ids));
break;
} break;
case log_entry::entry_type::clear_storage:
case log_entry::entry_type::remove_storage:
write_snapshot_entry(entry_type, db_key, db_value.substr(1), {});
break;
default:
LOG(ERROR) << "never reach " << static_cast<int>(entry_type);
std::abort();
Expand Down Expand Up @@ -322,7 +353,7 @@ blob_id_type create_compact_pwal_and_get_max_blob_id(compaction_options &options
setvbuf(ostrm, nullptr, _IOFBF, 128L * 1024L); // NOLINT, NB. glibc may ignore size when _IOFBF and buffer=NULL
log_entry::begin_session(ostrm, 0);

auto write_snapshot_entry = [&ostrm](log_entry::entry_type entry_type, std::string_view key_sid, std::string_view value_etc,
auto write_snapshot_entry = [&ostrm, &options](log_entry::entry_type entry_type, std::string_view key_sid, std::string_view value_etc,
std::string_view blob_ids) {
switch (entry_type) {
case log_entry::entry_type::normal_entry:
Expand All @@ -332,6 +363,15 @@ blob_id_type create_compact_pwal_and_get_max_blob_id(compaction_options &options
log_entry::write_with_blob(ostrm, key_sid, value_etc, blob_ids);
break;
case log_entry::entry_type::remove_entry:
if (options.is_gc_enabled()) {
log_entry::write_remove(ostrm, key_sid, value_etc);
}
break;
case log_entry::entry_type::clear_storage:
log_entry::write_clear_storage(ostrm, key_sid, value_etc);
break;
case log_entry::entry_type::remove_storage:
log_entry::write_remove_storage(ostrm, key_sid, value_etc);
break;
default:
LOG(ERROR) << "Unexpected entry type: " << static_cast<int>(entry_type);
Expand All @@ -340,7 +380,7 @@ blob_id_type create_compact_pwal_and_get_max_blob_id(compaction_options &options
};


sortdb_foreach(sctx, write_snapshot_entry);
sortdb_foreach(options, sctx, write_snapshot_entry);
//log_entry::end_session(ostrm, epoch);
if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory)
LOG_AND_THROW_IO_EXCEPTION("cannot close snapshot file (" + snapshot_file.string() + ")", errno);
Expand Down Expand Up @@ -480,7 +520,7 @@ blob_id_type datastore::create_snapshot_and_get_max_blob_id() {
}
};

sortdb_foreach(sctx, write_snapshot_entry);
sortdb_foreach(options, sctx, write_snapshot_entry);
if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory)
LOG_AND_THROW_IO_EXCEPTION("cannot close snapshot file (" + snapshot_file.string() + ")", errno);
}
Expand Down
26 changes: 16 additions & 10 deletions test/limestone/compaction/compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1084,9 +1084,11 @@ TEST_F(compaction_test, scenario03) {
ASSERT_PRED_FORMAT2(ContainsString, pwals, "pwal_0000.compacted");

log_entries = read_log_file("pwal_0000.compacted", location);
ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries
EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key3", "value3", 1, 3, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key2", "value2", 1, 0, {}, log_entry::entry_type::normal_entry));
ASSERT_EQ(log_entries.size(), 4); // Ensure that there are log entries
EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key1", std::nullopt, 1, 1, {}, log_entry::entry_type::remove_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key3", "value3", 1, 3, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key4", std::nullopt, 1, 0, {}, log_entry::entry_type::remove_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[3], 2, "key2", "value2", 1, 0, {}, log_entry::entry_type::normal_entry));

// 3. Add/Update PWALs (include remove_entry again)

Expand Down Expand Up @@ -1142,9 +1144,11 @@ TEST_F(compaction_test, scenario03) {

// 5. check the compacted file and snapshot creating at the boot time
log_entries = read_log_file("pwal_0000.compacted", location);
ASSERT_EQ(log_entries.size(), 2); // Ensure that there are log entries
EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key3", "value3", 1, 3, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[1], 2, "key2", "value2", 1, 0, {}, log_entry::entry_type::normal_entry));
ASSERT_EQ(log_entries.size(), 4); // Ensure that there are log entries
EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key1", std::nullopt, 1, 1, {}, log_entry::entry_type::remove_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key3", "value3", 1, 3, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key4", std::nullopt, 1, 0, {}, log_entry::entry_type::remove_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[3], 2, "key2", "value2", 1, 0, {}, log_entry::entry_type::normal_entry));

log_entries = read_log_file("data/snapshot", location);
ASSERT_EQ(log_entries.size(), 4); // Ensure that there are log entries
Expand Down Expand Up @@ -1248,13 +1252,14 @@ TEST_F(compaction_test, scenario04) {
ASSERT_PRED_FORMAT3(ContainsPrefix, pwals, "pwal_0002.", 1);

log_entries = read_log_file("pwal_0000.compacted", location);
ASSERT_EQ(log_entries.size(), 6);
ASSERT_EQ(log_entries.size(), 7);
EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key1", "value1", 1, 0, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key2", "value2", 1, 1, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key5", "value5", 1, 2, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[3], 1, "key6", "value6", 1, 3, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[4], 1, "key7", "value7", 3, 0, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[5], 2, "key8", "value8", 3, 0, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[5], 2, "", "", 2, 0, {}, log_entry::entry_type::remove_storage));

Check failure on line 1261 in test/limestone/compaction/compaction_test.cpp

View workflow job for this annotation

GitHub Actions / CTest (ubuntu-22.04)

compaction_test.scenario04

Value of: AssertLogEntry(log_entries[5], 2, "", "", 2, 0, {}, log_entry::entry_type::remove_storage) Actual: false (Expected storage ID: 2, but got: 144115188075855872) Expected: true

Check failure on line 1261 in test/limestone/compaction/compaction_test.cpp

View workflow job for this annotation

GitHub Actions / CTest (ubuntu-24.04)

compaction_test.scenario04

Value of: AssertLogEntry(log_entries[5], 2, "", "", 2, 0, {}, log_entry::entry_type::remove_storage) Actual: false (Expected storage ID: 2, but got: 144115188075855872) Expected: true
EXPECT_TRUE(AssertLogEntry(log_entries[6], 2, "key8", "value8", 3, 0, {}, log_entry::entry_type::normal_entry));

// Storage ID 1: Add normal entries
lc0_->begin_session();
Expand Down Expand Up @@ -1330,14 +1335,15 @@ TEST_F(compaction_test, scenario04) {

// check the compacted file and snapshot creating at the boot time
log_entries = read_log_file("pwal_0000.compacted", location);
ASSERT_EQ(log_entries.size(), 6);
ASSERT_EQ(log_entries.size(), 7);

EXPECT_TRUE(AssertLogEntry(log_entries[0], 1, "key1", "value1", 1, 0, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[1], 1, "key2", "value2", 1, 1, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[2], 1, "key5", "value5", 1, 2, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[3], 1, "key6", "value6", 1, 3, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[4], 1, "key7", "value7", 3, 0, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[5], 2, "key8", "value8", 3, 0, {}, log_entry::entry_type::normal_entry));
EXPECT_TRUE(AssertLogEntry(log_entries[5], 2, "", "", 2, 0, {}, log_entry::entry_type::remove_storage));

Check failure on line 1345 in test/limestone/compaction/compaction_test.cpp

View workflow job for this annotation

GitHub Actions / CTest (ubuntu-22.04)

compaction_test.scenario04

Value of: AssertLogEntry(log_entries[5], 2, "", "", 2, 0, {}, log_entry::entry_type::remove_storage) Actual: false (Expected storage ID: 2, but got: 144115188075855872) Expected: true

Check failure on line 1345 in test/limestone/compaction/compaction_test.cpp

View workflow job for this annotation

GitHub Actions / CTest (ubuntu-24.04)

compaction_test.scenario04

Value of: AssertLogEntry(log_entries[5], 2, "", "", 2, 0, {}, log_entry::entry_type::remove_storage) Actual: false (Expected storage ID: 2, but got: 144115188075855872) Expected: true
EXPECT_TRUE(AssertLogEntry(log_entries[6], 2, "key8", "value8", 3, 0, {}, log_entry::entry_type::normal_entry));

log_entries = read_log_file("data/snapshot", location);
ASSERT_EQ(log_entries.size(), 4);
Expand Down

0 comments on commit 488c19f

Please sign in to comment.