Skip to content

Commit

Permalink
fix(blob_file_gc_snapshot): include valid log entries in final snapsh…
Browse files Browse the repository at this point in the history
…ot(WIP)

- Renamed tls_container_ to tls_low_container_ and thread_containers_ to thread_low_containers_.
- Updated comments and documentation to reflect that the low entries container collects entries with write_version below boundary_version.
- This change prepares for further modifications to include high entries, addressing the bug of missing valid log entries in the final snapshot.
  • Loading branch information
umegane committed Feb 18, 2025
1 parent 7273641 commit f0a5100
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 63 deletions.
88 changes: 43 additions & 45 deletions src/limestone/blob_file_gc_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,68 +20,66 @@
#include <algorithm>

namespace limestone::internal {


// ----------------- Implementation of blob_file_gc_snapshot methods -----------------
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
thread_local std::shared_ptr<log_entry_container> blob_file_gc_snapshot::tls_container_ = nullptr;


// ----------------- Implementation of blob_file_gc_snapshot methods -----------------
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
thread_local std::shared_ptr<log_entry_container> blob_file_gc_snapshot::tls_low_container_ = nullptr;

blob_file_gc_snapshot::blob_file_gc_snapshot(const write_version_type& boundary_version)
: boundary_version_(boundary_version) {
// The thread_containers_ vector and snapshot_ are default-constructed.
: boundary_version_(boundary_version) {
// The thread_low_containers_ vector and snapshot_ are default-constructed.
}

blob_file_gc_snapshot::~blob_file_gc_snapshot() {
// Reset the thread-local container to avoid state leakage between tests or re-use in the same thread.
tls_container_.reset();
}


// Reset the thread-local container to avoid state leakage between tests or re-use in the same thread.
tls_low_container_.reset();
}

void blob_file_gc_snapshot::sanitize_and_add_entry(const log_entry& entry) {
// Only process entries of type normal_with_blob.
if (entry.type() != log_entry::entry_type::normal_with_blob) {
return;
}

// Create a modifiable copy of the entry.
// NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
log_entry modified_entry = entry;

// Truncate unnecessary data from the value_etc field.
modified_entry.truncate_value_from_normal_entry();

// Obtain the write_version from the modified entry.
write_version_type entry_wv;
modified_entry.write_version(entry_wv);

// Compare the obtained write_version with the threshold.
if (!(entry_wv < boundary_version_)) {
return;
}

// Obtain or create the thread-local log_entry_container.
if (!tls_container_) {
tls_container_ = std::make_shared<log_entry_container>();
// Obtain or create the thread-local low log_entry_container.
if (!tls_low_container_) {
tls_low_container_ = std::make_shared<log_entry_container>();
{
std::lock_guard<std::mutex> lock(global_mtx_);
thread_containers_.push_back(tls_container_);
thread_low_containers_.push_back(tls_low_container_);
}
}

// Append the modified entry into the thread-local container.
tls_container_->append(modified_entry);
// Append the modified entry into the thread-local low container.
tls_low_container_->append(modified_entry);
}

void blob_file_gc_snapshot::finalize_local_entries() {
if (tls_container_) {
tls_container_->sort_descending();
tls_container_.reset();
}
if (tls_low_container_) {
tls_low_container_->sort_descending();
tls_low_container_.reset();
}
}

const log_entry_container& blob_file_gc_snapshot::finalize_snapshot() {
log_entry_container merged = log_entry_container::merge_sorted_collections(thread_containers_);

log_entry_container merged = log_entry_container::merge_sorted_collections(thread_low_containers_);
// Remove duplicate entries from the merged container.
// Since the container is sorted in descending order, the first entry for a given key_sid
// is the one with the maximum write_version.
Expand All @@ -94,24 +92,24 @@ thread_local std::shared_ptr<log_entry_container> blob_file_gc_snapshot::tls_con
last_key = current_key;
}
}

return snapshot_;
}

