Skip to content

Commit

Permalink
compaction: introduce compaction_options parameter (DONE)
Browse files Browse the repository at this point in the history
  • Loading branch information
umegane committed Feb 18, 2025
1 parent 67f9eb0 commit 7273641
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 12 deletions.
96 changes: 96 additions & 0 deletions docs/internal/blob-limestone.md
Original file line number Diff line number Diff line change
Expand Up @@ -664,3 +664,99 @@ BLOBリスト付きのエントリに対応する。
log_channel.hに更新しているが、だれも参照していないpriavte fieldがあったので削除した。

write_version_type write_version_{};

## コンパクション時のGC

### blob_file_gc_snapshotクラス

* 概要
* BLOB_FILEのGCを行うための暮らす
* GC対象外のblob_idのリストを生成するために使用する。
* コンパクション時に、BLOB付きのエントリを登録する。
* エントリの追加機能はマルチスレッド対応
* メモリ上のエントリを保持する。ただし、このクラスの目的に不要なvalue部は削除して保持する。
* スレッドの処理が終了するとスナップショット作成対象のエントリをソートする。
* 全てのスレッドが終了したら、スナップショット作成対象のエントリをマージソートでマージしながらスナップショットを作成し、削除不可なエントリのセットを作成する。
* write_versionがboundary_version以上のエントリは無条件に削除不可
* write_versionがboundary_version未満のエントリは該当するエントリからスナップショットを作成し、スナップショットに含まれるエントリを削除不可とする。
* 削除不可のエントリがもつblob_idのリストから、削除不可なblob idのリストを作成する。


* コンストラクタ、パラメータとして、boundary_versionを受け取る。

```
/*
* Constructs a blob_file_gc_snapshot with the given boundary_version.
* @param boundary_version The boundary_version for garbage collection.
*/
explicit blob_file_gc_snapshot(const write_version_type& boundary_version);
```

* エントリを登録するメソッド

```
/*
* Sanitizes and adds a log entry to the snapshot.
*
* Only entries of type normal_with_blob are processed.
* The method clears the payload from the entry’s value_etc (keeping the write_version header)
* and adds the entry if its write_version is below the boundary_version.
*
* @param entry The log_entry to be processed and potentially added.
*/
void sanitize_and_add_entry(const log_entry& entry);
```

* 当該スレッドのエントリ追加が終了したことを通知するメソッド
* これが呼ばれると、当該スレッドが登録したエントリをソートする。
* ソートしたデータは内部に保持する。

```
/*
* Notifies that the add_entry operations in the current thread are complete,
* and finalizes (sorts) the local container for later merging.
*/
void finalize_local_entries();
```

* 全てのスレッドの処理が終了し、登録すべきエントリがなくなった呼び出すメソッド
* マージソートでスナップショットを作成し、削除してはいけないエントリのリストを作成する。


```
/**
* @brief Finalizes the snapshot after all entries have been added and returns the snapshot.
*
* Merges thread-local containers, sorts them in descending order, and removes duplicate entries.
*
* @return const log_entry_container& The finalized snapshot of log entries.
*/
const log_entry_container& finalize_snapshot();
```

* 内部に保持していた情報をクリアしメモリを開放する。
```
/*
* Resets the internal state for a new garbage collection cycle.
*/
void reset();
```




### オンラインコンパクション対応のための既存コードの変更点

