Skip to content

Commit

Permalink
WIP implementation of WAL compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
ban-nobuhiro committed Jun 23, 2024
1 parent a25bdb7 commit f0ed155
Show file tree
Hide file tree
Showing 6 changed files with 491 additions and 71 deletions.
47 changes: 47 additions & 0 deletions docs/tglogutil-compaction-man.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# `tglogutil-compaction` - トランザクションログを再編し、ディスク容量を削減する

`tglogutil compaction` コマンドは Tsurugi トランザクションログを再編し、ディスク容量を削減する

## SYNOPSIS

```
$ tglogutil compaction [options] <dblogdir>
```

## DESCRIPTION

`<dblogdir>` で指定されたトランザクションログを再編する.
Specify the location set in the `log_location` parameter in the `[datastore]` section of the configuration file of Tsurugi server (`tsurugi.ini`).

Options:
* `--thread-num=<number>`
* Number (default `1`) of concurrent processing thread of reading log files
* `--epoch=<epoch>`
* Upper limit epoch number to be accepted as valid data (default is the value recorded in the transaction log directory)
* `--force=<bool>`
* 実行開始前のプロンプトを出さない (default `false`)
* `--dry-run=<bool>` (**未実装**)
* トランザクションログファイルを変更しない. 作業ディレクトリに再編したデータを作成するが, その生成物を採用せず破棄する (default `false`)
* `--verbose=<bool>`
* verbose mode (default `false`)
* `--make-backup=<bool>`
* 作業ディレクトリ上で再編したデータを作成したあと, 元の dblogdir の内容のバックアップを残すかどうか. `false` の場合には元のディレクトリの内容は消去される (default `false`)

## EXIT STATUS

* 0: No errors
* Compaction process completed successfully
* 16: Error
* 一時ディレクトリの削除に失敗した (処理を継続)
* 64 or more: Unable to handle
* `dblogdir` does not exist
* `dblogdir` is inaccessible
* `dblogdir` has file format error
* Specified a directory that is not the transaction log directory
* Specified a transaction log directory of unsupported format version
* `epoch` file does not exist
* files in `dblogdir` are damaged

## PRECAUTIONS FOR USE

The compaction process involves rewriting data, so it is recommended to back up the entire directory before using this tool.
7 changes: 7 additions & 0 deletions docs/tglogutil_ja.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

* 2024-04-10 ban
* for 1.0.0-BETA4
* 2024-06-21 ban
* for 1.0.0-BETA5, compaction サブコマンド

## この文書について

Expand All @@ -26,6 +28,7 @@
## 機能一覧 (サブコマンド)

* repair: 永続化データディレクトリの内容を修復する.
* compaction: 永続化データディレクトリの内容を再編し冗長な情報を削除する.

### サブコマンド repair (自動修復)

Expand Down Expand Up @@ -63,3 +66,7 @@ $ tglogutil repair [options] <dblogdir>

使用上の注意
* データの書き換えを伴うため, ディレクトリのバックアップを取っておくことが望ましい.

### サブコマンド compaction


190 changes: 122 additions & 68 deletions src/limestone/datastore_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
#include "log_entry.h"
#include "sortdb_wrapper.h"

