Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ban-nobuhiro committed Mar 11, 2024
1 parent 4cf3574 commit 9357180
Show file tree
Hide file tree
Showing 11 changed files with 1,258 additions and 216 deletions.
14 changes: 14 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,17 @@ target_link_libraries(limestone-impl
INTERFACE ${sort_lib}
INTERFACE nlohmann_json::nlohmann_json
)

# utils
file(GLOB DBLOGUTIL_SOURCES
"limestone/dblogutil/*.cpp"
)


add_executable(dblogutil ${DBLOGUTIL_SOURCES})
target_include_directories(dblogutil
PRIVATE .
PRIVATE ./limestone
)
target_link_libraries(dblogutil PRIVATE limestone-impl PRIVATE glog::glog gflags::gflags)
install_custom(dblogutil dblogutil)
174 changes: 8 additions & 166 deletions src/limestone/datastore_snapshot.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Project Tsurugi.
* Copyright 2022-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,71 +25,17 @@
#include "logging_helper.h"

#include <limestone/api/datastore.h>
#include "dblog_scan.h"
#include "internal.h"
#include "log_entry.h"
#include "sortdb_wrapper.h"

namespace limestone::internal {
using namespace limestone::api;

// return max epoch in file.
std::optional<epoch_id_type> last_durable_epoch(const boost::filesystem::path& file) {
std::optional<epoch_id_type> rv;

boost::filesystem::ifstream istrm;
log_entry e;
istrm.open(file, std::ios_base::in | std::ios_base::binary);
while (e.read(istrm)) {
if (e.type() != log_entry::entry_type::marker_durable) {
LOG_LP(ERROR) << "this epoch file is broken: unexpected log_entry type: " << static_cast<int>(e.type());
throw std::runtime_error("unexpected log_entry type for epoch file");
}
if (!rv.has_value() || e.epoch_id() > rv) {
rv = e.epoch_id();
}
}
istrm.close();
return rv;
}

}

namespace limestone::api {
using namespace limestone::internal;

constexpr std::size_t write_version_size = sizeof(epoch_id_type) + sizeof(std::uint64_t);
static_assert(write_version_size == 16);

epoch_id_type datastore::last_durable_epoch_in_dir() {
auto& from_dir = location_;
// read main epoch file first
std::optional<epoch_id_type> ld_epoch = last_durable_epoch(from_dir / std::string(epoch_file_name));
if (ld_epoch.has_value()) {
return *ld_epoch;
}

// main epoch file is empty,
// read all rotated-epoch files
for (const boost::filesystem::path& p : boost::filesystem::directory_iterator(from_dir)) {
if (p.filename().string().rfind(epoch_file_name, 0) == 0) { // starts_with(epoch_file_name)
// this is epoch file (main one or rotated)
std::optional<epoch_id_type> epoch = last_durable_epoch(p);
if (!epoch.has_value()) {
continue; // file is empty
}
// ld_epoch = max(ld_epoch, epoch)
if (!ld_epoch.has_value() || *ld_epoch < *epoch) {
ld_epoch = epoch;
}
}
}
return ld_epoch.value_or(0); // 0 = minimum epoch
}

}

