From f1b04e91877855fd99aa52bd7827855d0a0b113e Mon Sep 17 00:00:00 2001 From: Sanal P Date: Mon, 12 Feb 2024 17:15:20 -0700 Subject: [PATCH] Fix device manager load of chunks. Chunks getting duplciate start offset while recovery. Add or cleanup logs for debugging. Add test case. Add more logdev for logstore test. Enable logstore test. Make rollback test first to make sure there are not log entries. --- conanfile.py | 2 +- src/lib/device/device.h | 1 + src/lib/device/device_manager.cpp | 9 + src/lib/device/journal_vdev.cpp | 25 ++- src/lib/device/journal_vdev.hpp | 9 +- src/lib/device/physical_dev.cpp | 10 +- src/lib/device/physical_dev.hpp | 1 + src/lib/logstore/log_dev.cpp | 21 ++- src/lib/logstore/log_dev.hpp | 8 +- src/lib/logstore/log_group.cpp | 3 +- src/lib/logstore/log_store.cpp | 2 +- src/lib/logstore/log_store_service.cpp | 1 - src/lib/logstore/log_stream.cpp | 35 ++-- .../log_store/home_raft_log_store.cpp | 4 +- .../replication/repl_dev/raft_repl_dev.cpp | 8 +- src/tests/CMakeLists.txt | 2 +- src/tests/test_device_manager.cpp | 56 ++++++ src/tests/test_log_store.cpp | 165 ++++++++++-------- 18 files changed, 235 insertions(+), 127 deletions(-) diff --git a/conanfile.py b/conanfile.py index 38bb6fb8d..7b79c1796 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "5.1.4" + version = "5.1.5" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/device/device.h b/src/lib/device/device.h index cbcdde8ea..a69e740dd 100644 --- a/src/lib/device/device.h +++ b/src/lib/device/device.h @@ -173,6 +173,7 @@ class DeviceManager { std::vector< PhysicalDev* > get_pdevs_by_dev_type(HSDevType dtype) const; std::vector< shared< VirtualDev > > get_vdevs() const; + std::vector< shared< Chunk > > get_chunks() const; uint64_t total_capacity() const; uint64_t total_capacity(HSDevType dtype) const; diff --git a/src/lib/device/device_manager.cpp b/src/lib/device/device_manager.cpp index ad450910e..33d8cbad6 100644 --- a/src/lib/device/device_manager.cpp +++ b/src/lib/device/device_manager.cpp @@ -543,6 +543,15 @@ std::vector< shared< VirtualDev > > DeviceManager::get_vdevs() const { return ret_v; } +std::vector< shared< Chunk > > DeviceManager::get_chunks() const { + std::unique_lock lg{m_vdev_mutex}; + std::vector< shared< Chunk > > res; + for (auto& chunk : m_chunks) { + if (chunk) res.push_back(chunk); + } + return res; +} + // Some of the hs_super_blk details uint64_t hs_super_blk::vdev_super_block_size() { return (hs_super_blk::MAX_VDEVS_IN_SYSTEM * vdev_info::size); } diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index 87eab5f72..db5e0a76c 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -231,7 +231,7 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) { // assert that returnning logical offset is in good range HS_DBG_ASSERT_LE(tail_off, m_end_offset); - LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} desc {}", to_hex(tail_off), to_string()); + LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} size {} desc {}", to_hex(tail_off), sz, to_string()); return tail_off; } @@ -443,6 +443,14 @@ off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const { return vdev_offset; } +void JournalVirtualDev::Descriptor::update_data_start_offset(off_t offset) { + m_data_start_offset = offset; + auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); + m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size; + LOGTRACEMOD(journalvdev, "Updated data start offset off 0x{} {}", to_hex(offset), to_string()); + RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch {}", to_string()); +} + off_t JournalVirtualDev::Descriptor::tail_offset(bool reserve_space_include) const { off_t tail = static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)); if (reserve_space_include) { tail += m_reserved_sz; } @@ -461,7 +469,7 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) { } lseek(tail); - LOGDEBUGMOD(journalvdev, "tail arg 0x{} desc {} ", to_hex(tail), to_string()); + LOGDEBUGMOD(journalvdev, "Updated tail offset arg 0x{} desc {} ", to_hex(tail), to_string()); HS_REL_ASSERT(tail_offset() == tail, "tail offset mismatch after calculation 0x{} : {}", tail_offset(), tail); } @@ -598,7 +606,7 @@ bool JournalVirtualDev::Descriptor::is_alloc_accross_chunk(size_t size) const { nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { nlohmann::json j; - j["logdev_id"] = m_logdev_id; + j["logdev"] = m_logdev_id; j["seek_cursor"] = m_seek_cursor; j["data_start_offset"] = m_data_start_offset; j["end_offset"] = m_end_offset; @@ -613,7 +621,7 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { nlohmann::json c; auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); c["chunk_id"] = chunk->chunk_id(); - c["logdev_id"] = private_data->logdev_id; + c["logdev"] = private_data->logdev_id; c["is_head"] = private_data->is_head; c["end_of_chunk"] = private_data->end_of_chunk; c["next_chunk"] = private_data->next_chunk; @@ -627,12 +635,13 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { } std::string JournalVirtualDev::Descriptor::to_string() const { - std::string str{fmt::format("id={};ds=0x{};end=0x{};writesz={};tail=0x{};" + off_t tail = + static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz; + std::string str{fmt::format("logdev={};ds=0x{};end=0x{};writesz={};tail=0x{};" "rsvdsz={};chunks={};trunc={};total={};seek=0x{} ", m_logdev_id, to_hex(m_data_start_offset), to_hex(m_end_offset), - m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail_offset()), - m_reserved_sz, m_journal_chunks.size(), m_truncate_done, m_total_size, - to_hex(m_seek_cursor))}; + m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail), m_reserved_sz, + m_journal_chunks.size(), m_truncate_done, m_total_size, to_hex(m_seek_cursor))}; return str; } diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index 13ec18c65..ad0cca4f2 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -233,12 +233,7 @@ class JournalVirtualDev : public VirtualDev { * * @param offset : the start logical offset to be persisted */ - void update_data_start_offset(off_t offset) { - m_data_start_offset = offset; - auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); - m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size; - RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch"); - } + void update_data_start_offset(off_t offset); /** * @brief : get the logical tail offset; @@ -309,6 +304,8 @@ class JournalVirtualDev : public VirtualDev { */ nlohmann::json get_status(int log_level) const; + logdev_id_t logdev_id() const { return m_logdev_id; } + std::string to_string() const; private: diff --git a/src/lib/device/physical_dev.cpp b/src/lib/device/physical_dev.cpp index fe44059de..6c7b4d478 100644 --- a/src/lib/device/physical_dev.cpp +++ b/src/lib/device/physical_dev.cpp @@ -247,12 +247,13 @@ std::vector< shared< Chunk > > PhysicalDev::create_chunks(const std::vector< uin auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot); ret_chunks.push_back(chunk); get_stream(chunk).m_chunks_map.insert(std::pair{chunk_ids[cit], std::move(chunk)}); - + HS_LOG(TRACE, device, "Creating chunk {}", chunk->to_string()); cinfo->~chunk_info(); } m_chunk_info_slots->set_bits(b.start_bit, b.nbits); write_super_block(buf, chunk_info::size * b.nbits, chunk_info_offset_nth(b.start_bit)); + hs_utils::iobuf_free(buf, sisl::buftag::superblk); chunks_remaining -= b.nbits; @@ -296,6 +297,7 @@ shared< Chunk > PhysicalDev::create_chunk(uint32_t chunk_id, uint32_t vdev_id, u auto bitmap_mem = m_chunk_info_slots->serialize(m_pdev_info.dev_attr.align_size); write_super_block(bitmap_mem->cbytes(), bitmap_mem->size(), hs_super_blk::chunk_sb_offset()); + HS_LOG(TRACE, device, "Created chunk {}", chunk->to_string()); cinfo->~chunk_info(); hs_utils::iobuf_free(buf, sisl::buftag::superblk); @@ -360,6 +362,8 @@ void PhysicalDev::load_chunks(std::function< bool(cshared< Chunk >&) >&& chunk_f cinfo->checksum = info_crc; auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot); + m_chunk_data_area.insert( + ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size)); if (chunk_found_cb(chunk)) { get_stream(chunk).m_chunks_map.insert(std::pair{cinfo->chunk_id, chunk}); } } hs_utils::iobuf_free(buf, sisl::buftag::superblk); @@ -395,6 +399,7 @@ void PhysicalDev::do_remove_chunk(cshared< Chunk >& chunk) { get_stream(chunk).m_chunks_map.erase(chunk->chunk_id()); cinfo->~chunk_info(); hs_utils::iobuf_free(buf, sisl::buftag::superblk); + HS_LOG(TRACE, device, "Removed chunk {}", chunk->to_string()); } uint64_t PhysicalDev::chunk_info_offset_nth(uint32_t slot) const { @@ -416,11 +421,14 @@ void PhysicalDev::populate_chunk_info(chunk_info* cinfo, uint32_t vdev_id, uint6 cinfo->set_allocated(); cinfo->set_user_private(private_data); cinfo->compute_checksum(); + auto [_, inserted] = m_chunk_start.insert(cinfo->chunk_start_offset); + RELEASE_ASSERT(inserted, "Duplicate start offset {} for chunk {}", cinfo->chunk_start_offset, cinfo->chunk_id); } void PhysicalDev::free_chunk_info(chunk_info* cinfo) { auto ival = ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size); m_chunk_data_area.erase(ival); + m_chunk_start.erase(cinfo->chunk_start_offset); cinfo->set_free(); cinfo->checksum = 0; diff --git a/src/lib/device/physical_dev.hpp b/src/lib/device/physical_dev.hpp index 2987fb45e..41eb9221d 100644 --- a/src/lib/device/physical_dev.hpp +++ b/src/lib/device/physical_dev.hpp @@ -136,6 +136,7 @@ class PhysicalDev { ChunkIntervalSet m_chunk_data_area; // Range of chunks data area created std::unique_ptr< sisl::Bitset > m_chunk_info_slots; // Slots to write the chunk info uint32_t m_chunk_sb_size{0}; // Total size of the chunk sb at present + std::unordered_set< uint64_t > m_chunk_start; // Store and verify start offset of all chunks for debugging. public: PhysicalDev(const dev_info& dinfo, int oflags, const pdev_info_header& pinfo); diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 83c284c3e..15134b7f0 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -258,11 +258,17 @@ log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_rec m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); auto* header = r_cast< const log_group_header* >(buf->cbytes()); - HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch!"); - HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch!"); - HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx"); - HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx, "log key offset does not match with log_idx"); - HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group"); + THIS_LOGDEV_LOG(TRACE, "Logdev read log group header {}", *header); + HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch! {} {}", + m_logdev_id, *header); + HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch! {} {}", + m_logdev_id, *header); + HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx {} }{}", m_logdev_id, + *header); + HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx, + "log key offset does not match with log_idx {} {}", m_logdev_id, *header); + HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group {} {}", + m_logdev_id, *header); // We can only do crc match in read if we have read all the blocks. We don't want to aggressively read more data // than we need to just to compare CRC for read operation. It can be done during recovery. @@ -337,7 +343,7 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) { } }); - lg->finish(get_prev_crc()); + lg->finish(m_logdev_id, get_prev_crc()); if (sisl_unlikely(flushing_upto_idx == -1)) { return nullptr; } lg->m_flush_log_idx_from = m_last_flush_idx + 1; lg->m_flush_log_idx_upto = flushing_upto_idx; @@ -346,7 +352,6 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) { HS_DBG_ASSERT_GT(lg->header()->oob_data_offset, 0); THIS_LOGDEV_LOG(DEBUG, "Flushing upto log_idx={}", flushing_upto_idx); - THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg); return lg; } @@ -408,7 +413,7 @@ bool LogDev::flush_if_needed(int64_t threshold_size) { lg->m_log_dev_offset = offset; HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full"); THIS_LOGDEV_LOG(TRACE, "Flush prepared, flushing data size={} at offset={}", lg->actual_data_size(), offset); - + THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg); do_flush(lg); return true; } else { diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index bfb2afb1a..226194cc4 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -136,6 +136,7 @@ struct log_group_header { uint32_t footer_offset; // offset of where footer starts crc32_t prev_grp_crc; // Checksum of the previous group that was written crc32_t cur_grp_crc; // Checksum of the current group record + logdev_id_t logdev_id; // Logdev id log_group_header() : magic{LOG_GROUP_HDR_MAGIC}, version{header_version} {} log_group_header(const log_group_header&) = delete; @@ -195,9 +196,10 @@ struct fmt::formatter< homestore::log_group_header > { return fmt::format_to( ctx.out(), "magic = {} version={} n_log_records = {} start_log_idx = {} group_size = {} inline_data_offset = {} " - "oob_data_offset = {} prev_grp_crc = {} cur_grp_crc = {}", + "oob_data_offset = {} prev_grp_crc = {} cur_grp_crc = {} logdev = {}", header.magic, header.version, header.n_log_records, header.start_log_idx, header.group_size, - header.inline_data_offset, header.oob_data_offset, header.prev_grp_crc, header.cur_grp_crc); + header.inline_data_offset, header.oob_data_offset, header.prev_grp_crc, header.cur_grp_crc, + header.logdev_id); } }; @@ -253,7 +255,7 @@ class LogGroup { bool add_record(log_record& record, const int64_t log_idx); bool can_accomodate(const log_record& record) const { return (m_nrecords <= m_max_records); } - const iovec_array& finish(const crc32_t prev_crc); + const iovec_array& finish(logdev_id_t logdev_id, const crc32_t prev_crc); crc32_t compute_crc(); log_group_header* header() { return reinterpret_cast< log_group_header* >(m_cur_log_buf); } diff --git a/src/lib/logstore/log_group.cpp b/src/lib/logstore/log_group.cpp index 2f54da8d8..597d97849 100644 --- a/src/lib/logstore/log_group.cpp +++ b/src/lib/logstore/log_group.cpp @@ -117,13 +117,14 @@ bool LogGroup::new_iovec_for_footer() const { return ((m_inline_data_pos + sizeof(log_group_footer)) >= m_cur_buf_len || m_oob_data_pos != 0); } -const iovec_array& LogGroup::finish(const crc32_t prev_crc) { +const iovec_array& LogGroup::finish(logdev_id_t logdev_id, const crc32_t prev_crc) { // add footer auto footer = add_and_get_footer(); m_iovecs[0].iov_len = sisl::round_up(m_iovecs[0].iov_len, m_flush_multiple_size); log_group_header* hdr = new (header()) log_group_header{}; + hdr->logdev_id = logdev_id; hdr->n_log_records = m_nrecords; hdr->prev_grp_crc = prev_crc; hdr->inline_data_offset = sizeof(log_group_header) + (m_max_records * sizeof(serialized_log_record)); diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index 089bb3286..c75fbae6e 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -39,7 +39,7 @@ HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, b m_records{"HomeLogStoreRecords", start_lsn - 1}, m_append_mode{append_mode}, m_seq_num{start_lsn}, - m_fq_name{fmt::format("{}.{}", logdev->get_id(), id)}, + m_fq_name{fmt::format("{} logdev={}", id, logdev->get_id())}, m_metrics{logstore_service().metrics()} { m_truncation_barriers.reserve(10000); m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 90e35cf06..df33ebd90 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -115,7 +115,6 @@ logdev_id_t LogStoreService::create_new_logdev() { auto logdev = create_new_logdev_internal(logdev_id); logdev->start(true /* format */, m_logdev_vdev.get()); COUNTER_INCREMENT(m_metrics, logdevs_count, 1); - LOGINFO("Created log dev id {}", logdev_id); return logdev_id; } diff --git a/src/lib/logstore/log_stream.cpp b/src/lib/logstore/log_stream.cpp index afa7e67e9..ac2aa8647 100644 --- a/src/lib/logstore/log_stream.cpp +++ b/src/lib/logstore/log_stream.cpp @@ -44,7 +44,7 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { do { m_cur_log_buf = read_next_bytes(std::max(min_needed, bulk_read_size)); if (m_cur_log_buf.size() == 0) { - LOGINFOMOD(logstore, "Logdev data empty"); + LOGINFOMOD(logstore, "Logdev data empty logdev={}", m_vdev_jd->logdev_id()); return {}; } } while (m_cur_log_buf.size() < sizeof(log_group_header)); @@ -54,8 +54,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { HS_REL_ASSERT_GE(m_cur_log_buf.size(), m_read_size_multiple); const auto* header = r_cast< log_group_header const* >(m_cur_log_buf.bytes()); if (header->magic_word() != LOG_GROUP_HDR_MAGIC) { - LOGINFOMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of logdev", - m_vdev_jd->dev_offset(m_cur_read_bytes)); + LOGINFOMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of logdev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -65,8 +65,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { } if (header->total_size() > m_cur_log_buf.size()) { - LOGINFOMOD(logstore, "Logstream group size {} is more than available buffer size {}, reading from store", - header->total_size(), m_cur_log_buf.size()); + LOGINFOMOD(logstore, "Logstream group size {} is more than available buffer size {}, reading from logdev={}", + header->total_size(), m_cur_log_buf.size(), m_vdev_jd->logdev_id()); // Bigger group size than needed bytes, read again min_needed = sisl::round_up(header->total_size(), m_read_size_multiple); goto read_again; @@ -74,15 +74,16 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { LOGTRACEMOD(logstore, "Logstream read log group of size={} nrecords={} m_cur_log_dev_offset {} buf size " - "remaining {} ", - header->total_size(), header->nrecords(), m_vdev_jd->dev_offset(m_cur_read_bytes), - m_cur_log_buf.size()); + "remaining {} logdev={}", + header->total_size(), header->nrecords(), m_vdev_jd->dev_offset(m_cur_read_bytes), m_cur_log_buf.size(), + m_vdev_jd->logdev_id()); // compare it with prev crc if (m_prev_crc != 0 && m_prev_crc != header->prev_grp_crc) { // we reached at the end - LOGINFOMOD(logstore, "we have reached the end. crc doesn't match with the prev crc {}", - m_vdev_jd->dev_offset(m_cur_read_bytes)); + LOGINFOMOD(logstore, + "we have reached the end. crc doesn't match offset {} prev crc {} header prev crc {} logdev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), header->prev_grp_crc, m_prev_crc, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -95,8 +96,9 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { const auto* footer = r_cast< log_group_footer* >((uint64_t)m_cur_log_buf.bytes() + header->footer_offset); if (footer->magic != LOG_GROUP_FOOTER_MAGIC || footer->start_log_idx != header->start_log_idx) { LOGINFOMOD(logstore, - "last write is not completely written. footer magic {} footer start_log_idx {} header log indx {}", - footer->magic, footer->start_log_idx, header->start_log_idx); + "last write is not completely written. footer magic {} footer start_log_idx {} header log indx {} " + "logdev={}", + footer->magic, footer->start_log_idx, header->start_log_idx, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -112,8 +114,9 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { (header->total_size() - sizeof(log_group_header))); if (cur_crc != header->cur_grp_crc) { /* This is a valid entry so crc should match */ - HS_REL_ASSERT(0, "data is corrupted"); - LOGINFOMOD(logstore, "crc doesn't match {}", m_vdev_jd->dev_offset(m_cur_read_bytes)); + HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); + LOGINFOMOD(logstore, "crc doesn't match {} logdev={}", m_vdev_jd->dev_offset(m_cur_read_bytes), + m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -149,8 +152,8 @@ sisl::byte_view log_stream_reader::read_next_bytes(uint64_t nbytes) { auto sz_read = m_vdev_jd->sync_next_read(out_buf->bytes() + m_cur_log_buf.size(), nbytes); if (sz_read == 0) { return {}; } - LOGINFOMOD(logstore, "LogStream read {} bytes from vdev offset {} and vdev cur offset {}", nbytes, prev_pos, - m_vdev_jd->seeked_pos()); + LOGINFOMOD(logstore, "LogStream read {} bytes from vdev offset {} and vdev cur offset {} lgodev={}", nbytes, + prev_pos, m_vdev_jd->seeked_pos(), m_vdev_jd->logdev_id()); return sisl::byte_view{out_buf}; } } // namespace homestore diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index 7444169ee..883ffa46d 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -64,11 +64,11 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore m_log_store = logstore_service().create_new_log_store(m_logdev_id, true); if (!m_log_store) { throw std::runtime_error("Failed to create log store"); } m_logstore_id = m_log_store->get_store_id(); - LOGDEBUGMOD(replication, "Opened new home log dev = {} store id={}", m_logdev_id, m_logstore_id); + LOGDEBUGMOD(replication, "Opened new home logdev={} logstore={}", m_logdev_id, m_logstore_id); } else { m_logdev_id = logdev_id; m_logstore_id = logstore_id; - LOGDEBUGMOD(replication, "Opening existing home log dev = {} store id={}", m_logdev_id, logstore_id); + LOGDEBUGMOD(replication, "Opening existing home logdev={} logstore={}", m_logdev_id, logstore_id); logstore_service().open_logdev(m_logdev_id); logstore_service().open_log_store(m_logdev_id, logstore_id, true).thenValue([this](auto log_store) { m_log_store = std::move(log_store); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 91b85ca4d..344571003 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -66,9 +66,11 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk m_rd_sb.write(); } - RD_LOG(INFO, "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={}", + RD_LOG(INFO, + "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={} " + "log_dev={} log_store={}", (load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id, - m_commit_upto_lsn.load(), m_next_dsn.load()); + m_commit_upto_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id); m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, bind_this(RaftReplDev::on_push_data_received, 1)); m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 1)); @@ -420,7 +422,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre } void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { - std::vector<::flatbuffers::Offset< RequestEntry > > entries; + std::vector< ::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs->size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index e5f4ea0b2..67bcdb6b8 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -108,7 +108,7 @@ if (${io_tests}) can_build_epoll_io_tests(epoll_tests) if(${epoll_tests}) - # add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) + add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service) diff --git a/src/tests/test_device_manager.cpp b/src/tests/test_device_manager.cpp index 13bef9d6b..6e1cdcfcc 100644 --- a/src/tests/test_device_manager.cpp +++ b/src/tests/test_device_manager.cpp @@ -174,6 +174,62 @@ TEST_F(DeviceMgrTest, StripedVDevCreation) { this->validate_striped_vdevs(); } +TEST_F(DeviceMgrTest, CreateChunk) { + // Create dynamically chunks and verify no two chunks ahve same start offset. + uint64_t avail_size{0}; + for (auto& pdev : m_pdevs) { + avail_size += pdev->data_size(); + } + + LOGINFO("Step 1: Creating test_vdev with size={}", in_bytes(avail_size)); + auto vdev = + m_dmgr->create_vdev(homestore::vdev_parameters{.vdev_name = "test_vdev", + .size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC, + .vdev_size = avail_size, + .num_chunks = 0, + .blk_size = 512, + .chunk_size = 512 * 1024, + .dev_type = HSDevType::Data, + .alloc_type = blk_allocator_type_t::none, + .chunk_sel_type = chunk_selector_type_t::NONE, + .multi_pdev_opts = vdev_multi_pdev_opts_t::ALL_PDEV_STRIPED, + .context_data = sisl::blob{}}); + + auto num_chunks = 10; + LOGINFO("Step 2: Creating {} chunks", num_chunks); + std::unordered_map< uint32_t, chunk_info > chunk_info_map; + std::unordered_set< uint64_t > chunk_start; + + for (int i = 0; i < num_chunks; i++) { + auto chunk = m_dmgr->create_chunk(HSDevType::Data, vdev->info().vdev_id, 512 * 1024, {}); + chunk_info_map[chunk->chunk_id()] = chunk->info(); + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + + LOGINFO("Step 3: Restarting homestore"); + this->restart(); + + LOGINFO("Step 4: Creating additional {} chunks", num_chunks); + for (int i = 0; i < num_chunks; i++) { + auto chunk = m_dmgr->create_chunk(HSDevType::Data, vdev->info().vdev_id, 512 * 1024, {}); + chunk_info_map[chunk->chunk_id()] = chunk->info(); + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + + chunk_start.clear(); + auto chunk_vec = m_dmgr->get_chunks(); + ASSERT_EQ(chunk_vec.size(), num_chunks * 2); + for (const auto& chunk : chunk_vec) { + ASSERT_EQ(chunk->info().chunk_start_offset, chunk_info_map[chunk->chunk_id()].chunk_start_offset) + << "Chunk offsets mismatch"; + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + vdev.reset(); +} + int main(int argc, char* argv[]) { SISL_OPTIONS_LOAD(argc, argv, logging, test_device_manager, iomgr); ::testing::InitGoogleTest(&argc, argv); diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 262270c88..fc28a6ece 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -438,7 +438,13 @@ class SampleDB { } void start_homestore(bool restart = false) { + auto n_log_devs = SISL_OPTIONS["num_logdevs"].as< uint32_t >(); auto n_log_stores = SISL_OPTIONS["num_logstores"].as< uint32_t >(); + if (n_log_devs < 1u) { + LOGINFO("Log store test needs minimum 1 log dev for testing, setting them to 1"); + n_log_devs = 1u; + } + if (n_log_stores < 4u) { LOGINFO("Log store test needs minimum 4 log stores for testing, setting them to 4"); n_log_stores = 4u; @@ -462,13 +468,18 @@ class SampleDB { restart); if (!restart) { - auto logdev_id = logstore_service().create_new_logdev(); + std::vector< logdev_id_t > logdev_id_vec; + for (uint32_t i{0}; i < n_log_devs; ++i) { + logdev_id_vec.push_back(logstore_service().create_new_logdev()); + } + for (uint32_t i{0}; i < n_log_stores; ++i) { + auto logdev_id = logdev_id_vec[rand() % logdev_id_vec.size()]; m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( logdev_id, bind_this(SampleDB::on_log_insert_completion, 3))); } SampleLogStoreClient::s_max_flush_multiple = - logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + logstore_service().get_logdev(logdev_id_vec[0])->get_flush_size_multiple(); } } @@ -619,14 +630,13 @@ class LogStoreTest : public ::testing::Test { EXPECT_EQ(expected_num_records, rec_count); } - void dump_validate_filter(logstore_id_t id, logstore_seq_num_t start_seq, logstore_seq_num_t end_seq, - bool print_content = false) { + void dump_validate_filter(bool print_content = false) { for (const auto& lsc : SampleDB::instance().m_log_store_clients) { - if (lsc->m_log_store->get_store_id() != id) { continue; } - - log_dump_req dump_req; + logstore_id_t id = lsc->m_log_store->get_store_id(); const auto fid = lsc->m_logdev_id; - + logstore_seq_num_t start_seq = lsc->m_log_store->truncated_upto() + 1; + logstore_seq_num_t end_seq = lsc->m_log_store->get_contiguous_completed_seq_num(-1); + log_dump_req dump_req; if (print_content) dump_req.verbosity_level = log_dump_verbosity::CONTENT; dump_req.log_store = lsc->m_log_store; dump_req.start_seq_num = start_seq; @@ -648,9 +658,8 @@ class LogStoreTest : public ::testing::Test { } else { EXPECT_FALSE(true); } - - return; } + return; } int find_garbage_upto(logdev_id_t logdev_id, logid_t idx) { @@ -791,7 +800,8 @@ class LogStoreTest : public ::testing::Test { } } ASSERT_EQ(actual_valid_ids, SampleDB::instance().m_log_store_clients.size()); - ASSERT_EQ(actual_garbage_ids, exp_garbage_store_count); + // Becasue we randomly assign logstore to logdev, some logdev will be empty. + // ASSERT_EQ(actual_garbage_ids, exp_garbage_store_count); } void delete_validate(uint32_t idx) { @@ -863,6 +873,71 @@ class LogStoreTest : public ::testing::Test { uint32_t m_holes_per_batch{0}; }; +// Rollback test is the first as there are no log entries on other logstores +// and logdev truncate will truncate all logstore entries which includes +// rollback entries. +TEST_F(LogStoreTest, Rollback) { + LOGINFO("Step 1: Reinit the 500 records on a single logstore to start rollback test"); + this->init(500, {std::make_pair(1ull, 100)}); // Last entry = 500 + + LOGINFO("Step 2: Issue sequential inserts with q depth of 10"); + this->kickstart_inserts(1, 10); + + LOGINFO("Step 3: Wait for the Inserts to complete"); + this->wait_for_inserts(); + + LOGINFO("Step 4: Rollback last 50 entries and validate if pre-rollback entries are intact"); + this->rollback_validate(50); // Last entry = 450 + + LOGINFO("Step 5: Append 25 entries after rollback is completed"); + this->init(25, {std::make_pair(1ull, 100)}); + this->kickstart_inserts(1, 10); + this->wait_for_inserts(); // Last entry = 475 + + LOGINFO("Step 7: Rollback again for 75 entries even before previous rollback entry"); + this->rollback_validate(75); // Last entry = 400 + + LOGINFO("Step 8: Append 25 entries after second rollback is completed"); + this->init(25, {std::make_pair(1ull, 100)}); + this->kickstart_inserts(1, 10); + this->wait_for_inserts(); // Last entry = 425 + + LOGINFO("Step 9: Restart homestore and ensure all rollbacks are effectively validated"); + SampleDB::instance().start_homestore(true /* restart */); + this->recovery_validate(); + + LOGINFO("Step 10: Post recovery, append another 25 entries"); + this->init(25, {std::make_pair(1ull, 100)}); + this->kickstart_inserts(1, 5); + this->wait_for_inserts(); // Last entry = 450 + + LOGINFO("Step 11: Rollback again for 75 entries even before previous rollback entry"); + this->rollback_validate(75); // Last entry = 375 + + LOGINFO("Step 12: After 3rd rollback, append another 25 entries"); + this->init(25, {std::make_pair(1ull, 100)}); + this->kickstart_inserts(1, 5); + this->wait_for_inserts(); // Last entry = 400 + + LOGINFO("Step 13: Truncate all entries"); + this->truncate_validate(); + + LOGINFO("Step 14: Restart homestore and ensure all truncations after rollbacks are effectively validated"); + SampleDB::instance().start_homestore(true /* restart */); + this->recovery_validate(); + + LOGINFO("Step 15: Append 25 entries after truncation is completed"); + this->init(25, {std::make_pair(1ull, 100)}); + this->kickstart_inserts(1, 10); + this->wait_for_inserts(); // Last entry = 425 + + LOGINFO("Step 16: Do another truncation to effectively truncate previous records"); + this->truncate_validate(); + + LOGINFO("Step 17: Validate if there are no rollback records"); + this->post_truncate_rollback_validate(); +} + TEST_F(LogStoreTest, BurstRandInsertThenTruncate) { const auto num_records = SISL_OPTIONS["num_records"].as< uint32_t >(); const auto iterations = SISL_OPTIONS["iterations"].as< uint32_t >(); @@ -890,7 +965,7 @@ TEST_F(LogStoreTest, BurstRandInsertThenTruncate) { this->dump_validate(num_records); LOGINFO("Step 4.2: Read some specific interval/filter of seq number in one logstore and dump it into json"); - this->dump_validate_filter(0, 10, 100, true); + this->dump_validate_filter(true); } LOGINFO("Step 5: Truncate all of the inserts one log store at a time and validate log dev truncation is marked " @@ -1142,68 +1217,6 @@ TEST_F(LogStoreTest, FlushSync) { #endif } -TEST_F(LogStoreTest, Rollback) { - LOGINFO("Step 1: Reinit the 500 records on a single logstore to start rollback test"); - this->init(500, {std::make_pair(1ull, 100)}); // Last entry = 500 - - LOGINFO("Step 2: Issue sequential inserts with q depth of 10"); - this->kickstart_inserts(1, 10); - - LOGINFO("Step 3: Wait for the Inserts to complete"); - this->wait_for_inserts(); - - LOGINFO("Step 4: Rollback last 50 entries and validate if pre-rollback entries are intact"); - this->rollback_validate(50); // Last entry = 450 - - LOGINFO("Step 5: Append 25 entries after rollback is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 475 - - LOGINFO("Step 7: Rollback again for 75 entries even before previous rollback entry"); - this->rollback_validate(75); // Last entry = 400 - - LOGINFO("Step 8: Append 25 entries after second rollback is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 425 - - LOGINFO("Step 9: Restart homestore and ensure all rollbacks are effectively validated"); - SampleDB::instance().start_homestore(true /* restart */); - this->recovery_validate(); - - LOGINFO("Step 10: Post recovery, append another 25 entries"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 5); - this->wait_for_inserts(); // Last entry = 450 - - LOGINFO("Step 11: Rollback again for 75 entries even before previous rollback entry"); - this->rollback_validate(75); // Last entry = 375 - - LOGINFO("Step 12: After 3rd rollback, append another 25 entries"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 5); - this->wait_for_inserts(); // Last entry = 400 - - LOGINFO("Step 13: Truncate all entries"); - this->truncate_validate(); - - LOGINFO("Step 14: Restart homestore and ensure all truncations after rollbacks are effectively validated"); - SampleDB::instance().start_homestore(true /* restart */); - this->recovery_validate(); - - LOGINFO("Step 15: Append 25 entries after truncation is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 425 - - LOGINFO("Step 16: Do another truncation to effectively truncate previous records"); - this->truncate_validate(); - - LOGINFO("Step 17: Validate if there are no rollback records"); - this->post_truncate_rollback_validate(); -} - TEST_F(LogStoreTest, DeleteMultipleLogStores) { const auto nrecords = (SISL_OPTIONS["num_records"].as< uint32_t >() * 5) / 100; @@ -1282,8 +1295,10 @@ TEST_F(LogStoreTest, WriteSyncThenRead) { SISL_OPTIONS_ENABLE(logging, test_log_store, iomgr, test_common_setup) SISL_OPTION_GROUP(test_log_store, - (num_logstores, "", "num_logstores", "number of log stores", + (num_logdevs, "", "num_logdevs", "number of log devs", ::cxxopts::value< uint32_t >()->default_value("4"), "number"), + (num_logstores, "", "num_logstores", "number of log stores", + ::cxxopts::value< uint32_t >()->default_value("16"), "number"), (num_records, "", "num_records", "number of record to test", ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), (iterations, "", "iterations", "Iterations", ::cxxopts::value< uint32_t >()->default_value("1"),