Skip to content

Commit

Permalink
Fix device manager load of chunks.
Browse files Browse the repository at this point in the history
Chunks getting duplciate start offset while recovery. Add or cleanup
logs for debugging. Add test case. Add more logdev for logstore test.
Enable logstore test. Make rollback test first to make sure there
are not log entries.
  • Loading branch information
sanebay committed Feb 13, 2024
1 parent c8021ab commit f1b04e9
Show file tree
Hide file tree
Showing 18 changed files with 235 additions and 127 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "5.1.4"
version = "5.1.5"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
1 change: 1 addition & 0 deletions src/lib/device/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class DeviceManager {

std::vector< PhysicalDev* > get_pdevs_by_dev_type(HSDevType dtype) const;
std::vector< shared< VirtualDev > > get_vdevs() const;
std::vector< shared< Chunk > > get_chunks() const;

uint64_t total_capacity() const;
uint64_t total_capacity(HSDevType dtype) const;
Expand Down
9 changes: 9 additions & 0 deletions src/lib/device/device_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,15 @@ std::vector< shared< VirtualDev > > DeviceManager::get_vdevs() const {
return ret_v;
}

std::vector< shared< Chunk > > DeviceManager::get_chunks() const {
std::unique_lock lg{m_vdev_mutex};
std::vector< shared< Chunk > > res;
for (auto& chunk : m_chunks) {
if (chunk) res.push_back(chunk);
}
return res;
}

// Some of the hs_super_blk details
uint64_t hs_super_blk::vdev_super_block_size() { return (hs_super_blk::MAX_VDEVS_IN_SYSTEM * vdev_info::size); }

Expand Down
25 changes: 17 additions & 8 deletions src/lib/device/journal_vdev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) {

// assert that returnning logical offset is in good range
HS_DBG_ASSERT_LE(tail_off, m_end_offset);
LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} desc {}", to_hex(tail_off), to_string());
LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} size {} desc {}", to_hex(tail_off), sz, to_string());
return tail_off;
}

Expand Down Expand Up @@ -443,6 +443,14 @@ off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const {
return vdev_offset;
}

void JournalVirtualDev::Descriptor::update_data_start_offset(off_t offset) {
m_data_start_offset = offset;
auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size);
m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size;
LOGTRACEMOD(journalvdev, "Updated data start offset off 0x{} {}", to_hex(offset), to_string());
RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch {}", to_string());
}

off_t JournalVirtualDev::Descriptor::tail_offset(bool reserve_space_include) const {
off_t tail = static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed));
if (reserve_space_include) { tail += m_reserved_sz; }
Expand All @@ -461,7 +469,7 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) {
}
lseek(tail);

LOGDEBUGMOD(journalvdev, "tail arg 0x{} desc {} ", to_hex(tail), to_string());
LOGDEBUGMOD(journalvdev, "Updated tail offset arg 0x{} desc {} ", to_hex(tail), to_string());
HS_REL_ASSERT(tail_offset() == tail, "tail offset mismatch after calculation 0x{} : {}", tail_offset(), tail);
}

Expand Down Expand Up @@ -598,7 +606,7 @@ bool JournalVirtualDev::Descriptor::is_alloc_accross_chunk(size_t size) const {

nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
nlohmann::json j;
j["logdev_id"] = m_logdev_id;
j["logdev"] = m_logdev_id;
j["seek_cursor"] = m_seek_cursor;
j["data_start_offset"] = m_data_start_offset;
j["end_offset"] = m_end_offset;
Expand All @@ -613,7 +621,7 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
nlohmann::json c;
auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private()));
c["chunk_id"] = chunk->chunk_id();
c["logdev_id"] = private_data->logdev_id;
c["logdev"] = private_data->logdev_id;
c["is_head"] = private_data->is_head;
c["end_of_chunk"] = private_data->end_of_chunk;
c["next_chunk"] = private_data->next_chunk;
Expand All @@ -627,12 +635,13 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
}

std::string JournalVirtualDev::Descriptor::to_string() const {
std::string str{fmt::format("id={};ds=0x{};end=0x{};writesz={};tail=0x{};"
off_t tail =
static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz;
std::string str{fmt::format("logdev={};ds=0x{};end=0x{};writesz={};tail=0x{};"
"rsvdsz={};chunks={};trunc={};total={};seek=0x{} ",
m_logdev_id, to_hex(m_data_start_offset), to_hex(m_end_offset),
m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail_offset()),
m_reserved_sz, m_journal_chunks.size(), m_truncate_done, m_total_size,
to_hex(m_seek_cursor))};
m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail), m_reserved_sz,
m_journal_chunks.size(), m_truncate_done, m_total_size, to_hex(m_seek_cursor))};
return str;
}

Expand Down
9 changes: 3 additions & 6 deletions src/lib/device/journal_vdev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,7 @@ class JournalVirtualDev : public VirtualDev {
*
* @param offset : the start logical offset to be persisted
*/
void update_data_start_offset(off_t offset) {
m_data_start_offset = offset;
auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size);
m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size;
RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch");
}
void update_data_start_offset(off_t offset);