void blob_file_gc_snapshot::reset() {
{
// Clean up any remaining thread-local containers.
// Clean up any remaining thread-local low containers.
std::lock_guard<std::mutex> lock(global_mtx_);
thread_containers_.clear();
thread_low_containers_.clear();
}
snapshot_.clear();
// Note: The thread_local tls_container remains set in each thread.
// Note: The thread_local tls_low_container remains set in each thread.
// Its lifetime is managed per thread; if needed, threads can reset it.
}

const write_version_type& blob_file_gc_snapshot::boundary_version() const {
return boundary_version_;
}

const write_version_type& blob_file_gc_snapshot::boundary_version() const {
return boundary_version_;
}
} // namespace limestone::internal

43 changes: 25 additions & 18 deletions src/limestone/blob_file_gc_snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <cstddef>
#include <vector>
#include <memory>
#include <mutex>
#include "log_entry_container.h"
#include "limestone/api/blob_id_type.h"

Expand All @@ -30,6 +32,11 @@ namespace limestone::internal {
*
* Instead of returning a complete list of blob IDs (which may consume significant memory),
* the class returns a custom iterator that extracts blob IDs on-the-fly.
*
* The snapshot is composed of two groups:
* - Low entries container: Contains log entries with write_version below boundary_version.
* These entries will be merged, sorted, and deduplicated.
* - High entries container: Will be added later (not processed with merge/sort/deduplication).
*/
class blob_file_gc_snapshot {
public:
Expand All @@ -45,32 +52,32 @@ class blob_file_gc_snapshot {
blob_file_gc_snapshot(blob_file_gc_snapshot&&) = delete;
blob_file_gc_snapshot& operator=(blob_file_gc_snapshot&&) = delete;

/**
/**
* @brief Destructor that resets the thread-local container.
*/
~blob_file_gc_snapshot();

/*
* Sanitizes and adds a log entry to the snapshot.
*
* Only entries of type normal_with_blob are processed.
* The method clears the payload from the entry’s value_etc (keeping the write_version header)
* and adds the entry if its write_version is below the boundary_version.
*
* @param entry The log_entry to be processed and potentially added.
*/
* Sanitizes and adds a log entry to the snapshot.
*
* Only entries of type normal_with_blob are processed.
* The method clears the payload from the entry’s value_etc (keeping the write_version header)
* and adds the entry if its write_version is below the boundary_version.
*
* @param entry The log_entry to be processed and potentially added.
*/
void sanitize_and_add_entry(const log_entry& entry);

/*
* Notifies that the add_entry operations in the current thread are complete,
* and finalizes (sorts) the local container for later merging.
*/
* Notifies that the add_entry operations in the current thread are complete,
* and finalizes (sorts) the local container for later merging.
*/
void finalize_local_entries();

/**
* @brief Finalizes the snapshot after all entries have been added and returns the snapshot.
*
* Merges thread-local containers, sorts them in descending order, and removes duplicate entries.
* Merges thread-local low entries containers, sorts them in descending order, and removes duplicate entries.
*
* @return const log_entry_container& The finalized snapshot of log entries.
*/
Expand All @@ -89,9 +96,9 @@ class blob_file_gc_snapshot {
const write_version_type& boundary_version() const;

private:
// Thread-local pointer to each thread's log_entry_container.
// Thread-local pointer to each thread's low log_entry_container.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
static thread_local std::shared_ptr<log_entry_container> tls_container_;
static thread_local std::shared_ptr<log_entry_container> tls_low_container_;

// The boundary version for write_version used in garbage collection.
write_version_type boundary_version_;
Expand All @@ -102,10 +109,10 @@ class blob_file_gc_snapshot {
// Mutex to ensure thread-safe access to internal data.
mutable std::mutex mtx_;

// List of thread-local containers to be merged into the final snapshot.
std::vector<std::shared_ptr<log_entry_container>> thread_containers_;
// List of thread-local low containers to be merged into the final snapshot.
std::vector<std::shared_ptr<log_entry_container>> thread_low_containers_;

// Global mutex to safely access thread_containers_.
// Global mutex to safely access thread_low_containers_.
std::mutex global_mtx_;
};

Expand Down

0 comments on commit f0a5100

Please sign in to comment.