Skip to content

Commit

Permalink
Refactor code to address clang-tidy warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
umegane committed Feb 17, 2025
1 parent ea765ad commit b8b204f
Show file tree
Hide file tree
Showing 7 changed files with 486 additions and 400 deletions.
150 changes: 33 additions & 117 deletions src/limestone/blob_file_gc_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,176 +19,92 @@
#include "log_entry_comparator.h"
#include <algorithm>

namespace limestone {
namespace internal {


// Thread-local pointer to each thread's log_entry_container.
thread_local log_entry_container* tls_container = nullptr;


// ----------------- Implementation of blob_id_iterator -----------------

blob_id_iterator::blob_id_iterator(const log_entry_container* snapshot)
: snapshot_(snapshot), entry_index_(0), blob_index_(0)
{
// Ensure cache is initially empty.
cached_blob_ids_.clear();
advance_to_valid();
}

bool blob_id_iterator::has_next() const
{
return (snapshot_ && entry_index_ < snapshot_->size());
}

limestone::api::blob_id_type blob_id_iterator::current()
{
// Assume caller has checked has_next()
// Use the cached blob IDs.
limestone::api::blob_id_type result = cached_blob_ids_[blob_index_];
++blob_index_;
advance_to_valid();
return result;
}

void blob_id_iterator::advance_to_valid()
{
// Loop until a valid blob ID is found or we run out of entries.
while (snapshot_ && entry_index_ < snapshot_->size()) {
// If the cache is empty, load the blob IDs from the current entry.
if (cached_blob_ids_.empty()) {
const auto& entry = *std::next(snapshot_->begin(), entry_index_);
cached_blob_ids_ = entry.get_blob_ids();
}
if (blob_index_ < cached_blob_ids_.size()) {
// Valid blob found.
return;
}
// Otherwise, move to the next entry.
++entry_index_;
blob_index_ = 0;
cached_blob_ids_.clear(); // Clear cache so it will be reloaded for the new entry.
}
// If we exit the loop, no more valid blob IDs exist.
}

namespace limestone::internal {


// ----------------- Implementation of blob_file_gc_snapshot methods -----------------

// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
thread_local std::shared_ptr<log_entry_container> blob_file_gc_snapshot::tls_container_ = nullptr;

blob_file_gc_snapshot::blob_file_gc_snapshot(const write_version_type& threshold)
: threshold_(threshold)
{
// The thread_containers_ vector and snapshot_ are default-constructed.
}

void blob_file_gc_snapshot::sanitize_and_add_entry(const log_entry& entry)
{

void blob_file_gc_snapshot::sanitize_and_add_entry(const log_entry& entry) {
// Only process entries of type normal_with_blob.
if (entry.type() != log_entry::entry_type::normal_with_blob) {
return;
}

// Create a modifiable copy of the entry.
// NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
log_entry modified_entry = entry;

// Clear the payload part of value_etc while keeping the header (write_version) intact.
// The header size is determined by the logical sizes of epoch_id_type and std::uint64_t.
constexpr std::size_t header_size = sizeof(limestone::api::epoch_id_type) + sizeof(std::uint64_t);
std::string& mutable_value = const_cast<std::string&>(modified_entry.value_etc());
if (mutable_value.size() > header_size) {
mutable_value.resize(header_size);
}


// Truncate unnecessary data from the value_etc field.
modified_entry.truncate_value_from_normal_entry();

// Obtain the write_version from the modified entry.
write_version_type entry_wv;
modified_entry.write_version(entry_wv);

// Compare the obtained write_version with the threshold.
if (!(entry_wv < threshold_)) {
return;
}

// Obtain or create the thread-local log_entry_container.
if (!tls_container) {
tls_container = new log_entry_container();
if (!tls_container_) {
tls_container_ = std::make_shared<log_entry_container>();
{
std::lock_guard<std::mutex> lock(global_mtx_);
thread_containers_.push_back(tls_container);
thread_containers_.push_back(tls_container_);
}
}

// Append the modified entry into the thread-local container.
tls_container->append(modified_entry);
tls_container_->append(modified_entry);
}

void blob_file_gc_snapshot::finalize_local_entries()
{
tls_container->sort_descending();
tls_container_->sort_descending();
}

void blob_file_gc_snapshot::finalize_snapshot()
const log_entry_container& blob_file_gc_snapshot::finalize_snapshot()
{
// Prepare a vector to collect containers from all threads.
std::vector<log_entry_container> containers_to_merge;
{
// Lock the global mutex to safely access thread_containers_.
std::lock_guard<std::mutex> lock(global_mtx_);
for (auto* container_ptr : thread_containers_) {
// Although each thread should have already finalized its local container,
// we call sort_descending() here as a defensive measure.
container_ptr->sort_descending();
// Move the container's content into the merging vector.
containers_to_merge.push_back(std::move(*container_ptr));
// Free the allocated memory.
delete container_ptr;
}
// Clear the global container list.
thread_containers_.clear();
}
log_entry_container merged = log_entry_container::merge_sorted_collections(thread_containers_);

// Merge all thread-local containers into a single container.
log_entry_container merged = log_entry_container::merge_sorted_collections(containers_to_merge);

// Remove duplicates directly into snapshot_.
// Remove duplicate entries from the merged container.
// Since the container is sorted in descending order, the first entry for a given key_sid
// is the one with the maximum write_version.
snapshot_.clear();
// last_key is initialized to an empty string.
// It is guaranteed that key_sid is never an empty string, so the first entry will always be appended.
std::string last_key;
for (auto it = merged.begin(); it != merged.end(); ++it) {
const std::string& current_key = it->key_sid();
for (const auto& entry : merged) {
const std::string& current_key = entry.key_sid();
if (last_key.empty() || current_key != last_key) {
snapshot_.append(*it);
snapshot_.append(entry);
last_key = current_key;
}
}
}


blob_id_iterator blob_file_gc_snapshot::blob_ids_iterator() const {
return blob_id_iterator(&snapshot_);
return snapshot_;
}


void blob_file_gc_snapshot::reset()
{
{
// Clean up any remaining thread-local containers.
std::lock_guard<std::mutex> lock(global_mtx_);
for (auto* container_ptr : thread_containers_) {
delete container_ptr;
}
thread_containers_.clear();
}
snapshot_.clear();
// Note: The thread_local tls_container remains set in each thread.
// Its lifetime is managed per thread; if needed, threads can reset it.
}

// These helper functions are no longer used because merging and duplicate removal are handled in finalize_snapshot.
void blob_file_gc_snapshot::merge_entries() { }
void blob_file_gc_snapshot::remove_duplicate_entries() { }

} // namespace internal
} // namespace limestone

} // namespace limestone::internal

90 changes: 17 additions & 73 deletions src/limestone/blob_file_gc_snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,7 @@
#include "log_entry_container.h"
#include "limestone/api/blob_id_type.h"

namespace limestone {
namespace internal {

/*
* blob_id_iterator
*
* A simple, Java-style iterator that allows sequential access to all blob IDs
* contained in a snapshot (log_entry_container) without creating a separate list.
*
* The iterator provides:
* - has_next(): to check if there is a next blob ID.
* - current(): to retrieve the current blob ID and advance the iterator.
*
* Internally, the iterator caches the blob IDs from the current log entry to
* avoid repeated calls to get_blob_ids().
*/
class blob_id_iterator {
public:
// Constructs an iterator for the given snapshot.
// The iterator starts at the first blob ID of the first log entry.
explicit blob_id_iterator(const log_entry_container* snapshot);

// Returns true if there is a next blob ID in the snapshot.
bool has_next() const;

// Returns the current blob ID and advances the iterator.
// Caller should check has_next() before calling current().
limestone::api::blob_id_type current();

private:
// Pointer to the snapshot containing log entries.
const log_entry_container* snapshot_;
// Index of the current log entry within the snapshot.
std::size_t entry_index_;
// Index of the current blob ID within the blob ID list of the current log entry.
std::size_t blob_index_;
// Cached blob IDs from the current log entry to reduce repeated retrieval.
std::vector<limestone::api::blob_id_type> cached_blob_ids_;

// Advances internal indices (and updates the cache) to point to the next valid blob ID.
// If the current log entry has no further blob IDs, it advances to the next log entry.
void advance_to_valid();
};
namespace limestone::internal {

/*
* blob_file_gc_snapshot
Expand All @@ -84,8 +42,9 @@ class blob_file_gc_snapshot {
// Disable copy and move semantics.
blob_file_gc_snapshot(const blob_file_gc_snapshot&) = delete;
blob_file_gc_snapshot& operator=(const blob_file_gc_snapshot&) = delete;
blob_file_gc_snapshot(blob_file_gc_snapshot&&) = default;
blob_file_gc_snapshot& operator=(blob_file_gc_snapshot&&) = default;
blob_file_gc_snapshot(blob_file_gc_snapshot&&) = delete;
blob_file_gc_snapshot& operator=(blob_file_gc_snapshot&&) = delete;
~blob_file_gc_snapshot() = default;

/*
* Sanitizes and adds a log entry to the snapshot.
Expand All @@ -104,54 +63,39 @@ class blob_file_gc_snapshot {
*/
void finalize_local_entries();


/*
* Finalizes the snapshot after all threads have finished adding entries.
/**
* @brief Finalizes the snapshot after all entries have been added and returns the snapshot.
*
* This method merges all entries from the internal buffer, sorts them in descending order,
* and removes duplicate entries with the same key_sid by retaining only the entry with the maximum write_version.
*/
void finalize_snapshot();

/*
* Returns a blob_id_iterator that provides sequential access to all blob IDs in the snapshot.
* Merges thread-local containers, sorts them in descending order, and removes duplicate entries.
*
* This iterator allows clients to iterate over each blob ID without creating an additional list,
* thereby reducing memory overhead.
* @return const log_entry_container& The finalized snapshot of log entries.
*/
blob_id_iterator blob_ids_iterator() const;
const log_entry_container& finalize_snapshot();

/*
* Resets the internal state for a new garbage collection cycle.
*/
void reset();

private:
/*
* Merges entries from multiple sources (if applicable).
*/
void merge_entries();

/*
* Removes duplicate entries by retaining only the entry with the highest write_version for each key_sid.
*/
void remove_duplicate_entries();
// Thread-local pointer to each thread's log_entry_container.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
static thread_local std::shared_ptr<log_entry_container> tls_container_;

// The threshold for write_version used in garbage collection.
write_version_type threshold_;

// Internal buffer to accumulate log entries from multiple threads.
std::vector<log_entry> entries_;

// Final snapshot after merging, sorting, and duplicate removal.
log_entry_container snapshot_;

// Mutex to ensure thread-safe access to internal data.
mutable std::mutex mtx_;

std::vector<log_entry_container*> thread_containers_;
// List of thread-local containers to be merged into the final snapshot.
std::vector<std::shared_ptr<log_entry_container>> thread_containers_;

// Global mutex to safely access thread_containers_.
std::mutex global_mtx_;
};

} // namespace internal
} // namespace limestone
} // namespace limestone::internal
20 changes: 20 additions & 0 deletions src/limestone/log_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,26 @@ class log_entry {
return parse_blob_ids(blob_ids_);
}

/**
* @brief Truncates the value portion of value_etc_, keeping only the write_version header,
* but only for entries of type normal_entry and normal_with_blob.
*
* For these entry types, value_etc_ contains the write_version header followed by the value.
* This method removes any data beyond the header, leaving only the write_version information.
* For other entry types, the method does nothing.
*/
void truncate_value_from_normal_entry() {
// Process only normal_entry and normal_with_blob entry types.
if (entry_type_ != entry_type::normal_entry && entry_type_ != entry_type::normal_with_blob) {
return;
}

constexpr std::size_t header_size = sizeof(epoch_id_type) + sizeof(std::uint64_t);
if (value_etc_.size() > header_size) {
value_etc_.resize(header_size);
}
}

private:
entry_type entry_type_{};
epoch_id_type epoch_id_{};
Expand Down
Loading

0 comments on commit b8b204f

Please sign in to comment.