/**
* @brief : get the logical tail offset;
Expand Down Expand Up @@ -309,6 +304,8 @@ class JournalVirtualDev : public VirtualDev {
*/
nlohmann::json get_status(int log_level) const;

logdev_id_t logdev_id() const { return m_logdev_id; }

std::string to_string() const;

private:
Expand Down
10 changes: 9 additions & 1 deletion src/lib/device/physical_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,13 @@ std::vector< shared< Chunk > > PhysicalDev::create_chunks(const std::vector< uin
auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot);
ret_chunks.push_back(chunk);
get_stream(chunk).m_chunks_map.insert(std::pair{chunk_ids[cit], std::move(chunk)});

HS_LOG(TRACE, device, "Creating chunk {}", chunk->to_string());
cinfo->~chunk_info();
}

m_chunk_info_slots->set_bits(b.start_bit, b.nbits);
write_super_block(buf, chunk_info::size * b.nbits, chunk_info_offset_nth(b.start_bit));

hs_utils::iobuf_free(buf, sisl::buftag::superblk);

chunks_remaining -= b.nbits;
Expand Down Expand Up @@ -296,6 +297,7 @@ shared< Chunk > PhysicalDev::create_chunk(uint32_t chunk_id, uint32_t vdev_id, u

auto bitmap_mem = m_chunk_info_slots->serialize(m_pdev_info.dev_attr.align_size);
write_super_block(bitmap_mem->cbytes(), bitmap_mem->size(), hs_super_blk::chunk_sb_offset());
HS_LOG(TRACE, device, "Created chunk {}", chunk->to_string());

cinfo->~chunk_info();
hs_utils::iobuf_free(buf, sisl::buftag::superblk);
Expand Down Expand Up @@ -360,6 +362,8 @@ void PhysicalDev::load_chunks(std::function< bool(cshared< Chunk >&) >&& chunk_f
cinfo->checksum = info_crc;

auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot);
m_chunk_data_area.insert(
ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size));
if (chunk_found_cb(chunk)) { get_stream(chunk).m_chunks_map.insert(std::pair{cinfo->chunk_id, chunk}); }
}
hs_utils::iobuf_free(buf, sisl::buftag::superblk);
Expand Down Expand Up @@ -395,6 +399,7 @@ void PhysicalDev::do_remove_chunk(cshared< Chunk >& chunk) {
get_stream(chunk).m_chunks_map.erase(chunk->chunk_id());
cinfo->~chunk_info();
hs_utils::iobuf_free(buf, sisl::buftag::superblk);
HS_LOG(TRACE, device, "Removed chunk {}", chunk->to_string());
}

uint64_t PhysicalDev::chunk_info_offset_nth(uint32_t slot) const {
Expand All @@ -416,11 +421,14 @@ void PhysicalDev::populate_chunk_info(chunk_info* cinfo, uint32_t vdev_id, uint6
cinfo->set_allocated();
cinfo->set_user_private(private_data);
cinfo->compute_checksum();
auto [_, inserted] = m_chunk_start.insert(cinfo->chunk_start_offset);
RELEASE_ASSERT(inserted, "Duplicate start offset {} for chunk {}", cinfo->chunk_start_offset, cinfo->chunk_id);
}

void PhysicalDev::free_chunk_info(chunk_info* cinfo) {
auto ival = ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size);
m_chunk_data_area.erase(ival);
m_chunk_start.erase(cinfo->chunk_start_offset);

cinfo->set_free();
cinfo->checksum = 0;
Expand Down
1 change: 1 addition & 0 deletions src/lib/device/physical_dev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class PhysicalDev {
ChunkIntervalSet m_chunk_data_area; // Range of chunks data area created
std::unique_ptr< sisl::Bitset > m_chunk_info_slots; // Slots to write the chunk info
uint32_t m_chunk_sb_size{0}; // Total size of the chunk sb at present
std::unordered_set< uint64_t > m_chunk_start; // Store and verify start offset of all chunks for debugging.

public:
PhysicalDev(const dev_info& dinfo, int oflags, const pdev_info_header& pinfo);
Expand Down
21 changes: 13 additions & 8 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,17 @@ log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_rec
m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset);

auto* header = r_cast< const log_group_header* >(buf->cbytes());
HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch!");
HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch!");
HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx");
HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx, "log key offset does not match with log_idx");
HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group");
THIS_LOGDEV_LOG(TRACE, "Logdev read log group header {}", *header);
HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch! {} {}",
m_logdev_id, *header);
HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch! {} {}",
m_logdev_id, *header);
HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx {} }{}", m_logdev_id,
*header);
HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx,
"log key offset does not match with log_idx {} {}", m_logdev_id, *header);
HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group {} {}",
m_logdev_id, *header);

// We can only do crc match in read if we have read all the blocks. We don't want to aggressively read more data
// than we need to just to compare CRC for read operation. It can be done during recovery.
Expand Down Expand Up @@ -337,7 +343,7 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) {
}
});

