Skip to content

Commit

Permalink
Fix log dev load issue. (#299)
Browse files Browse the repository at this point in the history
In UT's, logdev may not be used and dont have chunk.
Return empty buffer and dont check for corruption in those cases.
  • Loading branch information
sanebay authored Feb 2, 2024
1 parent 0b99aee commit 4bbea89
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 13 deletions.
3 changes: 2 additions & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.1.1"
version = "5.1.2"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/homestore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class HomeStore {

HS_SERVICE m_services; // Services homestore is starting with
hs_before_services_starting_cb_t m_before_services_starting_cb{nullptr};
bool m_init_done{false};
std::atomic< bool > m_init_done{false};

public:
HomeStore() = default;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/device/journal_vdev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ off_t JournalVirtualDev::Descriptor::lseek(off_t offset, int whence) {
* @brief :- it returns the vdev offset after nbytes from start offset
*/
off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const {
if (m_journal_chunks.empty()) { return 0; }
if (m_journal_chunks.empty()) { return data_start_offset(); }

off_t vdev_offset = data_start_offset();
uint32_t dev_id{0}, chunk_id{0};
Expand Down
4 changes: 2 additions & 2 deletions src/lib/device/virtual_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ void VirtualDev::add_chunk(cshared< Chunk >& chunk, bool is_fresh_chunk) {

void VirtualDev::remove_chunk(cshared< Chunk >& chunk) {
std::unique_lock lg{m_mgmt_mutex};
auto iter = std::remove_if(m_all_chunks.begin(), m_all_chunks.end(), [chunk](auto c) { return c == chunk; });
m_all_chunks.erase(iter, m_all_chunks.end());
m_all_chunks[chunk->chunk_id()].reset();
m_all_chunks[chunk->chunk_id()] = nullptr;
m_chunk_selector->remove_chunk(chunk);
}

Expand Down
17 changes: 11 additions & 6 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) {
}

void LogDev::stop() {
THIS_LOGDEV_LOG(INFO, "Logdev stopping family {}", m_family_id);
HS_LOG_ASSERT((m_pending_flush_size == 0), "LogDev stop attempted while writes to logdev are pending completion");
const bool locked_now = run_under_flush_lock([this]() {
{
Expand Down Expand Up @@ -144,19 +145,20 @@ void LogDev::stop() {
m_log_group_pool[i].stop();
}

THIS_LOGDEV_LOG(INFO, "LogDev stopped successfully");
THIS_LOGDEV_LOG(INFO, "LogDev stopped successfully family {}", m_family_id);
m_hs.reset();
}

void LogDev::do_load(const off_t device_cursor) {
log_stream_reader lstream{device_cursor, m_vdev, m_vdev_jd, m_flush_size_multiple};
logid_t loaded_from{-1};
off_t group_dev_offset = 0;

THIS_LOGDEV_LOG(TRACE, "LogDev::do_load start {} ", m_family_id);

off_t group_dev_offset;
do {
const auto buf = lstream.next_group(&group_dev_offset);
if (buf.size() == 0) {
assert_next_pages(lstream);
THIS_LOGDEV_LOG(INFO, "LogDev loaded log_idx in range of [{} - {}]", loaded_from, m_log_idx - 1);
break;
}
Expand Down Expand Up @@ -201,13 +203,15 @@ void LogDev::do_load(const off_t device_cursor) {
}
++i;
}

m_log_idx = header->start_idx() + i;
m_last_crc = header->cur_grp_crc;
} while (true);

// Update the tail offset with where we finally end up loading, so that new append entries can be written from
// here.
m_vdev_jd->update_tail_offset(group_dev_offset);
THIS_LOGDEV_LOG(TRACE, "LogDev::do_load end {} ", m_family_id);
}

void LogDev::assert_next_pages(log_stream_reader& lstream) {
Expand All @@ -220,8 +224,8 @@ void LogDev::assert_next_pages(log_stream_reader& lstream) {
auto* header = r_cast< const log_group_header* >(buf.bytes());
HS_REL_ASSERT_GT(m_log_idx.load(std::memory_order_acquire), header->start_idx(),
"Found a header with future log_idx after reaching end of log. Hence rbuf which was read "
"must have been corrupted, Header: {}",
*header);
"must have been corrupted, Family {} Header: {}",
m_family_id, *header);
}
}
}
Expand Down Expand Up @@ -528,7 +532,8 @@ uint64_t LogDev::truncate(const logdev_key& key) {
HS_DBG_ASSERT_GE(key.idx, m_last_truncate_idx);
uint64_t const num_records_to_truncate = static_cast< uint64_t >(key.idx - m_last_truncate_idx);
if (num_records_to_truncate > 0) {
HS_PERIODIC_LOG(INFO, logstore, "Truncating log device upto log_id={} vdev_offset={} truncated {} log records",
HS_PERIODIC_LOG(INFO, logstore,
"Truncating log device upto {} log_id={} vdev_offset={} truncated {} log records", m_family_id,
key.idx, key.dev_offset, num_records_to_truncate);
m_log_records->truncate(key.idx);
m_vdev_jd->truncate(key.dev_offset);
Expand Down
5 changes: 4 additions & 1 deletion src/lib/logstore/log_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) {
uint64_cast(sisl::round_up(HS_DYNAMIC_CONFIG(logstore.bulk_read_size), m_read_size_multiple));
uint64_t min_needed{m_read_size_multiple};
sisl::byte_view ret_buf;
*out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes);

read_again:
if (m_cur_log_buf.size() < min_needed) {
Expand Down Expand Up @@ -145,7 +146,9 @@ sisl::byte_view log_stream_reader::read_next_bytes(uint64_t nbytes) {
if (m_cur_log_buf.size()) { memcpy(out_buf->bytes(), m_cur_log_buf.bytes(), m_cur_log_buf.size()); }

const auto prev_pos = m_vdev_jd->seeked_pos();
m_vdev_jd->sync_next_read(out_buf->bytes() + m_cur_log_buf.size(), nbytes);
auto sz_read = m_vdev_jd->sync_next_read(out_buf->bytes() + m_cur_log_buf.size(), nbytes);
if (sz_read == 0) { return {}; }

LOGINFOMOD(logstore, "LogStream read {} bytes from vdev offset {} and vdev cur offset {}", nbytes, prev_pos,
m_vdev_jd->seeked_pos());
return sisl::byte_view{out_buf};
Expand Down
2 changes: 1 addition & 1 deletion src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ if (${io_tests})
add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr)
add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service)

#add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev)
add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev)
# add_test(NAME HomeRaftLogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore)
# add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev)
endif()
Expand Down

0 comments on commit 4bbea89

Please sign in to comment.