diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index 5556da1e3..87eab5f72 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -419,7 +419,7 @@ off_t JournalVirtualDev::Descriptor::lseek(off_t offset, int whence) { * @brief :- it returns the vdev offset after nbytes from start offset */ off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const { - if (m_journal_chunks.empty()) { return 0; } + if (m_journal_chunks.empty()) { return data_start_offset(); } off_t vdev_offset = data_start_offset(); uint32_t dev_id{0}, chunk_id{0}; diff --git a/src/lib/device/virtual_dev.cpp b/src/lib/device/virtual_dev.cpp index c79abd73c..c534c2d71 100644 --- a/src/lib/device/virtual_dev.cpp +++ b/src/lib/device/virtual_dev.cpp @@ -125,8 +125,8 @@ void VirtualDev::add_chunk(cshared< Chunk >& chunk, bool is_fresh_chunk) { void VirtualDev::remove_chunk(cshared< Chunk >& chunk) { std::unique_lock lg{m_mgmt_mutex}; - auto iter = std::remove_if(m_all_chunks.begin(), m_all_chunks.end(), [chunk](auto c) { return c == chunk; }); - m_all_chunks.erase(iter, m_all_chunks.end()); + m_all_chunks[chunk->chunk_id()].reset(); + m_all_chunks[chunk->chunk_id()] = nullptr; m_chunk_selector->remove_chunk(chunk); } diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index a51cf228e..5c6c09e9d 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -88,8 +88,8 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) { m_store_found_cb(spair.first, spair.second); } - THIS_LOGDEV_LOG(INFO, "get start vdev offset during recovery {} log indx {} ", - m_logdev_meta.get_start_dev_offset(), m_logdev_meta.get_start_log_idx()); + LOGINFO("get start vdev offset during recovery {} {} log indx {} ", m_family_id, + m_logdev_meta.get_start_dev_offset(), m_logdev_meta.get_start_log_idx()); m_vdev_jd->update_data_start_offset(m_logdev_meta.get_start_dev_offset()); m_log_idx = m_logdev_meta.get_start_log_idx(); @@ -151,26 +151,34 @@ void LogDev::stop() { void LogDev::do_load(const off_t device_cursor) { log_stream_reader lstream{device_cursor, m_vdev, m_vdev_jd, m_flush_size_multiple}; logid_t loaded_from{-1}; - - off_t group_dev_offset; + LOGINFO("LogDev::do_load start {} ", m_family_id); + off_t group_dev_offset = 0; do { const auto buf = lstream.next_group(&group_dev_offset); if (buf.size() == 0) { - assert_next_pages(lstream); + LOGINFO("{} LogDev loaded log_idx in range of[{} - {}] ", m_family_id, loaded_from, m_log_idx - 1); + // assert_next_pages(lstream); THIS_LOGDEV_LOG(INFO, "LogDev loaded log_idx in range of [{} - {}]", loaded_from, m_log_idx - 1); break; } auto* header = r_cast< const log_group_header* >(buf.bytes()); if (loaded_from == -1 && header->start_idx() < m_log_idx) { + LOGINFO("{} LogDev loaded log_idx in range of[{} - {}] ", m_family_id, loaded_from, m_log_idx - 1); + // log dev is truncated completely assert_next_pages(lstream); THIS_LOGDEV_LOG(INFO, "LogDev loaded log_idx in range of [{} - {}]", loaded_from, m_log_idx - 1); break; } + LOGINFO("{} group_dev_offset {} loaded_from {} m_log_idx {} header {} ", m_family_id, group_dev_offset, + loaded_from, m_log_idx.load(), *header); HS_REL_ASSERT_EQ(header->start_idx(), m_log_idx.load(), "log indx is not the expected one"); - if (loaded_from == -1) { loaded_from = header->start_idx(); } + if (loaded_from == -1) { + loaded_from = header->start_idx(); + LOGINFO("m_family_id {} {} ", m_family_id, header->start_idx()); + } // Loop through each record within the log group and do a callback decltype(header->nrecords()) i{0}; @@ -201,6 +209,7 @@ void LogDev::do_load(const off_t device_cursor) { } ++i; } + LOGINFO(" {} group_dev_offset {} m_log_idx {} i {} ", m_family_id, group_dev_offset, m_log_idx.load(), i); m_log_idx = header->start_idx() + i; m_last_crc = header->cur_grp_crc; } while (true); @@ -208,6 +217,7 @@ void LogDev::do_load(const off_t device_cursor) { // Update the tail offset with where we finally end up loading, so that new append entries can be written from // here. m_vdev_jd->update_tail_offset(group_dev_offset); + LOGINFO("LogDev::do_load end {} ", m_family_id); } void LogDev::assert_next_pages(log_stream_reader& lstream) { @@ -220,8 +230,8 @@ void LogDev::assert_next_pages(log_stream_reader& lstream) { auto* header = r_cast< const log_group_header* >(buf.bytes()); HS_REL_ASSERT_GT(m_log_idx.load(std::memory_order_acquire), header->start_idx(), "Found a header with future log_idx after reaching end of log. Hence rbuf which was read " - "must have been corrupted, Header: {}", - *header); + "must have been corrupted, Family {} Header: {}", + m_family_id, *header); } } } @@ -528,8 +538,8 @@ uint64_t LogDev::truncate(const logdev_key& key) { HS_DBG_ASSERT_GE(key.idx, m_last_truncate_idx); uint64_t const num_records_to_truncate = static_cast< uint64_t >(key.idx - m_last_truncate_idx); if (num_records_to_truncate > 0) { - HS_PERIODIC_LOG(INFO, logstore, "Truncating log device upto log_id={} vdev_offset={} truncated {} log records", - key.idx, key.dev_offset, num_records_to_truncate); + LOGINFO("Truncating log device upto {} log_id={} vdev_offset={} truncated {} log records", m_family_id, key.idx, + key.dev_offset, num_records_to_truncate); m_log_records->truncate(key.idx); m_vdev_jd->truncate(key.dev_offset); m_last_truncate_idx = key.idx; diff --git a/src/lib/logstore/log_store_family.cpp b/src/lib/logstore/log_store_family.cpp index 33e1e4313..1c9180959 100644 --- a/src/lib/logstore/log_store_family.cpp +++ b/src/lib/logstore/log_store_family.cpp @@ -137,10 +137,7 @@ void LogStoreFamily::device_truncate(const std::shared_ptr< truncate_req >& treq } if (done) { if (treq->cb) { treq->cb(treq->m_trunc_upto_result); } - if (treq->wait_till_done) { - std::lock_guard< std::mutex > lk{treq->mtx}; - treq->cv.notify_one(); - } + if (treq->wait_till_done) { treq->cv.notify_one(); } } m_log_dev.unlock_flush(); }); @@ -235,6 +232,7 @@ logdev_key LogStoreFamily::do_device_truncate(bool dry_run) { m_non_participating_stores.clear(); logdev_key min_safe_ld_key = logdev_key::out_of_bound_ld_key(); + LOGINFO("do_device_truncate on family {}", m_family_id); std::string dbg_str{"Format [store_id:trunc_lsn:logidx:dev_trunc_pending?:active_writes_in_trucate?] "}; { @@ -266,18 +264,16 @@ logdev_key LogStoreFamily::do_device_truncate(bool dry_run) { } if ((min_safe_ld_key == logdev_key::out_of_bound_ld_key()) || (min_safe_ld_key.idx < 0)) { - HS_PERIODIC_LOG( - INFO, logstore, - "[Family={}] No log store append on any log stores, skipping device truncation, all_logstore_info:<{}>", - m_family_id, dbg_str); + LOGINFO("[Family={}] No log store append on any log stores, skipping device truncation, all_logstore_info:<{}>", + m_family_id, dbg_str); return min_safe_ld_key; } + LOGINFO("do_device_truncate on family {}", m_family_id); // Got the safest log id to truncate and actually truncate upto the safe log idx to the log device if (!dry_run) { m_log_dev.truncate(min_safe_ld_key); } - HS_PERIODIC_LOG(INFO, logstore, - "[Family={}] LogDevice truncate, all_logstore_info:<{}> safe log dev key to truncate={}", - m_family_id, dbg_str, min_safe_ld_key); + LOGINFO("[Family={}] LogDevice truncate, all_logstore_info:<{}> safe log dev key to truncate={}", m_family_id, + dbg_str, min_safe_ld_key); // We call post device truncation only to the log stores whose prepared truncation points are fully // truncated or to stores which didn't particpate in this device truncation. diff --git a/src/lib/logstore/log_stream.cpp b/src/lib/logstore/log_stream.cpp index 67c8fe284..b766ac238 100644 --- a/src/lib/logstore/log_stream.cpp +++ b/src/lib/logstore/log_stream.cpp @@ -37,13 +37,14 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { uint64_cast(sisl::round_up(HS_DYNAMIC_CONFIG(logstore.bulk_read_size), m_read_size_multiple)); uint64_t min_needed{m_read_size_multiple}; sisl::byte_view ret_buf; + *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); read_again: if (m_cur_log_buf.size() < min_needed) { do { m_cur_log_buf = read_next_bytes(std::max(min_needed, bulk_read_size)); if (m_cur_log_buf.size() == 0) { - LOGINFOMOD(logstore, "Logdev data empty"); + LOGINFO("Logdev data empty"); return {}; } } while (m_cur_log_buf.size() < sizeof(log_group_header)); @@ -145,7 +146,9 @@ sisl::byte_view log_stream_reader::read_next_bytes(uint64_t nbytes) { if (m_cur_log_buf.size()) { memcpy(out_buf->bytes(), m_cur_log_buf.bytes(), m_cur_log_buf.size()); } const auto prev_pos = m_vdev_jd->seeked_pos(); - m_vdev_jd->sync_next_read(out_buf->bytes() + m_cur_log_buf.size(), nbytes); + auto sz_read = m_vdev_jd->sync_next_read(out_buf->bytes() + m_cur_log_buf.size(), nbytes); + if (sz_read == 0) { return {}; } + LOGINFOMOD(logstore, "LogStream read {} bytes from vdev offset {} and vdev cur offset {}", nbytes, prev_pos, m_vdev_jd->seeked_pos()); return sisl::byte_view{out_buf}; diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 023e25565..011c32342 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -111,8 +111,8 @@ if (${io_tests}) add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service) - #add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev) - # add_test(NAME HomeRaftLogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore) + add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev) + add_test(NAME HomeRaftLogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore) # add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev) endif() diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 90b5bcca0..92822adc8 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -306,7 +306,12 @@ SISL_OPTION_GROUP(test_solo_repl_dev, int main(int argc, char* argv[]) { int parsed_argc{argc}; ::testing::InitGoogleTest(&parsed_argc, argv); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_solo_repl_dev, iomgr, test_common_setup); + // sisl::logging::SetModuleLogLevel("logstore", spdlog::level::level_enum::trace); + // sisl::logging::SetModuleLogLevel("device", spdlog::level::level_enum::trace); + // sisl::logging::SetModuleLogLevel("journalvdev", spdlog::level::level_enum::trace); + sisl::logging::SetLogger("test_solo_repl_dev"); spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v");