diff --git a/.github/workflows/build_dependencies.yml b/.github/workflows/build_dependencies.yml index e0cbf5fb7..1b6870641 100644 --- a/.github/workflows/build_dependencies.yml +++ b/.github/workflows/build_dependencies.yml @@ -206,6 +206,13 @@ jobs: fail_on_cache_miss: true if: ${{ inputs.testing == 'True' && github.event_name != 'pull_request' && steps.restore-cache.outputs.cache-hit != 'true' }} + - uses: actions/checkout@v3 + - name: Setup tmate session + uses: mxschmitt/action-tmate@v3 + with: + limit-access-to-actor: true + detached: true + - name: Create and Test Package run: | sanitize=$([[ "${{ inputs.tooling }}" == "Sanitize" ]] && echo "True" || echo "False") diff --git a/src/lib/device/device.h b/src/lib/device/device.h index cbcdde8ea..a69e740dd 100644 --- a/src/lib/device/device.h +++ b/src/lib/device/device.h @@ -173,6 +173,7 @@ class DeviceManager { std::vector< PhysicalDev* > get_pdevs_by_dev_type(HSDevType dtype) const; std::vector< shared< VirtualDev > > get_vdevs() const; + std::vector< shared< Chunk > > get_chunks() const; uint64_t total_capacity() const; uint64_t total_capacity(HSDevType dtype) const; diff --git a/src/lib/device/device_manager.cpp b/src/lib/device/device_manager.cpp index ad450910e..6c7c59ef1 100644 --- a/src/lib/device/device_manager.cpp +++ b/src/lib/device/device_manager.cpp @@ -543,6 +543,16 @@ std::vector< shared< VirtualDev > > DeviceManager::get_vdevs() const { return ret_v; } +std::vector< shared< Chunk > > DeviceManager::get_chunks() const { + std::unique_lock lg{m_vdev_mutex}; + std::vector< shared< Chunk > > res; + res.reserve(m_chunks.size()); + for (auto& chunk : m_chunks) { + if (chunk) res.push_back(chunk); + } + return res; +} + // Some of the hs_super_blk details uint64_t hs_super_blk::vdev_super_block_size() { return (hs_super_blk::MAX_VDEVS_IN_SYSTEM * vdev_info::size); } diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index 87eab5f72..db5e0a76c 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -231,7 +231,7 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) { // assert that returnning logical offset is in good range HS_DBG_ASSERT_LE(tail_off, m_end_offset); - LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} desc {}", to_hex(tail_off), to_string()); + LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} size {} desc {}", to_hex(tail_off), sz, to_string()); return tail_off; } @@ -443,6 +443,14 @@ off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const { return vdev_offset; } +void JournalVirtualDev::Descriptor::update_data_start_offset(off_t offset) { + m_data_start_offset = offset; + auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); + m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size; + LOGTRACEMOD(journalvdev, "Updated data start offset off 0x{} {}", to_hex(offset), to_string()); + RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch {}", to_string()); +} + off_t JournalVirtualDev::Descriptor::tail_offset(bool reserve_space_include) const { off_t tail = static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)); if (reserve_space_include) { tail += m_reserved_sz; } @@ -461,7 +469,7 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) { } lseek(tail); - LOGDEBUGMOD(journalvdev, "tail arg 0x{} desc {} ", to_hex(tail), to_string()); + LOGDEBUGMOD(journalvdev, "Updated tail offset arg 0x{} desc {} ", to_hex(tail), to_string()); HS_REL_ASSERT(tail_offset() == tail, "tail offset mismatch after calculation 0x{} : {}", tail_offset(), tail); } @@ -598,7 +606,7 @@ bool JournalVirtualDev::Descriptor::is_alloc_accross_chunk(size_t size) const { nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { nlohmann::json j; - j["logdev_id"] = m_logdev_id; + j["logdev"] = m_logdev_id; j["seek_cursor"] = m_seek_cursor; j["data_start_offset"] = m_data_start_offset; j["end_offset"] = m_end_offset; @@ -613,7 +621,7 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { nlohmann::json c; auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private())); c["chunk_id"] = chunk->chunk_id(); - c["logdev_id"] = private_data->logdev_id; + c["logdev"] = private_data->logdev_id; c["is_head"] = private_data->is_head; c["end_of_chunk"] = private_data->end_of_chunk; c["next_chunk"] = private_data->next_chunk; @@ -627,12 +635,13 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { } std::string JournalVirtualDev::Descriptor::to_string() const { - std::string str{fmt::format("id={};ds=0x{};end=0x{};writesz={};tail=0x{};" + off_t tail = + static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz; + std::string str{fmt::format("logdev={};ds=0x{};end=0x{};writesz={};tail=0x{};" "rsvdsz={};chunks={};trunc={};total={};seek=0x{} ", m_logdev_id, to_hex(m_data_start_offset), to_hex(m_end_offset), - m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail_offset()), - m_reserved_sz, m_journal_chunks.size(), m_truncate_done, m_total_size, - to_hex(m_seek_cursor))}; + m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail), m_reserved_sz, + m_journal_chunks.size(), m_truncate_done, m_total_size, to_hex(m_seek_cursor))}; return str; } diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index 13ec18c65..ad0cca4f2 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -233,12 +233,7 @@ class JournalVirtualDev : public VirtualDev { * * @param offset : the start logical offset to be persisted */ - void update_data_start_offset(off_t offset) { - m_data_start_offset = offset; - auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); - m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size; - RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch"); - } + void update_data_start_offset(off_t offset); /** * @brief : get the logical tail offset; @@ -309,6 +304,8 @@ class JournalVirtualDev : public VirtualDev { */ nlohmann::json get_status(int log_level) const; + logdev_id_t logdev_id() const { return m_logdev_id; } + std::string to_string() const; private: diff --git a/src/lib/device/physical_dev.cpp b/src/lib/device/physical_dev.cpp index fe44059de..8baae8ab0 100644 --- a/src/lib/device/physical_dev.cpp +++ b/src/lib/device/physical_dev.cpp @@ -247,12 +247,13 @@ std::vector< shared< Chunk > > PhysicalDev::create_chunks(const std::vector< uin auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot); ret_chunks.push_back(chunk); get_stream(chunk).m_chunks_map.insert(std::pair{chunk_ids[cit], std::move(chunk)}); - + HS_LOG(DEBUG, device, "Creating chunk {}", chunk->to_string()); cinfo->~chunk_info(); } m_chunk_info_slots->set_bits(b.start_bit, b.nbits); write_super_block(buf, chunk_info::size * b.nbits, chunk_info_offset_nth(b.start_bit)); + hs_utils::iobuf_free(buf, sisl::buftag::superblk); chunks_remaining -= b.nbits; @@ -296,6 +297,7 @@ shared< Chunk > PhysicalDev::create_chunk(uint32_t chunk_id, uint32_t vdev_id, u auto bitmap_mem = m_chunk_info_slots->serialize(m_pdev_info.dev_attr.align_size); write_super_block(bitmap_mem->cbytes(), bitmap_mem->size(), hs_super_blk::chunk_sb_offset()); + HS_LOG(TRACE, device, "Created chunk {}", chunk->to_string()); cinfo->~chunk_info(); hs_utils::iobuf_free(buf, sisl::buftag::superblk); @@ -360,6 +362,8 @@ void PhysicalDev::load_chunks(std::function< bool(cshared< Chunk >&) >&& chunk_f cinfo->checksum = info_crc; auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot); + m_chunk_data_area.insert( + ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size)); if (chunk_found_cb(chunk)) { get_stream(chunk).m_chunks_map.insert(std::pair{cinfo->chunk_id, chunk}); } } hs_utils::iobuf_free(buf, sisl::buftag::superblk); @@ -395,6 +399,7 @@ void PhysicalDev::do_remove_chunk(cshared< Chunk >& chunk) { get_stream(chunk).m_chunks_map.erase(chunk->chunk_id()); cinfo->~chunk_info(); hs_utils::iobuf_free(buf, sisl::buftag::superblk); + HS_LOG(TRACE, device, "Removed chunk {}", chunk->to_string()); } uint64_t PhysicalDev::chunk_info_offset_nth(uint32_t slot) const { @@ -416,11 +421,14 @@ void PhysicalDev::populate_chunk_info(chunk_info* cinfo, uint32_t vdev_id, uint6 cinfo->set_allocated(); cinfo->set_user_private(private_data); cinfo->compute_checksum(); + auto [_, inserted] = m_chunk_start.insert(cinfo->chunk_start_offset); + RELEASE_ASSERT(inserted, "Duplicate start offset {} for chunk {}", cinfo->chunk_start_offset, cinfo->chunk_id); } void PhysicalDev::free_chunk_info(chunk_info* cinfo) { auto ival = ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size); m_chunk_data_area.erase(ival); + m_chunk_start.erase(cinfo->chunk_start_offset); cinfo->set_free(); cinfo->checksum = 0; diff --git a/src/lib/device/physical_dev.hpp b/src/lib/device/physical_dev.hpp index 2987fb45e..41eb9221d 100644 --- a/src/lib/device/physical_dev.hpp +++ b/src/lib/device/physical_dev.hpp @@ -136,6 +136,7 @@ class PhysicalDev { ChunkIntervalSet m_chunk_data_area; // Range of chunks data area created std::unique_ptr< sisl::Bitset > m_chunk_info_slots; // Slots to write the chunk info uint32_t m_chunk_sb_size{0}; // Total size of the chunk sb at present + std::unordered_set< uint64_t > m_chunk_start; // Store and verify start offset of all chunks for debugging. public: PhysicalDev(const dev_info& dinfo, int oflags, const pdev_info_header& pinfo); diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 83c284c3e..310539070 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -258,11 +258,17 @@ log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_rec m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset); auto* header = r_cast< const log_group_header* >(buf->cbytes()); - HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch!"); - HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch!"); - HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx"); - HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx, "log key offset does not match with log_idx"); - HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group"); + THIS_LOGDEV_LOG(TRACE, "Logdev read log group header {}", *header); + HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch! {} {}", + m_logdev_id, *header); + HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch! {} {}", + m_logdev_id, *header); + HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx {} }{}", m_logdev_id, + *header); + HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx, + "log key offset does not match with log_idx {} {}", m_logdev_id, *header); + HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group {} {}", + m_logdev_id, *header); // We can only do crc match in read if we have read all the blocks. We don't want to aggressively read more data // than we need to just to compare CRC for read operation. It can be done during recovery. @@ -337,7 +343,7 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) { } }); - lg->finish(get_prev_crc()); + lg->finish(m_logdev_id, get_prev_crc()); if (sisl_unlikely(flushing_upto_idx == -1)) { return nullptr; } lg->m_flush_log_idx_from = m_last_flush_idx + 1; lg->m_flush_log_idx_upto = flushing_upto_idx; @@ -346,7 +352,6 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) { HS_DBG_ASSERT_GT(lg->header()->oob_data_offset, 0); THIS_LOGDEV_LOG(DEBUG, "Flushing upto log_idx={}", flushing_upto_idx); - THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg); return lg; } @@ -408,7 +413,7 @@ bool LogDev::flush_if_needed(int64_t threshold_size) { lg->m_log_dev_offset = offset; HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full"); THIS_LOGDEV_LOG(TRACE, "Flush prepared, flushing data size={} at offset={}", lg->actual_data_size(), offset); - + THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg); do_flush(lg); return true; } else { @@ -540,14 +545,14 @@ void LogDev::unlock_flush(bool do_flush) { uint64_t LogDev::truncate(const logdev_key& key) { HS_DBG_ASSERT_GE(key.idx, m_last_truncate_idx); uint64_t const num_records_to_truncate = static_cast< uint64_t >(key.idx - m_last_truncate_idx); - LOGINFO("LogDev::truncate {}", num_records_to_truncate); + THIS_LOGDEV_LOG(INFO, "LogDev::truncate {}", num_records_to_truncate); if (num_records_to_truncate > 0) { HS_PERIODIC_LOG(INFO, logstore, "Truncating log device upto logdev {} log_id={} vdev_offset={} truncated {} log records", m_logdev_id, key.idx, key.dev_offset, num_records_to_truncate); m_log_records->truncate(key.idx); m_vdev_jd->truncate(key.dev_offset); - LOGINFO("LogDev::truncate {}", key.idx); + THIS_LOGDEV_LOG(INFO, "LogDev::truncate {}", key.idx); m_last_truncate_idx = key.idx; { @@ -574,11 +579,11 @@ uint64_t LogDev::truncate(const logdev_key& key) { // We can remove the rollback records of those upto which logid is getting truncated m_logdev_meta.remove_rollback_record_upto(key.idx, false /* persist_now */); - LOGINFO("LogDev::truncate remove rollback {}", key.idx); + THIS_LOGDEV_LOG(INFO, "LogDev::truncate remove rollback {}", key.idx); m_logdev_meta.persist(); #ifdef _PRERELEASE if (garbage_collect && iomgr_flip::instance()->test_flip("logdev_abort_after_garbage")) { - LOGINFO("logdev aborting after unreserving garbage ids"); + THIS_LOGDEV_LOG(INFO, "logdev aborting after unreserving garbage ids"); raise(SIGKILL); } #endif @@ -633,7 +638,7 @@ std::shared_ptr< HomeLogStore > LogDev::create_new_log_store(bool append_mode) { HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_logdev_id, store_id); m_id_logstore_map.insert(std::pair(store_id, logstore_info{.log_store = lstore, .append_mode = append_mode})); } - LOGINFO("Created log store id {}-{}", m_logdev_id, store_id); + LOGINFO("Created log store log_dev={} log_store={}", m_logdev_id, store_id); return lstore; } @@ -653,7 +658,7 @@ folly::Future< shared< HomeLogStore > > LogDev::open_log_store(logstore_id_t sto } void LogDev::remove_log_store(logstore_id_t store_id) { - LOGINFO("Removing log store id {}-{}", m_logdev_id, store_id); + LOGINFO("Removing log_dev={} log_store={}", m_logdev_id, store_id); { folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx); auto ret = m_id_logstore_map.erase(store_id); @@ -695,8 +700,8 @@ void LogDev::on_log_store_found(logstore_id_t store_id, const logstore_superblk& return; } - LOGINFO("Found a logstore store_id={}-{} with start seq_num={}, Creating a new HomeLogStore instance", m_logdev_id, - store_id, sb.m_first_seq_num); + LOGINFO("Found a logstore log_dev={} log_store={} with start seq_num={}, Creating a new HomeLogStore instance", + m_logdev_id, store_id, sb.m_first_seq_num); logstore_info& info = it->second; info.log_store = std::make_shared< HomeLogStore >(shared_from_this(), store_id, info.append_mode, sb.m_first_seq_num); @@ -1055,7 +1060,7 @@ void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_n uint32_t n_removed{0}; for (auto i = m_rollback_sb->num_records; i > 0; --i) { auto& rec = m_rollback_sb->at(i - 1); - LOGINFO("Removing record sb {} {}", rec.idx_range.second, upto_id); + HS_LOG(TRACE, logstore, "Removing record sb {} {}", rec.idx_range.second, upto_id); if (rec.idx_range.second <= upto_id) { m_rollback_sb->remove_ith_record(i - 1); ++n_removed; @@ -1064,7 +1069,7 @@ void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_n if (n_removed) { for (auto it = m_rollback_info.begin(); it != m_rollback_info.end();) { - LOGINFO("Removing info {} {}", it->second.second, upto_id); + HS_LOG(TRACE, logstore, "Removing info {} {}", it->second.second, upto_id); if (it->second.second <= upto_id) { it = m_rollback_info.erase(it); } else { diff --git a/src/lib/logstore/log_dev.hpp b/src/lib/logstore/log_dev.hpp index bfb2afb1a..226194cc4 100644 --- a/src/lib/logstore/log_dev.hpp +++ b/src/lib/logstore/log_dev.hpp @@ -136,6 +136,7 @@ struct log_group_header { uint32_t footer_offset; // offset of where footer starts crc32_t prev_grp_crc; // Checksum of the previous group that was written crc32_t cur_grp_crc; // Checksum of the current group record + logdev_id_t logdev_id; // Logdev id log_group_header() : magic{LOG_GROUP_HDR_MAGIC}, version{header_version} {} log_group_header(const log_group_header&) = delete; @@ -195,9 +196,10 @@ struct fmt::formatter< homestore::log_group_header > { return fmt::format_to( ctx.out(), "magic = {} version={} n_log_records = {} start_log_idx = {} group_size = {} inline_data_offset = {} " - "oob_data_offset = {} prev_grp_crc = {} cur_grp_crc = {}", + "oob_data_offset = {} prev_grp_crc = {} cur_grp_crc = {} logdev = {}", header.magic, header.version, header.n_log_records, header.start_log_idx, header.group_size, - header.inline_data_offset, header.oob_data_offset, header.prev_grp_crc, header.cur_grp_crc); + header.inline_data_offset, header.oob_data_offset, header.prev_grp_crc, header.cur_grp_crc, + header.logdev_id); } }; @@ -253,7 +255,7 @@ class LogGroup { bool add_record(log_record& record, const int64_t log_idx); bool can_accomodate(const log_record& record) const { return (m_nrecords <= m_max_records); } - const iovec_array& finish(const crc32_t prev_crc); + const iovec_array& finish(logdev_id_t logdev_id, const crc32_t prev_crc); crc32_t compute_crc(); log_group_header* header() { return reinterpret_cast< log_group_header* >(m_cur_log_buf); } diff --git a/src/lib/logstore/log_group.cpp b/src/lib/logstore/log_group.cpp index 2f54da8d8..597d97849 100644 --- a/src/lib/logstore/log_group.cpp +++ b/src/lib/logstore/log_group.cpp @@ -117,13 +117,14 @@ bool LogGroup::new_iovec_for_footer() const { return ((m_inline_data_pos + sizeof(log_group_footer)) >= m_cur_buf_len || m_oob_data_pos != 0); } -const iovec_array& LogGroup::finish(const crc32_t prev_crc) { +const iovec_array& LogGroup::finish(logdev_id_t logdev_id, const crc32_t prev_crc) { // add footer auto footer = add_and_get_footer(); m_iovecs[0].iov_len = sisl::round_up(m_iovecs[0].iov_len, m_flush_multiple_size); log_group_header* hdr = new (header()) log_group_header{}; + hdr->logdev_id = logdev_id; hdr->n_log_records = m_nrecords; hdr->prev_grp_crc = prev_crc; hdr->inline_data_offset = sizeof(log_group_header) + (m_max_records * sizeof(serialized_log_record)); diff --git a/src/lib/logstore/log_store.cpp b/src/lib/logstore/log_store.cpp index 089bb3286..eff6c39e5 100644 --- a/src/lib/logstore/log_store.cpp +++ b/src/lib/logstore/log_store.cpp @@ -39,7 +39,7 @@ HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, b m_records{"HomeLogStoreRecords", start_lsn - 1}, m_append_mode{append_mode}, m_seq_num{start_lsn}, - m_fq_name{fmt::format("{}.{}", logdev->get_id(), id)}, + m_fq_name{fmt::format("{} logdev={}", id, logdev->get_id())}, m_metrics{logstore_service().metrics()} { m_truncation_barriers.reserve(10000); m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key(); @@ -121,7 +121,7 @@ log_buffer HomeLogStore::read_sync(logstore_seq_num_t seq_num) { // If seq_num has not been flushed yet, but issued, then we flush them before reading auto const s = m_records.status(seq_num); if (s.is_out_of_range || s.is_hole) { - // THIS_LOGSTORE_LOG(DEBUG, "ld_key not valid {}", seq_num); + THIS_LOGSTORE_LOG(ERROR, "ld_key not valid {}", seq_num); throw std::out_of_range("key not valid"); } else if (!s.is_completed) { THIS_LOGSTORE_LOG(TRACE, "Reading lsn={}:{} before flushed, doing flush first", m_store_id, seq_num); diff --git a/src/lib/logstore/log_store_service.cpp b/src/lib/logstore/log_store_service.cpp index 90e35cf06..df33ebd90 100644 --- a/src/lib/logstore/log_store_service.cpp +++ b/src/lib/logstore/log_store_service.cpp @@ -115,7 +115,6 @@ logdev_id_t LogStoreService::create_new_logdev() { auto logdev = create_new_logdev_internal(logdev_id); logdev->start(true /* format */, m_logdev_vdev.get()); COUNTER_INCREMENT(m_metrics, logdevs_count, 1); - LOGINFO("Created log dev id {}", logdev_id); return logdev_id; } diff --git a/src/lib/logstore/log_stream.cpp b/src/lib/logstore/log_stream.cpp index afa7e67e9..ac2aa8647 100644 --- a/src/lib/logstore/log_stream.cpp +++ b/src/lib/logstore/log_stream.cpp @@ -44,7 +44,7 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { do { m_cur_log_buf = read_next_bytes(std::max(min_needed, bulk_read_size)); if (m_cur_log_buf.size() == 0) { - LOGINFOMOD(logstore, "Logdev data empty"); + LOGINFOMOD(logstore, "Logdev data empty logdev={}", m_vdev_jd->logdev_id()); return {}; } } while (m_cur_log_buf.size() < sizeof(log_group_header)); @@ -54,8 +54,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { HS_REL_ASSERT_GE(m_cur_log_buf.size(), m_read_size_multiple); const auto* header = r_cast< log_group_header const* >(m_cur_log_buf.bytes()); if (header->magic_word() != LOG_GROUP_HDR_MAGIC) { - LOGINFOMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of logdev", - m_vdev_jd->dev_offset(m_cur_read_bytes)); + LOGINFOMOD(logstore, "Logdev data not seeing magic at pos {}, must have come to end of logdev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -65,8 +65,8 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { } if (header->total_size() > m_cur_log_buf.size()) { - LOGINFOMOD(logstore, "Logstream group size {} is more than available buffer size {}, reading from store", - header->total_size(), m_cur_log_buf.size()); + LOGINFOMOD(logstore, "Logstream group size {} is more than available buffer size {}, reading from logdev={}", + header->total_size(), m_cur_log_buf.size(), m_vdev_jd->logdev_id()); // Bigger group size than needed bytes, read again min_needed = sisl::round_up(header->total_size(), m_read_size_multiple); goto read_again; @@ -74,15 +74,16 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { LOGTRACEMOD(logstore, "Logstream read log group of size={} nrecords={} m_cur_log_dev_offset {} buf size " - "remaining {} ", - header->total_size(), header->nrecords(), m_vdev_jd->dev_offset(m_cur_read_bytes), - m_cur_log_buf.size()); + "remaining {} logdev={}", + header->total_size(), header->nrecords(), m_vdev_jd->dev_offset(m_cur_read_bytes), m_cur_log_buf.size(), + m_vdev_jd->logdev_id()); // compare it with prev crc if (m_prev_crc != 0 && m_prev_crc != header->prev_grp_crc) { // we reached at the end - LOGINFOMOD(logstore, "we have reached the end. crc doesn't match with the prev crc {}", - m_vdev_jd->dev_offset(m_cur_read_bytes)); + LOGINFOMOD(logstore, + "we have reached the end. crc doesn't match offset {} prev crc {} header prev crc {} logdev={}", + m_vdev_jd->dev_offset(m_cur_read_bytes), header->prev_grp_crc, m_prev_crc, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -95,8 +96,9 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { const auto* footer = r_cast< log_group_footer* >((uint64_t)m_cur_log_buf.bytes() + header->footer_offset); if (footer->magic != LOG_GROUP_FOOTER_MAGIC || footer->start_log_idx != header->start_log_idx) { LOGINFOMOD(logstore, - "last write is not completely written. footer magic {} footer start_log_idx {} header log indx {}", - footer->magic, footer->start_log_idx, header->start_log_idx); + "last write is not completely written. footer magic {} footer start_log_idx {} header log indx {} " + "logdev={}", + footer->magic, footer->start_log_idx, header->start_log_idx, m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -112,8 +114,9 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) { (header->total_size() - sizeof(log_group_header))); if (cur_crc != header->cur_grp_crc) { /* This is a valid entry so crc should match */ - HS_REL_ASSERT(0, "data is corrupted"); - LOGINFOMOD(logstore, "crc doesn't match {}", m_vdev_jd->dev_offset(m_cur_read_bytes)); + HS_REL_ASSERT(0, "data is corrupted {}", m_vdev_jd->logdev_id()); + LOGINFOMOD(logstore, "crc doesn't match {} logdev={}", m_vdev_jd->dev_offset(m_cur_read_bytes), + m_vdev_jd->logdev_id()); *out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes); // move it by dma boundary if header is not valid @@ -149,8 +152,8 @@ sisl::byte_view log_stream_reader::read_next_bytes(uint64_t nbytes) { auto sz_read = m_vdev_jd->sync_next_read(out_buf->bytes() + m_cur_log_buf.size(), nbytes); if (sz_read == 0) { return {}; } - LOGINFOMOD(logstore, "LogStream read {} bytes from vdev offset {} and vdev cur offset {}", nbytes, prev_pos, - m_vdev_jd->seeked_pos()); + LOGINFOMOD(logstore, "LogStream read {} bytes from vdev offset {} and vdev cur offset {} lgodev={}", nbytes, + prev_pos, m_vdev_jd->seeked_pos(), m_vdev_jd->logdev_id()); return sisl::byte_view{out_buf}; } } // namespace homestore diff --git a/src/lib/replication/log_store/home_raft_log_store.cpp b/src/lib/replication/log_store/home_raft_log_store.cpp index 7444169ee..883ffa46d 100644 --- a/src/lib/replication/log_store/home_raft_log_store.cpp +++ b/src/lib/replication/log_store/home_raft_log_store.cpp @@ -64,11 +64,11 @@ HomeRaftLogStore::HomeRaftLogStore(logdev_id_t logdev_id, logstore_id_t logstore m_log_store = logstore_service().create_new_log_store(m_logdev_id, true); if (!m_log_store) { throw std::runtime_error("Failed to create log store"); } m_logstore_id = m_log_store->get_store_id(); - LOGDEBUGMOD(replication, "Opened new home log dev = {} store id={}", m_logdev_id, m_logstore_id); + LOGDEBUGMOD(replication, "Opened new home logdev={} logstore={}", m_logdev_id, m_logstore_id); } else { m_logdev_id = logdev_id; m_logstore_id = logstore_id; - LOGDEBUGMOD(replication, "Opening existing home log dev = {} store id={}", m_logdev_id, logstore_id); + LOGDEBUGMOD(replication, "Opening existing home logdev={} logstore={}", m_logdev_id, logstore_id); logstore_service().open_logdev(m_logdev_id); logstore_service().open_log_store(m_logdev_id, logstore_id, true).thenValue([this](auto log_store) { m_log_store = std::move(log_store); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index b7cc53b4f..e9d378ebc 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -67,9 +67,11 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk m_rd_sb.write(); } - RD_LOG(INFO, "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={}", + RD_LOG(INFO, + "Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={} next_dsn={} " + "log_dev={} log_store={}", (load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id, - m_commit_upto_lsn.load(), m_next_dsn.load()); + m_commit_upto_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id); m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, bind_this(RaftReplDev::on_push_data_received, 1)); m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 1)); @@ -421,7 +423,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t >* rre } void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t >* rreqs) { - std::vector<::flatbuffers::Offset< RequestEntry > > entries; + std::vector< ::flatbuffers::Offset< RequestEntry > > entries; entries.reserve(rreqs->size()); shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >(); diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index e5f4ea0b2..e3b40fc90 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -85,6 +85,10 @@ if (${io_tests}) target_sources(test_log_store PRIVATE test_log_store.cpp) target_link_libraries(test_log_store hs_logdev homestore ${COMMON_TEST_DEPS} ) + add_executable(test_log_dev) + target_sources(test_log_dev PRIVATE test_log_dev.cpp) + target_link_libraries(test_log_dev hs_logdev homestore ${COMMON_TEST_DEPS} ) + set(TEST_METABLK_SOURCE_FILES test_meta_blk_mgr.cpp) add_executable(test_meta_blk_mgr ${TEST_METABLK_SOURCE_FILES}) target_link_libraries(test_meta_blk_mgr homestore ${COMMON_TEST_DEPS} GTest::gmock) @@ -108,7 +112,8 @@ if (${io_tests}) can_build_epoll_io_tests(epoll_tests) if(${epoll_tests}) - # add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) + add_test(NAME LogDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_dev) + add_test(NAME LogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store) add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr) add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service) @@ -120,6 +125,7 @@ if (${io_tests}) can_build_spdk_io_tests(spdk_tests) if(${spdk_tests}) add_test(NAME LogStore-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_store -- --spdk "true") + add_test(NAME LogDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_log_dev -- --spdk "true") add_test(NAME MetaBlkMgr-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr -- --spdk "true") add_test(NAME DataSerice-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service -- --spdk "true") add_test(NAME SoloReplDev-Spdk COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev -- --spdk "true") diff --git a/src/tests/test_device_manager.cpp b/src/tests/test_device_manager.cpp index 13bef9d6b..6e1cdcfcc 100644 --- a/src/tests/test_device_manager.cpp +++ b/src/tests/test_device_manager.cpp @@ -174,6 +174,62 @@ TEST_F(DeviceMgrTest, StripedVDevCreation) { this->validate_striped_vdevs(); } +TEST_F(DeviceMgrTest, CreateChunk) { + // Create dynamically chunks and verify no two chunks ahve same start offset. + uint64_t avail_size{0}; + for (auto& pdev : m_pdevs) { + avail_size += pdev->data_size(); + } + + LOGINFO("Step 1: Creating test_vdev with size={}", in_bytes(avail_size)); + auto vdev = + m_dmgr->create_vdev(homestore::vdev_parameters{.vdev_name = "test_vdev", + .size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC, + .vdev_size = avail_size, + .num_chunks = 0, + .blk_size = 512, + .chunk_size = 512 * 1024, + .dev_type = HSDevType::Data, + .alloc_type = blk_allocator_type_t::none, + .chunk_sel_type = chunk_selector_type_t::NONE, + .multi_pdev_opts = vdev_multi_pdev_opts_t::ALL_PDEV_STRIPED, + .context_data = sisl::blob{}}); + + auto num_chunks = 10; + LOGINFO("Step 2: Creating {} chunks", num_chunks); + std::unordered_map< uint32_t, chunk_info > chunk_info_map; + std::unordered_set< uint64_t > chunk_start; + + for (int i = 0; i < num_chunks; i++) { + auto chunk = m_dmgr->create_chunk(HSDevType::Data, vdev->info().vdev_id, 512 * 1024, {}); + chunk_info_map[chunk->chunk_id()] = chunk->info(); + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + + LOGINFO("Step 3: Restarting homestore"); + this->restart(); + + LOGINFO("Step 4: Creating additional {} chunks", num_chunks); + for (int i = 0; i < num_chunks; i++) { + auto chunk = m_dmgr->create_chunk(HSDevType::Data, vdev->info().vdev_id, 512 * 1024, {}); + chunk_info_map[chunk->chunk_id()] = chunk->info(); + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + + chunk_start.clear(); + auto chunk_vec = m_dmgr->get_chunks(); + ASSERT_EQ(chunk_vec.size(), num_chunks * 2); + for (const auto& chunk : chunk_vec) { + ASSERT_EQ(chunk->info().chunk_start_offset, chunk_info_map[chunk->chunk_id()].chunk_start_offset) + << "Chunk offsets mismatch"; + auto [_, inserted] = chunk_start.insert(chunk->info().chunk_start_offset); + ASSERT_EQ(inserted, true) << "chunk start duplicate " << chunk->info().chunk_start_offset; + } + vdev.reset(); +} + int main(int argc, char* argv[]) { SISL_OPTIONS_LOAD(argc, argv, logging, test_device_manager, iomgr); ::testing::InitGoogleTest(&argc, argv); diff --git a/src/tests/test_log_dev.cpp b/src/tests/test_log_dev.cpp index b41a5f9a5..d7e716b6f 100644 --- a/src/tests/test_log_dev.cpp +++ b/src/tests/test_log_dev.cpp @@ -13,135 +13,309 @@ * specific language governing permissions and limitations under the License. * *********************************************************************************/ -#include + +#include #include #include -#include -#include #include #include -#include +#include +#include +#include +#include #include +#include +#include #include #include +#include +#include +#include +#include "common/homestore_utils.hpp" +#include "common/homestore_assert.hpp" #include "logstore/log_dev.hpp" #include "test_common/homestore_test_common.hpp" using namespace homestore; + RCU_REGISTER_INIT SISL_LOGGING_INIT(HOMESTORE_LOG_MODS) +SISL_OPTIONS_ENABLE(logging, test_log_dev, iomgr, test_common_setup) +SISL_LOGGING_DECL(test_log_dev) -std::vector< logdev_key > s_logdev_keys; -static uint64_t first_offset{~static_cast< uint64_t >(0)}; +std::vector< std::string > test_common::HSTestHelper::s_dev_names; -static void on_append_completion(const logdev_key lkey, void* const ctx) { - s_logdev_keys.push_back(lkey); - LOGINFO("Append completed with log_idx = {} offset = {}", lkey.idx, lkey.dev_offset); - if (first_offset == ~static_cast< uint64_t >(0)) { first_offset = lkey.dev_offset; } -} +struct test_log_data { + test_log_data() = default; + test_log_data(const test_log_data&) = delete; + test_log_data(test_log_data&&) noexcept = delete; + test_log_data& operator=(const test_log_data&) = delete; + test_log_data& operator=(test_log_data&&) noexcept = delete; + ~test_log_data() = default; -static void on_log_found(const logdev_key lkey, const log_buffer buf) { - s_logdev_keys.push_back(lkey); - LOGINFO("Found a log with log_idx = {} offset = {}", lkey.idx, lkey.dev_offset); -} + uint32_t size; -[[nodiscard]] static std::shared_ptr< iomgr::ioMgr > start_homestore(const uint32_t ndevices, const uint64_t dev_size, - const uint32_t nthreads) { - std::vector< dev_info > device_info; - // these should be static so that they stay in scope in the lambda in case function ends before lambda completes - static std::mutex start_mutex; - static std::condition_variable cv; - static bool inited; - - inited = false; - LOGINFO("creating {} device files with each of size {} ", ndevices, dev_size); - for (uint32_t i{0}; i < ndevices; ++i) { - const std::filesystem::path fpath{"/tmp/" + std::to_string(i + 1)}; - std::ofstream ofs{fpath.string(), std::ios::binary | std::ios::out}; - std::filesystem::resize_file(fpath, dev_size); - device_info.emplace_back(std::filesystem::canonical(fpath).string(), HSDevType::Data); + uint8_t* get_data() { return uintptr_cast(this) + sizeof(test_log_data); }; + uint8_t const* get_data_const() const { return r_cast< uint8_t const* >(this) + sizeof(test_log_data); } + const uint8_t* get_data() const { return r_cast< const uint8_t* >(this) + sizeof(test_log_data); } + uint32_t total_size() const { return sizeof(test_log_data) + size; } + std::string get_data_str() const { + return std::string(r_cast< const char* >(get_data_const()), static_cast< size_t >(size)); } +}; - LOGINFO("Creating iomgr with {} threads", nthreads); - auto iomgr_obj{std::make_shared< iomgr::ioMgr >(2, nthreads)}; - - const uint64_t cache_size{((ndevices * dev_size) * 10) / 100}; - LOGINFO("Initialize and start HomeBlks with cache_size = {}", cache_size); - - boost::uuids::string_generator gen; - init_params params; - params.open_flags = homestore::io_flag::DIRECT_IO; - params.min_virtual_page_size = 4096; - params.cache_size = cache_size; - params.devices = device_info; - params.iomgr = iomgr_obj; - params.init_done_cb = [&iomgr_obj, &tl_start_mutex = start_mutex, &tl_cv = cv, - &tl_inited = inited](std::error_condition err, const out_params& params) { - iomgr_obj->start(); - LOGINFO("HomeBlks Init completed"); - { - std::unique_lock< std::mutex > lk{tl_start_mutex}; - tl_inited = true; - } - tl_cv.notify_one(); - }; - params.vol_mounted_cb = [](const VolumePtr& vol_obj, vol_state state) {}; - params.vol_state_change_cb = [](const VolumePtr& vol, vol_state old_state, vol_state new_state) {}; - params.vol_found_cb = [](boost::uuids::uuid uuid) -> bool { return true; }; +class LogDevTest : public ::testing::Test { +public: + const std::map< uint32_t, test_common::HSTestHelper::test_params > svc_params = {}; + static constexpr uint32_t max_data_size = 1024; + uint64_t s_max_flush_multiple = 0; - test_common::set_random_http_port(); - VolInterface::init(params); + virtual void SetUp() override { start_homestore(); } - { - std::unique_lock< std::mutex > lk{start_mutex}; - cv.wait(lk, [] { return inited; }); + void start_homestore(bool restart = false, hs_before_services_starting_cb_t starting_cb = nullptr) { + auto const ndevices = SISL_OPTIONS["num_devs"].as< uint32_t >(); + auto const dev_size = SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024; + test_common::HSTestHelper::start_homestore("test_journal_vdev", + { + {HS_SERVICE::META, {.size_pct = 15.0}}, + {HS_SERVICE::LOG, + {.size_pct = 50.0, + .chunk_size = 32 * 1024 * 1024, + .vdev_size_type = vdev_size_type_t::VDEV_SIZE_DYNAMIC}}, + }, + starting_cb, restart); } - return iomgr_obj; -} -SISL_OPTIONS_ENABLE(logging, test_log_dev) -SISL_OPTION_GROUP(test_log_dev, - (num_threads, "", "num_threads", "number of threads", - ::cxxopts::value< uint32_t >()->default_value("2"), "number"), - (num_devs, "", "num_devs", "number of devices to create", - ::cxxopts::value< uint32_t >()->default_value("2"), "number"), - (dev_size_mb, "", "dev_size_mb", "size of each device in MB", - ::cxxopts::value< uint64_t >()->default_value("5120"), "number")); + virtual void TearDown() override { test_common::HSTestHelper::shutdown_homestore(); } -int main(int argc, char* argv[]) { - SISL_OPTIONS_LOAD(argc, argv, logging, test_log_dev); - sisl::logging::SetLogger("test_log_dev"); - spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v"); + test_log_data* prepare_data(const logstore_seq_num_t lsn, bool& io_memory) { + static thread_local std::random_device rd{}; + static thread_local std::default_random_engine re{rd()}; + uint32_t sz{0}; + uint8_t* raw_buf{nullptr}; + + // Generate buffer of random size and fill with specific data + std::uniform_int_distribution< uint8_t > gen_percentage{0, 99}; + std::uniform_int_distribution< uint32_t > gen_data_size{0, max_data_size - 1}; + if (gen_percentage(re) < static_cast< uint8_t >(10)) { + // 10% of data is dma'ble aligned boundary + const auto alloc_sz = sisl::round_up(gen_data_size(re) + sizeof(test_log_data), s_max_flush_multiple); + raw_buf = iomanager.iobuf_alloc(dma_address_boundary, alloc_sz); + sz = alloc_sz - sizeof(test_log_data); + io_memory = true; + } else { + sz = gen_data_size(re); + raw_buf = static_cast< uint8_t* >(std::malloc(sizeof(test_log_data) + sz)); + io_memory = false; + } - auto iomgr_obj{start_homestore(SISL_OPTIONS["num_devs"].as< uint32_t >(), - SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024, - SISL_OPTIONS["num_threads"].as< uint32_t >())}; + test_log_data* d = new (raw_buf) test_log_data(); + d->size = sz; - std::array< std::string, 1024 > s; - auto ld{LogDev::instance()}; - ld->register_append_cb(on_append_completion); - ld->register_logfound_cb(on_log_found); + assert(uintptr_cast(d) == raw_buf); - for ( - size_t i{0}; (i < std::min< size_t >(195, s.size()); ++i) { - s[i] = std::to_string(i); - ld->append_async(0, 0, {reinterpret_cast< const uint8_t* >(s[i].c_str()), s[i].size() + 1}, nullptr); + const char c = static_cast< char >((lsn % 94) + 33); + std::memset(voidptr_cast(d->get_data()), c, static_cast< size_t >(d->size)); + return d; } - size_t i{0}; - for (const auto& lk : s_logdev_keys) { - const auto b{ld->read(lk)}; - const auto exp_val{std::to_string(i)}; - const auto actual_val{reinterpret_cast< const char* >(b.data()), static_cast< size_t >(b.size())}; - if (actual_val != exp_val) { - LOGERROR("Error in reading value for log_idx {} actual_val={} expected_val={}", i, actual_val, exp_val); + void validate_data(std::shared_ptr< HomeLogStore > log_store, const test_log_data* d, + const logstore_seq_num_t lsn) { + const char c = static_cast< char >((lsn % 94) + 33); + const std::string actual = d->get_data_str(); + const std::string expected(static_cast< size_t >(d->size), + c); // needs to be () because of same reason as vector + ASSERT_EQ(actual, expected) << "Data mismatch for LSN=" << log_store->get_store_id() << ":" << lsn + << " size=" << d->size; + } + + void insert_sync(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t lsn) { + bool io_memory{false}; + auto* d = prepare_data(lsn, io_memory); + const bool succ = log_store->write_sync(lsn, {uintptr_cast(d), d->total_size(), false}); + EXPECT_TRUE(succ); + LOGINFO("Written sync data for LSN -> {}:{}", log_store->get_store_id(), lsn); + if (io_memory) { + iomanager.iobuf_free(uintptr_cast(d)); } else { - LOGINFO("Read value {} for log_idx {}", actual_val, i); + std::free(voidptr_cast(d)); + } + } + + void kickstart_inserts(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t& cur_lsn, int64_t batch) { + auto last = cur_lsn + batch; + for (; cur_lsn < last; cur_lsn++) { + insert_sync(log_store, cur_lsn); + } + } + + void read_verify(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t lsn) { + auto b = log_store->read_sync(lsn); + auto* d = r_cast< test_log_data const* >(b.bytes()); + ASSERT_EQ(d->total_size(), b.size()) << "Size Mismatch for lsn=" << log_store->get_store_id() << ":" << lsn; + validate_data(log_store, d, lsn); + } + + void read_all_verify(std::shared_ptr< HomeLogStore > log_store) { + const auto trunc_upto = log_store->truncated_upto(); + const auto upto = log_store->get_contiguous_completed_seq_num(-1); + + for (std::remove_const_t< decltype(trunc_upto) > i{0}; i <= trunc_upto; ++i) { + ASSERT_THROW(log_store->read_sync(i), std::out_of_range) + << "Expected std::out_of_range exception for lsn=" << log_store->get_store_id() << ":" << i + << " but not thrown"; + } + + for (auto lsn = trunc_upto + 1; lsn < upto; lsn++) { + read_verify(log_store, lsn); + } + } + + void rollback_validate(std::shared_ptr< HomeLogStore > log_store, logstore_seq_num_t& cur_lsn, + uint32_t num_lsns_to_rollback) { + std::mutex mtx; + std::condition_variable cv; + bool rollback_done = false; + cur_lsn -= num_lsns_to_rollback; + auto const upto_lsn = cur_lsn - 1; + log_store->rollback_async(upto_lsn, [&](logstore_seq_num_t) { + ASSERT_EQ(log_store->get_contiguous_completed_seq_num(-1), upto_lsn) + << "Last completed seq num is not reset after rollback"; + ASSERT_EQ(log_store->get_contiguous_issued_seq_num(-1), upto_lsn) + << "Last issued seq num is not reset after rollback"; + read_all_verify(log_store); + { + std::unique_lock lock(mtx); + rollback_done = true; + } + cv.notify_one(); + }); + + // We wait till async rollback is finished as we do validation. + std::unique_lock lock(mtx); + cv.wait(lock, [&rollback_done] { return rollback_done == true; }); + } + + void truncate_validate(std::shared_ptr< HomeLogStore > log_store) { + auto upto = log_store->get_contiguous_completed_seq_num(-1); + LOGINFO("truncate_validate upto {}", upto); + log_store->truncate(upto); + read_all_verify(log_store); + logstore_service().device_truncate(nullptr /* cb */, true /* wait_till_done */); + } + + void rollback_records_validate(std::shared_ptr< HomeLogStore > log_store, uint32_t expected_count) { + auto actual_count = log_store->get_logdev()->log_dev_meta().num_rollback_records(log_store->get_store_id()); + ASSERT_EQ(actual_count, expected_count); + } +}; + +TEST_F(LogDevTest, WriteSyncThenRead) { + const auto iterations = SISL_OPTIONS["iterations"].as< uint32_t >(); + + for (uint32_t iteration{0}; iteration < iterations; ++iteration) { + LOGINFO("Iteration {}", iteration); + auto logdev_id = logstore_service().create_new_logdev(); + s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + auto log_store = logstore_service().create_new_log_store(logdev_id, false); + const auto store_id = log_store->get_store_id(); + LOGINFO("Created new log store -> id {}", store_id); + const unsigned count{10}; + for (unsigned i{0}; i < count; ++i) { + // Insert new entry. + insert_sync(log_store, i); + // Verify the entry. + read_verify(log_store, i); } - ++i; + + logstore_service().remove_log_store(logdev_id, store_id); + LOGINFO("Remove logstore -> i {}", store_id); } +} + +TEST_F(LogDevTest, Rollback) { + LOGINFO("Step 1: Create a single logstore to start rollback test"); + auto logdev_id = logstore_service().create_new_logdev(); + s_max_flush_multiple = logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + auto log_store = logstore_service().create_new_log_store(logdev_id, false); + auto store_id = log_store->get_store_id(); + + auto restart = [&]() { + std::promise< bool > p; + auto starting_cb = [&]() { + logstore_service().open_logdev(logdev_id); + logstore_service().open_log_store(logdev_id, store_id, false /* append_mode */).thenValue([&](auto store) { + log_store = store; + p.set_value(true); + }); + }; + start_homestore(true /* retart*/, starting_cb); + p.get_future().get(); + }; + + LOGINFO("Step 2: Issue sequential inserts with q depth of 10"); + logstore_seq_num_t cur_lsn = 0; + kickstart_inserts(log_store, cur_lsn, 500); + + LOGINFO("Step 3: Rollback last 50 entries and validate if pre-rollback entries are intact"); + rollback_validate(log_store, cur_lsn, 50); // Last entry = 450 + + LOGINFO("Step 4: Append 25 entries after rollback is completed"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 475 + + LOGINFO("Step 5: Rollback again for 75 entries even before previous rollback entry"); + rollback_validate(log_store, cur_lsn, 75); // Last entry = 400 + + LOGINFO("Step 6: Append 25 entries after second rollback is completed"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 425 + + LOGINFO("Step 7: Restart homestore and ensure all rollbacks are effectively validated"); + restart(); + + LOGINFO("Step 8: Post recovery, append another 25 entries"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 450 + + LOGINFO("Step 9: Rollback again for 75 entries even before previous rollback entry"); + rollback_validate(log_store, cur_lsn, 75); // Last entry = 375 + + LOGINFO("Step 10: After 3rd rollback, append another 25 entries"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 400 + + LOGINFO("Step 11: Truncate all entries"); + truncate_validate(log_store); + + LOGINFO("Step 12: Restart homestore and ensure all truncations after rollbacks are effectively validated"); + restart(); + + LOGINFO("Step 13: Append 25 entries after truncation is completed"); + kickstart_inserts(log_store, cur_lsn, 25); // Last entry = 425 + + LOGINFO("Step 14: Do another truncation to effectively truncate previous records"); + truncate_validate(log_store); + + LOGINFO("Step 15: Validate if there are no rollback records"); + rollback_records_validate(log_store, 0 /* expected_count */); +} + +SISL_OPTION_GROUP(test_log_dev, + (num_logdevs, "", "num_logdevs", "number of log devs", + ::cxxopts::value< uint32_t >()->default_value("4"), "number"), + (num_logstores, "", "num_logstores", "number of log stores", + ::cxxopts::value< uint32_t >()->default_value("16"), "number"), + (num_records, "", "num_records", "number of record to test", + ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), + (iterations, "", "iterations", "Iterations", ::cxxopts::value< uint32_t >()->default_value("1"), + "the number of iterations to run each test")); + +int main(int argc, char* argv[]) { + int parsed_argc = argc; + ::testing::InitGoogleTest(&parsed_argc, argv); + SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_log_dev, iomgr, test_common_setup); + sisl::logging::SetLogger("test_log_dev"); + spdlog::set_pattern("[%D %T%z] [%^%l%$] [%t] %v"); - ld->load(first_offset); + const int ret = RUN_ALL_TESTS(); + return ret; } diff --git a/src/tests/test_log_store.cpp b/src/tests/test_log_store.cpp index 262270c88..1880e9043 100644 --- a/src/tests/test_log_store.cpp +++ b/src/tests/test_log_store.cpp @@ -438,7 +438,13 @@ class SampleDB { } void start_homestore(bool restart = false) { + auto n_log_devs = SISL_OPTIONS["num_logdevs"].as< uint32_t >(); auto n_log_stores = SISL_OPTIONS["num_logstores"].as< uint32_t >(); + if (n_log_devs < 1u) { + LOGINFO("Log store test needs minimum 1 log dev for testing, setting them to 1"); + n_log_devs = 1u; + } + if (n_log_stores < 4u) { LOGINFO("Log store test needs minimum 4 log stores for testing, setting them to 4"); n_log_stores = 4u; @@ -462,13 +468,18 @@ class SampleDB { restart); if (!restart) { - auto logdev_id = logstore_service().create_new_logdev(); + std::vector< logdev_id_t > logdev_id_vec; + for (uint32_t i{0}; i < n_log_devs; ++i) { + logdev_id_vec.push_back(logstore_service().create_new_logdev()); + } + for (uint32_t i{0}; i < n_log_stores; ++i) { + auto logdev_id = logdev_id_vec[rand() % logdev_id_vec.size()]; m_log_store_clients.push_back(std::make_unique< SampleLogStoreClient >( logdev_id, bind_this(SampleDB::on_log_insert_completion, 3))); } SampleLogStoreClient::s_max_flush_multiple = - logstore_service().get_logdev(logdev_id)->get_flush_size_multiple(); + logstore_service().get_logdev(logdev_id_vec[0])->get_flush_size_multiple(); } } @@ -619,14 +630,13 @@ class LogStoreTest : public ::testing::Test { EXPECT_EQ(expected_num_records, rec_count); } - void dump_validate_filter(logstore_id_t id, logstore_seq_num_t start_seq, logstore_seq_num_t end_seq, - bool print_content = false) { + void dump_validate_filter(bool print_content = false) { for (const auto& lsc : SampleDB::instance().m_log_store_clients) { - if (lsc->m_log_store->get_store_id() != id) { continue; } - - log_dump_req dump_req; + logstore_id_t id = lsc->m_log_store->get_store_id(); const auto fid = lsc->m_logdev_id; - + logstore_seq_num_t start_seq = lsc->m_log_store->truncated_upto() + 1; + logstore_seq_num_t end_seq = lsc->m_log_store->get_contiguous_completed_seq_num(-1); + log_dump_req dump_req; if (print_content) dump_req.verbosity_level = log_dump_verbosity::CONTENT; dump_req.log_store = lsc->m_log_store; dump_req.start_seq_num = start_seq; @@ -648,9 +658,8 @@ class LogStoreTest : public ::testing::Test { } else { EXPECT_FALSE(true); } - - return; } + return; } int find_garbage_upto(logdev_id_t logdev_id, logid_t idx) { @@ -791,7 +800,8 @@ class LogStoreTest : public ::testing::Test { } } ASSERT_EQ(actual_valid_ids, SampleDB::instance().m_log_store_clients.size()); - ASSERT_EQ(actual_garbage_ids, exp_garbage_store_count); + // Becasue we randomly assign logstore to logdev, some logdev will be empty. + // ASSERT_EQ(actual_garbage_ids, exp_garbage_store_count); } void delete_validate(uint32_t idx) { @@ -890,7 +900,7 @@ TEST_F(LogStoreTest, BurstRandInsertThenTruncate) { this->dump_validate(num_records); LOGINFO("Step 4.2: Read some specific interval/filter of seq number in one logstore and dump it into json"); - this->dump_validate_filter(0, 10, 100, true); + this->dump_validate_filter(true); } LOGINFO("Step 5: Truncate all of the inserts one log store at a time and validate log dev truncation is marked " @@ -1142,68 +1152,6 @@ TEST_F(LogStoreTest, FlushSync) { #endif } -TEST_F(LogStoreTest, Rollback) { - LOGINFO("Step 1: Reinit the 500 records on a single logstore to start rollback test"); - this->init(500, {std::make_pair(1ull, 100)}); // Last entry = 500 - - LOGINFO("Step 2: Issue sequential inserts with q depth of 10"); - this->kickstart_inserts(1, 10); - - LOGINFO("Step 3: Wait for the Inserts to complete"); - this->wait_for_inserts(); - - LOGINFO("Step 4: Rollback last 50 entries and validate if pre-rollback entries are intact"); - this->rollback_validate(50); // Last entry = 450 - - LOGINFO("Step 5: Append 25 entries after rollback is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 475 - - LOGINFO("Step 7: Rollback again for 75 entries even before previous rollback entry"); - this->rollback_validate(75); // Last entry = 400 - - LOGINFO("Step 8: Append 25 entries after second rollback is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 425 - - LOGINFO("Step 9: Restart homestore and ensure all rollbacks are effectively validated"); - SampleDB::instance().start_homestore(true /* restart */); - this->recovery_validate(); - - LOGINFO("Step 10: Post recovery, append another 25 entries"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 5); - this->wait_for_inserts(); // Last entry = 450 - - LOGINFO("Step 11: Rollback again for 75 entries even before previous rollback entry"); - this->rollback_validate(75); // Last entry = 375 - - LOGINFO("Step 12: After 3rd rollback, append another 25 entries"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 5); - this->wait_for_inserts(); // Last entry = 400 - - LOGINFO("Step 13: Truncate all entries"); - this->truncate_validate(); - - LOGINFO("Step 14: Restart homestore and ensure all truncations after rollbacks are effectively validated"); - SampleDB::instance().start_homestore(true /* restart */); - this->recovery_validate(); - - LOGINFO("Step 15: Append 25 entries after truncation is completed"); - this->init(25, {std::make_pair(1ull, 100)}); - this->kickstart_inserts(1, 10); - this->wait_for_inserts(); // Last entry = 425 - - LOGINFO("Step 16: Do another truncation to effectively truncate previous records"); - this->truncate_validate(); - - LOGINFO("Step 17: Validate if there are no rollback records"); - this->post_truncate_rollback_validate(); -} - TEST_F(LogStoreTest, DeleteMultipleLogStores) { const auto nrecords = (SISL_OPTIONS["num_records"].as< uint32_t >() * 5) / 100; @@ -1282,8 +1230,10 @@ TEST_F(LogStoreTest, WriteSyncThenRead) { SISL_OPTIONS_ENABLE(logging, test_log_store, iomgr, test_common_setup) SISL_OPTION_GROUP(test_log_store, - (num_logstores, "", "num_logstores", "number of log stores", + (num_logdevs, "", "num_logdevs", "number of log devs", ::cxxopts::value< uint32_t >()->default_value("4"), "number"), + (num_logstores, "", "num_logstores", "number of log stores", + ::cxxopts::value< uint32_t >()->default_value("16"), "number"), (num_records, "", "num_records", "number of record to test", ::cxxopts::value< uint32_t >()->default_value("10000"), "number"), (iterations, "", "iterations", "Iterations", ::cxxopts::value< uint32_t >()->default_value("1"),