diff --git a/src/limestone/blob_file_gc_snapshot.cpp b/src/limestone/blob_file_gc_snapshot.cpp index 735f3c7..e849e9a 100644 --- a/src/limestone/blob_file_gc_snapshot.cpp +++ b/src/limestone/blob_file_gc_snapshot.cpp @@ -24,15 +24,17 @@ // ----------------- Implementation of blob_file_gc_snapshot methods ----------------- // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) thread_local std::shared_ptr blob_file_gc_snapshot::tls_low_container_ = nullptr; + // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) + thread_local std::shared_ptr blob_file_gc_snapshot::tls_high_container_ = nullptr; - blob_file_gc_snapshot::blob_file_gc_snapshot(const write_version_type& boundary_version) - : boundary_version_(boundary_version) { - // The thread_low_containers_ vector and snapshot_ are default-constructed. + blob_file_gc_snapshot::blob_file_gc_snapshot(const write_version_type& boundary_version) : boundary_version_(boundary_version) { + // The thread_low_containers_, thread_high_containers_ and snapshot_ are default-constructed. } blob_file_gc_snapshot::~blob_file_gc_snapshot() { - // Reset the thread-local container to avoid state leakage between tests or re-use in the same thread. + // Reset the thread-local containers to avoid state leakage between tests or re-use in the same thread. tls_low_container_.reset(); + tls_high_container_.reset(); } void blob_file_gc_snapshot::sanitize_and_add_entry(const log_entry& entry) { @@ -52,22 +54,28 @@ write_version_type entry_wv; modified_entry.write_version(entry_wv); - // Compare the obtained write_version with the threshold. - if (!(entry_wv < boundary_version_)) { - return; - } - - // Obtain or create the thread-local low log_entry_container. - if (!tls_low_container_) { - tls_low_container_ = std::make_shared(); - { - std::lock_guard lock(global_mtx_); - thread_low_containers_.push_back(tls_low_container_); + // Dispatch entry to the appropriate container based on write_version. + if (entry_wv < boundary_version_) { + // Process low container. + if (!tls_low_container_) { + tls_low_container_ = std::make_shared(); + { + std::lock_guard lock(global_mtx_); + thread_low_containers_.push_back(tls_low_container_); + } + } + tls_low_container_->append(modified_entry); + } else { + // Process high container. + if (!tls_high_container_) { + tls_high_container_ = std::make_shared(); + { + std::lock_guard lock(global_mtx_); + thread_high_containers_.push_back(tls_high_container_); + } } + tls_high_container_->append(modified_entry); } - - // Append the modified entry into the thread-local low container. - tls_low_container_->append(modified_entry); } void blob_file_gc_snapshot::finalize_local_entries() { @@ -75,36 +83,44 @@ tls_low_container_->sort_descending(); tls_low_container_.reset(); } + if (tls_high_container_) { + // For high container, no sorting is required. + tls_high_container_.reset(); + } } const log_entry_container& blob_file_gc_snapshot::finalize_snapshot() { - log_entry_container merged = log_entry_container::merge_sorted_collections(thread_low_containers_); - - // Remove duplicate entries from the merged container. - // Since the container is sorted in descending order, the first entry for a given key_sid - // is the one with the maximum write_version. - snapshot_.clear(); + // Process low containers: merge, sort, and remove duplicate entries. + log_entry_container low_merged = log_entry_container::merge_sorted_collections(thread_low_containers_); + aggregated_entries.clear(); std::string last_key; - for (const auto& entry : merged) { + for (const auto& entry : low_merged) { const std::string& current_key = entry.key_sid(); if (last_key.empty() || current_key != last_key) { - snapshot_.append(entry); + aggregated_entries.append(entry); last_key = current_key; } } - return snapshot_; + // Process high containers: append all entries directly. + for (const auto& high_container : thread_high_containers_) { + for (const auto& entry : *high_container) { + aggregated_entries.append(entry); + } + } + + return aggregated_entries; } void blob_file_gc_snapshot::reset() { { - // Clean up any remaining thread-local low containers. std::lock_guard lock(global_mtx_); thread_low_containers_.clear(); + thread_high_containers_.clear(); } - snapshot_.clear(); - // Note: The thread_local tls_low_container remains set in each thread. - // Its lifetime is managed per thread; if needed, threads can reset it. + aggregated_entries.clear(); + // Note: The thread_local containers remain set in each thread. + // Their lifetime is managed per thread; if needed, threads can reset them. } const write_version_type& blob_file_gc_snapshot::boundary_version() const { diff --git a/src/limestone/blob_file_gc_snapshot.h b/src/limestone/blob_file_gc_snapshot.h index 9faed72..4a5d6d3 100644 --- a/src/limestone/blob_file_gc_snapshot.h +++ b/src/limestone/blob_file_gc_snapshot.h @@ -28,15 +28,13 @@ namespace limestone::internal { * blob_file_gc_snapshot * * This class maintains a snapshot of log entries for blob file garbage collection. - * It provides an interface to obtain an iterator over all blob IDs contained in the snapshot. + * It collects entries from multiple threads in two separate groups: * - * Instead of returning a complete list of blob IDs (which may consume significant memory), - * the class returns a custom iterator that extracts blob IDs on-the-fly. - * - * The snapshot is composed of two groups: * - Low entries container: Contains log entries with write_version below boundary_version. * These entries will be merged, sorted, and deduplicated. - * - High entries container: Will be added later (not processed with merge/sort/deduplication). + * + * - High entries container: Contains log entries with write_version greater than or equal to boundary_version. + * These entries are not processed with merge/sort/duplicate removal and will be directly appended. */ class blob_file_gc_snapshot { public: @@ -53,7 +51,7 @@ class blob_file_gc_snapshot { blob_file_gc_snapshot& operator=(blob_file_gc_snapshot&&) = delete; /** - * @brief Destructor that resets the thread-local container. + * @brief Destructor that resets the thread-local containers. */ ~blob_file_gc_snapshot(); @@ -62,7 +60,9 @@ class blob_file_gc_snapshot { * * Only entries of type normal_with_blob are processed. * The method clears the payload from the entry’s value_etc (keeping the write_version header) - * and adds the entry if its write_version is below the boundary_version. + * and adds the entry into the appropriate container based on its write_version: + * - write_version below boundary_version goes to the low entries container. + * - write_version greater than or equal to boundary_version goes to the high entries container. * * @param entry The log_entry to be processed and potentially added. */ @@ -70,14 +70,16 @@ class blob_file_gc_snapshot { /* * Notifies that the add_entry operations in the current thread are complete, - * and finalizes (sorts) the local container for later merging. + * and finalizes (sorts) the low container for later merging. + * For the high container, no sorting is performed. */ void finalize_local_entries(); /** * @brief Finalizes the snapshot after all entries have been added and returns the snapshot. * - * Merges thread-local low entries containers, sorts them in descending order, and removes duplicate entries. + * Merges thread-local low containers, sorts them in descending order, and removes duplicate entries. + * Then, high container entries are appended directly. * * @return const log_entry_container& The finalized snapshot of log entries. */ @@ -100,20 +102,24 @@ class blob_file_gc_snapshot { // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) static thread_local std::shared_ptr tls_low_container_; + // Thread-local pointer to each thread's high log_entry_container. + // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) + static thread_local std::shared_ptr tls_high_container_; + // The boundary version for write_version used in garbage collection. write_version_type boundary_version_; - // Final snapshot after merging, sorting, and duplicate removal. - log_entry_container snapshot_; + // Final snapshot after merging low containers (with deduplication) and appending high entries. + log_entry_container aggregated_entries; - // Mutex to ensure thread-safe access to internal data. - mutable std::mutex mtx_; + // Global mutex to ensure thread-safe access to thread-local container vectors. + mutable std::mutex global_mtx_; // List of thread-local low containers to be merged into the final snapshot. std::vector> thread_low_containers_; - // Global mutex to safely access thread_low_containers_. - std::mutex global_mtx_; + // List of thread-local high containers whose entries are directly appended. + std::vector> thread_high_containers_; }; } // namespace limestone::internal