* `switch_available_boundary_version(write_version_type version)`で通知された、boundary_versionを覚えておく
* オンラインコンパクション時は、write_versionがboundary_version以上のエントリは、削除しないで残しておく
* SORT_METHOD_PUT_ONLYの有無によって、実装が切り替わるが、両方のパターンに対応する。
* SORT_METHOD_PUT_ONLYの有無によって、ロジックを変える必要がある。
* こうしないと、まだ参照されているBLOBを含むエントリを削除してしまう可能性がある。
* 対象となるのはBLOBを含むエントリのみ
* コンパクション時に、新たに作成したPWALファイルと、既存のコンパクション済みファイルを読み取るが、そのときに、
BLOB付きのエントリをblob_file_gc_snapshotに追加する。
* コンパクション処理が終了 = 全てのエントリ追加完了なので、blob_file_gc_snapshot::finalize_snapshot()を呼び出し、
削除不可なエントリのリストを作成する。
* 削除不可なエントリのリストから、削除不可なblob_idのリストを作成する。
* 後は起動時のコンパクションと同様に、blob_fileをスキャンして作成したblob_idのリストと、削除不可なblob_idのリストを比較し、
削除すべきblob_idのリストを作成、blob_fileを削除する。
17 changes: 10 additions & 7 deletions src/limestone/datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,14 @@ void datastore::compact_with_online() {
TRACE_START;
check_after_ready(static_cast<const char*>(__func__));

// get a copy of next_blob_id and boundary_version before rotation
blob_id_type next_blob_id_copy = next_blob_id_.load(std::memory_order_acquire);
write_version_type boundary_version_copy;
{
std::lock_guard<std::mutex> lock(boundary_mutex_);
boundary_version_copy = available_boundary_version_;
}

// rotate first
rotation_result result = rotate_log_files();

Expand Down Expand Up @@ -687,11 +695,6 @@ void datastore::compact_with_online() {
ensure_directory_exists(compaction_temp_dir);

// create a compacted file and snapshot for blob file garbage collection
write_version_type boundary_version_copy;
{
std::lock_guard<std::mutex> lock(boundary_mutex_);
boundary_version_copy = available_boundary_version_;
}
blob_file_gc_snapshot gc_snapshot(boundary_version_copy);
compaction_options options{location_, compaction_temp_dir, recover_max_parallelism_, need_compaction_filenames, gc_snapshot};
blob_id_type max_blob_id = create_compact_pwal_and_get_max_blob_id(options);
Expand Down Expand Up @@ -737,7 +740,7 @@ void datastore::compact_with_online() {

// blob files garbage collection
blob_file_garbage_collector garb_collector(*blob_file_resolver_);
garb_collector.scan_blob_files(boundary_version_copy.get_major());
garb_collector.scan_blob_files(next_blob_id_copy);
log_entry_container log_entries = gc_snapshot.finalize_snapshot();
for(const auto& entry : log_entries) {
for(const auto& blob_id : entry.get_blob_ids()) {
Expand Down Expand Up @@ -795,7 +798,7 @@ blob_file datastore::get_blob_file(blob_id_type reference) {
return blob_file(path, available);
}

void datastore::switch_available_boundary_version([[maybe_unused]] write_version_type version) {
void datastore::switch_available_boundary_version(write_version_type version) {
TRACE_FINE_START << "version=" << version.get_major() << "." << version.get_minor();
{
std::lock_guard<std::mutex> lock(boundary_mutex_);
Expand Down
6 changes: 6 additions & 0 deletions src/limestone/dblog_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ epoch_id_type dblog_scan::scan_pwal_files( // NOLINT(readability-function-cogni

try {
process_file(p);
if (options_.has_value()) {
compaction_options &opts = options_.value().get();
if (opts.is_gc_enabled()) {
opts.get_gc_snapshot().finalize_local_entries();
}
}
} catch (limestone_exception& ex) {
VLOG(log_info) << "/:limestone catch runtime_error(" << ex.what() << ")";
{
Expand Down
12 changes: 7 additions & 5 deletions src/limestone/dblog_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,12 @@ class dblog_scan {
* @param logdir The path to the directory containing the files to be processed.
* @param file_names The set of file names within `logdir` to be processed.
*/
explicit dblog_scan(boost::filesystem::path logdir, const compaction_options &options) : dblogdir_(std::move(logdir)) {
for (const auto &file_name : options.get_file_names()) {
path_list_.emplace_back(dblogdir_ / file_name);
}
}
explicit dblog_scan(boost::filesystem::path logdir, compaction_options& options)
: dblogdir_(std::move(logdir)), options_(options) {
for (const auto& file_name : options.get_file_names()) {
path_list_.emplace_back(dblogdir_ / file_name);
}
}

const boost::filesystem::path& get_dblogdir() { return dblogdir_; }
void set_thread_num(int thread_num) noexcept { thread_num_ = thread_num; }
Expand Down Expand Up @@ -172,6 +173,7 @@ class dblog_scan {

private:
boost::filesystem::path dblogdir_;
std::optional<std::reference_wrapper<compaction_options>> options_;
std::list<boost::filesystem::path> path_list_;
int thread_num_{1};
bool fail_fast_{false};
Expand Down

0 comments on commit 7273641

Please sign in to comment.