Skip to content

Commit

Permalink
fix(blob_file_gc_snapshot): include valid log entries in final snapsh…
Browse files Browse the repository at this point in the history
…ot (WIP)

- Renamed the final snapshot container to "aggregated_entries" to reflect its combined role.
- Updated finalize_snapshot to merge low entries with deduplication and directly append high entries.
- Adjusted comments and variable names accordingly.
- Confirmed that the code compiles.
  • Loading branch information
umegane committed Feb 18, 2025
1 parent f0a5100 commit 9bbad13
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 47 deletions.
78 changes: 47 additions & 31 deletions src/limestone/blob_file_gc_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@
// ----------------- Implementation of blob_file_gc_snapshot methods -----------------
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
thread_local std::shared_ptr<log_entry_container> blob_file_gc_snapshot::tls_low_container_ = nullptr;
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
thread_local std::shared_ptr<log_entry_container> blob_file_gc_snapshot::tls_high_container_ = nullptr;

blob_file_gc_snapshot::blob_file_gc_snapshot(const write_version_type& boundary_version)
: boundary_version_(boundary_version) {
// The thread_low_containers_ vector and snapshot_ are default-constructed.
blob_file_gc_snapshot::blob_file_gc_snapshot(const write_version_type& boundary_version) : boundary_version_(boundary_version) {
// The thread_low_containers_, thread_high_containers_ and snapshot_ are default-constructed.
}

blob_file_gc_snapshot::~blob_file_gc_snapshot() {
// Reset the thread-local container to avoid state leakage between tests or re-use in the same thread.
// Reset the thread-local containers to avoid state leakage between tests or re-use in the same thread.
tls_low_container_.reset();
tls_high_container_.reset();
}

void blob_file_gc_snapshot::sanitize_and_add_entry(const log_entry& entry) {
Expand All @@ -52,59 +54,73 @@
write_version_type entry_wv;
modified_entry.write_version(entry_wv);

// Compare the obtained write_version with the threshold.
if (!(entry_wv < boundary_version_)) {
return;
}

// Obtain or create the thread-local low log_entry_container.
if (!tls_low_container_) {
tls_low_container_ = std::make_shared<log_entry_container>();
{
std::lock_guard<std::mutex> lock(global_mtx_);
thread_low_containers_.push_back(tls_low_container_);
// Dispatch entry to the appropriate container based on write_version.
if (entry_wv < boundary_version_) {
// Process low container.
if (!tls_low_container_) {
tls_low_container_ = std::make_shared<log_entry_container>();
{
std::lock_guard<std::mutex> lock(global_mtx_);
thread_low_containers_.push_back(tls_low_container_);
}
}
tls_low_container_->append(modified_entry);
} else {
// Process high container.
if (!tls_high_container_) {
tls_high_container_ = std::make_shared<log_entry_container>();
{
std::lock_guard<std::mutex> lock(global_mtx_);
thread_high_containers_.push_back(tls_high_container_);
}
}
tls_high_container_->append(modified_entry);
}

// Append the modified entry into the thread-local low container.
tls_low_container_->append(modified_entry);
}

void blob_file_gc_snapshot::finalize_local_entries() {
if (tls_low_container_) {
tls_low_container_->sort_descending();
tls_low_container_.reset();
}
if (tls_high_container_) {
// For high container, no sorting is required.
tls_high_container_.reset();
}
}

const log_entry_container& blob_file_gc_snapshot::finalize_snapshot() {
log_entry_container merged = log_entry_container::merge_sorted_collections(thread_low_containers_);

// Remove duplicate entries from the merged container.
// Since the container is sorted in descending order, the first entry for a given key_sid
// is the one with the maximum write_version.
snapshot_.clear();
// Process low containers: merge, sort, and remove duplicate entries.
log_entry_container low_merged = log_entry_container::merge_sorted_collections(thread_low_containers_);
aggregated_entries.clear();
std::string last_key;
for (const auto& entry : merged) {
for (const auto& entry : low_merged) {
const std::string& current_key = entry.key_sid();
if (last_key.empty() || current_key != last_key) {
snapshot_.append(entry);
aggregated_entries.append(entry);
last_key = current_key;
}
}

return snapshot_;
// Process high containers: append all entries directly.
for (const auto& high_container : thread_high_containers_) {
for (const auto& entry : *high_container) {
aggregated_entries.append(entry);
}
}

return aggregated_entries;
}

void blob_file_gc_snapshot::reset() {
{
// Clean up any remaining thread-local low containers.
std::lock_guard<std::mutex> lock(global_mtx_);
thread_low_containers_.clear();
thread_high_containers_.clear();
}
snapshot_.clear();
// Note: The thread_local tls_low_container remains set in each thread.
// Its lifetime is managed per thread; if needed, threads can reset it.
aggregated_entries.clear();
// Note: The thread_local containers remain set in each thread.
// Their lifetime is managed per thread; if needed, threads can reset them.
}

const write_version_type& blob_file_gc_snapshot::boundary_version() const {
Expand Down
38 changes: 22 additions & 16 deletions src/limestone/blob_file_gc_snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@ namespace limestone::internal {
* blob_file_gc_snapshot
*
* This class maintains a snapshot of log entries for blob file garbage collection.
* It provides an interface to obtain an iterator over all blob IDs contained in the snapshot.
* It collects entries from multiple threads in two separate groups:
*
* Instead of returning a complete list of blob IDs (which may consume significant memory),
* the class returns a custom iterator that extracts blob IDs on-the-fly.
*
* The snapshot is composed of two groups:
* - Low entries container: Contains log entries with write_version below boundary_version.
* These entries will be merged, sorted, and deduplicated.
* - High entries container: Will be added later (not processed with merge/sort/deduplication).
*
* - High entries container: Contains log entries with write_version greater than or equal to boundary_version.
* These entries are not processed with merge/sort/duplicate removal and will be directly appended.
*/
class blob_file_gc_snapshot {
public:
Expand All @@ -53,7 +51,7 @@ class blob_file_gc_snapshot {
blob_file_gc_snapshot& operator=(blob_file_gc_snapshot&&) = delete;

/**
* @brief Destructor that resets the thread-local container.
* @brief Destructor that resets the thread-local containers.
*/
~blob_file_gc_snapshot();

Expand All @@ -62,22 +60,26 @@ class blob_file_gc_snapshot {
*
* Only entries of type normal_with_blob are processed.
* The method clears the payload from the entry’s value_etc (keeping the write_version header)
* and adds the entry if its write_version is below the boundary_version.
* and adds the entry into the appropriate container based on its write_version:
* - write_version below boundary_version goes to the low entries container.
* - write_version greater than or equal to boundary_version goes to the high entries container.
*
* @param entry The log_entry to be processed and potentially added.
*/
void sanitize_and_add_entry(const log_entry& entry);

/*
* Notifies that the add_entry operations in the current thread are complete,
* and finalizes (sorts) the local container for later merging.
* and finalizes (sorts) the low container for later merging.
* For the high container, no sorting is performed.
*/
void finalize_local_entries();

/**
* @brief Finalizes the snapshot after all entries have been added and returns the snapshot.
*
* Merges thread-local low entries containers, sorts them in descending order, and removes duplicate entries.
* Merges thread-local low containers, sorts them in descending order, and removes duplicate entries.
* Then, high container entries are appended directly.
*
* @return const log_entry_container& The finalized snapshot of log entries.
*/
Expand All @@ -100,20 +102,24 @@ class blob_file_gc_snapshot {
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
static thread_local std::shared_ptr<log_entry_container> tls_low_container_;

// Thread-local pointer to each thread's high log_entry_container.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
static thread_local std::shared_ptr<log_entry_container> tls_high_container_;

// The boundary version for write_version used in garbage collection.
write_version_type boundary_version_;

// Final snapshot after merging, sorting, and duplicate removal.
log_entry_container snapshot_;
// Final snapshot after merging low containers (with deduplication) and appending high entries.
log_entry_container aggregated_entries;

// Mutex to ensure thread-safe access to internal data.
mutable std::mutex mtx_;
// Global mutex to ensure thread-safe access to thread-local container vectors.
mutable std::mutex global_mtx_;

// List of thread-local low containers to be merged into the final snapshot.
std::vector<std::shared_ptr<log_entry_container>> thread_low_containers_;

// Global mutex to safely access thread_low_containers_.
std::mutex global_mtx_;
// List of thread-local high containers whose entries are directly appended.
std::vector<std::shared_ptr<log_entry_container>> thread_high_containers_;
};

} // namespace limestone::internal

0 comments on commit 9bbad13

Please sign in to comment.