namespace limestone::api {
using namespace limestone::internal;
namespace limestone::internal {

constexpr std::size_t write_version_size = sizeof(epoch_id_type) + sizeof(std::uint64_t);
static_assert(write_version_size == 16);
Expand All @@ -53,98 +52,78 @@ static int comp_twisted_key(const std::string_view& a, const std::string_view& b
return std::memcmp(b.data(), a.data(), write_version_size);
}

void datastore::create_snapshot() {
auto& from_dir = location_;
[[maybe_unused]]
static void insert_entry_or_update_to_max(sortdb_wrapper* sortdb, log_entry& e) {
bool need_write = true;
// skip older entry than already inserted
std::string value;
if (sortdb->get(e.key_sid(), &value)) {
write_version_type write_version;
e.write_version(write_version);
if (write_version < write_version_type(value.substr(1))) {
need_write = false;
}
}
if (need_write) {
std::string db_value;
db_value.append(1, static_cast<char>(e.type()));
db_value.append(e.value_etc());
sortdb->put(e.key_sid(), db_value);
}
}

[[maybe_unused]]
static void insert_twisted_entry(sortdb_wrapper* sortdb, log_entry& e) {
// key_sid: storage_id[8] key[*], value_etc: epoch[8]LE minor_version[8]LE value[*], type: type[1]
// db_key: epoch[8]BE minor_version[8]BE storage_id[8] key[*], db_value: type[1] value[*]
std::string db_key(write_version_size + e.key_sid().size(), '\0');
store_bswap64_value(&db_key[0], &e.value_etc()[0]); // NOLINT(readability-container-data-pointer)
store_bswap64_value(&db_key[8], &e.value_etc()[8]);
std::memcpy(&db_key[write_version_size], e.key_sid().data(), e.key_sid().size());
std::string db_value(1, static_cast<char>(e.type()));
db_value.append(e.value_etc().substr(write_version_size));
sortdb->put(db_key, db_value);
}

static std::pair<epoch_id_type, std::unique_ptr<sortdb_wrapper>> create_sortdb_from_wals(const boost::filesystem::path& from_dir, int num_worker) {
#if defined SORT_METHOD_PUT_ONLY
auto sortdb = std::make_unique<sortdb_wrapper>(from_dir, comp_twisted_key);
#else
auto sortdb = std::make_unique<sortdb_wrapper>(from_dir);
#endif
dblog_scan logscan{location_};
dblog_scan logscan{from_dir};

epoch_id_type ld_epoch = logscan.last_durable_epoch_in_dir();

[[maybe_unused]]
auto insert_entry_or_update_to_max = [&sortdb](log_entry& e){
bool need_write = true;

// skip older entry than already inserted
std::string value;
if (sortdb->get(e.key_sid(), &value)) {
write_version_type write_version;
e.write_version(write_version);
if (write_version < write_version_type(value.substr(1))) {
need_write = false;
}
}

if (need_write) {
std::string db_value;
db_value.append(1, static_cast<char>(e.type()));
db_value.append(e.value_etc());
sortdb->put(e.key_sid(), db_value);
}
};
[[maybe_unused]]
auto insert_twisted_entry = [&sortdb](log_entry& e){
// key_sid: storage_id[8] key[*], value_etc: epoch[8]LE minor_version[8]LE value[*], type: type[1]
// db_key: epoch[8]BE minor_version[8]BE storage_id[8] key[*], db_value: type[1] value[*]
std::string db_key(write_version_size + e.key_sid().size(), '\0');
store_bswap64_value(&db_key[0], &e.value_etc()[0]); // NOLINT(readability-container-data-pointer)
store_bswap64_value(&db_key[8], &e.value_etc()[8]);
std::memcpy(&db_key[write_version_size], e.key_sid().data(), e.key_sid().size());
std::string db_value(1, static_cast<char>(e.type()));
db_value.append(e.value_etc().substr(write_version_size));
sortdb->put(db_key, db_value);
};
#if defined SORT_METHOD_PUT_ONLY
auto add_entry = insert_twisted_entry;
auto add_entry = [&sortdb](log_entry& e){insert_twisted_entry(sortdb.get(), e);};
bool works_with_multi_thread = true;
#else
auto add_entry = insert_entry_or_update_to_max;
auto add_entry = [&sortdb](log_entry& e){insert_entry_or_update_to_max(sortdb.get(), e);};
bool works_with_multi_thread = false;
#endif

int num_worker = recover_max_parallelism_;
if (!works_with_multi_thread && num_worker > 1) {
LOG(INFO) << "/:limestone:config:datastore this sort method does not work correctly with multi-thread, so force the number of recover process thread = 1";
num_worker = 1;
}
logscan.set_thread_num(num_worker);
try {
epoch_id_type max_appeared_epoch = logscan.scan_pwal_files_throws(ld_epoch, add_entry);
epoch_id_switched_.store(max_appeared_epoch);
epoch_id_informed_.store(max_appeared_epoch);
return {max_appeared_epoch, std::move(sortdb)};
} catch (std::runtime_error& e) {
VLOG_LP(log_info) << "failed to scan pwal files: " << e.what();
LOG(ERROR) << "/:limestone recover process failed. (cause: corruption detected in transaction log data directory), "
<< "see https://github.com/project-tsurugi/tsurugidb/blob/master/docs/troubleshooting-guide.md";
LOG(ERROR) << "/:limestone dblogdir (transaction log directory): " << location_;
LOG(ERROR) << "/:limestone dblogdir (transaction log directory): " << from_dir;
throw std::runtime_error("dblogdir is corrupted");
}
}

boost::filesystem::path sub_dir = location_ / boost::filesystem::path(std::string(snapshot::subdirectory_name_));
boost::system::error_code error;
const bool result_check = boost::filesystem::exists(sub_dir, error);
if (!result_check || error) {
const bool result_mkdir = boost::filesystem::create_directory(sub_dir, error);
if (!result_mkdir || error) {
LOG_LP(ERROR) << "fail to create directory";
throw std::runtime_error("I/O error");
}
}

boost::filesystem::path snapshot_file = sub_dir / boost::filesystem::path(std::string(snapshot::file_name_));
VLOG_LP(log_info) << "generating snapshot file: " << snapshot_file;
FILE* ostrm = fopen(snapshot_file.c_str(), "w"); // NOLINT(*-owning-memory)
if (!ostrm) {
LOG_LP(ERROR) << "cannot create snapshot file (" << snapshot_file << ")";
throw std::runtime_error("I/O error");
}
setvbuf(ostrm, nullptr, _IOFBF, 128L * 1024L); // NOLINT, NB. glibc may ignore size when _IOFBF and buffer=NULL
static void sortdb_foreach(sortdb_wrapper *sortdb, std::function<void(const std::string_view key, const std::string_view value)> write_snapshot_entry) {
static_assert(sizeof(log_entry::entry_type) == 1);
#if defined SORT_METHOD_PUT_ONLY
sortdb->each([&ostrm, last_key = std::string{}](std::string_view db_key, std::string_view db_value) mutable {
sortdb->each([write_snapshot_entry, last_key = std::string{}](const std::string_view db_key, const std::string_view db_value) mutable {
// using the first entry in GROUP BY (original-)key
// NB: max versions comes first (by the custom-comparator)
std::string_view key(db_key.data() + write_version_size, db_key.size() - write_version_size);
Expand All @@ -160,7 +139,7 @@ void datastore::create_snapshot() {
store_bswap64_value(&value[0], &db_key[0]);
store_bswap64_value(&value[8], &db_key[8]);
std::memcpy(&value[write_version_size], &db_value[1], db_value.size() - 1);
log_entry::write(ostrm, key, value);
write_snapshot_entry(key, value);
break;
}
case log_entry::entry_type::remove_entry:
Expand All @@ -171,12 +150,11 @@ void datastore::create_snapshot() {
}
});
#else
sortdb->each([&ostrm](std::string_view db_key, std::string_view db_value) {
sortdb->each([&write_snapshot_entry](const std::string_view db_key, const std::string_view db_value) {
auto entry_type = static_cast<log_entry::entry_type>(db_value[0]);
db_value.remove_prefix(1);
switch (entry_type) {
case log_entry::entry_type::normal_entry:
log_entry::write(ostrm, db_key, db_value);
write_snapshot_entry(db_key, db_value.substr(1));
break;
case log_entry::entry_type::remove_entry:
break; // skip
Expand All @@ -186,6 +164,82 @@ void datastore::create_snapshot() {
}
});
#endif
}

void create_comapct_pwal(const boost::filesystem::path& from_dir, boost::filesystem::path& to_dir, int num_worker) {
auto [max_appeared_epoch, sortdb] = create_sortdb_from_wals(from_dir, num_worker);

boost::system::error_code error;
const bool result_check = boost::filesystem::exists(to_dir, error);
if (!result_check || error) {
const bool result_mkdir = boost::filesystem::create_directory(to_dir, error);
if (!result_mkdir || error) {
LOG_LP(ERROR) << "fail to create directory " << to_dir;
throw std::runtime_error("I/O error");
}
}

boost::filesystem::path snapshot_file = to_dir / boost::filesystem::path("pwal_0000.compacted");
VLOG_LP(log_info) << "generating compacted pwal file: " << snapshot_file;
FILE* ostrm = fopen(snapshot_file.c_str(), "w"); // NOLINT(*-owning-memory)
if (!ostrm) {
LOG_LP(ERROR) << "cannot create snapshot file (" << snapshot_file << ")";
throw std::runtime_error("I/O error");
}
setvbuf(ostrm, nullptr, _IOFBF, 128L * 1024L); // NOLINT, NB. glibc may ignore size when _IOFBF and buffer=NULL
bool rewind = true; // TODO: change by flag
epoch_id_type epoch = rewind ? 0 : max_appeared_epoch;
log_entry::begin_session(ostrm, epoch);
auto write_snapshot_entry = [&ostrm, &rewind](std::string_view key_stid, std::string_view value_etc) {
if (rewind) {
static std::string value{};
value = value_etc;
std::memset(value.data(), 0, 16);
log_entry::write(ostrm, key_stid, value);
} else {
log_entry::write(ostrm, key_stid, value_etc);
}
};
sortdb_foreach(sortdb.get(), write_snapshot_entry);
//log_entry::end_session(ostrm, epoch);
if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory)
LOG_LP(ERROR) << "cannot close snapshot file (" << snapshot_file << "), errno = " << errno;
throw std::runtime_error("I/O error");
}
}

}

namespace limestone::api {
using namespace limestone::internal;

void datastore::create_snapshot() {
const auto& from_dir = location_;
auto [max_appeared_epoch, sortdb] = create_sortdb_from_wals(from_dir, recover_max_parallelism_);
epoch_id_switched_.store(max_appeared_epoch);
epoch_id_informed_.store(max_appeared_epoch);

boost::filesystem::path sub_dir = location_ / boost::filesystem::path(std::string(snapshot::subdirectory_name_));
boost::system::error_code error;
const bool result_check = boost::filesystem::exists(sub_dir, error);
if (!result_check || error) {
const bool result_mkdir = boost::filesystem::create_directory(sub_dir, error);
if (!result_mkdir || error) {
LOG_LP(ERROR) << "fail to create directory";
throw std::runtime_error("I/O error");
}
}

boost::filesystem::path snapshot_file = sub_dir / boost::filesystem::path(std::string(snapshot::file_name_));
VLOG_LP(log_info) << "generating snapshot file: " << snapshot_file;
FILE* ostrm = fopen(snapshot_file.c_str(), "w"); // NOLINT(*-owning-memory)
if (!ostrm) {
LOG_LP(ERROR) << "cannot create snapshot file (" << snapshot_file << ")";
throw std::runtime_error("I/O error");
}
setvbuf(ostrm, nullptr, _IOFBF, 128L * 1024L); // NOLINT, NB. glibc may ignore size when _IOFBF and buffer=NULL
auto write_snapshot_entry = [&ostrm](std::string_view key, std::string_view value){log_entry::write(ostrm, key, value);};
sortdb_foreach(sortdb.get(), write_snapshot_entry);
if (fclose(ostrm) != 0) { // NOLINT(*-owning-memory)
LOG_LP(ERROR) << "cannot close snapshot file (" << snapshot_file << "), errno = " << errno;
throw std::runtime_error("I/O error");
Expand Down
1 change: 1 addition & 0 deletions src/limestone/dblog_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class dblog_scan {
explicit dblog_scan(const boost::filesystem::path& logdir) : dblogdir_(logdir) { }
explicit dblog_scan(boost::filesystem::path&& logdir) : dblogdir_(std::move(logdir)) { }

const boost::filesystem::path& get_dblogdir() { return dblogdir_; }
void set_thread_num(int thread_num) noexcept { thread_num_ = thread_num; }
void set_fail_fast(bool fail_fast) noexcept { fail_fast_ = fail_fast; }
void detach_wal_files(bool skip_empty_files = true);
Expand Down
Loading

0 comments on commit f0ed155

Please sign in to comment.