From 35b11dc537e2e4db0f5a4950658355b93803f921 Mon Sep 17 00:00:00 2001 From: Sanal P Date: Mon, 12 Feb 2024 17:15:20 -0700 Subject: [PATCH] Fix device manager load of chunks. Chunks getting duplciate start offset while recovery. Add or cleanup logs for debugging. Add test case. Add more logdev for logstore test. Enable logstore test. Move rollback test to new logdev test. --- .github/workflows/build_dependencies.yml | 7 + src/lib/device/device.h | 1 + src/lib/device/device_manager.cpp | 10 + src/lib/device/journal_vdev.cpp | 25 +- src/lib/device/journal_vdev.hpp | 9 +- src/lib/device/physical_dev.cpp | 10 +- src/lib/device/physical_dev.hpp | 1 + src/lib/logstore/log_dev.cpp | 41 +- src/lib/logstore/log_dev.hpp | 8 +- src/lib/logstore/log_group.cpp | 3 +- src/lib/logstore/log_store.cpp | 4 +- src/lib/logstore/log_store_service.cpp | 1 - src/lib/logstore/log_stream.cpp | 35 +- .../log_store/home_raft_log_store.cpp | 4 +- .../replication/repl_dev/raft_repl_dev.cpp | 8 +- src/tests/CMakeLists.txt | 8 +- src/tests/test_device_manager.cpp | 56 +++ src/tests/test_log_dev.cpp | 366 +++++++++++++----- src/tests/test_log_store.cpp | 100 ++--- 19 files changed, 464 insertions(+), 233 deletions(-) diff --git a/.github/workflows/build_dependencies.yml b/.github/workflows/build_dependencies.yml index e0cbf5fb7..1b6870641 100644 --- a/.github/workflows/build_dependencies.yml +++ b/.github/workflows/build_dependencies.yml @@ -206,6 +206,13 @@ jobs: fail_on_cache_miss: true if: ${{ inputs.testing == 'True' && github.event_name != 'pull_request' && steps.restore-cache.outputs.cache-hit != 'true' }} + - uses: actions/checkout@v3 + - name: Setup tmate session + uses: mxschmitt/action-tmate@v3 + with: + limit-access-to-actor: true + detached: true + - name: Create and Test Package run: | sanitize=$([[ "${{ inputs.tooling }}" == "Sanitize" ]] && echo "True" || echo "False") diff --git a/src/lib/device/device.h b/src/lib/device/device.h index cbcdde8ea..a69e740dd 100644 --- a/src/lib/device/device.h +++ b/src/lib/device/device.h @@ -173,6 +173,7 @@ class DeviceManager { std::vector< PhysicalDev* > get_pdevs_by_dev_type(HSDevType dtype) const; std::vector< shared< VirtualDev > > get_vdevs() const; + std::vector< shared< Chunk > > get_chunks() const; uint64_t total_capacity() const; uint64_t total_capacity(HSDevType dtype) const; diff --git a/src/lib/device/device_manager.cpp b/src/lib/device/device_manager.cpp index ad450910e..6c7c59ef1 100644 --- a/src/lib/device/device_manager.cpp +++ b/src/lib/device/device_manager.cpp @@ -543,6 +543,16 @@ std::vector< shared< VirtualDev > > DeviceManager::get_vdevs() const { return ret_v; } +std::vector< shared< Chunk > > DeviceManager::get_chunks() const { + std::unique_lock lg{m_vdev_mutex}; + std::vector< shared< Chunk > > res; + res.reserve(m_chunks.size()); + for (auto& chunk : m_chunks) { + if (chunk) res.push_back(chunk); + } + return res; +} + // Some of the hs_super_blk details uint64_t hs_super_blk::vdev_super_block_size() { return (hs_super_blk::MAX_VDEVS_IN_SYSTEM * vdev_info::size); } diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index 87eab5f72..db5e0a76c 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -231,7 +231,7 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) { // assert that returnning logical offset is in good range HS_DBG_ASSERT_LE(tail_off, m_end_offset); - LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} desc {}", to_hex(tail_off), to_string()); + LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} size {} desc {}", to_hex(tail_off), sz, to_string()); return tail_off; } @@ -443,6 +443,14 @@ off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const { return vdev_offset; } +void JournalVirtualDev::Descriptor::update_data_start_offset(off_t offset) { + m_data_start_offset = offset; + auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); + m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size; + LOGTRACEMOD(journalvdev, "Updated data start offset off 0x{} {}", to_hex(offset), to_string()); + RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch {}", to_string()); +} + off_t JournalVirtualDev::Descriptor::tail_offset(bool reserve_space_include) const { off_t tail = static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)); if (reserve_space_include) { tail += m_reserved_sz; } @@ -461,7 +469,7 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) { } lseek(tail); - LOGDEBUGMOD(journalvdev, "tail arg 0x{} desc {} ", to_hex(tail), to_string()); + LOGDEBUGMOD(journalvdev, "Updated tail offset arg 0x{} desc {} ", to_hex(tail), to_string()); HS_REL_ASSERT(tail_offset() == tail, "tail offset mismatch after calculation 0x{} : {}", tail_offset(), tail); } @@ -598,7 +606,7 @@ bool JournalVirtualDev::Descriptor::is_alloc_accross_chunk(size_t size) const { nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { nlohmann::json j; - j["logdev_id"] = m_logdev_id; + j["logdev"] = m_logdev_id; j["seek_cursor"] = m_seek_cursor; j["data_start_offset"] = m_data_start_offset; j["end_offset"] = m_end_offset; @@ -613,7 +621,7 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { nlohmann::json c; auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); c["chunk_id"] = chunk->chunk_id(); - c["logdev_id"] = private_data->logdev_id; + c["logdev"] = private_data->logdev_id; c["is_head"] = private_data->is_head; c["end_of_chunk"] = private_data->end_of_chunk; c["next_chunk"] = private_data->next_chunk; @@ -627,12 +635,13 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { } std::string JournalVirtualDev::Descriptor::to_string() const { - std::string str{fmt::format("id={};ds=0x{};end=0x{};writesz={};tail=0x{};" + off_t tail = + static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz; + std::string str{fmt::format("logdev={};ds=0x{};end=0x{};writesz={};tail=0x{};" "rsvdsz={};chunks={};trunc={};total={};seek=0x{} ", m_logdev_id, to_hex(m_data_start_offset), to_hex(m_end_offset), - m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail_offset()), - m_reserved_sz, m_journal_chunks.size(), m_truncate_done, m_total_size, - to_hex(m_seek_cursor))}; + m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail), m_reserved_sz, + m_journal_chunks.size(), m_truncate_done, m_total_size, to_hex(m_seek_cursor))}; return str; } diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index 13ec18c65..ad0cca4f2 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -233,12 +233,7 @@ class JournalVirtualDev : public VirtualDev { * * @param offset : the start logical offset to be persisted */ - void update_data_start_offset(off_t offset) { - m_data_start_offset = offset; - auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); - m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size; - RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch"); - } + void update_data_start_offset(off_t offset); /** * @brief : get the logical tail offset; @@ -309,6 +304,8 @@ class JournalVirtualDev : public VirtualDev { */ nlohmann::json get_status(int log_level) const; + logdev_id_t logdev_id() const { return m_logdev_id; } + std::string to_string() const; private: diff --git a/src/lib/device/physical_dev.cpp b/src/lib/device/physical_dev.cpp index fe44059de..8baae8ab0 100644 --- a/src/lib/device/physical_dev.cpp +++ b/src/lib/device/physical_dev.cpp @@ -247,12 +247,13 @@ std::vector< shared< Chunk > > PhysicalDev::create_chunks(const std::vector< uin auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot); ret_chunks.push_back(chunk); get_stream(chunk).m_chunks_map.insert(std::pair{chunk_ids[cit], std::move(chunk)}); - + HS_LOG(DEBUG, device, "Creating chunk {}", chunk->to_string()); cinfo->~chunk_info(); } m_chunk_info_slots->set_bits(b.start_bit, b.nbits); write_super_block(buf, chunk_info::size * b.nbits, chunk_info_offset_nth(b.start_bit)); + hs_utils::iobuf_free(buf, sisl::buftag::superblk); chunks_remaining -= b.nbits; @@ -296,6 +297,7 @@ shared< Chunk > PhysicalDev::create_chunk(uint32_t chunk_id, uint32_t vdev_id, u auto bitmap_mem = m_chunk_info_slots->serialize(m_pdev_info.dev_attr.align_size); write_super_block(bitmap_mem->cbytes(), bitmap_mem->size(), hs_super_blk::chunk_sb_offset()); + HS_LOG(TRACE, device, "Created chunk {}", chunk->to_string()); cinfo->~chunk_info(); hs_utils::iobuf_free(buf, sisl::buftag::superblk); @@ -360,6 +362,8 @@ void PhysicalDev::load_chunks(std::function< bool(cshared< Chunk >&) >&& chunk_f cinfo->checksum = info_crc; auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot); + m_chunk_data_area.insert( + ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size)); if (chunk_found_cb(chunk)) { get_stream(chunk).m_chunks_map.insert(std::pair{cinfo->chunk_id, chunk}); } } hs_utils::iobuf_free(buf, sisl::buftag::superblk); @@ -395,6 +399,7 @@ void PhysicalDev::do_remove_chunk(cshared< Chunk >& chunk) { get_stream(chunk).m_chunks_map.erase(chunk->chunk_id()); cinfo->~chunk_info(); hs_utils::iobuf_free(buf, sisl::buftag::superblk); + HS_LOG(TRACE, device, "Removed chunk {}", chunk->to_string()); } uint64_t PhysicalDev::chunk_info_offset_nth(uint32_t slot) const { @@ -416,11 +421,14 @@ void PhysicalDev::populate_chunk_info(chunk_info* cinfo, uint32_t vdev_id, uint6 cinfo->set_allocated(); cinfo->set_user_private(private_data); cinfo->compute_checksum(); + auto [_, inserted] = m_chunk_start.insert(cinfo->chunk_start_offset); + RELEASE_ASSERT(inserted, "Duplicate start offset {} for chunk {}", cinfo->chunk_start_offset, cinfo->chunk_id); } void PhysicalDev::free_chunk_info(chunk_info* cinfo) { auto ival = ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size); m_chunk_data_area.erase(ival); + m_chunk_start.erase(cinfo->chunk_start_offset); cinfo->set_free(); cinfo->checksum = 0; diff --git a/src/lib/device/physical_dev.hpp b/src/lib/device/physical_dev.hpp index 2987fb45e..41eb9221d 100644 --- a/src/lib/device/physical_dev.hpp +++ b/src/lib/device/physical_dev.hpp @@ -136,6 +136,7 @@ class PhysicalDev { ChunkIntervalSet m_chunk_data_area; // Range of chunks data area created std::unique_ptr< sisl::Bitset > m_chunk_info_slots; // Slots to write the chunk info uint32_t m_chunk_sb_size{0}; // Total size of the chunk sb at present + std::unordered_set< uint64_t > m_chunk_start; // Store and verify start offset of all chunks for debugging. public: PhysicalDev(const dev_info& dinfo, int oflags, const pdev_info_header& pinfo); diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 83c284c3e..310539070 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -258,11 +258,17 @@ log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_rec m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); auto* header = r_cast< const log_group_header* >(buf->cbytes()); - HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch!"); - HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch!"); - HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx"); - HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx, "log key offset does not match with log_idx"); - HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group"); + THIS_LOGDEV_LOG(TRACE, "Logdev read log group header {}", *header); + HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch! {} {}", + m_logdev_id, *header); + HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch! {} {}", + m_logdev_id, *header); + HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx {} }{}", m_logdev_id, + *header); + HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx, + "log key offset does not match with log_idx {} {}", m_logdev_id, *header); + HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group {} {}", + m_logdev_id, *header); // We can only do crc match in read if we have read all the blocks. We don't want to aggressively read more data // than we need to just to compare CRC for read operation. It can be done during recovery. @@ -337,7 +343,7 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) { } }); - lg->finish(get_prev_crc()); + lg->finish(m_logdev_id, get_prev_crc()); if (sisl_unlikely(flushing_upto_idx == -1)) { return nullptr; } lg->m_flush_log_idx_from = m_last_flush_idx + 1; lg->m_flush_log_idx_upto = flushing_upto_idx; @@ -346,7 +352,6 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) { HS_DBG_ASSERT_GT(lg->header()->oob_data_offset, 0); THIS_LOGDEV_LOG(DEBUG, "Flushing upto log_idx={}", flushing_upto_idx); - THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg); return lg; } @@ -408,7 +413,7 @@ bool LogDev::flush_if_needed(int64_t threshold_size) { lg->m_log_dev_offset = offset; HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full"); THIS_LOGDEV_LOG(TRACE, "Flush prepared, flushing data size={} at offset={}", lg->actual_data_size(), offset); - + THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg); do_flush(lg); return true; } else { @@ -540,14 +545,14 @@ void LogDev::unlock_flush(bool do_flush) { uint64_t LogDev::truncate(const logdev_key& key) { HS_DBG_ASSERT_GE(key.idx, m_last_truncate_idx); uint64_t const num_records_to_truncate = static_cast< uint64_t >(key.idx - m_last_truncate_idx); - LOGINFO("LogDev::truncate {}", num_records_to_truncate); + THIS_LOGDEV_LOG(INFO, "LogDev::truncate {}", num_records_to_truncate); if (num_records_to_truncate > 0) { HS_PERIODIC_LOG(INFO, logstore, "Truncating log device upto logdev {} log_id={} vdev_offset={} truncated {} log records", m_logdev_id, key.idx, key.dev_offset, num_records_to_truncate); m_log_records->truncate(key.idx); m_vdev_jd->truncate(key.dev_offset); - LOGINFO("LogDev::truncate {}", key.idx); + THIS_LOGDEV_LOG(INFO, "LogDev::truncate {}", key.idx); m_last_truncate_idx = key.idx; { @@ -574,11 +579,11 @@ uint64_t LogDev::truncate(const logdev_key& key) { // We can remove the rollback records of those upto which logid is getting truncated m_logdev_meta.remove_rollback_record_upto(key.idx, false /* persist_now */); - LOGINFO("LogDev::truncate remove rollback {}", key.idx); + THIS_LOGDEV_LOG(INFO, "LogDev::truncate remove rollback {}", key.idx); m_logdev_meta.persist(); #ifdef _PRERELEASE if (garbage_collect && iomgr_flip::instance()->test_flip("logdev_abort_after_garbage")) { - LOGINFO("logdev aborting after unreserving garbage ids"); + THIS_LOGDEV_LOG(INFO, "logdev aborting after unreserving garbage ids"); raise(SIGKILL); } #endif @@ -633,7 +638,7 @@ std::shared_ptr< HomeLogStore > LogDev::create_new_log_store(bool append_mode) { HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_logdev_id, store_id); m_id_logstore_map.insert(std::pair(store_id, logstore_info{.log_store = lstore, .append_mode = append_mode})); } - LOGINFO("Created log store id {}-{}", m_logdev_id, store_id); + LOGINFO("Created log store log_dev={} log_store={}", m_logdev_id, store_id); return lstore; } @@ -653,7 +658,7 @@ folly::Future< shared< HomeLogStore > > LogDev::open_log_store(logstore_id_t sto } void LogDev::remove_log_store(logstore_id_t store_id) { - LOGINFO("Removing log store id {}-{}", m_logdev_id, store_id); + LOGINFO("Removing log_dev={} log_store={}", m_logdev_id, store_id); { folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); auto ret = m_id_logstore_map.erase(store_id); @@ -695,8 +700,8 @@ void LogDev::on_log_store_found(logstore_id_t store_id, const logstore_superblk& return; } - LOGINFO("Found a logstore store_id={}-{} with start seq_num={}, Creating a new HomeLogStore instance", m_logdev_id, - store_id, sb.m_first_seq_num); + LOGINFO("Found a logstore log_dev={} log_store={} with start seq_num={}, Creating a new HomeLogStore instance", + m_logdev_id, store_id, sb.m_first_seq_num); logstore_info& info = it->second; info.log_store = std::make_shared< HomeLogStore >(shared_from_this(), store_id, info.append_mode, sb.m_first_seq_num); @@ -1055,7 +1060,7 @@ void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_n uint32_t n_removed{0}; for (auto i = m_rollback_sb->num_records; i > 0; --i) { auto& rec = m_rollback_sb->at(i - 1); - LOGINFO("Removing record sb {} {}", rec.idx_range.second, upto_id); + HS_LOG(TRACE, logstore, "Removing record sb {} {}", rec.idx_range.second, upto_id); if (rec.idx_range.second <= upto_id) { m_rollback_sb->remove_ith_record(i - 1); ++n_removed; @@ -1064,7 +1069,7 @@ void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_n if (n_removed) { for (auto it = m_rollback_info.begin(); it != m_rollback_info.end();) { - LOGINFO("Removing info {} {}", it->second.second, upto_id); + HS_LOG(TRACE, logstore, "Removing info {} {}", it->second.second, upto_id); if (it->second.second <= upto_id) { it = m_rollback_info.erase(it); } else { diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index bfb2afb1a..226194cc4 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -136,6 +136,7 @@ struct log_group_header { uint32_t footer_offset; // offset of where footer starts crc32_t prev_grp_crc; // Checksum of the previous group that was written crc32_t cur_grp_crc; // Checksum of the current group record + logdev_id_t logdev_id; // Logdev id log_group_header() : magic{LOG_GROUP_HDR_MAGIC}, version{header_version} {} log_group_header(const log_group_header&) = delete; @@ -195,9 +196,10 @@ struct fmt::formatter< homestore::log_group_header > { return fmt::format_to( ctx.out(), "magic = {} version={} n_log_records = {} start_log_idx = {} group_size = {} inline_data_offset = {} " - "oob_data_offset = {} prev_grp_crc = {} cur_grp_crc = {}", + "oob_data_offset = {} prev_grp_crc = {} cur_grp_crc = {} logdev = {}", header.magic, header.version, header.n_log_records, header.start_log_idx, header.group_size, - header.inline_data_offset, header.oob_data_offset, header.prev_grp_crc, header.cur_grp_crc); + header.inline_data_offset, header.oob_data_offset, header.prev_grp_crc, header.cur_grp_crc, + header.logdev_id); } }; @@ -253,7 +255,7 @@ class LogGroup { bool add_record(log_record& record, const int64_t log_idx); bool can_accomodate(const log_record& record) const { return (m_nrecords <= m_max_records); } - const iovec_array& finish(const crc32_t prev_crc); + const iovec_array& finish(logdev_id_t logdev_id, const crc32_t prev_crc); crc32_t compute_crc(); log_group_header* header() { return reinterpret_cast< log_group_header* >(m_cur_log_buf); } diff --git a/src/lib/logstore/log_group.cpp b/src/lib/logstore/log_group.cpp index 2f54da8d8..597d97849 100644 --- a/src/lib/logstore/log_group.cpp +++ b/src/lib/logstore/log_group.cpp @@ -117,13 +117,14 @@ bool LogGroup::new_iovec_for_footer() const { return ((m_inline_data_pos + sizeof(log_group_footer)) >= m_cur_buf_len || m_oob_data_pos != 0); } -const iovec_array& LogGroup::finish(const crc32_t prev_crc) { +const iovec_array& LogGroup::finish(logdev_id_t logdev_id, const crc32_t prev_crc) { // add footer auto footer = add_and_get_footer(); m_iovecs[0].iov_len = sisl::round_up(m_iovecs[0].iov_len, m_flush_multiple_size); log_group_header* hdr = new (header()) log_group_header{}; + hdr->logdev_id = logdev_id; hdr->n_log_records = m_nrecords; hdr->prev_grp_crc = prev_crc; hdr->inline_data_offset = sizeof(log_group_header) + (m_max_records * sizeof(serialized_log_record)); diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index 089bb3286..eff6c39e5 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -39,7 +39,7 @@ HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, b m_records{"HomeLogStoreRecords", start_lsn - 1}, m_append_mode{append_mode}, m_seq_num{start_lsn}, - m_fq_name{fmt::format("{}.{}", logdev->get_id(), id)}, + m_fq_name{fmt::format("{} logdev={}", id, logdev->get_id())}, m_metrics{logstore_service().metrics()} { m_truncation_barriers.reserve(10000); m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); @@ -121,7 +121,7 @@ log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) { // If seq_num has not been flushed yet, but issued, then we flush them before reading auto const s = m_records.status(seq_num); if (s.is_out_of_range || s.is_hole) { - // THIS_LOGSTORE_LOG(DEBUG, "ld_key not valid {}", seq_num); + THIS_LOGSTORE_LOG(ERROR, "ld_key not valid {}", seq_num); throw std::out_of_range("key not valid"); } else if (!s.is_completed) { THIS_LOGSTORE_LOG(TRACE, "Reading lsn={}:{} before flushed, doing flush first", m_store_id, seq_num); diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 90e35cf06..df33ebd90 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -115,7 +115,6 @@ logdev_id_t LogStoreService::create_new_logdev() { auto logdev = create_new_logdev_internal(logdev_id); logdev->start(true /* format */, m_logdev_vdev.get()); COUNTER_INCREMENT(m_metrics, logdevs_count, 1); - LOGINFO("Created log dev id {}", logdev_id); return logdev_id; } diff --git a/src/lib/logstore/log_stream.cpp b/src/lib/logstore/log_stream.cpp index afa7e67e9..ac2aa8647 100644 --- a/src/lib/logstore/log_stream.cpp +++ b/src/lib/logstore/log_stream.cpp @@ -44,7 +44,7 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { do { m_cur_log_buf = read_next_bytes(std::max(min_needed, bulk_read_size)); if (m_cur_log_buf.size() == 0) { - LOGINFOMOD(logstore, "Logdev data empty"); + LOGINFOMOD(logstore, "Logdev data empty logdev={}", m_vdev_jd->logdev_id()); return {}; } } while (m_cur_log_buf.size() < sizeof(log_group_header)); @@ -54,8 +54,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { HS_REL_ASSERT_GE(m_cur_log_buf.size(), m_read_size_multiple); const auto* header = r_cast< log_group_header const* >(m_cur_log_buf.bytes()); if (header->magic_word() != LOG_GROUP_HDR_MAGIC) { - LOGINFOMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of logdev", - m_vdev_jd->dev_offset(m_cur_read_bytes)); + LOGINFOMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of logdev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -65,8 +65,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { } if (header->total_size() > m_cur_log_buf.size()) { - LOGINFOMOD(logstore, "Logstream group size {} is more than available buffer size {}, reading from store", - header->total_size(), m_cur_log_buf.size()); + LOGINFOMOD(logstore, "Logstream group size {} is more than available buffer size {}, reading from logdev={}", + header->total_size(), m_cur_log_buf.size(), m_vdev_jd->logdev_id()); // Bigger group size than needed bytes, read again min_needed = sisl::round_up(header->total_size(), m_read_size_multiple); goto read_again; @@ -74,15 +74,16 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { LOGTRACEMOD(logstore, "Logstream read log group of size={} nrecords={} m_cur_log_dev_offset {} buf size " - "remaining {} ", - header->total_size(), header->nrecords(), m_vdev_jd->dev_offset(m_cur_read_bytes), - m_cur_log_buf.size()); + "remaining {} logdev={}", + header->total_size(), header->nrecords(), m_vdev_jd->dev_offset(m_cur_read_bytes), m_cur_log_buf.size(), + m_vdev_jd->logdev_id()); // compare it with prev crc if (m_prev_crc != 0 && m_prev_crc != header->prev_grp_crc) { // we reached at the end - LOGINFOMOD(logstore, "we have reached the end. crc doesn't match with the prev crc {}", - m_vdev_jd->dev_offset(m_cur_read_bytes)); + LOGINFOMOD(logstore, + "we have reached the end. crc doesn't match offset {} prev crc {} header prev crc {} logdev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), header->prev_grp_crc, m_prev_crc, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -95,8 +96,9 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { const auto* footer = r_cast< log_group_footer* >((uint64_t)m_cur_log_buf.bytes() + header->footer_offset); if (footer->magic != LOG_GROUP_FOOTER_MAGIC || footer->start_log_idx != header->start_log_idx) { LOGINFOMOD(logstore, - "last write is not completely written. footer magic {} footer start_log_idx {} header log indx {}", - footer->magic, footer->start_log_idx, header->start_log_idx); + "last write is not completely written. footer magic {} footer start_log_idx {} header log indx {} " + "logdev={}", + footer->magic, footer->start_log_idx, header->start_log_idx, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -112,8 +114,9 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { (header->total_size() - sizeof(log_group_header))); if (cur_crc != header->cur_grp_crc) { /* This is a valid entry so crc should match */ - HS_REL_ASSERT(0, "data is corrupted"); - LOGINFOMOD(logstore, "crc doesn't match {}", m_vdev_jd->dev_offset(m_cur_read_bytes)); + HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); + LOGINFOMOD(logstore, "crc doesn't match {} logdev={}", m_vdev_jd->dev_offset(m_cur_read_bytes), + m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -149,8 +152,8 @@ sisl::byte_view log_stream_reader::read_next_bytes(uint64_t nbytes) { auto sz_read = m_vdev_jd->sync_next_read(out_buf->bytes() + m_cur_log_buf.size(), nbytes); if (sz_read == 0) { return {}; } - LOGINFOMOD(logstore, "LogStream read {} bytes from vdev offset {} and vdev cur offset {}", nbytes, prev_pos, - m_vdev_jd->seeked_pos()); + LOGINFOMOD(logstore, "LogStream read {} bytes from vdev offset {} and vdev cur offset {} lgodev={}", nbytes, + prev_pos, m_vdev_jd->seeked_pos(), m_vdev_jd->logdev_id()); return sisl::byte_view{out_buf}; } } // namespace homestore diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index 7444169ee..883ffa46d 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -64,11 +64,11 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore m_log_store = logstore_service().create_new_log_store(m_logdev_id, true); if (!m_log_store) { throw std::runtime_error("Failed to create log store"); } m_logstore_id = m_log_store->get_store_id(); - LOGDEBUGMOD(replication, "Opened new home log dev = {} store id={}", m_logdev_id, m_logstore_id); + LOGDEBUGMOD(replication, "Opened new home logdev={} logstore={}", m_logdev_id, m_logstore_id); } else { m_logdev_id = logdev_id; m_logstore_id = logstore_id; - LOGDEBUGMOD(replication, "Opening existing home log dev = {} store id={}", m_logdev_id, logstore_id); + LOGDEBUGMOD(replication, "Opening existing home logdev={} logstore={}", m_logdev_id, logstore_id); logstore_service().open_logdev(m_logdev_id); logstore_service().open_log_store(m_logdev_id, logstore_id, true).thenValue([this](auto log_store) { m_log_store = std::move(log_store); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index b7cc53b4f..e9d378ebc 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -67,9 +67,11 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk m_rd_sb.write(); } - RD_LOG(INFO, "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={}", + RD_LOG(INFO, + "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={} " + "log_dev={} log_store={}", (load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id, - m_commit_upto_lsn.load(), m_next_dsn.load()); + m_commit_upto_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id); m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, bind_this(RaftReplDev::on_push_data_received, 1)); m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 1)); @@ -421,7 +423,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre } void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { - std::vector<::flatbuffers::Offset< RequestEntry > > entries; + std::vector< ::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs->size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index e5f4ea0b2..e3b40fc90 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -85,6 +85,10 @@ if (${io_tests}) target_sources(test_log_store PRIVATE test_log_store.cpp) target_link_libraries(test_log_store hs_logdev homestore ${COMMON_TEST_DEPS} ) + add_executable(test_log_dev) + target_sources(test_log_dev PRIVATE test_log_dev.cpp) + target_link_libraries(test_log_dev hs_logdev homestore ${COMMON_TEST_DEPS} ) + set(TEST_METABLK_SOURCE_FILES test_meta_blk_mgr.cpp) add_executable(test_meta_blk_mgr ${TEST_METABLK_SOURCE_FILES}) target_link_libraries(test_meta_blk_mgr homestore ${COMMON_TEST_DEPS} GTest::gmock) @@ -108,7 +112,8 @@ if (${io_tests}) can_build_epoll_io_tests(epoll_tests) if(${epoll_tests}) - # add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) + add_test(NAME LogDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_dev) + add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service) @@ -120,6 +125,7 @@ if (${io_tests}) can_build_spdk_io_tests(spdk_tests) if(${spdk_tests}) add_test(NAME LogStore-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store -- --spdk "true") + add_test(NAME LogDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_dev -- --spdk "true") add_test(NAME MetaBlkMgr-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr -- --spdk "true") add_test(NAME DataSerice-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service -- --spdk "true") add_test(NAME SoloReplDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev -- --spdk "true") diff --git a/src/tests/test_device_manager.cpp b/src/tests/test_device_manager.cpp index 13bef9d6b..6e1cdcfcc 100644 --- a/src/tests/test_device_manager.cpp +++ b/src/tests/test_device_manager.cpp @@ -174,6 +174,62 @@ TEST_F(DeviceMgrTest, StripedVDevCreation) { this->validate_striped_vdevs(); } +TEST_F(DeviceMgrTest, CreateChunk) { + // Create dynamically chunks and verify no two chunks ahve same start offset. + uint64_t avail_size{0}; + for (auto& pdev : m_pdevs) { + avail_size += pdev->data_size(); + } + + LOGINFO("Step 1: Creating test_vdev with size={}", in_bytes(avail_size)); + auto vdev = + m_dmgr->create_vdev(homestore::vdev_parameters{.vdev_name = "test_vdev", + .size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC, + .vdev_size = avail_size, + .num_chunks = 0, + .blk_size = 512, + .chunk_size = 512 * 1024, + .dev_type = HSDevType::Data, + .alloc_type = blk_allocator_type_t::none, + .chunk_sel_type = chunk_selector_type_t::NONE, + .multi_pdev_opts = vdev_multi_pdev_opts_t::ALL_PDEV_STRIPED, + .context_data = sisl::blob{}}); + + auto num_chunks = 10; + LOGINFO("Step 2: Creating {} chunks", num_chunks); + std::unordered_map< uint32_t, chunk_info > chunk_info_map; + std::unordered_set< uint64_t > chunk_start; + + for (int i = 0; i < num_chunks; i++) { + auto chunk = m_dmgr->create_chunk(HSDevType::Data, vdev->info().vdev_id, 512 * 1024, {}); + chunk_info_map[chunk->chunk_id()] = chunk->info(); + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + + LOGINFO("Step 3: Restarting homestore"); + this->restart(); + + LOGINFO("Step 4: Creating additional {} chunks", num_chunks); + for (int i = 0; i < num_chunks; i++) { + auto chunk = m_dmgr->create_chunk(HSDevType::Data, vdev->info().vdev_id, 512 * 1024, {}); + chunk_info_map[chunk->chunk_id()] = chunk->info(); + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + + chunk_start.clear(); + auto chunk_vec = m_dmgr->get_chunks(); + ASSERT_EQ(chunk_vec.size(), num_chunks * 2); + for (const auto& chunk : chunk_vec) { + ASSERT_EQ(chunk->info().chunk_start_offset, chunk_info_map[chunk->chunk_id()].chunk_start_offset) + << "Chunk offsets mismatch"; + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + vdev.reset(); +} + int main(int argc, char* argv[]) { SISL_OPTIONS_LOAD(argc, argv, logging, test_device_manager, iomgr); ::testing::InitGoogleTest(&argc, argv); diff --git a/src/tests/test_log_dev.cpp b/src/tests/test_log_dev.cpp index b41a5f9a5..d7e716b6f 100644 --- a/src/tests/test_log_dev.cpp +++ b/src/tests/test_log_dev.cpp @@ -13,135 +13,309 @@ * specific language governing permissions and limitations under the License. * *********************************************************************************/ -#include + +#include #include #include -#include -#include #include #include -#include +#include +#include +#include +#include #include +#include +#include #include #include +#include +#include +#include +#include "common/homestore_utils.hpp" +#include "common/homestore_assert.hpp" #include "logstore/log_dev.hpp" #include "test_common/homestore_test_common.hpp" using namespace homestore; + RCU_REGISTER_INIT SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) +SISL_OPTIONS_ENABLE(logging, test_log_dev, iomgr, test_common_setup) +SISL_LOGGING_DECL(test_log_dev) -std::vector< logdev_key > s_logdev_keys; -static uint64_t first_offset{~static_cast< uint64_t >(0)}; +std::vector< std::string > test_common::HSTestHelper::s_dev_names; -static void on_append_completion(const logdev_key lkey, void* const ctx) { - s_logdev_keys.push_back(lkey); - LOGINFO("Append completed with log_idx = {} offset = {}", lkey.idx, lkey.dev_offset); - if (first_offset == ~static_cast< uint64_t >(0)) { first_offset = lkey.dev_offset; } -} +struct test_log_data { + test_log_data() = default; + test_log_data(const test_log_data&) = delete; + test_log_data(test_log_data&&) noexcept = delete; + test_log_data& operator=(const test_log_data&) = delete; + test_log_data& operator=(test_log_data&&) noexcept = delete; + ~test_log_data() = default; -static void on_log_found(const logdev_key lkey, const log_buffer buf) { - s_logdev_keys.push_back(lkey); - LOGINFO("Found a log with log_idx = {} offset = {}", lkey.idx, lkey.dev_offset); -} + uint32_t size; -[[nodiscard]] static std::shared_ptr< iomgr::ioMgr > start_homestore(const uint32_t ndevices, const uint64_t dev_size, - const uint32_t nthreads) { - std::vector< dev_info > device_info; - // these should be static so that they stay in scope in the lambda in case function ends before lambda completes - static std::mutex start_mutex; - static std::condition_variable cv; - static bool inited; - - inited = false; - LOGINFO("creating {} device files with each of size {} ", ndevices, dev_size); - for (uint32_t i{0}; i < ndevices; ++i) { - const std::filesystem::path fpath{"/tmp/" + std::to_string(i + 1)}; - std::ofstream ofs{fpath.string(), std::ios::binary | std::ios::out}; - std::filesystem::resize_file(fpath, dev_size); - device_info.emplace_back(std::filesystem::canonical(fpath).string(), HSDevType::Data); + uint8_t* get_data() { return uintptr_cast(this) + sizeof(test_log_data); }; + uint8_t const* get_data_const() const { return r_cast< uint8_t const* >(this) + sizeof(test_log_data); } + const uint8_t* get_data() const { return r_cast< const uint8_t* >(this) + sizeof(test_log_data); } + uint32_t total_size() const { return sizeof(test_log_data) + size; } + std::string get_data_str() const { + return std::string(r_cast< const char* >(get_data_const()), static_cast< size_t >(size)); } +}; - LOGINFO("Creating iomgr with {} threads", nthreads); - auto iomgr_obj{std::make_shared< iomgr::ioMgr >(2, nthreads)}; - - const uint64_t cache_size{((ndevices * dev_size) * 10) / 100}; - LOGINFO("Initialize and start HomeBlks with cache_size = {}", cache_size); - - boost::uuids::string_generator gen; - init_params params; - params.open_flags = homestore::io_flag::DIRECT_IO; - params.min_virtual_page_size = 4096; - params.cache_size = cache_size; - params.devices = device_info; - params.iomgr = iomgr_obj; - params.init_done_cb = [&iomgr_obj, &tl_start_mutex = start_mutex, &tl_cv = cv, - &tl_inited = inited](std::error_condition err, const out_params& params) { - iomgr_obj->start(); - LOGINFO("HomeBlks Init completed"); - { - std::unique_lock< std::mutex > lk{tl_start_mutex}; - tl_inited = true; - } - tl_cv.notify_one(); - }; - params.vol_mounted_cb = [](const VolumePtr& vol_obj, vol_state state) {}; - params.vol_state_change_cb = [](const VolumePtr& vol, vol_state old_state, vol_state new_state) {}; - params.vol_found_cb = [](boost::uuids::uuid uuid) -> bool { return true; }; +class LogDevTest : public ::testing::Test { +public: + const std::map< uint32_t, test_common::HSTestHelper::test_params > svc_params = {}; + static constexpr uint32_t max_data_size = 1024; + uint64_t s_max_flush_multiple = 0; - test_common::set_random_http_port(); - VolInterface::init(params); + virtual void SetUp() override { start_homestore(); } - { - std::unique_lock< std::mutex > lk{start_mutex}; - cv.wait(lk, [] { return inited; }); + void start_homestore(bool restart = false, hs_before_services_starting_cb_t starting_cb = nullptr) { + auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >(); + auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024; + test_common::HSTestHelper::start_homestore("test_journal_vdev", + { + {HS_SERVICE::META, {.size_pct = 15.0}}, + {HS_SERVICE::LOG, + {.size_pct = 50.0, + .chunk_size = 32 * 1024 * 1024, + .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, + }, + starting_cb, restart); } - return iomgr_obj; -} -SISL_OPTIONS_ENABLE(logging, test_log_dev) -SISL_OPTION_GROUP(test_log_dev, - (num_threads, "", "num_threads", "number of threads", - ::cxxopts::value< uint32_t >()->default_value("2"), "number"), - (num_devs, "", "num_devs", "number of devices to create", - ::cxxopts::value< uint32_t >()->default_value("2"), "number"), - (dev_size_mb, "", "dev_size_mb", "size of each device in MB", - ::cxxopts::value< uint64_t >()->default_value("5120"), "number")); + virtual void TearDown() override { test_common::HSTestHelper::shutdown_homestore(); } -int main(int argc, char* argv[]) { - SISL_OPTIONS_LOAD(argc, argv, logging, test_log_dev); - sisl::logging::SetLogger("test_log_dev"); - spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v"); + test_log_data* prepare_data(const logstore_seq_num_t lsn, bool& io_memory) { + static thread_local std::random_device rd{}; + static thread_local std::default_random_engine re{rd()}; + uint32_t sz{0}; + uint8_t* raw_buf{nullptr}; + + // Generate buffer of random size and fill with specific data + std::uniform_int_distribution< uint8_t > gen_percentage{0, 99}; + std::uniform_int_distribution< uint32_t > gen_data_size{0, max_data_size - 1}; + if (gen_percentage(re) < static_cast< uint8_t >(10)) { + // 10% of data is dma'ble aligned boundary + const auto alloc_sz = sisl::round_up(gen_data_size(re) + sizeof(test_log_data), s_max_flush_multiple); + raw_buf = iomanager.iobuf_alloc(dma_address_boundary, alloc_sz); + sz = alloc_sz - sizeof(test_log_data); + io_memory = true; + } else { + sz = gen_data_size(re); + raw_buf = static_cast< uint8_t* >(std::malloc(sizeof(test_log_data) + sz)); + io_memory = false; + } - auto iomgr_obj{start_homestore(SISL_OPTIONS["num_devs"].as< uint32_t >(), - SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024, - SISL_OPTIONS["num_threads"].as< uint32_t >())}; + test_log_data* d = new (raw_buf) test_log_data(); + d->size = sz; - std::array< std::string, 1024 > s; - auto ld{LogDev::instance()}; - ld->register_append_cb(on_append_completion); - ld->register_logfound_cb(on_log_found); + assert(uintptr_cast(d) == raw_buf); - for ( - size_t i{0}; (i < std::min< size_t >(195, s.size()); ++i) { - s[i] = std::to_string(i); - ld->append_async(0, 0, {reinterpret_cast< const uint8_t* >(s[i].c_str()), s[i].size() + 1}, nullptr); + const char c = static_cast< char >((lsn % 94) + 33); + std::memset(voidptr_cast(d->get_data()), c, static_cast< size_t >(d->size)); + return d; } - size_t i{0}; - for (const auto& lk : s_logdev_keys) { - const auto b{ld->read(lk)}; - const auto exp_val{std::to_string(i)}; - const auto actual_val{reinterpret_cast< const char* >(b.data()), static_cast< size_t >(b.size())}; - if (actual_val != exp_val) { - LOGERROR("Error in reading value for log_idx {} actual_val={} expected_val={}", i, actual_val, exp_val); + void validate_data(std::shared_ptr< HomeLogStore > log_store, const test_log_data* d, + const logstore_seq_num_t lsn) { + const char c = static_cast< char >((lsn % 94) + 33); + const std::string actual = d->get_data_str(); + const std::string expected(static_cast< size_t >(d->size), + c); // needs to be () because of same reason as vector + ASSERT_EQ(actual, expected) << "Data mismatch for LSN=" << log_store->get_store_id() << ":" << lsn + << " size=" << d->size; + } + + void insert_sync(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t lsn) { + bool io_memory{false}; + auto* d = prepare_data(lsn, io_memory); + const bool succ = log_store->write_sync(lsn, {uintptr_cast(d), d->total_size(), false}); + EXPECT_TRUE(succ); + LOGINFO("Written sync data for LSN -> {}:{}", log_store->get_store_id(), lsn); + if (io_memory) { + iomanager.iobuf_free(uintptr_cast(d)); } else { - LOGINFO("Read value {} for log_idx {}", actual_val, i); + std::free(voidptr_cast(d)); + } + } + + void kickstart_inserts(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t& cur_lsn, int64_t batch) { + auto last = cur_lsn + batch; + for (; cur_lsn < last; cur_lsn++) { + insert_sync(log_store, cur_lsn); + } + } + + void read_verify(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t lsn) { + auto b = log_store->read_sync(lsn); + auto* d = r_cast< test_log_data const* >(b.bytes()); + ASSERT_EQ(d->total_size(), b.size()) << "Size Mismatch for lsn=" << log_store->get_store_id() << ":" << lsn; + validate_data(log_store, d, lsn); + } + + void read_all_verify(std::shared_ptr< HomeLogStore > log_store) { + const auto trunc_upto = log_store->truncated_upto(); + const auto upto = log_store->get_contiguous_completed_seq_num(-1); + + for (std::remove_const_t< decltype(trunc_upto) > i{0}; i <= trunc_upto; ++i) { + ASSERT_THROW(log_store->read_sync(i), std::out_of_range) + << "Expected std::out_of_range exception for lsn=" << log_store->get_store_id() << ":" << i + << " but not thrown"; + } + + for (auto lsn = trunc_upto + 1; lsn < upto; lsn++) { + read_verify(log_store, lsn); + } + } + + void rollback_validate(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t& cur_lsn, + uint32_t num_lsns_to_rollback) { + std::mutex mtx; + std::condition_variable cv; + bool rollback_done = false; + cur_lsn -= num_lsns_to_rollback; + auto const upto_lsn = cur_lsn - 1; + log_store->rollback_async(upto_lsn, [&](logstore_seq_num_t) { + ASSERT_EQ(log_store->get_contiguous_completed_seq_num(-1), upto_lsn) + << "Last completed seq num is not reset after rollback"; + ASSERT_EQ(log_store->get_contiguous_issued_seq_num(-1), upto_lsn) + << "Last issued seq num is not reset after rollback"; + read_all_verify(log_store); + { + std::unique_lock lock(mtx); + rollback_done = true; + } + cv.notify_one(); + }); + + // We wait till async rollback is finished as we do validation. + std::unique_lock lock(mtx); + cv.wait(lock, [&rollback_done] { return rollback_done == true; }); + } + + void truncate_validate(std::shared_ptr< HomeLogStore > log_store) { + auto upto = log_store->get_contiguous_completed_seq_num(-1); + LOGINFO("truncate_validate upto {}", upto); + log_store->truncate(upto); + read_all_verify(log_store); + logstore_service().device_truncate(nullptr /* cb */, true /* wait_till_done */); + } + + void rollback_records_validate(std::shared_ptr< HomeLogStore > log_store, uint32_t expected_count) { + auto actual_count = log_store->get_logdev()->log_dev_meta().num_rollback_records(log_store->get_store_id()); + ASSERT_EQ(actual_count, expected_count); + } +}; + +TEST_F(LogDevTest, WriteSyncThenRead) { + const auto iterations = SISL_OPTIONS["iterations"].as< uint32_t >(); + + for (uint32_t iteration{0}; iteration < iterations; ++iteration) { + LOGINFO("Iteration {}", iteration); + auto logdev_id = logstore_service().create_new_logdev(); + s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + auto log_store = logstore_service().create_new_log_store(logdev_id, false); + const auto store_id = log_store->get_store_id(); + LOGINFO("Created new log store -> id {}", store_id); + const unsigned count{10}; + for (unsigned i{0}; i < count; ++i) { + // Insert new entry. + insert_sync(log_store, i); + // Verify the entry. + read_verify(log_store, i); } - ++i; + + logstore_service().remove_log_store(logdev_id, store_id); + LOGINFO("Remove logstore -> i {}", store_id); } +} + +TEST_F(LogDevTest, Rollback) { + LOGINFO("Step 1: Create a single logstore to start rollback test"); + auto logdev_id = logstore_service().create_new_logdev(); + s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + auto log_store = logstore_service().create_new_log_store(logdev_id, false); + auto store_id = log_store->get_store_id(); + + auto restart = [&]() { + std::promise< bool > p; + auto starting_cb = [&]() { + logstore_service().open_logdev(logdev_id); + logstore_service().open_log_store(logdev_id, store_id, false /* append_mode */).thenValue([&](auto store) { + log_store = store; + p.set_value(true); + }); + }; + start_homestore(true /* retart*/, starting_cb); + p.get_future().get(); + }; + + LOGINFO("Step 2: Issue sequential inserts with q depth of 10"); + logstore_seq_num_t cur_lsn = 0; + kickstart_inserts(log_store, cur_lsn, 500); + + LOGINFO("Step 3: Rollback last 50 entries and validate if pre-rollback entries are intact"); + rollback_validate(log_store, cur_lsn, 50); // Last entry = 450 + + LOGINFO("Step 4: Append 25 entries after rollback is completed"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 475 + + LOGINFO("Step 5: Rollback again for 75 entries even before previous rollback entry"); + rollback_validate(log_store, cur_lsn, 75); // Last entry = 400 + + LOGINFO("Step 6: Append 25 entries after second rollback is completed"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 425 + + LOGINFO("Step 7: Restart homestore and ensure all rollbacks are effectively validated"); + restart(); + + LOGINFO("Step 8: Post recovery, append another 25 entries"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 450 + + LOGINFO("Step 9: Rollback again for 75 entries even before previous rollback entry"); + rollback_validate(log_store, cur_lsn, 75); // Last entry = 375 + + LOGINFO("Step 10: After 3rd rollback, append another 25 entries"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 400 + + LOGINFO("Step 11: Truncate all entries"); + truncate_validate(log_store); + + LOGINFO("Step 12: Restart homestore and ensure all truncations after rollbacks are effectively validated"); + restart(); + + LOGINFO("Step 13: Append 25 entries after truncation is completed"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 425 + + LOGINFO("Step 14: Do another truncation to effectively truncate previous records"); + truncate_validate(log_store); + + LOGINFO("Step 15: Validate if there are no rollback records"); + rollback_records_validate(log_store, 0 /* expected_count */); +} + +SISL_OPTION_GROUP(test_log_dev, + (num_logdevs, "", "num_logdevs", "number of log devs", + ::cxxopts::value< uint32_t >()->default_value("4"), "number"), + (num_logstores, "", "num_logstores", "number of log stores", + ::cxxopts::value< uint32_t >()->default_value("16"), "number"), + (num_records, "", "num_records", "number of record to test", + ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), + (iterations, "", "iterations", "Iterations", ::cxxopts::value< uint32_t >()->default_value("1"), + "the number of iterations to run each test")); + +int main(int argc, char* argv[]) { + int parsed_argc = argc; + ::testing::InitGoogleTest(&parsed_argc, argv); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_log_dev, iomgr, test_common_setup); + sisl::logging::SetLogger("test_log_dev"); + spdlog::set_pattern("[%D %T%z] [%^%l%$] [%t] %v"); - ld->load(first_offset); + const int ret = RUN_ALL_TESTS(); + return ret; } diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 262270c88..1880e9043 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -438,7 +438,13 @@ class SampleDB { } void start_homestore(bool restart = false) { + auto n_log_devs = SISL_OPTIONS["num_logdevs"].as< uint32_t >(); auto n_log_stores = SISL_OPTIONS["num_logstores"].as< uint32_t >(); + if (n_log_devs < 1u) { + LOGINFO("Log store test needs minimum 1 log dev for testing, setting them to 1"); + n_log_devs = 1u; + } + if (n_log_stores < 4u) { LOGINFO("Log store test needs minimum 4 log stores for testing, setting them to 4"); n_log_stores = 4u; @@ -462,13 +468,18 @@ class SampleDB { restart); if (!restart) { - auto logdev_id = logstore_service().create_new_logdev(); + std::vector< logdev_id_t > logdev_id_vec; + for (uint32_t i{0}; i < n_log_devs; ++i) { + logdev_id_vec.push_back(logstore_service().create_new_logdev()); + } + for (uint32_t i{0}; i < n_log_stores; ++i) { + auto logdev_id = logdev_id_vec[rand() % logdev_id_vec.size()]; m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( logdev_id, bind_this(SampleDB::on_log_insert_completion, 3))); } SampleLogStoreClient::s_max_flush_multiple = - logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + logstore_service().get_logdev(logdev_id_vec[0])->get_flush_size_multiple(); } } @@ -619,14 +630,13 @@ class LogStoreTest : public ::testing::Test { EXPECT_EQ(expected_num_records, rec_count); } - void dump_validate_filter(logstore_id_t id, logstore_seq_num_t start_seq, logstore_seq_num_t end_seq, - bool print_content = false) { + void dump_validate_filter(bool print_content = false) { for (const auto& lsc : SampleDB::instance().m_log_store_clients) { - if (lsc->m_log_store->get_store_id() != id) { continue; } - - log_dump_req dump_req; + logstore_id_t id = lsc->m_log_store->get_store_id(); const auto fid = lsc->m_logdev_id; - + logstore_seq_num_t start_seq = lsc->m_log_store->truncated_upto() + 1; + logstore_seq_num_t end_seq = lsc->m_log_store->get_contiguous_completed_seq_num(-1); + log_dump_req dump_req; if (print_content) dump_req.verbosity_level = log_dump_verbosity::CONTENT; dump_req.log_store = lsc->m_log_store; dump_req.start_seq_num = start_seq; @@ -648,9 +658,8 @@ class LogStoreTest : public ::testing::Test { } else { EXPECT_FALSE(true); } - - return; } + return; } int find_garbage_upto(logdev_id_t logdev_id, logid_t idx) { @@ -791,7 +800,8 @@ class LogStoreTest : public ::testing::Test { } } ASSERT_EQ(actual_valid_ids, SampleDB::instance().m_log_store_clients.size()); - ASSERT_EQ(actual_garbage_ids, exp_garbage_store_count); + // Becasue we randomly assign logstore to logdev, some logdev will be empty. + // ASSERT_EQ(actual_garbage_ids, exp_garbage_store_count); } void delete_validate(uint32_t idx) { @@ -890,7 +900,7 @@ TEST_F(LogStoreTest, BurstRandInsertThenTruncate) { this->dump_validate(num_records); LOGINFO("Step 4.2: Read some specific interval/filter of seq number in one logstore and dump it into json"); - this->dump_validate_filter(0, 10, 100, true); + this->dump_validate_filter(true); } LOGINFO("Step 5: Truncate all of the inserts one log store at a time and validate log dev truncation is marked " @@ -1142,68 +1152,6 @@ TEST_F(LogStoreTest, FlushSync) { #endif } -TEST_F(LogStoreTest, Rollback) { - LOGINFO("Step 1: Reinit the 500 records on a single logstore to start rollback test"); - this->init(500, {std::make_pair(1ull, 100)}); // Last entry = 500 - - LOGINFO("Step 2: Issue sequential inserts with q depth of 10"); - this->kickstart_inserts(1, 10); - - LOGINFO("Step 3: Wait for the Inserts to complete"); - this->wait_for_inserts(); - - LOGINFO("Step 4: Rollback last 50 entries and validate if pre-rollback entries are intact"); - this->rollback_validate(50); // Last entry = 450 - - LOGINFO("Step 5: Append 25 entries after rollback is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 475 - - LOGINFO("Step 7: Rollback again for 75 entries even before previous rollback entry"); - this->rollback_validate(75); // Last entry = 400 - - LOGINFO("Step 8: Append 25 entries after second rollback is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 425 - - LOGINFO("Step 9: Restart homestore and ensure all rollbacks are effectively validated"); - SampleDB::instance().start_homestore(true /* restart */); - this->recovery_validate(); - - LOGINFO("Step 10: Post recovery, append another 25 entries"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 5); - this->wait_for_inserts(); // Last entry = 450 - - LOGINFO("Step 11: Rollback again for 75 entries even before previous rollback entry"); - this->rollback_validate(75); // Last entry = 375 - - LOGINFO("Step 12: After 3rd rollback, append another 25 entries"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 5); - this->wait_for_inserts(); // Last entry = 400 - - LOGINFO("Step 13: Truncate all entries"); - this->truncate_validate(); - - LOGINFO("Step 14: Restart homestore and ensure all truncations after rollbacks are effectively validated"); - SampleDB::instance().start_homestore(true /* restart */); - this->recovery_validate(); - - LOGINFO("Step 15: Append 25 entries after truncation is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 425 - - LOGINFO("Step 16: Do another truncation to effectively truncate previous records"); - this->truncate_validate(); - - LOGINFO("Step 17: Validate if there are no rollback records"); - this->post_truncate_rollback_validate(); -} - TEST_F(LogStoreTest, DeleteMultipleLogStores) { const auto nrecords = (SISL_OPTIONS["num_records"].as< uint32_t >() * 5) / 100; @@ -1282,8 +1230,10 @@ TEST_F(LogStoreTest, WriteSyncThenRead) { SISL_OPTIONS_ENABLE(logging, test_log_store, iomgr, test_common_setup) SISL_OPTION_GROUP(test_log_store, - (num_logstores, "", "num_logstores", "number of log stores", + (num_logdevs, "", "num_logdevs", "number of log devs", ::cxxopts::value< uint32_t >()->default_value("4"), "number"), + (num_logstores, "", "num_logstores", "number of log stores", + ::cxxopts::value< uint32_t >()->default_value("16"), "number"), (num_records, "", "num_records", "number of record to test", ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), (iterations, "", "iterations", "Iterations", ::cxxopts::value< uint32_t >()->default_value("1"),