lg->finish(get_prev_crc());
lg->finish(m_logdev_id, get_prev_crc());
if (sisl_unlikely(flushing_upto_idx == -1)) { return nullptr; }
lg->m_flush_log_idx_from = m_last_flush_idx + 1;
lg->m_flush_log_idx_upto = flushing_upto_idx;
Expand All @@ -346,7 +352,6 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) {
HS_DBG_ASSERT_GT(lg->header()->oob_data_offset, 0);

THIS_LOGDEV_LOG(DEBUG, "Flushing upto log_idx={}", flushing_upto_idx);
THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg);
return lg;
}

Expand Down Expand Up @@ -408,7 +413,7 @@ bool LogDev::flush_if_needed(int64_t threshold_size) {
lg->m_log_dev_offset = offset;
HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full");
THIS_LOGDEV_LOG(TRACE, "Flush prepared, flushing data size={} at offset={}", lg->actual_data_size(), offset);

THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg);
do_flush(lg);
return true;
} else {
Expand Down
8 changes: 5 additions & 3 deletions src/lib/logstore/log_dev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ struct log_group_header {
uint32_t footer_offset; // offset of where footer starts
crc32_t prev_grp_crc; // Checksum of the previous group that was written
crc32_t cur_grp_crc; // Checksum of the current group record
logdev_id_t logdev_id; // Logdev id

log_group_header() : magic{LOG_GROUP_HDR_MAGIC}, version{header_version} {}
log_group_header(const log_group_header&) = delete;
Expand Down Expand Up @@ -195,9 +196,10 @@ struct fmt::formatter< homestore::log_group_header > {
return fmt::format_to(
ctx.out(),
"magic = {} version={} n_log_records = {} start_log_idx = {} group_size = {} inline_data_offset = {} "
"oob_data_offset = {} prev_grp_crc = {} cur_grp_crc = {}",
"oob_data_offset = {} prev_grp_crc = {} cur_grp_crc = {} logdev = {}",
header.magic, header.version, header.n_log_records, header.start_log_idx, header.group_size,
header.inline_data_offset, header.oob_data_offset, header.prev_grp_crc, header.cur_grp_crc);
header.inline_data_offset, header.oob_data_offset, header.prev_grp_crc, header.cur_grp_crc,
header.logdev_id);
}
};

Expand Down Expand Up @@ -253,7 +255,7 @@ class LogGroup {
bool add_record(log_record& record, const int64_t log_idx);
bool can_accomodate(const log_record& record) const { return (m_nrecords <= m_max_records); }

const iovec_array& finish(const crc32_t prev_crc);
const iovec_array& finish(logdev_id_t logdev_id, const crc32_t prev_crc);
crc32_t compute_crc();

log_group_header* header() { return reinterpret_cast< log_group_header* >(m_cur_log_buf); }
Expand Down
3 changes: 2 additions & 1 deletion src/lib/logstore/log_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,14 @@ bool LogGroup::new_iovec_for_footer() const {
return ((m_inline_data_pos + sizeof(log_group_footer)) >= m_cur_buf_len || m_oob_data_pos != 0);
}

const iovec_array& LogGroup::finish(const crc32_t prev_crc) {
const iovec_array& LogGroup::finish(logdev_id_t logdev_id, const crc32_t prev_crc) {
// add footer
auto footer = add_and_get_footer();

m_iovecs[0].iov_len = sisl::round_up(m_iovecs[0].iov_len, m_flush_multiple_size);

log_group_header* hdr = new (header()) log_group_header{};
hdr->logdev_id = logdev_id;
hdr->n_log_records = m_nrecords;
hdr->prev_grp_crc = prev_crc;
hdr->inline_data_offset = sizeof(log_group_header) + (m_max_records * sizeof(serialized_log_record));
Expand Down
2 changes: 1 addition & 1 deletion src/lib/logstore/log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ HomeLogStore::HomeLogStore(std::shared_ptr< LogDev > logdev, logstore_id_t id, b
m_records{"HomeLogStoreRecords", start_lsn - 1},
m_append_mode{append_mode},
m_seq_num{start_lsn},
m_fq_name{fmt::format("{}.{}", logdev->get_id(), id)},
m_fq_name{fmt::format("{} logdev={}", id, logdev->get_id())},
m_metrics{logstore_service().metrics()} {
m_truncation_barriers.reserve(10000);
m_safe_truncation_boundary.ld_key = m_logdev->get_last_flush_ld_key();
Expand Down
1 change: 0 additions & 1 deletion src/lib/logstore/log_store_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ logdev_id_t LogStoreService::create_new_logdev() {
auto logdev = create_new_logdev_internal(logdev_id);
logdev->start(true /* format */, m_logdev_vdev.get());
COUNTER_INCREMENT(m_metrics, logdevs_count, 1);
LOGINFO("Created log dev id {}", logdev_id);
return logdev_id;
}

Expand Down
Loading

0 comments on commit f1b04e9

Please sign in to comment.