namespace limestone::internal {
using namespace limestone::api;

[[maybe_unused]]
static void store_bswap64_value(void *dest, const void *src) {
auto* p64_dest = reinterpret_cast<std::uint64_t*>(dest); // NOLINT(*-reinterpret-cast)
Expand All @@ -107,121 +53,16 @@ static int comp_twisted_key(const std::string_view& a, const std::string_view& b
return std::memcmp(b.data(), a.data(), write_version_size);
}

epoch_id_type scan_one_pwal_file(const boost::filesystem::path& p, epoch_id_type ld_epoch, const std::function<void(log_entry&)>& add_entry) {
VLOG_LP(log_info) << "processing pwal file: " << p.filename().string();
log_entry e;
epoch_id_type current_epoch{UINT64_MAX};
epoch_id_type max_epoch_of_file{0};

boost::filesystem::fstream strm;
strm.open(p, std::ios_base::in | std::ios_base::out | std::ios_base::binary);
bool skipping = false; // scanning in the invalidated epoch snippet
while (e.read(strm)) {
switch (e.type()) {
case log_entry::entry_type::marker_begin: {
current_epoch = e.epoch_id();
max_epoch_of_file = std::max(max_epoch_of_file, current_epoch);
if (current_epoch <= ld_epoch) {
skipping = false;
} else {
auto pos = strm.tellg();
strm.seekp(-9, std::ios::cur); // size of marker_begin entry
char buf = static_cast<char>(log_entry::entry_type::marker_invalidated_begin);
strm.write(&buf, sizeof(char));
strm.flush();
strm.seekg(pos, std::ios::beg); // restore position
skipping = true;
}
break;
}
case log_entry::entry_type::marker_invalidated_begin: {
max_epoch_of_file = std::max(max_epoch_of_file, e.epoch_id());
skipping = true;
break;
}
case log_entry::entry_type::normal_entry:
case log_entry::entry_type::remove_entry: {
if (!skipping) {
add_entry(e);
}
break;
}
default:
break;
}
}
strm.close();
return max_epoch_of_file;
}

epoch_id_type scan_pwal_files_in_dir(const boost::filesystem::path& from_dir, int num_worker,
const std::function<bool(const boost::filesystem::path&)>& is_wal,
epoch_id_type ld_epoch, const std::function<void(log_entry&)>& add_entry) {
std::atomic<epoch_id_type> max_appeared_epoch{ld_epoch};
auto process_file = [&](const boost::filesystem::path& p) {
if (is_wal(p)) {
epoch_id_type max_epoch_of_file = scan_one_pwal_file(p, ld_epoch, add_entry);
epoch_id_type t = max_appeared_epoch.load();
while (t < max_epoch_of_file
&& !max_appeared_epoch.compare_exchange_weak(t, max_epoch_of_file)) {
/* nop */
}
}
};
std::mutex dir_mtx;
auto dir_begin = boost::filesystem::directory_iterator(from_dir);
auto dir_end = boost::filesystem::directory_iterator();
std::vector<std::thread> workers;
std::mutex ex_mtx;
std::exception_ptr ex_ptr{};
workers.reserve(num_worker);
for (int i = 0; i < num_worker; i++) {
workers.emplace_back(std::thread([&](){
for (;;) {
boost::filesystem::path p;
{
std::lock_guard<std::mutex> g{dir_mtx};
if (dir_begin == dir_end) break;
p = *dir_begin++;
}
try {
process_file(p);
} catch (std::runtime_error& ex) {
VLOG(log_info) << "/:limestone catch runtime_error(" << ex.what() << ")";
std::lock_guard<std::mutex> g2{ex_mtx};
if (!ex_ptr) { // only save one
ex_ptr = std::current_exception();
}
std::lock_guard<std::mutex> g{dir_mtx};
dir_begin = dir_end; // skip all unprocessed files
break;
}
}
}));
}
for (int i = 0; i < num_worker; i++) {
workers[i].join();
}
if (ex_ptr) {
std::rethrow_exception(ex_ptr);
}
return max_appeared_epoch;
}

}

namespace limestone::api {
using namespace limestone::internal;

void datastore::create_snapshot() {
auto& from_dir = location_;
#if defined SORT_METHOD_PUT_ONLY
auto sortdb = std::make_unique<sortdb_wrapper>(from_dir, comp_twisted_key);
#else
auto sortdb = std::make_unique<sortdb_wrapper>(from_dir);
#endif
dblog_scan logscan{location_};

epoch_id_type ld_epoch = last_durable_epoch_in_dir();
epoch_id_type ld_epoch = logscan.last_durable_epoch_in_dir();
epoch_id_switched_.store(ld_epoch + 1); // ??

[[maybe_unused]]
Expand Down Expand Up @@ -270,8 +111,9 @@ void datastore::create_snapshot() {
LOG(INFO) << "/:limestone:config:datastore this sort method does not work correctly with multi-thread, so force the number of recover process thread = 1";
num_worker = 1;
}
auto is_wal = [](const boost::filesystem::path& p){ return p.filename().string().substr(0, log_channel::prefix.length()) == log_channel::prefix; };
epoch_id_type max_appeared_epoch = scan_pwal_files_in_dir(from_dir, num_worker, is_wal, ld_epoch, add_entry);
logscan.set_thread_num(num_worker);
logscan.set_process_at_nondurable_epoch_snippet(dblog_scan::process_at_nondurable::repair_by_mark);
epoch_id_type max_appeared_epoch = logscan.scan_pwal_files_throws(ld_epoch, add_entry);
epoch_id_informed_.store(max_appeared_epoch);

boost::filesystem::path sub_dir = location_ / boost::filesystem::path(std::string(snapshot::subdirectory_name_));
Expand Down
Loading

0 comments on commit 9357180

Please sign in to comment.