Skip to content

Commit

Permalink
introduce log_entry::read_error
Browse files Browse the repository at this point in the history
  • Loading branch information
ban-nobuhiro committed Mar 12, 2024
1 parent 4cf3574 commit 03acf92
Showing 1 changed file with 81 additions and 21 deletions.
102 changes: 81 additions & 21 deletions src/limestone/log_entry.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Project Tsurugi.
* Copyright 2022-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,6 +45,45 @@ class log_entry {
remove_entry = 5,
marker_invalidated_begin = 6,
};
class read_error {
public:
enum code {
ok = 0,
// warning
nondurable_snippet = 0x01,
// error
short_entry = 0x81,
// unknown type; eg. type 0
unknown_type = 0x82,
// unexpected type; eg. add_entry at the head of pwal file or in epoch file
unexpected_type = 0x83,
};

read_error() noexcept : value_(ok) {}
explicit read_error(code value) noexcept : value_(value) {}
read_error(code value, log_entry::entry_type entry_type) noexcept : value_(value), entry_type_(entry_type) {}

void value(code value) noexcept { value_ = value; }
[[nodiscard]] code value() const noexcept { return value_; }
void entry_type(log_entry::entry_type entry_type) noexcept { entry_type_ = entry_type; }
[[nodiscard]] log_entry::entry_type entry_type() const noexcept { return entry_type_; }

explicit operator bool() const noexcept { return value_ != 0; }

[[nodiscard]] std::string message() const {
switch (value_) {
case ok: return "no error";
case nondurable_snippet: return "found nondurable epoch snippet";
case short_entry: return "unexpected EOF";
case unknown_type: return "unknown log_entry type " + std::to_string(static_cast<int>(entry_type_));
case unexpected_type: return "unexpected log_entry type " + std::to_string(static_cast<int>(entry_type_));
}
return "unknown error code " + std::to_string(value_);
}
private:
code value_;
log_entry::entry_type entry_type_{0};
};

log_entry() = default;

Expand Down Expand Up @@ -160,44 +199,66 @@ class log_entry {

// for reader
bool read(std::istream& strm) {
strm.read(&one_char_, sizeof(char));
entry_type_ = static_cast<entry_type>(one_char_);
read_error ec{};
bool rc = read_entry_from(strm, ec);
if (ec) {
LOG_LP(ERROR) << "this log_entry is broken: " << ec.message();
throw std::runtime_error(ec.message());
}
return rc;
}

bool read_entry_from(std::istream& strm, read_error& ec) {
ec.value(read_error::ok);
ec.entry_type(entry_type::this_id_is_not_used);
char one_char{};
strm.read(&one_char, sizeof(char));
entry_type_ = static_cast<entry_type>(one_char);
if (strm.eof()) {
return false;
}

switch(entry_type_) {
case entry_type::normal_entry:
{
std::size_t key_len = read_uint32le(strm);
std::size_t value_len = read_uint32le(strm);
std::size_t key_len = read_uint32le(strm, ec);
if (ec) return false;
std::size_t value_len = read_uint32le(strm, ec);
if (ec) return false;

key_sid_.resize(key_len + sizeof(storage_id_type));
read_bytes(strm, key_sid_.data(), static_cast<std::streamsize>(key_sid_.length()));
read_bytes(strm, key_sid_.data(), static_cast<std::streamsize>(key_sid_.length()), ec);
if (ec) return false;
value_etc_.resize(value_len + sizeof(epoch_id_type) + sizeof(std::uint64_t));
read_bytes(strm, value_etc_.data(), static_cast<std::streamsize>(value_etc_.length()));
read_bytes(strm, value_etc_.data(), static_cast<std::streamsize>(value_etc_.length()), ec);
if (ec) return false;
break;
}
case entry_type::remove_entry:
{
std::size_t key_len = read_uint32le(strm);
std::size_t key_len = read_uint32le(strm, ec);
if (ec) return false;

key_sid_.resize(key_len + sizeof(storage_id_type));
read_bytes(strm, key_sid_.data(), static_cast<std::streamsize>(key_sid_.length()));
read_bytes(strm, key_sid_.data(), static_cast<std::streamsize>(key_sid_.length()), ec);
if (ec) return false;
value_etc_.resize(sizeof(epoch_id_type) + sizeof(std::uint64_t));
read_bytes(strm, value_etc_.data(), static_cast<std::streamsize>(value_etc_.length()));
read_bytes(strm, value_etc_.data(), static_cast<std::streamsize>(value_etc_.length()), ec);
if (ec) return false;
break;
}
case entry_type::marker_begin:
case entry_type::marker_end:
case entry_type::marker_durable:
case entry_type::marker_invalidated_begin:
epoch_id_ = static_cast<epoch_id_type>(read_uint64le(strm));
epoch_id_ = static_cast<epoch_id_type>(read_uint64le(strm, ec));
if (ec) return false;
break;

default:
LOG_LP(ERROR) << "this log_entry is broken: unknown type: " << static_cast<int>(entry_type_);
throw std::runtime_error("unknown log_entry type");
ec.value(read_error::unknown_type);
ec.entry_type(entry_type_);
return false;
}

return true;
Expand Down Expand Up @@ -247,7 +308,6 @@ class log_entry {
epoch_id_type epoch_id_{};
std::string key_sid_{};
std::string value_etc_{};
char one_char_{};

static void write_uint8(FILE* out, const std::uint8_t value) {
int ret = fputc(value, out);
Expand All @@ -260,18 +320,18 @@ class log_entry {
std::uint32_t buf = htole32(value);
write_bytes(out, &buf, sizeof(std::uint32_t));
}
static std::uint32_t read_uint32le(std::istream& in) {
static std::uint32_t read_uint32le(std::istream& in, read_error& ec) {
std::uint32_t buf{};
read_bytes(in, &buf, sizeof(std::uint32_t));
read_bytes(in, &buf, sizeof(std::uint32_t), ec);
return le32toh(buf);
}
static void write_uint64le(FILE* out, const std::uint64_t value) {
std::uint64_t buf = htole64(value);
write_bytes(out, &buf, sizeof(std::uint64_t));
}
static std::uint64_t read_uint64le(std::istream& in) {
static std::uint64_t read_uint64le(std::istream& in, read_error& ec) {
std::uint64_t buf{};
read_bytes(in, &buf, sizeof(std::uint64_t));
read_bytes(in, &buf, sizeof(std::uint64_t), ec);
return le64toh(buf);
}
static void write_bytes(FILE* out, const void* buf, std::size_t len) {
Expand All @@ -282,11 +342,11 @@ class log_entry {
throw std::runtime_error("I/O error");
}
}
static void read_bytes(std::istream& in, void* buf, std::streamsize len) {
static void read_bytes(std::istream& in, void* buf, std::streamsize len, read_error& ec) {
in.read(reinterpret_cast<char*>(buf), len); // NOLINT(*-reinterpret-cast)
if (in.eof()) {
LOG_LP(ERROR) << "this log entry is broken: unexpected EOF";
throw std::runtime_error("unexpected EOF");
ec.value(read_error::short_entry);
return;
}
}
};
Expand Down

0 comments on commit 03acf92

Please sign in to comment.