diff --git a/src/limestone/blob_file_garbage_collector.cpp b/src/limestone/blob_file_garbage_collector.cpp index 3e8a167..730e977 100644 --- a/src/limestone/blob_file_garbage_collector.cpp +++ b/src/limestone/blob_file_garbage_collector.cpp @@ -85,7 +85,10 @@ using limestone::api::log_entry; // Iterate recursively over the root directory. for (boost::filesystem::recursive_directory_iterator it(root), end; it != end; ++it) { - if (boost::filesystem::is_regular_file(it->path())) { + if (shutdown_requested_.load(std::memory_order_acquire)) { + break; + } + if (boost::filesystem::is_regular_file(it->path())) { const boost::filesystem::path& file_path = it->path(); // Use blob_file_resolver's function to check if this file is a valid blob file. @@ -131,6 +134,9 @@ using limestone::api::log_entry; // Calculate the difference and perform deletion operations scanned_blobs_->diff(*gc_exempt_blob_); for (const auto &id : *scanned_blobs_) { + if (shutdown_requested_.load(std::memory_order_acquire)) { + break; + } boost::filesystem::path file_path = resolver_->resolve_path(id); boost::system::error_code ec; file_ops_->remove(file_path, ec); @@ -179,6 +185,11 @@ using limestone::api::log_entry; } void blob_file_garbage_collector::shutdown() { + // Use a dedicated mutex to ensure shutdown() is executed exclusively. + std::lock_guard shutdown_lock(shutdown_mutex_); + + shutdown_requested_.store(true, std::memory_order_release); + wait_for_blob_file_scan(); wait_for_scan_snapshot(); wait_for_cleanup(); @@ -193,6 +204,8 @@ using limestone::api::log_entry; cleanup_thread_.join(); } reset(); + + shutdown_requested_.store(false, std::memory_order_release); } void blob_file_garbage_collector::scan_snapshot(const boost::filesystem::path &snapshot_file, const boost::filesystem::path &compacted_file) { @@ -209,12 +222,15 @@ using limestone::api::log_entry; try { auto cur = std::make_unique(snapshot_file, compacted_file); while (cur->next()) { - if (cur->type() == log_entry::entry_type::normal_with_blob) { - auto blob_ids = cur->blob_ids(); - for (auto id : blob_ids) { - gc_exempt_blob_->add_blob_id(id); - } - } + if (shutdown_requested_.load(std::memory_order_acquire)) { + break; + } + if (cur->type() == log_entry::entry_type::normal_with_blob) { + auto blob_ids = cur->blob_ids(); + for (auto id : blob_ids) { + gc_exempt_blob_->add_blob_id(id); + } + } } finalize_scan_and_cleanup(); } catch (const limestone_exception &e) { diff --git a/src/limestone/blob_file_garbage_collector.h b/src/limestone/blob_file_garbage_collector.h index c196470..d64da2e 100644 --- a/src/limestone/blob_file_garbage_collector.h +++ b/src/limestone/blob_file_garbage_collector.h @@ -219,6 +219,8 @@ class blob_file_garbage_collector { // --- Others --- mutable std::mutex mutex_; ///< Mutex for synchronizing access to state variables. std::unique_ptr file_ops_; ///< Pointer to the file_operations implementation. + std::mutex shutdown_mutex_; ///< Mutex to ensure shutdown() is executed exclusively. + std::atomic_bool shutdown_requested_{false}; ///< Shutdown flag indicating if shutdown has been requested. /** * @brief The background function that scans the blob_root directory for BLOB files. diff --git a/src/limestone/cursor_impl.cpp b/src/limestone/cursor_impl.cpp index 7389857..8c1a4ac 100644 --- a/src/limestone/cursor_impl.cpp +++ b/src/limestone/cursor_impl.cpp @@ -70,14 +70,6 @@ void cursor_impl::validate_and_read_stream(std::optionaleof()) { - DVLOG_LP(log_trace) << stream_name << " stream reached EOF, closing it."; - stream->close(); - stream = std::nullopt; - return; - } - // If the entry is not yet read, read it if (!log_entry) { log_entry.emplace(); // Construct a new log_entry diff --git a/test/limestone/snapshot/cursor_impl_test.cpp b/test/limestone/snapshot/cursor_impl_test.cpp index 06e652b..5733cb4 100644 --- a/test/limestone/snapshot/cursor_impl_test.cpp +++ b/test/limestone/snapshot/cursor_impl_test.cpp @@ -36,7 +36,8 @@ class cursor_impl_testable : public limestone::internal::cursor_impl { using cursor_impl::storage; using cursor_impl::key; using cursor_impl::value; - using cursor_impl::type; + using cursor_impl::type; + using cursor_impl::blob_ids; ~cursor_impl_testable() { // Ensure that the close() method is called to release resources. @@ -137,7 +138,7 @@ class cursor_impl_test : public ::testing::Test { -// Test case 1: Only Snapshot exists +// Only Snapshot exists TEST_F(cursor_impl_test, snapshot_only) { create_log_file("snapshot", entry_maker_.get_default_entries()); boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot"; @@ -146,7 +147,7 @@ TEST_F(cursor_impl_test, snapshot_only) { EXPECT_TRUE(cursor.next()) << "Should be able to read the snapshot"; } -// Test case 2: Both Snapshot and Compacted files exist +// Both Snapshot and Compacted files exist TEST_F(cursor_impl_test, snapshot_and_compacted) { create_log_file("snapshot", entry_maker_.get_default_entries()); create_log_file("compacted", entry_maker_.get_default_entries()); @@ -158,7 +159,7 @@ TEST_F(cursor_impl_test, snapshot_and_compacted) { EXPECT_TRUE(cursor.next()) << "Should be able to read both snapshot and compacted files"; } -// Test case 3: Error cases +// Error cases TEST_F(cursor_impl_test, error_case) { // No files exist, should throw limestone_exception boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "not_existing_snapshot"; @@ -188,7 +189,7 @@ TEST_F(cursor_impl_test, error_case) { } } -// Test Case 4: Verify the entry methods after reading from a snapshot file +// Verify the entry methods after reading from a snapshot file TEST_F(cursor_impl_test, verify_entry_methods) { // Create a snapshot file with default entries create_log_file("snapshot", entry_maker_.get_default_entries()); @@ -239,6 +240,359 @@ TEST_F(cursor_impl_test, verify_entry_methods) { EXPECT_FALSE(cursor.next()) << "No more entries should be available, next() should return false"; } +// Test create_cursor (snapshot only) with clear_storage filtering +TEST_F(cursor_impl_test, create_cursor_snapshot_only_clear_storage_filtering) { + // Create a snapshot file with default entries: + // Entry 1: storage 1, key "key1", value "value1", write_version {1, 0} + // Entry 2: storage 1, key "key2", value "value2", write_version {1, 1} + create_log_file("snapshot", entry_maker_.get_default_entries()); + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot"; + + // Set clear_storage such that entries with write_version less than {1,1} are filtered out. + // This should filter out entry "key1" and only allow entry "key2". + std::map clear_storage; + clear_storage[1] = limestone::api::write_version_type(1, 1); + + // Use the static create_cursor method to create a cursor instance with snapshot file only. + auto cursor_ptr = limestone::internal::cursor_impl::create_cursor(snapshot_file, clear_storage); + + // Expect that the cursor returns a valid entry. + ASSERT_TRUE(cursor_ptr->next()) << "Should read a valid entry after filtering."; + + // Verify that the key of the returned entry is "key2". + std::string key; + cursor_ptr->key(key); + EXPECT_EQ(key, "key2") << "Expected the remaining entry to have key 'key2'."; + + // There should be no further entries. + EXPECT_FALSE(cursor_ptr->next()) << "No more entries should be available."; +} + +// Test create_cursor (snapshot and compacted) with clear_storage filtering +TEST_F(cursor_impl_test, create_cursor_snapshot_and_compacted_clear_storage_filtering) { + // Create snapshot and compacted files with default entries. + create_log_file("snapshot", entry_maker_.get_default_entries()); + create_log_file("compacted", entry_maker_.get_default_entries()); + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot"; + boost::filesystem::path compacted_file = boost::filesystem::path(location) / "compacted"; + + // Set clear_storage: for storage 1, set threshold to {1,1} so that entry with write_version {1,0} is filtered out. + std::map clear_storage; + clear_storage[1] = limestone::api::write_version_type(1, 1); + + // Use the static create_cursor method that accepts both snapshot and compacted files. + auto cursor_ptr = limestone::internal::cursor_impl::create_cursor(snapshot_file, compacted_file, clear_storage); + + // Expect that the cursor returns a valid entry (which should be the one with key "key2"). + ASSERT_TRUE(cursor_ptr->next()) << "Should read a valid entry from combined files after filtering."; + + // Verify the key. + std::string key; + cursor_ptr->key(key); + EXPECT_EQ(key, "key2") << "Expected the remaining entry to have key 'key2'."; + + // Verify that there are no further entries. + EXPECT_FALSE(cursor_ptr->next()) << "No more entries should be available."; +} + +// Validate stream closure on EOF condition +TEST_F(cursor_impl_test, validate_stream_eof) { + // Create an empty file to simulate immediate EOF. + boost::filesystem::path empty_file = boost::filesystem::path(location) / "empty_snapshot"; + { + std::ofstream ofs(empty_file.string()); + ofs.close(); + } + + // Open the empty file into a stream. + std::optional stream; + stream.emplace(empty_file, std::ios::binary); + ASSERT_TRUE(stream->is_open()); + + // Create dummy log_entry and previous key to pass to validate_and_read_stream. + std::optional log_entry; + std::string previous_key; + + // Use cursor_impl_testable to call validate_and_read_stream. + cursor_impl_testable test_cursor(empty_file); + test_cursor.validate_and_read_stream(stream, "empty_stream", log_entry, previous_key); + + // Expect that the stream is closed and reset to nullopt due to EOF. + EXPECT_FALSE(stream.has_value()) << "stream should be closed and reset when EOF is reached"; +} + +// Validate stream closure when stream is not good +TEST_F(cursor_impl_test, validate_stream_not_good) { + // Create a file with dummy content. + boost::filesystem::path bad_file = boost::filesystem::path(location) / "bad_file"; + { + std::ofstream ofs(bad_file.string()); + ofs << "dummy data"; + ofs.close(); + } + + // Open the file into a stream. + std::optional stream; + stream.emplace(bad_file, std::ios::binary); + ASSERT_TRUE(stream->is_open()); + + // Force the stream into a bad state. + stream->setstate(std::ios::failbit); + + // Prepare dummy log_entry and previous key. + std::optional log_entry; + std::string previous_key; + + // Use cursor_impl_testable to call validate_and_read_stream. + cursor_impl_testable test_cursor(bad_file); + test_cursor.validate_and_read_stream(stream, "bad_stream", log_entry, previous_key); + + // Expect that the stream is closed and reset to nullopt due to not being in a good state. + EXPECT_FALSE(stream.has_value()) << "stream should be closed and reset when not in a good state"; +} + +// Verify while loop processes multiple entries with sorted keys +TEST_F(cursor_impl_test, while_loop_processes_multiple_entries_sorted) { + // Begin a session to write multiple entries into one file. + lc0_->begin_session(); + // Write a non-relevant entry: remove_entry with key "a" + lc0_->remove_entry(1, "a", {1, 0}); + // Write another non-relevant entry: remove_entry with key "b" + lc0_->remove_entry(1, "b", {1, 0}); + // Write a relevant entry: normal_entry with key "d" + lc0_->add_entry(1, "d", "normal_value", {1, 1}); + // Write a relevant entry: blob entry with key "e" (this produces a normal_with_blob entry) + lc0_->add_entry(1, "e", "blob_value", {1, 2}, {3001, 3002}); + // Write one more non-relevant entry: remove_entry with key "f" + lc0_->remove_entry(1, "f", {1, 0}); + lc0_->end_session(); + + // Rename the generated PWAL file to "snapshot_multi_sorted" + boost::filesystem::path pwal_file = boost::filesystem::path(location) / "pwal_0000"; + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot_multi_sorted"; + if (boost::filesystem::exists(pwal_file)) { + boost::filesystem::rename(pwal_file, snapshot_file); + } else { + FAIL() << "pwal_0000 file not found for renaming"; + } + + // Create a cursor using the snapshot file. + cursor_impl_testable test_cursor(snapshot_file); + + // The first call to next() should skip the non-relevant entries ("a" and "b") + // and return the first relevant entry: normal_entry with key "d". + ASSERT_TRUE(test_cursor.next()) << "Expected to read first relevant normal entry after skipping non-relevant entries"; + std::string key; + test_cursor.key(key); + EXPECT_EQ(key, "d") << "Expected first relevant entry key to be 'd'"; + + // The second call to next() should return the next relevant entry, + // which is the blob entry (normal_with_blob) with key "e". + ASSERT_TRUE(test_cursor.next()) << "Expected to read second relevant blob entry"; + test_cursor.key(key); + EXPECT_EQ(key, "e") << "Expected second relevant entry key to be 'e'"; + std::vector blob_ids = test_cursor.blob_ids(); + std::vector expected_blob_ids = {3001, 3002}; + EXPECT_EQ(blob_ids, expected_blob_ids) << "Expected blob IDs to match the provided values"; + + // The third call to next() should return false since no further relevant entries exist. + EXPECT_FALSE(test_cursor.next()) << "Expected no further relevant entries"; +} + +// Verify that an invalid (non-relevant) entry is reset and skipped. +TEST_F(cursor_impl_test, while_loop_resets_invalid_entry) { + // Create a snapshot file with a single normal_entry. + lc0_->begin_session(); + lc0_->add_entry(1, "irrelevant_key", "irrelevant_value", {1, 0}); + lc0_->end_session(); + + // Rename the generated PWAL file to "snapshot_invalid" + boost::filesystem::path pwal_file = boost::filesystem::path(location) / "pwal_0000"; + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot_invalid"; + if (boost::filesystem::exists(pwal_file)) { + boost::filesystem::rename(pwal_file, snapshot_file); + } else { + FAIL() << "pwal_0000 file not found for renaming"; + } + + // Create a cursor using the snapshot file. + cursor_impl_testable test_cursor(snapshot_file); + + // Set clear_storage to a threshold that makes the entry non-relevant. + // For example, if the entry's write version is {1, 0}, setting the threshold to {1, 1} + // will cause the entry to be considered outdated (non-relevant). + std::map clear_storage; + clear_storage[1] = limestone::api::write_version_type(1, 1); + test_cursor.set_clear_storage(clear_storage); + + // Call next(). The while loop in validate_and_read_stream will read the single entry, + // find it non-relevant, reset log_entry, and eventually return false since no further entries exist. + EXPECT_FALSE(test_cursor.next()) << "Expected next() to return false when only non-relevant entries are present"; +} + +// The entries are inserted in order: first by storage ID, then by key, then by write_version. +TEST_F(cursor_impl_test, skip_non_target_entries_sorted) { + // Begin a session to write multiple entries. + lc0_->begin_session(); + // For storage ID 1 (target entries): + // Write a normal_entry with key "a" + lc0_->add_entry(1, "a", "value_a", {1, 0}); + // Write a normal_with_blob entry with key "b" + lc0_->add_entry(1, "b", "value_b", {1, 1}, {2001, 2002}); + // Write a remove_entry with key "c" + lc0_->remove_entry(1, "c", {1, 2}); + // Insert non-target entry for storage ID 1: clear_storage entry via truncate_storage. + lc0_->truncate_storage(1, {1, 3}); + // For storage ID 2: non-target entry via add_storage. + lc0_->add_storage(2, {1, 4}); + // For storage ID 3: non-target entry via remove_storage. + lc0_->remove_storage(3, {1, 5}); + lc0_->end_session(); + + // Rename the generated PWAL file to "snapshot_sorted" + boost::filesystem::path pwal_file = boost::filesystem::path(location) / "pwal_0000"; + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot_sorted"; + if (boost::filesystem::exists(pwal_file)) { + boost::filesystem::rename(pwal_file, snapshot_file); + } else { + FAIL() << "pwal_0000 file not found for renaming"; + } + + // Create a cursor using the snapshot file. + cursor_impl_testable test_cursor(snapshot_file); + + // Expected target entries (from storage ID 1) in sorted order: + // "a" (normal_entry), "b" (normal_with_blob), "c" (remove_entry). + std::vector> expected_entries = { + {"a", "value_a"}, + {"b", "value_b"}, + }; + + std::vector> actual_entries; + while (test_cursor.next()) { + std::string key, value; + test_cursor.key(key); + test_cursor.value(value); + actual_entries.emplace_back(key, value); + } + + EXPECT_EQ(actual_entries, expected_entries) + << "Only target entries (normal_entry, normal_with_blob, remove_entry) should be processed"; +} + +// Test case: Both snapshot and compacted exist with snapshot key less than compacted key. +// In this case, the cursor should use the snapshot entry. +TEST_F(cursor_impl_test, both_exist_snapshot_lt_compacted) { + // Create snapshot file with a single entry with key "aaa" + { + std::vector> entries = { + {1, "aaa", "val_snapshot", {1, 0}} + }; + create_log_file("snapshot_aaa", entries); + } + // Create compacted file with a single entry with key "bbb" + { + std::vector> entries = { + {1, "bbb", "val_compacted", {1, 1}} + }; + create_log_file("compacted_bbb", entries); + } + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot_aaa"; + boost::filesystem::path compacted_file = boost::filesystem::path(location) / "compacted_bbb"; + + // Create a cursor with both snapshot and compacted files. + cursor_impl_testable test_cursor(snapshot_file, compacted_file); + // Since "aaa" < "bbb", the first call to next() should return the snapshot entry. + ASSERT_TRUE(test_cursor.next()); + std::string key; + test_cursor.key(key); + EXPECT_EQ(key, "aaa") << "Expected snapshot entry (key 'aaa') when snapshot key < compacted key"; +} + +// Test case: Both snapshot and compacted exist with snapshot key greater than compacted key. +// In this case, the cursor should use the compacted entry. +TEST_F(cursor_impl_test, both_exist_snapshot_gt_compacted) { + // Create snapshot file with a single entry with key "ccc" + { + std::vector> entries = { + {1, "ccc", "val_snapshot", {1, 0}} + }; + create_log_file("snapshot_ccc", entries); + } + // Create compacted file with a single entry with key "bbb" + { + std::vector> entries = { + {1, "bbb", "val_compacted", {1, 1}} + }; + create_log_file("compacted_bbb", entries); + } + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot_ccc"; + boost::filesystem::path compacted_file = boost::filesystem::path(location) / "compacted_bbb"; + + // Create a cursor with both files. + cursor_impl_testable test_cursor(snapshot_file, compacted_file); + // Since "ccc" > "bbb", the first call to next() should return the compacted entry. + ASSERT_TRUE(test_cursor.next()); + std::string key; + test_cursor.key(key); + EXPECT_EQ(key, "bbb") << "Expected compacted entry (key 'bbb') when snapshot key > compacted key"; +} + +// Test case: Both snapshot and compacted exist with equal keys. +// In this case, the cursor should use the snapshot entry and reset both. +TEST_F(cursor_impl_test, both_exist_equal_keys) { + // Create snapshot file with a single entry with key "ddd" + { + std::vector> entries = { + {1, "ddd", "val_snapshot", {1, 0}} + }; + create_log_file("snapshot_ddd", entries); + } + // Create compacted file with a single entry with key "ddd" + { + std::vector> entries = { + {1, "ddd", "val_compacted", {1, 1}} + }; + create_log_file("compacted_ddd", entries); + } + boost::filesystem::path snapshot_file = boost::filesystem::path(location) / "snapshot_ddd"; + boost::filesystem::path compacted_file = boost::filesystem::path(location) / "compacted_ddd"; + + // Create a cursor with both files. + cursor_impl_testable test_cursor(snapshot_file, compacted_file); + // When keys are equal, the code uses the snapshot entry and resets both. + ASSERT_TRUE(test_cursor.next()); + std::string key; + test_cursor.key(key); + EXPECT_EQ(key, "ddd") << "Expected snapshot entry (key 'ddd') when snapshot and compacted keys are equal"; +} + +// Verify that when the snapshot file yields no log entry but +// the compacted file yields a valid entry, the cursor uses the compacted entry. +TEST_F(cursor_impl_test, use_compacted_when_snapshot_empty) { + // Create an empty snapshot file to simulate no valid log entry. + boost::filesystem::path empty_snapshot = boost::filesystem::path(location) / "empty_snapshot"; + { + std::ofstream ofs(empty_snapshot.string()); + ofs.close(); + } + + // Create a compacted file with one valid log entry. + std::vector> entries = { + {1, "compacted_key", "compacted_value", {1, 0}} + }; + create_log_file("compacted_file", entries); + boost::filesystem::path compacted_file = boost::filesystem::path(location) / "compacted_file"; + + // Create a cursor using both files: an empty snapshot and a valid compacted file. + cursor_impl_testable test_cursor(empty_snapshot, compacted_file); + + // Since the snapshot stream yields no entry, the compacted stream's entry should be used. + ASSERT_TRUE(test_cursor.next()) << "Expected next() to return true when compacted file provides a valid entry"; + std::string key; + test_cursor.key(key); + EXPECT_EQ(key, "compacted_key") << "Expected the entry from the compacted file when snapshot is empty"; +} } // namespace limestone::testing