From 9357180b37d40778e829188a4a1e39a672d70866 Mon Sep 17 00:00:00 2001 From: Nobuhiro Ban Date: Tue, 5 Mar 2024 05:34:57 +0900 Subject: [PATCH] wip --- src/CMakeLists.txt | 14 + src/limestone/datastore_snapshot.cpp | 174 +----------- src/limestone/dblog_scan.cpp | 207 ++++++++++++++ src/limestone/dblog_scan.h | 164 +++++++++++ src/limestone/dblogutil/dblogutil.cpp | 118 ++++++++ src/limestone/internal.h | 12 +- src/limestone/log_entry.h | 104 +++++-- src/limestone/parse_wal_file.cpp | 379 +++++++++++++++++++++++++ test/limestone/log/dblog_scan_test.cpp | 209 ++++++++++++++ test/limestone/log/durable_test.cpp | 26 +- test/limestone/log/log_dir_test.cpp | 67 +++-- 11 files changed, 1258 insertions(+), 216 deletions(-) create mode 100644 src/limestone/dblog_scan.cpp create mode 100644 src/limestone/dblog_scan.h create mode 100644 src/limestone/dblogutil/dblogutil.cpp create mode 100644 src/limestone/parse_wal_file.cpp create mode 100644 test/limestone/log/dblog_scan_test.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index bf4dd48f..c9b76259 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -58,3 +58,17 @@ target_link_libraries(limestone-impl INTERFACE ${sort_lib} INTERFACE nlohmann_json::nlohmann_json ) + +# utils +file(GLOB DBLOGUTIL_SOURCES + "limestone/dblogutil/*.cpp" +) + + +add_executable(dblogutil ${DBLOGUTIL_SOURCES}) +target_include_directories(dblogutil + PRIVATE . + PRIVATE ./limestone + ) +target_link_libraries(dblogutil PRIVATE limestone-impl PRIVATE glog::glog gflags::gflags) +install_custom(dblogutil dblogutil) diff --git a/src/limestone/datastore_snapshot.cpp b/src/limestone/datastore_snapshot.cpp index b4e499e2..53dfa0dc 100644 --- a/src/limestone/datastore_snapshot.cpp +++ b/src/limestone/datastore_snapshot.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Project Tsurugi. + * Copyright 2022-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,71 +25,17 @@ #include "logging_helper.h" #include +#include "dblog_scan.h" +#include "internal.h" #include "log_entry.h" #include "sortdb_wrapper.h" -namespace limestone::internal { -using namespace limestone::api; - -// return max epoch in file. -std::optional last_durable_epoch(const boost::filesystem::path& file) { - std::optional rv; - - boost::filesystem::ifstream istrm; - log_entry e; - istrm.open(file, std::ios_base::in | std::ios_base::binary); - while (e.read(istrm)) { - if (e.type() != log_entry::entry_type::marker_durable) { - LOG_LP(ERROR) << "this epoch file is broken: unexpected log_entry type: " << static_cast(e.type()); - throw std::runtime_error("unexpected log_entry type for epoch file"); - } - if (!rv.has_value() || e.epoch_id() > rv) { - rv = e.epoch_id(); - } - } - istrm.close(); - return rv; -} - -} - namespace limestone::api { using namespace limestone::internal; constexpr std::size_t write_version_size = sizeof(epoch_id_type) + sizeof(std::uint64_t); static_assert(write_version_size == 16); -epoch_id_type datastore::last_durable_epoch_in_dir() { - auto& from_dir = location_; - // read main epoch file first - std::optional ld_epoch = last_durable_epoch(from_dir / std::string(epoch_file_name)); - if (ld_epoch.has_value()) { - return *ld_epoch; - } - - // main epoch file is empty, - // read all rotated-epoch files - for (const boost::filesystem::path& p : boost::filesystem::directory_iterator(from_dir)) { - if (p.filename().string().rfind(epoch_file_name, 0) == 0) { // starts_with(epoch_file_name) - // this is epoch file (main one or rotated) - std::optional epoch = last_durable_epoch(p); - if (!epoch.has_value()) { - continue; // file is empty - } - // ld_epoch = max(ld_epoch, epoch) - if (!ld_epoch.has_value() || *ld_epoch < *epoch) { - ld_epoch = epoch; - } - } - } - return ld_epoch.value_or(0); // 0 = minimum epoch -} - -} - -namespace limestone::internal { -using namespace limestone::api; - [[maybe_unused]] static void store_bswap64_value(void *dest, const void *src) { auto* p64_dest = reinterpret_cast(dest); // NOLINT(*-reinterpret-cast) @@ -107,112 +53,6 @@ static int comp_twisted_key(const std::string_view& a, const std::string_view& b return std::memcmp(b.data(), a.data(), write_version_size); } -epoch_id_type scan_one_pwal_file(const boost::filesystem::path& p, epoch_id_type ld_epoch, const std::function& add_entry) { - VLOG_LP(log_info) << "processing pwal file: " << p.filename().string(); - log_entry e; - epoch_id_type current_epoch{UINT64_MAX}; - epoch_id_type max_epoch_of_file{0}; - - boost::filesystem::fstream strm; - strm.open(p, std::ios_base::in | std::ios_base::out | std::ios_base::binary); - bool skipping = false; // scanning in the invalidated epoch snippet - while (e.read(strm)) { - switch (e.type()) { - case log_entry::entry_type::marker_begin: { - current_epoch = e.epoch_id(); - max_epoch_of_file = std::max(max_epoch_of_file, current_epoch); - if (current_epoch <= ld_epoch) { - skipping = false; - } else { - auto pos = strm.tellg(); - strm.seekp(-9, std::ios::cur); // size of marker_begin entry - char buf = static_cast(log_entry::entry_type::marker_invalidated_begin); - strm.write(&buf, sizeof(char)); - strm.flush(); - strm.seekg(pos, std::ios::beg); // restore position - skipping = true; - } - break; - } - case log_entry::entry_type::marker_invalidated_begin: { - max_epoch_of_file = std::max(max_epoch_of_file, e.epoch_id()); - skipping = true; - break; - } - case log_entry::entry_type::normal_entry: - case log_entry::entry_type::remove_entry: { - if (!skipping) { - add_entry(e); - } - break; - } - default: - break; - } - } - strm.close(); - return max_epoch_of_file; -} - -epoch_id_type scan_pwal_files_in_dir(const boost::filesystem::path& from_dir, int num_worker, - const std::function& is_wal, - epoch_id_type ld_epoch, const std::function& add_entry) { - std::atomic max_appeared_epoch{ld_epoch}; - auto process_file = [&](const boost::filesystem::path& p) { - if (is_wal(p)) { - epoch_id_type max_epoch_of_file = scan_one_pwal_file(p, ld_epoch, add_entry); - epoch_id_type t = max_appeared_epoch.load(); - while (t < max_epoch_of_file - && !max_appeared_epoch.compare_exchange_weak(t, max_epoch_of_file)) { - /* nop */ - } - } - }; - std::mutex dir_mtx; - auto dir_begin = boost::filesystem::directory_iterator(from_dir); - auto dir_end = boost::filesystem::directory_iterator(); - std::vector workers; - std::mutex ex_mtx; - std::exception_ptr ex_ptr{}; - workers.reserve(num_worker); - for (int i = 0; i < num_worker; i++) { - workers.emplace_back(std::thread([&](){ - for (;;) { - boost::filesystem::path p; - { - std::lock_guard g{dir_mtx}; - if (dir_begin == dir_end) break; - p = *dir_begin++; - } - try { - process_file(p); - } catch (std::runtime_error& ex) { - VLOG(log_info) << "/:limestone catch runtime_error(" << ex.what() << ")"; - std::lock_guard g2{ex_mtx}; - if (!ex_ptr) { // only save one - ex_ptr = std::current_exception(); - } - std::lock_guard g{dir_mtx}; - dir_begin = dir_end; // skip all unprocessed files - break; - } - } - })); - } - for (int i = 0; i < num_worker; i++) { - workers[i].join(); - } - if (ex_ptr) { - std::rethrow_exception(ex_ptr); - } - return max_appeared_epoch; -} - -} - -namespace limestone::api { -using namespace limestone::internal; - void datastore::create_snapshot() { auto& from_dir = location_; #if defined SORT_METHOD_PUT_ONLY @@ -220,8 +60,9 @@ void datastore::create_snapshot() { #else auto sortdb = std::make_unique(from_dir); #endif + dblog_scan logscan{location_}; - epoch_id_type ld_epoch = last_durable_epoch_in_dir(); + epoch_id_type ld_epoch = logscan.last_durable_epoch_in_dir(); epoch_id_switched_.store(ld_epoch + 1); // ?? [[maybe_unused]] @@ -270,8 +111,9 @@ void datastore::create_snapshot() { LOG(INFO) << "/:limestone:config:datastore this sort method does not work correctly with multi-thread, so force the number of recover process thread = 1"; num_worker = 1; } - auto is_wal = [](const boost::filesystem::path& p){ return p.filename().string().substr(0, log_channel::prefix.length()) == log_channel::prefix; }; - epoch_id_type max_appeared_epoch = scan_pwal_files_in_dir(from_dir, num_worker, is_wal, ld_epoch, add_entry); + logscan.set_thread_num(num_worker); + logscan.set_process_at_nondurable_epoch_snippet(dblog_scan::process_at_nondurable::repair_by_mark); + epoch_id_type max_appeared_epoch = logscan.scan_pwal_files_throws(ld_epoch, add_entry); epoch_id_informed_.store(max_appeared_epoch); boost::filesystem::path sub_dir = location_ / boost::filesystem::path(std::string(snapshot::subdirectory_name_)); diff --git a/src/limestone/dblog_scan.cpp b/src/limestone/dblog_scan.cpp new file mode 100644 index 00000000..60168d56 --- /dev/null +++ b/src/limestone/dblog_scan.cpp @@ -0,0 +1,207 @@ +/* + * Copyright 2024-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include "logging_helper.h" + +#include +#include "internal.h" +#include "dblog_scan.h" +#include "log_entry.h" +#include "sortdb_wrapper.h" + +namespace limestone::internal { +using namespace limestone::api; + +// return max epoch in file. +std::optional last_durable_epoch(const boost::filesystem::path& file) { + std::optional rv; + + boost::filesystem::ifstream istrm; + log_entry e; + istrm.open(file, std::ios_base::in | std::ios_base::binary); + while (e.read(istrm)) { + if (e.type() != log_entry::entry_type::marker_durable) { + LOG_LP(ERROR) << "this epoch file is broken: unexpected log_entry type: " << static_cast(e.type()); + throw std::runtime_error("unexpected log_entry type for epoch file"); + } + if (!rv.has_value() || e.epoch_id() > rv) { + rv = e.epoch_id(); + } + } + istrm.close(); + return rv; +} + +epoch_id_type dblog_scan::last_durable_epoch_in_dir() { + auto& from_dir = dblogdir_; + // read main epoch file first + std::optional ld_epoch = last_durable_epoch(from_dir / std::string(epoch_file_name)); + if (ld_epoch.has_value()) { + return *ld_epoch; + } + + // main epoch file is empty, + // read all rotated-epoch files + for (const boost::filesystem::path& p : boost::filesystem::directory_iterator(from_dir)) { + if (p.filename().string().rfind(epoch_file_name, 0) == 0) { // starts_with(epoch_file_name) + // this is epoch file (main one or rotated) + std::optional epoch = last_durable_epoch(p); + if (!epoch.has_value()) { + continue; // file is empty + } + // ld_epoch = max(ld_epoch, epoch) + if (!ld_epoch.has_value() || *ld_epoch < *epoch) { + ld_epoch = epoch; + } + } + } + return ld_epoch.value_or(0); // 0 = minimum epoch +} + +// deprecated, remove +epoch_id_type scan_one_pwal_file(const boost::filesystem::path& p, epoch_id_type ld_epoch, const std::function& add_entry) { + dblog_scan ds{""}; // dummy + dblog_scan::parse_error ec; + ds.set_fail_fast(true); + ds.set_process_at_nondurable_epoch_snippet(dblog_scan::process_at_nondurable::repair_by_mark); + auto rc = ds.scan_one_pwal_file(p, ld_epoch, add_entry, [](log_entry::read_error& e) -> bool { + LOG_LP(ERROR) << "this pwal file is broken: " << e.message(); + throw std::runtime_error("pwal file read error"); + }, ec); + return rc; +} + +void dblog_scan::detach_wal_files(bool skip_empty_files) { + // rotate_attached_wal_files + std::vector attached_files; + for (const boost::filesystem::path& p : boost::filesystem::directory_iterator(dblogdir_)) { + if (is_wal(p) && !is_detached_wal(p)) { + if (skip_empty_files && boost::filesystem::is_empty(p)) + continue; + attached_files.emplace_back(p); + } + } + for (const boost::filesystem::path& p : attached_files) { + // XXX: dup with log_channel::do_rotate_file + std::stringstream ss; + auto unix_epoch = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + ss << p.string() << "." << std::setw(14) << std::setfill('0') << unix_epoch << ".0"; + boost::filesystem::path new_file{ss.str()}; + boost::filesystem::rename(p, new_file); + VLOG_LP(50) << "rename " << p << " to " << new_file; + } +} + +epoch_id_type dblog_scan::scan_pwal_files(epoch_id_type ld_epoch, const std::function& add_entry, + const error_report_func_t& report_error, int* max_parse_error_value) { + std::atomic max_appeared_epoch{ld_epoch}; + if (max_parse_error_value) *max_parse_error_value = -1; + std::atomic max_error_value{0}; + auto process_file = [&](const boost::filesystem::path& p) { + if (is_wal(p)) { + parse_error ec; + auto rc = scan_one_pwal_file(p, ld_epoch, add_entry, report_error, ec); + epoch_id_type max_epoch_of_file = rc; + int ec_value = ec.value(); + switch (ec_value) { + case parse_error::ok: + VLOG(30) << "OK: " << p; + break; + case parse_error::repaired: + VLOG(30) << "REPAIRED: " << p; + break; + case parse_error::broken_after_marked: + VLOG(30) << "MARKED BUT TAIL IS BROKEN: " << p; + if (!is_detached_wal(p)) { + if (fail_fast_) + throw std::runtime_error("the end of non-detached file is broken"); + } + break; + case parse_error::broken_after: + case parse_error::unexpected: + case parse_error::failed: + default: + VLOG(30) << "ERROR: " << p; + if (fail_fast_) + throw std::runtime_error(ec.message()); + } + int tmp = max_error_value.load(); + while (tmp < ec.value() + && !max_error_value.compare_exchange_weak(tmp, ec.value())) { + /* nop */ + } + epoch_id_type t = max_appeared_epoch.load(); + while (t < max_epoch_of_file + && !max_appeared_epoch.compare_exchange_weak(t, max_epoch_of_file)) { + /* nop */ + } + } + }; + std::mutex dir_mtx; + auto dir_begin = boost::filesystem::directory_iterator(dblogdir_); + auto dir_end = boost::filesystem::directory_iterator(); + std::vector workers; + std::mutex ex_mtx; + std::exception_ptr ex_ptr{}; + workers.reserve(thread_num_); + for (int i = 0; i < thread_num_; i++) { + workers.emplace_back(std::thread([&](){ + for (;;) { + boost::filesystem::path p; + { + std::lock_guard g{dir_mtx}; + if (dir_begin == dir_end) break; + p = *dir_begin++; + } + try { + process_file(p); + } catch (std::runtime_error& ex) { + VLOG(log_info) << "/:limestone catch runtime_error(" << ex.what() << ")"; + std::lock_guard g2{ex_mtx}; + if (!ex_ptr) { // only save one + ex_ptr = std::current_exception(); + } + std::lock_guard g{dir_mtx}; + dir_begin = dir_end; // skip all unprocessed files + break; + } + } + })); + } + for (int i = 0; i < thread_num_; i++) { + workers[i].join(); + } + if (ex_ptr) { + std::rethrow_exception(ex_ptr); + } + if (max_parse_error_value) *max_parse_error_value = max_error_value; + return max_appeared_epoch; +} + +epoch_id_type dblog_scan::scan_pwal_files_throws(epoch_id_type ld_epoch, const std::function& add_entry) { + set_fail_fast(true); + return scan_pwal_files(ld_epoch, add_entry, [](log_entry::read_error& e) -> bool { + LOG_LP(ERROR) << "this pwal file is broken: " << e.message(); + throw std::runtime_error("pwal file read error"); + }); +} + +} diff --git a/src/limestone/dblog_scan.h b/src/limestone/dblog_scan.h new file mode 100644 index 00000000..f2a88f2c --- /dev/null +++ b/src/limestone/dblog_scan.h @@ -0,0 +1,164 @@ +/* + * Copyright 2024-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include "internal.h" +#include "log_entry.h" + +namespace limestone::internal { +// accessing dblogdir before db start +class dblog_scan { + + // XXX: copied from datastore.h, resolve dup + /** + * @brief name of a file to record durable epoch + */ + static constexpr const std::string_view epoch_file_name = "epoch"; /* datastore::epoch_file_name */ + + // XXX: copied from log_channel.h, resolve dup + /** + * @brief prefix of pwal file name + */ + static constexpr const std::string_view pwal_prefix = "pwal_"; /* log_channel::prefix */ + +public: // TODO private + using error_report_func_t = std::function; + class parse_error { + public: + static constexpr int ok = 0; + static constexpr int repaired = 1; + // warning; pending repair (cut) (inner-code, do not expose out of parse-func) + static constexpr int broken_after_tobe_cut = 0x8; + // warning; repaired, but tail is still broken, so do not append to this file + static constexpr int broken_after_marked = 0x11; + // error; tail is broken, not repaired + static constexpr int broken_after = 0x40; + // error; unexpected (formal) entry, maybe logic error + static constexpr int unexpected = 0x81; + static constexpr int failed = 0xff; + + parse_error() noexcept : value_(0) {} + explicit parse_error(int value) noexcept : value_(value) {} + parse_error(int value, std::streamoff fpos) noexcept : value_(value), fpos_(fpos) {} + + void value(int value) noexcept { value_ = value; } + [[nodiscard]] int value() const noexcept { return value_; } + explicit operator bool() const noexcept { return value_ != 0; } + + [[nodiscard]] std::streamoff fpos() const noexcept { return fpos_; } + + [[nodiscard]] std::string message() const { + switch (value_) { + case ok: return "no error"; + case repaired: return "file is repaired"; + case broken_after_tobe_cut: return "file is broken after ofset " + std::to_string(fpos_) + " , and pending to cut"; + case broken_after_marked: return "file is broken after ofset " + std::to_string(fpos_) + " , and marked invalid snippet"; + case broken_after: return "file is broken after ofset " + std::to_string(fpos_) + ", need to be repair"; + case unexpected: return "unexpected log entry order"; + case failed: return "parse failed"; + default: return "unknown error code " + std::to_string(value_); + } + } + private: + int value_; + std::streamoff fpos_{-1}; + }; + +public: + dblog_scan(const boost::filesystem::path& logdir) : dblogdir_(logdir) { } + dblog_scan(boost::filesystem::path&& logdir) : dblogdir_(std::move(logdir)) { } + + void set_thread_num(int thread_num) noexcept { thread_num_ = thread_num; } + void set_fail_fast(bool fail_fast) noexcept { fail_fast_ = fail_fast; } + void detach_wal_files(bool skip_empty_files = true); + + enum class process_at_nondurable { + ignore, + report, + repair_by_mark, // mark invalidated epoch snippet header of non-durable well-fromed epoch snippet + //repair_by_cut, // cut the snippet from the file + }; + void set_process_at_nondurable_epoch_snippet(process_at_nondurable p) noexcept { + process_at_nondurable_ = p; + } + + enum class process_at_truncated { + ignore, + report, + repair_by_mark, // mark invalidated to epoch snippet header of incomplete (truncated) epoch snippet + repair_by_cut, // truncate the incomplete snippet from the file + }; + void set_process_at_truncated_epoch_snippet(process_at_truncated p) noexcept { + process_at_truncated_ = p; + } + + enum class process_at_damaged { + ignore, + report, + repair_by_mark, // mark invalidated to epoch snippet header of broken epoch snippet (contains log entry which type is unknown) + repair_by_cut, // remove the damaged snippet from snippet header to the end of file + }; + void set_process_at_damaged_epoch_snippet(process_at_damaged p) noexcept { + process_at_damaged_ = p; + } + + epoch_id_type last_durable_epoch_in_dir(); + + /** + * @returns max epoch value in directory + * @throws exception on error + */ + epoch_id_type scan_pwal_files_throws(epoch_id_type ld_epoch, const std::function& add_entry); + /** + * @returns max epoch value in directory + */ + epoch_id_type scan_pwal_files(epoch_id_type ld_epoch, const std::function& add_entry, const error_report_func_t& report_error, int* = nullptr); + + epoch_id_type scan_one_pwal_file(const boost::filesystem::path& p, epoch_id_type ld_epoch, + const std::function& add_entry, + const error_report_func_t& report_error, + parse_error& pe); + + static bool is_wal(const boost::filesystem::path& p) { return p.filename().string().rfind(pwal_prefix, 0) == 0; } + static bool is_detached_wal(const boost::filesystem::path& p) { + auto filename = p.filename().string(); + return (filename.length() > 9 && filename.rfind(pwal_prefix, 0) == 0); + } + +private: + boost::filesystem::path dblogdir_; + int thread_num_; + bool fail_fast_{false}; + + // repair-nondurable-epoch-snippet + // (implemented in 1.0.0 BETA2) + // repair: non-durable well-fromed epoch snippet + process_at_nondurable process_at_nondurable_ = process_at_nondurable::report; + + // repair-truncated-epoch-snippet + // (implemented in 1.0.0 BETA4) + // repair: incomplete (truncated) epoch snippet + process_at_truncated process_at_truncated_ = process_at_truncated::report; + + // repair-damaged-epoch-snippet + // (implemented in 1.0.0 BETA4) + // damaged epoch snippet (contains log entry which type is unknown), e.g. zero-filled + process_at_damaged process_at_damaged_ = process_at_damaged::report; +}; + +} diff --git a/src/limestone/dblogutil/dblogutil.cpp b/src/limestone/dblogutil/dblogutil.cpp new file mode 100644 index 00000000..bce6b9a4 --- /dev/null +++ b/src/limestone/dblogutil/dblogutil.cpp @@ -0,0 +1,118 @@ +/* + * Copyright 2024-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "logging_helper.h" + +#include "limestone/api/datastore.h" +#include "dblog_scan.h" +#include "internal.h" +#include "log_entry.h" + +using namespace limestone::api; +using namespace limestone::internal; + +DEFINE_string(epoch, "", "specify valid epoch upper limit"); +DEFINE_int32(thread_num, 1, "specify thread num of scanning wal file"); +DEFINE_bool(cut, false, "repair by cutting for error-truncate and error-broken"); +DEFINE_string(rotate, "all", "rotate files"); + +namespace limestone { + +void inspect(dblog_scan &ds, std::optional epoch) { + epoch_id_type ld_epoch = ds.last_durable_epoch_in_dir(); + LOG(INFO) << "durable-epoch: " << ld_epoch; + std::atomic_size_t count_normal_entry = 0; + std::atomic_size_t count_remove_entry = 0; + ds.set_process_at_nondurable_epoch_snippet(dblog_scan::process_at_nondurable::report); + ds.set_process_at_truncated_epoch_snippet(dblog_scan::process_at_truncated::report); + ds.set_process_at_damaged_epoch_snippet(dblog_scan::process_at_damaged::report); + ds.set_fail_fast(false); + int max_ec; + epoch_id_type max_appeared_epoch = ds.scan_pwal_files(epoch.value_or(ld_epoch), [&](log_entry& e){ + if (e.type() == log_entry::entry_type::normal_entry) { + VLOG(50) << "normal"; + count_normal_entry++; + } else if (e.type() == log_entry::entry_type::remove_entry) { + VLOG(50) << "remove"; + count_remove_entry++; + } else { + LOG(ERROR) << static_cast(e.type()); + } + }, [](log_entry::read_error& ec){ + VLOG(30) << "ERROR " << ec.value() << " : " << ec.message(); + return false; + }, &max_ec); + LOG(INFO) << "max-appeared-epoch: " << max_appeared_epoch; + LOG(INFO) << "count-normal: " << count_normal_entry; + LOG(INFO) << "count-remove: " << count_remove_entry; + LOG(INFO) << "status: " << max_ec; +} + +void repair(dblog_scan &ds, std::optional epoch) { + epoch_id_type ld_epoch; + if (epoch.has_value()) { + ld_epoch = epoch.value(); + } else { + ld_epoch = ds.last_durable_epoch_in_dir(); + LOG(INFO) << "durable-epoch: " << ld_epoch; + } + ds.set_process_at_nondurable_epoch_snippet(dblog_scan::process_at_nondurable::repair_by_mark); + ds.set_process_at_truncated_epoch_snippet(FLAGS_cut ? dblog_scan::process_at_truncated::repair_by_cut : dblog_scan::process_at_truncated::repair_by_mark); + ds.set_process_at_damaged_epoch_snippet(FLAGS_cut ? dblog_scan::process_at_damaged::repair_by_cut : dblog_scan::process_at_damaged::repair_by_mark); + ds.set_fail_fast(false); + ds.detach_wal_files(); + [[maybe_unused]] + epoch_id_type max_appeared_epoch = ds.scan_pwal_files(ld_epoch, [&](log_entry& e){ + (void)e; + }, [](log_entry::read_error& e) -> bool { + LOG_LP(ERROR) << "this pwal file is broken: " << e.message(); + throw std::runtime_error("pwal file read error"); + }); +} + +int main(int argc, char *argv[]) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + if (argc < 3) abort(); + std::optional opt_epoch; + if (FLAGS_epoch.empty()) { + opt_epoch = std::nullopt; + } else { + opt_epoch = std::stoul(FLAGS_epoch); + } + enum { inspect, repair } mode; + if (strcmp(argv[1], "inspect") == 0) { // NOLINT(*-pointer-arithmetic) + mode = inspect; + } else if (strcmp(argv[1], "repair") == 0) { // NOLINT(*-pointer-arithmetic) + mode = repair; + } else { + LOG(FATAL) << "unknown subcommand"; + } + boost::filesystem::path p(argv[2]); // NOLINT(*-pointer-arithmetic) + LOG(INFO) << p; + dblog_scan ds(p); + ds.set_thread_num(FLAGS_thread_num); + if (mode == inspect) limestone::inspect(ds, opt_epoch); + if (mode == repair) limestone::repair(ds, opt_epoch); + return 0; +} + +} + +int main(int argc, char *argv[]) { + return limestone::main(argc, argv); +} diff --git a/src/limestone/internal.h b/src/limestone/internal.h index b8f0a82f..5e3cb132 100644 --- a/src/limestone/internal.h +++ b/src/limestone/internal.h @@ -1,5 +1,5 @@ /* - * Copyright 2023-2023 Project Tsurugi. + * Copyright 2023-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,11 +14,14 @@ * limitations under the License. */ +#pragma once + #include #include #include +#include "log_entry.h" namespace limestone::internal { using namespace limestone::api; @@ -30,13 +33,6 @@ std::optional last_durable_epoch(const boost::filesystem::path& f epoch_id_type scan_one_pwal_file(const boost::filesystem::path& pwal, epoch_id_type ld_epoch, const std::function& add_entry); -/** - * @returns max epoch value in directory - */ -epoch_id_type scan_pwal_files_in_dir(const boost::filesystem::path& from_dir, int num_worker, - const std::function& is_wal, - epoch_id_type ld_epoch, const std::function& add_entry); - // from datastore_format.cpp inline constexpr const std::string_view manifest_file_name = "limestone-manifest.json"; diff --git a/src/limestone/log_entry.h b/src/limestone/log_entry.h index ce4ee4e3..e14e782f 100644 --- a/src/limestone/log_entry.h +++ b/src/limestone/log_entry.h @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Project Tsurugi. + * Copyright 2022-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,6 +45,43 @@ class log_entry { remove_entry = 5, marker_invalidated_begin = 6, }; + class read_error { + public: + static constexpr int ok = 0; + // warning + static constexpr int nondurable_snippet = 0x01; + // error + static constexpr int short_entry = 0x81; + // unknown type; eg. type 0 + static constexpr int unknown_type = 0x82; + // unexpected type; eg. add_entry at the head of pwal file or in epoch file + static constexpr int unexpected_type = 0x83; + + read_error() noexcept { value_ = 0; } + read_error(int value) noexcept : value_(value) {} + read_error(int value, log_entry::entry_type entry_type) noexcept : value_(value), entry_type_(entry_type) {} + + void value(int value) noexcept { value_ = value; } + [[nodiscard]]int value() const noexcept { return value_; } + void entry_type(log_entry::entry_type entry_type) noexcept { entry_type_ = entry_type; } + [[nodiscard]] log_entry::entry_type entry_type() const noexcept { return entry_type_; } + + explicit operator bool() const noexcept { return value_ != 0; } + + [[nodiscard]] std::string message() const { + switch (value_) { + case ok: return "no error"; + case nondurable_snippet: return "found nondurable epoch snippet"; + case short_entry: return "unexpected EOF"; + case unknown_type: return "unknown log_entry type " + std::to_string(static_cast(entry_type_)); + case unexpected_type: return "unexpected log_entry type " + std::to_string(static_cast(entry_type_)); + } + return "unknown error code " + std::to_string(value_); + } + private: + int value_; + log_entry::entry_type entry_type_; + }; log_entry() = default; @@ -160,8 +197,21 @@ class log_entry { // for reader bool read(std::istream& strm) { - strm.read(&one_char_, sizeof(char)); - entry_type_ = static_cast(one_char_); + read_error ec(0); + bool rc = read_entry_from(strm, ec); + if (ec) { + LOG_LP(ERROR) << "this log_entry is broken: " << ec.message(); + throw std::runtime_error(ec.message()); + } + return rc; + } + + bool read_entry_from(std::istream& strm, read_error& ec) { + ec.value(0); + ec.entry_type(entry_type::this_id_is_not_used); + char one_char{}; + strm.read(&one_char, sizeof(char)); + entry_type_ = static_cast(one_char); if (strm.eof()) { return false; } @@ -169,35 +219,44 @@ class log_entry { switch(entry_type_) { case entry_type::normal_entry: { - std::size_t key_len = read_uint32le(strm); - std::size_t value_len = read_uint32le(strm); + std::size_t key_len = read_uint32le(strm, ec); + if (ec) return false; + std::size_t value_len = read_uint32le(strm, ec); + if (ec) return false; key_sid_.resize(key_len + sizeof(storage_id_type)); - read_bytes(strm, key_sid_.data(), static_cast(key_sid_.length())); + read_bytes(strm, key_sid_.data(), static_cast(key_sid_.length()), ec); + if (ec) return false; value_etc_.resize(value_len + sizeof(epoch_id_type) + sizeof(std::uint64_t)); - read_bytes(strm, value_etc_.data(), static_cast(value_etc_.length())); + read_bytes(strm, value_etc_.data(), static_cast(value_etc_.length()), ec); + if (ec) return false; break; } case entry_type::remove_entry: { - std::size_t key_len = read_uint32le(strm); + std::size_t key_len = read_uint32le(strm, ec); + if (ec) return false; key_sid_.resize(key_len + sizeof(storage_id_type)); - read_bytes(strm, key_sid_.data(), static_cast(key_sid_.length())); + read_bytes(strm, key_sid_.data(), static_cast(key_sid_.length()), ec); + if (ec) return false; value_etc_.resize(sizeof(epoch_id_type) + sizeof(std::uint64_t)); - read_bytes(strm, value_etc_.data(), static_cast(value_etc_.length())); + read_bytes(strm, value_etc_.data(), static_cast(value_etc_.length()), ec); + if (ec) return false; break; } case entry_type::marker_begin: case entry_type::marker_end: case entry_type::marker_durable: case entry_type::marker_invalidated_begin: - epoch_id_ = static_cast(read_uint64le(strm)); + epoch_id_ = static_cast(read_uint64le(strm, ec)); + if (ec) return false; break; default: - LOG_LP(ERROR) << "this log_entry is broken: unknown type: " << static_cast(entry_type_); - throw std::runtime_error("unknown log_entry type"); + ec.value(read_error::unknown_type); + ec.entry_type(entry_type_); + return false; } return true; @@ -206,7 +265,7 @@ class log_entry { void write_version(write_version_type& buf) { memcpy(static_cast(&buf), value_etc_.data(), sizeof(epoch_id_type) + sizeof(std::uint64_t)); } - storage_id_type storage() { + [[nodiscard]] storage_id_type storage() const { storage_id_type storage_id{}; memcpy(static_cast(&storage_id), key_sid_.data(), sizeof(storage_id_type)); return storage_id; @@ -217,7 +276,7 @@ class log_entry { void key(std::string& buf) { buf = key_sid_.substr(sizeof(storage_id_type)); } - entry_type type() { + [[nodiscard]] entry_type type() const { return entry_type_; } [[nodiscard]] epoch_id_type epoch_id() const { @@ -247,7 +306,6 @@ class log_entry { epoch_id_type epoch_id_{}; std::string key_sid_{}; std::string value_etc_{}; - char one_char_{}; static void write_uint8(FILE* out, const std::uint8_t value) { int ret = fputc(value, out); @@ -260,18 +318,18 @@ class log_entry { std::uint32_t buf = htole32(value); write_bytes(out, &buf, sizeof(std::uint32_t)); } - static std::uint32_t read_uint32le(std::istream& in) { + static std::uint32_t read_uint32le(std::istream& in, read_error& ec) { std::uint32_t buf{}; - read_bytes(in, &buf, sizeof(std::uint32_t)); + read_bytes(in, &buf, sizeof(std::uint32_t), ec); return le32toh(buf); } static void write_uint64le(FILE* out, const std::uint64_t value) { std::uint64_t buf = htole64(value); write_bytes(out, &buf, sizeof(std::uint64_t)); } - static std::uint64_t read_uint64le(std::istream& in) { + static std::uint64_t read_uint64le(std::istream& in, read_error& ec) { std::uint64_t buf{}; - read_bytes(in, &buf, sizeof(std::uint64_t)); + read_bytes(in, &buf, sizeof(std::uint64_t), ec); return le64toh(buf); } static void write_bytes(FILE* out, const void* buf, std::size_t len) { @@ -282,11 +340,11 @@ class log_entry { throw std::runtime_error("I/O error"); } } - static void read_bytes(std::istream& in, void* buf, std::streamsize len) { + static void read_bytes(std::istream& in, void* buf, std::streamsize len, read_error& ec) { in.read(reinterpret_cast(buf), len); // NOLINT(*-reinterpret-cast) if (in.eof()) { - LOG_LP(ERROR) << "this log entry is broken: unexpected EOF"; - throw std::runtime_error("unexpected EOF"); + ec.value(read_error::short_entry); + return; } } }; diff --git a/src/limestone/parse_wal_file.cpp b/src/limestone/parse_wal_file.cpp new file mode 100644 index 00000000..27135c2a --- /dev/null +++ b/src/limestone/parse_wal_file.cpp @@ -0,0 +1,379 @@ +/* + * Copyright 2024-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include "logging_helper.h" + +#include +#include "dblog_scan.h" +#include "log_entry.h" + +namespace limestone::internal { +using namespace limestone::api; + +void invalidate_epoch_snippet(boost::filesystem::fstream& strm) { + auto pos = strm.tellg(); + strm.seekp(-9, std::ios::cur); // size of marker_begin entry + char buf = static_cast(log_entry::entry_type::marker_invalidated_begin); + strm.write(&buf, sizeof(char)); + strm.flush(); + strm.seekg(pos, std::ios::beg); // restore position +} + +void invalidate_epoch_snippet(boost::filesystem::fstream& strm, std::streampos fpos_head_of_epoch_snippet) { + auto pos = strm.tellg(); + strm.seekp(fpos_head_of_epoch_snippet, std::ios::beg); + char buf = static_cast(log_entry::entry_type::marker_invalidated_begin); + strm.write(&buf, sizeof(char)); + strm.flush(); + // TODO fsync + strm.seekg(pos, std::ios::beg); // restore position +} + +// LOGFORMAT_v1 pWAL syntax + +// parser rule (naive, base idea) +// pwal_file = wal_header epoch_snippets (EOF) +// wal_header = (empty) +// epoch_snippets = epoch_snippet epoch_snippets +// | (empty) +// epoch_snippet = snippet_header log_entries snippet_footer +// snippet_header = marker_begin +// | marker_invalidated_begin +// log_entries = log_entry log_entries +// | (empty) +// log_entry = normal_entry +// | remove_entry +// snippet_footer = (empty) +// +// parser rule (with error-handle) +// pwal_file = wal_header epoch_snippets (EOF) +// wal_header = (empty) +// epoch_snippets = epoch_snippet epoch_snippets +// | (empty) +// epoch_snippet = { head_pos := ... } snippet_header log_entries snippet_footer +// snippet_header = marker_begin { max-epoch := max(...); if (epoch <= ld) { valid := true } else { valid := false, error-nondurable } } +// | marker_invalidated_begin { max-epoch := max(...); valid := false } +// | SHORT_marker_begin { error-truncated } // TAIL +// | SHORT_marker_inv_begin { } // TAIL +// | UNKNOWN_TYPE_entry { if (valid) error-broken-snippet-header } // TAIL // use previous 'valid' +// log_entries = log_entry log_entries +// | (empty) +// log_entry = normal_entry { if (valid) process-entry } +// | remove_entry { if (valid) process-entry } +// | SHORT_normal_entry { if (valid) error-truncated } // TAIL +// | SHORT_remove_entry { if (valid) error-truncated } // TAIL +// | UNKNOWN_TYPE_entry { if (valid) error-damaged-entry } // TAIL +// snippet_footer = (empty) + +// lexer rule (see log_entry.h) +// marker_begin = 0x02 epoch +// marker_invalidated_begin = 0x06 epoch +// normal_entry = 0x01 key_length value_length storage_id key(key_length) write_version_major write_version_minor value(value_length) +// remove_entry = 0x05 key_length storage_id key(key_length) writer_version_major writer_version_minor +// marker_durable = 0x04 epoch +// marker_end = 0x03 epoch +// epoch = int64le +// key_length = int32le +// value_length = int32le +// storage_id = int64le +// write_version_major = int64le +// write_version_minor = int64le +// SHORT_marker_begin = 0x02 byte(0-7) +// SHORT_marker_inv_begin = 0x06 byte(0-7) +// SHORT_normal_entry = 0x01 key_length value_length storage_id key(key_length) write_version_major write_version_minor value( END +// marker_begin : { head_pos := ...; max-epoch := max(...); if (epoch <= ld) { valid := true } else { valid := false, error-nondurable } } -> loop +// marker_invalidated_begin : { head_pos := ...; max-epoch := max(...); valid := false } -> loop +// SHORT_marker_begin : { head_pos := ...; error-truncated } -> END +// SHORT_marker_inv_begin : { head_pos := ... } -> END +// UNKNOWN_TYPE_entry : { error-broken-snippet-header } -> END +// else : { err_unexpected } -> END +// loop: +// normal_entry : { if (valid) process-entry } -> loop +// remove_entry : { if (valid) process-entry } -> loop +// eof : {} -> END +// marker_begin : { head_pos := ...; max-epoch := max(...); if (epoch <= ld) { valid := true } else { valid := false, error-nondurable } } -> loop +// marker_invalidated_begin : { head_pos := ...; max-epoch := max(...); valid := false } -> loop +// SHORT_normal_entry : { if (valid) error-truncated } -> END +// SHORT_remove_entry : { if (valid) error-truncated } -> END +// SHORT_marker_begin : { head_pos := ...; error-truncated } -> END +// SHORT_marker_inv_begin : { head_pos := ... } -> END +// UNKNOWN_TYPE_entry : { if (valid) error-damaged-entry } -> END + + +// scan the file, and check max epoch number in this file +epoch_id_type dblog_scan::scan_one_pwal_file( // NOLINT(readability-function-cognitive-complexity) + const boost::filesystem::path& p, epoch_id_type ld_epoch, + const std::function& add_entry, + const error_report_func_t& report_error, + parse_error& pe) { + VLOG_LP(log_info) << "processing pwal file: " << p.filename().string(); + epoch_id_type current_epoch{UINT64_MAX}; + epoch_id_type max_epoch_of_file{0}; + log_entry::read_error ec{0}; + + log_entry e; + auto err_unexpected = [&](){ + ec.value(log_entry::read_error::unexpected_type); + ec.entry_type(e.type()); + report_error(ec); + }; + boost::filesystem::fstream strm; + strm.open(p, std::ios_base::in | std::ios_base::out | std::ios_base::binary); + bool valid = true; // scanning in the normal (not-invalidated) epoch snippet + bool invalidated_wrote = true; + bool first = true; + ec.value(0); + std::streampos fpos_epoch_snippet; + while (true) { + auto fpos_before_read_entry = strm.tellg(); + bool data_remains = e.read_entry_from(strm, ec); + VLOG_LP(45) << "read: { ec:" << ec.value() << " : " << ec.message() << ", data_remains:" << data_remains << ", e:" << static_cast(e.type()) << "}"; + lex_token tok{ec, data_remains, e}; + VLOG_LP(45) << "token: " << static_cast(tok.value()); + bool aborted = false; + switch (tok.value()) { + case lex_token::token_type::normal_entry: + case lex_token::token_type::remove_entry: +// normal_entry | remove_entry : (not 1st) { if (valid) process-entry } -> loop + if (!first) { + if (valid) { + add_entry(e); + } + } else { + err_unexpected(); + if (fail_fast_) aborted = true; + } + break; + case lex_token::token_type::eof: + aborted = true; + break; + case lex_token::token_type::marker_begin: { +// marker_begin : { head_pos := ...; max-epoch := max(...); if (epoch <= ld) { valid := true } else { valid := false, error-nondurable } } -> loop + fpos_epoch_snippet = fpos_before_read_entry; + current_epoch = e.epoch_id(); + max_epoch_of_file = std::max(max_epoch_of_file, current_epoch); + if (current_epoch <= ld_epoch) { + valid = true; + VLOG_LP(45) << "valid: true"; + } else { + // exists-epoch-snippet-after-durable-epoch + switch (process_at_nondurable_) { + case process_at_nondurable::ignore: + invalidated_wrote = false; + break; + case process_at_nondurable::repair_by_mark: + invalidate_epoch_snippet(strm); + invalidated_wrote = true; + break; + // case process_at_nondurable::repair_by_cut: + // throw std::runtime_error("unimplemented repair method"); + case process_at_nondurable::report: + invalidated_wrote = false; + log_entry::read_error nondurable(log_entry::read_error::nondurable_snippet); + report_error(nondurable); + } + valid = false; + VLOG_LP(45) << "valid: false"; + } + break; + } + case lex_token::token_type::marker_invalidated_begin: { +// marker_invalidated_begin : { head_pos := ...; max-epoch := max(...); valid := false } -> loop + fpos_epoch_snippet = fpos_before_read_entry; + max_epoch_of_file = std::max(max_epoch_of_file, e.epoch_id()); + invalidated_wrote = true; + valid = false; + VLOG_LP(45) << "valid: false"; + break; + } + case lex_token::token_type::SHORT_normal_entry: + case lex_token::token_type::SHORT_remove_entry: { +// SHORT_normal_entry | SHORT_remove_entry : (not 1st) { if (valid) error-truncated } -> END + if (first) { + err_unexpected(); + } else { + switch (process_at_truncated_) { + case process_at_truncated::ignore: + break; + case process_at_truncated::repair_by_mark: + invalidate_epoch_snippet(strm, fpos_epoch_snippet); + pe = parse_error(parse_error::broken_after_marked, fpos_epoch_snippet); + break; + case process_at_truncated::repair_by_cut: + pe = parse_error(parse_error::broken_after_tobe_cut, fpos_epoch_snippet); + break; + case process_at_truncated::report: + if (valid) { // ignore short in invalidated blocks + report_error(ec); + } else { + if (invalidated_wrote) { + // + } else { + report_error(ec); + pe = parse_error(parse_error::broken_after, fpos_epoch_snippet); + } + } + } + } + aborted = true; + break; + } + case lex_token::token_type::SHORT_marker_begin: { +// SHORT_marker_begin : { head_pos := ...; error-truncated } -> END + fpos_epoch_snippet = fpos_before_read_entry; + switch (process_at_truncated_) { + case process_at_truncated::ignore: + break; + case process_at_truncated::repair_by_mark: + invalidate_epoch_snippet(strm, fpos_epoch_snippet); + pe = parse_error(parse_error::broken_after_marked, fpos_epoch_snippet); + break; + case process_at_truncated::repair_by_cut: + pe = parse_error(parse_error::broken_after_tobe_cut, fpos_epoch_snippet); + break; + case process_at_truncated::report: + report_error(ec); + pe = parse_error(parse_error::broken_after, fpos_epoch_snippet); + } + aborted = true; + break; + } + case lex_token::token_type::SHORT_marker_inv_begin: { +// SHORT_marker_inv_begin : { head_pos := ... } -> END + fpos_epoch_snippet = fpos_before_read_entry; + // ignore short in invalidated blocks + aborted = true; + break; + } + case lex_token::token_type::UNKNOWN_TYPE_entry: { +// UNKNOWN_TYPE_entry : (not 1st) { if (valid) error-damaged-entry } -> END +// UNKNOWN_TYPE_entry : (1st) { error-broken-snippet-header } -> END + if (first) { + err_unexpected(); // FIXME: error type + } else { + switch (process_at_damaged_) { + case process_at_damaged::ignore: + break; + case process_at_damaged::repair_by_mark: + invalidate_epoch_snippet(strm, fpos_epoch_snippet); + pe = parse_error(parse_error::broken_after_marked, fpos_epoch_snippet); + break; + case process_at_damaged::repair_by_cut: + pe = parse_error(parse_error::broken_after_tobe_cut, fpos_epoch_snippet); + break; + case process_at_damaged::report: + if (valid) { + report_error(ec); + pe = parse_error(parse_error::broken_after, fpos_epoch_snippet); + } else { + pe = parse_error(parse_error::broken_after_marked, fpos_epoch_snippet); + } + } + } + aborted = true; + break; + } + default: + // unexpected log_entry; may be logical error of program, not by disk damage + err_unexpected(); + if (tok.value() >= lex_token::token_type::SHORT_normal_entry || fail_fast_) { + aborted = true; + } + pe = parse_error(parse_error::unexpected, fpos_before_read_entry); // point to this log_entry + } + if (aborted) break; + first = false; + } + strm.close(); + if (pe.value() == parse_error::broken_after_tobe_cut) { + // DO trim + // TODO: check byte at fpos is 0x02 or 0x06 + boost::filesystem::resize_file(p, pe.fpos()); + VLOG(40) << "trimmed at " << pe.fpos(); + pe.value(parse_error::repaired); + } + return max_epoch_of_file; +} + +} diff --git a/test/limestone/log/dblog_scan_test.cpp b/test/limestone/log/dblog_scan_test.cpp new file mode 100644 index 00000000..8ffcfd15 --- /dev/null +++ b/test/limestone/log/dblog_scan_test.cpp @@ -0,0 +1,209 @@ + +#include +#include +#include + +#include + +#include "dblog_scan.h" +#include "internal.h" +#include "log_entry.h" + +#include "test_root.h" + +namespace limestone::testing { + +using namespace std::literals; +using namespace limestone::api; +using namespace limestone::internal; + +extern void create_file(boost::filesystem::path path, std::string_view content); + +class dblog_scan_test : public ::testing::Test { +public: +static constexpr const char* location = "/tmp/dblog_scan_test"; + + void SetUp() { + boost::filesystem::remove_all(location); + if (!boost::filesystem::create_directory(location)) { + std::cerr << "cannot make directory" << std::endl; + } + } + + void TearDown() { + boost::filesystem::remove_all(location); + } + + void set_inspect_mode(dblog_scan& ds) { + ds.set_process_at_nondurable_epoch_snippet(dblog_scan::process_at_nondurable::report); + ds.set_process_at_truncated_epoch_snippet(dblog_scan::process_at_truncated::report); + ds.set_process_at_damaged_epoch_snippet(dblog_scan::process_at_damaged::report); + ds.set_fail_fast(false); + } + bool starts_with(std::string a, std::string b) { return a.substr(0, b.length()) == b; } + + std::vector list_dir() { + std::vector ret; + for (const boost::filesystem::path& p : boost::filesystem::directory_iterator(boost::filesystem::path(location))) { + if (dblog_scan::is_wal(p)) { + ret.emplace_back(p); + } + } + return ret; + } + +}; + +// unit-test scan_one_pwal_file +// inspect the file filled zero +TEST_F(dblog_scan_test, scan_one_pwal_file_zero) { + auto p = boost::filesystem::path(location) / "pwal_0000"; + create_file(p, + "\x02\xff\x00\x00\x00\x00\x00\x00\x00" // marker_begin 0xff + // XXX: epoch footer... + "\x02\x01\x01\x00\x00\x00\x00\x00\x00" // marker_begin 0x101 + // XXX: epoch footer... + "\x00\x00\x00\x00\x00\x00\x00\x00\x00" // UNKNOWN_TYPE_entry + // XXX: epoch footer... + ""sv); + + dblog_scan ds{boost::filesystem::path(location)}; + ds.set_thread_num(1); + set_inspect_mode(ds); + dblog_scan::parse_error pe; + std::vector errors; + + EXPECT_EQ(ds.scan_one_pwal_file(p, 0x100, [](log_entry& e){ + VLOG(30) << static_cast(e.type()); + }, [&errors](log_entry::read_error& re){ + VLOG(30) << re.message(); + errors.emplace_back(re); + return false; + }, pe), 0x101); + EXPECT_EQ(errors.size(), 1); + EXPECT_EQ(pe.value(), dblog_scan::parse_error::broken_after_marked); + EXPECT_EQ(pe.fpos(), 9); +} + +// unit-test scan_one_pwal_file +// inspect the file truncated on log_entries +TEST_F(dblog_scan_test, scan_one_pwal_file_trunc_entry) { + auto p = boost::filesystem::path(location) / "pwal_0000"; + create_file(p, + "\x02\xff\x00\x00\x00\x00\x00\x00\x00" // marker_begin 0xff + // XXX: epoch footer... + "\x02\x01\x01\x00\x00\x00\x00\x00\x00" // marker_begin 0x101 + "\x01\x04\x00\x00\x00\x04\x00\x00\x00" // SHORT_normal_entry + // XXX: epoch footer... + ""sv); + + dblog_scan ds{boost::filesystem::path(location)}; + ds.set_thread_num(1); + set_inspect_mode(ds); + dblog_scan::parse_error pe; + std::vector errors; + + EXPECT_EQ(ds.scan_one_pwal_file(p, 0x100, [](log_entry& e){ + VLOG(30) << static_cast(e.type()); + }, [&errors](log_entry::read_error& re){ + VLOG(30) << re.message(); + errors.emplace_back(re); + return false; + }, pe), 0x101); + EXPECT_EQ(errors.size(), 2); // nondurable, short + EXPECT_EQ(pe.value(), dblog_scan::parse_error::broken_after); + EXPECT_EQ(pe.fpos(), 9); +} + +// unit-test scan_one_pwal_file +// inspect the file truncated on epoch_snippet_header +TEST_F(dblog_scan_test, scan_one_pwal_file_trunc_epoch_header) { + auto p = boost::filesystem::path(location) / "pwal_0000"; + create_file(p, + "\x02\xff\x00\x00\x00\x00\x00\x00\x00" // marker_begin 0xff + "\x01\x04\x00\x00\x00\x04\x00\x00\x00" "storage1" "1234" "vermajor" "verminor" "1234" // normal_entry + // XXX: epoch footer... + // offset 50 + "\x02\x01\x01\x00\x00\x00\x00\x00" // SHORT_marker_begin + ""sv); + + dblog_scan ds{boost::filesystem::path(location)}; + ds.set_thread_num(1); + set_inspect_mode(ds); + + dblog_scan::parse_error pe; + std::vector errors; + EXPECT_EQ(ds.scan_one_pwal_file(p, 0x100, [](const log_entry& e){ + VLOG(30) << static_cast(e.type()); + }, [&errors](log_entry::read_error& re){ + VLOG(30) << re.message(); + errors.emplace_back(re); + return false; + }, pe), 0xff); + EXPECT_EQ(errors.size(), 1); // truncated + EXPECT_EQ(pe.value(), dblog_scan::parse_error::broken_after); + EXPECT_EQ(pe.fpos(), 50); // after correct epoch snippet +} + +// unit-test detach_wal_files; normal non-detached pwal files are renamed (rotated) +TEST_F(dblog_scan_test, detach_wal_files_renamne_pwal_0000) { + auto p0_attached = boost::filesystem::path(location) / "pwal_0000"; + create_file(p0_attached, + "\x02\xff\x00\x00\x00\x00\x00\x00\x00" // marger_begin 0xff + // XXX: epoch footer... + ""sv); + { + auto wal_files = list_dir(); + ASSERT_EQ(wal_files.size(), 1); + ASSERT_EQ(wal_files.at(0), p0_attached); + } + dblog_scan ds{boost::filesystem::path(location)}; + ds.detach_wal_files(); + { // rotated + auto wal_files = list_dir(); + EXPECT_EQ(wal_files.size(), 1); + EXPECT_NE(wal_files.at(0), p0_attached); + EXPECT_GT(wal_files.at(0).filename().string().length(), 10); + } +} + +// unit-test detach_wal_files; empty pwal files are skipped +TEST_F(dblog_scan_test, detach_wal_files_skip_rename_empty_pwal) { + auto p0_attached_empty = boost::filesystem::path(location) / "pwal_0000"; + create_file(p0_attached_empty, ""sv); + { + auto wal_files = list_dir(); + ASSERT_EQ(wal_files.size(), 1); + ASSERT_EQ(wal_files.at(0), p0_attached_empty); + } + dblog_scan ds{boost::filesystem::path(location)}; + ds.detach_wal_files(); + { // no change + auto wal_files = list_dir(); + EXPECT_EQ(wal_files.size(), 1); + EXPECT_EQ(wal_files.at(0), p0_attached_empty); + } +} + +// unit-test detach_wal_files; detached (rotated) pwal files are skipped +TEST_F(dblog_scan_test, detach_wal_files_skip_rename_pwal_0000_somewhat) { + auto p0_detached = boost::filesystem::path(location) / "pwal_0000.somewhat"; + create_file(p0_detached, + "\x02\xff\x00\x00\x00\x00\x00\x00\x00" // marger_begin 0xff + // XXX: epoch footer... + ""sv); + { + auto wal_files = list_dir(); + ASSERT_EQ(wal_files.size(), 1); + ASSERT_EQ(wal_files.at(0), p0_detached); + } + dblog_scan ds{boost::filesystem::path(location)}; + ds.detach_wal_files(); + { // no change + auto wal_files = list_dir(); + EXPECT_EQ(wal_files.size(), 1); + EXPECT_EQ(wal_files.at(0), p0_detached); + } +} + +} // namespace limestone::testing diff --git a/test/limestone/log/durable_test.cpp b/test/limestone/log/durable_test.cpp index 6c633bc0..f2b5e0b1 100644 --- a/test/limestone/log/durable_test.cpp +++ b/test/limestone/log/durable_test.cpp @@ -188,7 +188,8 @@ TEST_F(durable_test, ut_scan_one_pwal_file_nondurable_entry) { } } -TEST_F(durable_test, ut_scan_one_pwal_file_broken_entry_trimmed) { +// broken entry in non durable epoch is ignored +TEST_F(durable_test, ut_scan_one_pwal_file_broken_entry_nondurable_trimmed) { using namespace limestone::api; boost::filesystem::path pwal(location); @@ -205,8 +206,29 @@ TEST_F(durable_test, ut_scan_one_pwal_file_broken_entry_trimmed) { } auto add_entry = [](log_entry&){ /* nop */ }; + EXPECT_EQ(limestone::internal::scan_one_pwal_file(pwal, 42, add_entry), 43); +} + +// broken entry in durable epoch is error +TEST_F(durable_test, ut_scan_one_pwal_file_broken_entry_trimmed) { + using namespace limestone::api; + + boost::filesystem::path pwal(location); + pwal /= "pwal"; + { // make pwal file for test + FILE *f = fopen(pwal.c_str(), "w"); + log_entry::begin_session(f, 42); + log_entry::write(f, 1, "k1", "v1", {42, 1}); + log_entry::begin_session(f, 43); + // make broken entry + fputc(static_cast(log_entry::entry_type::normal_entry), f); + fputc(99, f); // the end of file is missing + fclose(f); + } + auto add_entry = [](log_entry&){ /* nop */ }; + EXPECT_THROW({ - limestone::internal::scan_one_pwal_file(pwal, 42, add_entry); + limestone::internal::scan_one_pwal_file(pwal, 43, add_entry); }, std::exception); } diff --git a/test/limestone/log/log_dir_test.cpp b/test/limestone/log/log_dir_test.cpp index d7436c80..a2cc0868 100644 --- a/test/limestone/log/log_dir_test.cpp +++ b/test/limestone/log/log_dir_test.cpp @@ -3,15 +3,19 @@ #include +#include "dblog_scan.h" #include "internal.h" #include "log_entry.h" #include "test_root.h" using namespace std::literals; +using dblog_scan = limestone::internal::dblog_scan; namespace limestone::testing { +extern void create_file(boost::filesystem::path path, std::string_view content); + class log_dir_test : public ::testing::Test { public: static constexpr const char* location = "/tmp/log_dir_test"; @@ -44,11 +48,19 @@ static_assert(epoch_0_str.length() == 9); static bool is_pwal(const boost::filesystem::path& p) { return starts_with(p.filename().string(), "pwal"); } static void ignore_entry(limestone::api::log_entry&) {} + void create_mainfest_file(int persistent_format_version = 1) { + create_file(manifest_path, "{ \"format_version\": \"1.0\", \"persistent_format_version\": " + std::to_string(persistent_format_version) + " }"); + } + protected: std::unique_ptr datastore_{}; }; -extern void create_file(boost::filesystem::path path, std::string_view content); +void create_file_2(boost::filesystem::path& p, std::function make_file) { + FILE *strm = fopen(p.c_str(), "w"); + make_file(strm); + fclose(strm); +} TEST_F(log_dir_test, newly_created_directory_contains_manifest_file) { gen_datastore(); @@ -88,21 +100,21 @@ TEST_F(log_dir_test, reject_directory_only_broken_manifest_file2) { TEST_F(log_dir_test, accept_directory_with_correct_manifest_file) { create_file(boost::filesystem::path(location) / "epoch", epoch_0_str); - create_file(manifest_path, "{ \"format_version\": \"1.0\", \"persistent_format_version\": 1 }"); + create_mainfest_file(); gen_datastore(); limestone::internal::check_logdir_format(location); // success } TEST_F(log_dir_test, accept_directory_only_correct_manifest_file) { - create_file(manifest_path, "{ \"format_version\": \"1.0\", \"persistent_format_version\": 1 }"); + create_mainfest_file(); gen_datastore(); limestone::internal::check_logdir_format(location); // success } TEST_F(log_dir_test, reject_directory_of_different_version) { - create_file(manifest_path, "{ \"format_version\": \"1.0\", \"persistent_format_version\": 222 }"); + create_mainfest_file(222); gen_datastore(); EXPECT_THROW({ limestone::internal::check_logdir_format(location); }, std::exception); @@ -240,7 +252,7 @@ TEST_F(log_dir_test, rotate_prusik_rejects_corrupted_dir) { } TEST_F(log_dir_test, scan_pwal_files_in_dir_returns_max_epoch_normal) { - create_file(manifest_path, "{ \"format_version\": \"1.0\", \"persistent_format_version\": 2 }"); + create_mainfest_file(); // not used create_file(boost::filesystem::path(location) / "epoch", "\x04\x00\x01\x00\x00\x00\x00\x00\x00"sv); // not used create_file(boost::filesystem::path(location) / "pwal_0000", "\x02\xff\x00\x00\x00\x00\x00\x00\x00" @@ -249,12 +261,16 @@ TEST_F(log_dir_test, scan_pwal_files_in_dir_returns_max_epoch_normal) { // XXX: epoch footer... ""sv); - gen_datastore(); - EXPECT_EQ(limestone::internal::scan_pwal_files_in_dir(location, 2, is_pwal, 0x100, ignore_entry), 0x100); + // gen_datastore(); + // EXPECT_EQ(limestone::internal::scan_pwal_files_in_dir(location, 2, is_pwal, 0x100, ignore_entry), 0x100); + limestone::internal::dblog_scan ds{boost::filesystem::path(location)}; + ds.set_thread_num(2); + ds.set_process_at_nondurable_epoch_snippet(dblog_scan::process_at_nondurable::report); + EXPECT_EQ(ds.scan_pwal_files_throws(0x100, ignore_entry), 0x100); } TEST_F(log_dir_test, scan_pwal_files_in_dir_returns_max_epoch_nondurable) { - create_file(manifest_path, "{ \"format_version\": \"1.0\", \"persistent_format_version\": 2 }"); + create_mainfest_file(); // not used create_file(boost::filesystem::path(location) / "epoch", "\x04\x00\x01\x00\x00\x00\x00\x00\x00"sv); // not used create_file(boost::filesystem::path(location) / "pwal_0000", "\x02\xff\x00\x00\x00\x00\x00\x00\x00" @@ -263,12 +279,17 @@ TEST_F(log_dir_test, scan_pwal_files_in_dir_returns_max_epoch_nondurable) { // XXX: epoch footer... ""sv); - gen_datastore(); - EXPECT_EQ(limestone::internal::scan_pwal_files_in_dir(location, 2, is_pwal, 0x100, ignore_entry), 0x101); + // gen_datastore(); + // EXPECT_EQ(limestone::internal::scan_pwal_files_in_dir(location, 2, is_pwal, 0x100, ignore_entry), 0x101); + limestone::internal::dblog_scan ds{boost::filesystem::path(location)}; + ds.set_thread_num(2); + ds.set_process_at_nondurable_epoch_snippet(dblog_scan::process_at_nondurable::report); + ds.set_fail_fast(false); + EXPECT_EQ(ds.scan_pwal_files(0x100, ignore_entry, [](limestone::api::log_entry::read_error&){return false;}), 0x101); } TEST_F(log_dir_test, scan_pwal_files_in_dir_rejects_unexpected_EOF) { - create_file(manifest_path, "{ \"format_version\": \"1.0\", \"persistent_format_version\": 2 }"); + create_mainfest_file(); // not used create_file(boost::filesystem::path(location) / "epoch", "\x04\x00\x01\x00\x00\x00\x00\x00\x00"sv); // not used create_file(boost::filesystem::path(location) / "pwal_0000", "\x02\xff\x00\x00\x00\x00\x00\x00\x00" @@ -276,14 +297,20 @@ TEST_F(log_dir_test, scan_pwal_files_in_dir_rejects_unexpected_EOF) { "\x02\x01\x01\x00\x00\x00" ""sv); - gen_datastore(); + // gen_datastore(); + // EXPECT_THROW({ + // limestone::internal::scan_pwal_files_in_dir(location, 2, is_pwal, 0x100, ignore_entry); + // }, std::exception); + limestone::internal::dblog_scan ds{boost::filesystem::path(location)}; + ds.set_thread_num(2); + ds.set_process_at_nondurable_epoch_snippet(dblog_scan::process_at_nondurable::report); EXPECT_THROW({ - limestone::internal::scan_pwal_files_in_dir(location, 2, is_pwal, 0x100, ignore_entry); + ds.scan_pwal_files_throws(0x100, ignore_entry); }, std::exception); } TEST_F(log_dir_test, scan_pwal_files_in_dir_rejects_unexpeced_zeros) { - create_file(manifest_path, "{ \"format_version\": \"1.0\", \"persistent_format_version\": 2 }"); + create_mainfest_file(); // not used create_file(boost::filesystem::path(location) / "epoch", "\x04\x00\x01\x00\x00\x00\x00\x00\x00"sv); // not used create_file(boost::filesystem::path(location) / "pwal_0000", "\x02\xff\x00\x00\x00\x00\x00\x00\x00" @@ -291,14 +318,20 @@ TEST_F(log_dir_test, scan_pwal_files_in_dir_rejects_unexpeced_zeros) { "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" ""sv); - gen_datastore(); + // gen_datastore(); + // EXPECT_THROW({ + // limestone::internal::scan_pwal_files_in_dir(location, 2, is_pwal, 0x100, ignore_entry); + // }, std::exception); + limestone::internal::dblog_scan ds{boost::filesystem::path(location)}; + ds.set_thread_num(2); + ds.set_process_at_nondurable_epoch_snippet(dblog_scan::process_at_nondurable::report); EXPECT_THROW({ - limestone::internal::scan_pwal_files_in_dir(location, 2, is_pwal, 0x100, ignore_entry); + ds.scan_pwal_files_throws(0x100, ignore_entry); }, std::exception); } TEST_F(log_dir_test, ut_purge_dir_ok_file1) { - create_file(manifest_path, "{ \"format_version\": \"1.0\", \"persistent_format_version\": 2 }"); + create_mainfest_file(); // not used ASSERT_FALSE(boost::filesystem::is_empty(location)); ASSERT_EQ(internal::purge_dir(location), status::ok);