Skip to content

Commit 48f1b2c

Browse files
committed
Fix log store read across chunks and device manager load of chunks.
Chunks getting duplciate start offset while recovery. Add or cleanup logs for debugging. Add test case. Add more logdev for logstore test. Enable logstore test. Move rollback test to new logdev test.
1 parent 29f9d6f commit 48f1b2c

22 files changed

+746
-352
lines changed

src/include/homestore/logstore/log_store.hpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
110110
* @param cookie : Any cookie or context which will passed back in the callback
111111
* @param cb Callback upon completion which is called with the status, seq_num and cookie that was passed.
112112
*/
113-
void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb);
113+
void write_async(logstore_seq_num_t seq_num, const sisl::io_blob& b, void* cookie, const log_write_comp_cb_t& cb,
114+
bool flush_wait = false);
114115

115116
/**
116117
* @brief This method appends the blob into the log and it returns the generated seq number
@@ -210,6 +211,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
210211
return (ts == std::numeric_limits< logstore_seq_num_t >::max()) ? -1 : ts;
211212
}
212213

214+
sisl::StreamTracker< logstore_record >& log_records() { return m_records; }
215+
213216
/**
214217
* @brief iterator to get all the log buffers;
215218
*

src/include/homestore/logstore/log_store_internal.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ struct logstore_req {
125125
bool is_internal_req; // If the req is created internally by HomeLogStore itself
126126
log_req_comp_cb_t cb; // Callback upon completion of write (overridden than default)
127127
Clock::time_point start_time;
128+
bool flush_wait{false}; // Wait for the flush to happen
128129

129130
logstore_req(const logstore_req&) = delete;
130131
logstore_req& operator=(const logstore_req&) = delete;

src/lib/device/device.h

+1
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ class DeviceManager {
173173

174174
std::vector< PhysicalDev* > get_pdevs_by_dev_type(HSDevType dtype) const;
175175
std::vector< shared< VirtualDev > > get_vdevs() const;
176+
std::vector< shared< Chunk > > get_chunks() const;
176177

177178
uint64_t total_capacity() const;
178179
uint64_t total_capacity(HSDevType dtype) const;

src/lib/device/device_manager.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,16 @@ std::vector< shared< VirtualDev > > DeviceManager::get_vdevs() const {
543543
return ret_v;
544544
}
545545

546+
std::vector< shared< Chunk > > DeviceManager::get_chunks() const {
547+
std::unique_lock lg{m_vdev_mutex};
548+
std::vector< shared< Chunk > > res;
549+
res.reserve(m_chunks.size());
550+
for (auto& chunk : m_chunks) {
551+
if (chunk) res.push_back(chunk);
552+
}
553+
return res;
554+
}
555+
546556
// Some of the hs_super_blk details
547557
uint64_t hs_super_blk::vdev_super_block_size() { return (hs_super_blk::MAX_VDEVS_IN_SYSTEM * vdev_info::size); }
548558

src/lib/device/hs_super_blk.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,11 @@ class hs_super_blk {
157157
public:
158158
// Minium chunk size we can create in data device. Keeping this lower will increase number of chunks and thus
159159
// area for super block will be higher.
160-
static constexpr uint64_t MIN_CHUNK_SIZE_DATA_DEVICE = 16 * 1024 * 1024;
160+
static constexpr uint64_t MIN_CHUNK_SIZE_DATA_DEVICE = 8 * 1024 * 1024;
161161

162162
// Higher min chunk size than data device to ensure to limit max chunks in fast pdevs and thus lesser super block
163163
// area on more expensive fast device.
164-
static constexpr uint64_t MIN_CHUNK_SIZE_FAST_DEVICE = 32 * 1024 * 1024;
164+
static constexpr uint64_t MIN_CHUNK_SIZE_FAST_DEVICE = 8 * 1024 * 1024;
165165

166166
// Maximum number of chunks across all devices. We need to keep in mind the BlkId restriction (to address the
167167
// chunks)

src/lib/device/journal_vdev.cpp

+91-42
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ JournalVirtualDev::JournalVirtualDev(DeviceManager& dmgr, const vdev_info& vinfo
4646
HS_DYNAMIC_CONFIG(generic.journal_chunk_pool_capacity),
4747
[this]() {
4848
init_private_data->created_at = get_time_since_epoch_ms();
49+
init_private_data->end_of_chunk = m_vdev_info.chunk_size;
4950
sisl::blob private_blob{r_cast< uint8_t* >(init_private_data.get()), sizeof(JournalChunkPrivate)};
5051
return private_blob;
5152
},
@@ -84,17 +85,17 @@ void JournalVirtualDev::init() {
8485
// Create descriptor for each logdev_id
8586
auto journal_desc = std::make_shared< JournalVirtualDev::Descriptor >(*this, logdev_id);
8687
m_journal_descriptors.emplace(logdev_id, journal_desc);
87-
LOGDEBUGMOD(journalvdev, "Loading descriptor {}", logdev_id);
88+
LOGINFOMOD(journalvdev, "Loading descriptor log_dev={}", logdev_id);
8889
// Traverse the list starting from the head and add those chunks
8990
// in order to the journal descriptor. next_chunk is stored in private_data.
9091
// Last chunk will have next_chunk as 0.
9192
auto chunk_num = head.chunk_num;
9293
while (chunk_num != 0) {
9394
auto& c = chunk_map[chunk_num];
94-
RELEASE_ASSERT(c, "Invalid chunk found logdev {} chunk {}", logdev_id, chunk_num);
95+
RELEASE_ASSERT(c, "Invalid chunk found log_dev={} chunk={}", logdev_id, c->to_string());
9596
journal_desc->m_journal_chunks.push_back(c);
9697
visited_chunks.insert(chunk_num);
97-
LOGDEBUGMOD(journalvdev, "Loading chunk {} descriptor {}", chunk_num, logdev_id);
98+
LOGINFOMOD(journalvdev, "Loading log_dev={} chunk={}", logdev_id, c->to_string());
9899

99100
// Increase the the total size.
100101
journal_desc->m_total_size += c->size();
@@ -121,8 +122,8 @@ void JournalVirtualDev::init() {
121122
*data = JournalChunkPrivate{};
122123
update_chunk_private(chunk, data);
123124

124-
LOGDEBUGMOD(journalvdev, "Removing orphan chunk {} found for logdev {} next {}.", chunk_id, logdev_id,
125-
next_chunk);
125+
LOGINFOMOD(journalvdev, "Removing orphan chunk {} found for logdev {} next {}.", chunk_id, logdev_id,
126+
next_chunk);
126127
m_dmgr.remove_chunk_locked(chunk);
127128
}
128129

@@ -149,7 +150,11 @@ shared< JournalVirtualDev::Descriptor > JournalVirtualDev::open(logdev_id_t logd
149150
return journal_desc;
150151
}
151152

152-
LOGDEBUGMOD(journalvdev, "Opened log device descriptor {}", logdev_id);
153+
LOGINFOMOD(journalvdev, "Opened journal vdev descriptor log_dev={}", logdev_id);
154+
for (auto& chunk : it->second->m_journal_chunks) {
155+
LOGINFOMOD(journalvdev, " log_dev={} end_of_chunk={} chunk={}", logdev_id, get_end_of_chunk(chunk),
156+
chunk->to_string());
157+
}
153158
return it->second;
154159
}
155160

@@ -181,18 +186,19 @@ void JournalVirtualDev::Descriptor::append_chunk() {
181186
last_chunk_private->end_of_chunk = offset_in_chunk;
182187
}
183188
m_vdev.update_chunk_private(last_chunk, last_chunk_private);
184-
LOGDEBUGMOD(journalvdev, "Added chunk new {} last {} desc {}", new_chunk->chunk_id(), last_chunk->chunk_id(),
185-
to_string());
189+
LOGINFOMOD(journalvdev, "Added chunk new {} last {} desc {}", new_chunk->to_string(), last_chunk->chunk_id(),
190+
to_string());
186191

187192
} else {
188193
// If the list is empty, update the new chunk as the head. Only head chunk contains the logdev_id.
189194
auto* new_chunk_private = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(new_chunk->user_private()));
190195
new_chunk_private->is_head = true;
191196
new_chunk_private->logdev_id = m_logdev_id;
197+
new_chunk_private->end_of_chunk = m_vdev.info().chunk_size;
192198
// Append the new chunk
193199
m_journal_chunks.push_back(new_chunk);
194200
m_vdev.update_chunk_private(new_chunk, new_chunk_private);
195-
LOGDEBUGMOD(journalvdev, "Added head chunk {} desc {}", new_chunk->chunk_id(), to_string());
201+
LOGINFOMOD(journalvdev, "Added head chunk={} desc {}", new_chunk->to_string(), to_string());
196202
}
197203
}
198204

@@ -231,7 +237,8 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) {
231237

232238
// assert that returnning logical offset is in good range
233239
HS_DBG_ASSERT_LE(tail_off, m_end_offset);
234-
LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} desc {}", to_hex(tail_off), to_string());
240+
LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} tail_off {} size {} desc {}", to_hex(tail_off), tail_off, sz,
241+
to_string());
235242
return tail_off;
236243
}
237244

@@ -336,30 +343,45 @@ void JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, int iovcnt, o
336343
size_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_rd) {
337344
if (m_journal_chunks.empty()) { return 0; }
338345

346+
HS_REL_ASSERT_LE(m_seek_cursor, m_end_offset, "seek_cursor {} exceeded end_offset {}", m_seek_cursor, m_end_offset);
347+
if (m_seek_cursor >= m_end_offset) {
348+
LOGTRACEMOD(journalvdev, "sync_next_read reached end of chunks");
349+
return 0;
350+
}
351+
339352
auto [chunk, _, offset_in_chunk] = offset_to_chunk(m_seek_cursor);
340353
auto const end_of_chunk = m_vdev.get_end_of_chunk(chunk);
341354
auto const chunk_size = std::min< uint64_t >(end_of_chunk, chunk->size());
342355
bool across_chunk{false};
343356

357+
// LOGINFO("sync_next_read size_rd {} chunk {} seek_cursor {} end_of_chunk {} {}", size_rd, chunk->to_string(),
358+
// m_seek_cursor, end_of_chunk, chunk_size);
359+
344360
HS_REL_ASSERT_LE((uint64_t)end_of_chunk, chunk->size(), "Invalid end of chunk: {} detected on chunk num: {}",
345361
end_of_chunk, chunk->chunk_id());
346362
HS_REL_ASSERT_LE((uint64_t)offset_in_chunk, chunk->size(),
347363
"Invalid m_seek_cursor: {} which falls in beyond end of chunk: {}!", m_seek_cursor, end_of_chunk);
348364

349365
// if read size is larger then what's left in this chunk
350-
if (size_rd >= (chunk->size() - offset_in_chunk)) {
366+
if (size_rd >= (end_of_chunk - offset_in_chunk)) {
351367
// truncate size to what is left;
352-
size_rd = chunk->size() - offset_in_chunk;
368+
size_rd = end_of_chunk - offset_in_chunk;
353369
across_chunk = true;
354370
}
355371

372+
if (buf == nullptr) { return size_rd; }
373+
356374
auto ec = sync_pread(buf, size_rd, m_seek_cursor);
357375
// TODO: Check if we can have tolerate this error and somehow start homestore without replaying or in degraded mode?
358376
HS_REL_ASSERT(!ec, "Error in reading next stream of bytes, proceeding could cause some inconsistency, exiting");
359377

360378
// Update seek cursor after read;
361379
m_seek_cursor += size_rd;
362-
if (across_chunk) { m_seek_cursor += (chunk->size() - end_of_chunk); }
380+
if (across_chunk) {
381+
m_seek_cursor += (chunk->size() - end_of_chunk);
382+
LOGTRACEMOD(journalvdev, "Across size_rd {} chunk {} seek_cursor {} end_of_chunk {}", size_rd,
383+
chunk->to_string(), m_seek_cursor, end_of_chunk);
384+
}
363385
return size_rd;
364386
}
365387

@@ -412,37 +434,51 @@ off_t JournalVirtualDev::Descriptor::lseek(off_t offset, int whence) {
412434
break;
413435
}
414436

437+
LOGINFOMOD(journalvdev, "lseek desc {} offset 0x{} whence {} ", to_string(), to_hex(offset), whence);
415438
return m_seek_cursor;
416439
}
417440

418441
/**
419442
* @brief :- it returns the vdev offset after nbytes from start offset
420443
*/
421444
off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const {
422-
if (m_journal_chunks.empty()) { return data_start_offset(); }
445+
if (nbytes == 0 || m_journal_chunks.empty()) {
446+
// If no chunks return start offset.
447+
return data_start_offset();
448+
}
423449

424450
off_t vdev_offset = data_start_offset();
425-
uint32_t dev_id{0}, chunk_id{0};
426-
off_t offset_in_chunk{0};
427-
off_t cur_read_cur{0};
428-
429-
while (cur_read_cur != nbytes) {
430-
auto [chunk, _, offset_in_chunk] = offset_to_chunk(vdev_offset);
431-
432-
auto const end_of_chunk = m_vdev.get_end_of_chunk(chunk);
433-
auto const chunk_size = std::min< uint64_t >(end_of_chunk, chunk->size());
434-
auto const remaining = nbytes - cur_read_cur;
435-
if (remaining >= (static_cast< off_t >(chunk->size()) - offset_in_chunk)) {
436-
cur_read_cur += (chunk->size() - offset_in_chunk);
437-
vdev_offset += (chunk->size() - offset_in_chunk);
438-
} else {
451+
auto chunk_size = m_vdev.info().chunk_size;
452+
uint64_t remaining = nbytes;
453+
auto start_offset = data_start_offset() % chunk_size;
454+
455+
// data_start_offset coulde be anywhere in the first chunk.
456+
// because when we truncate and data_start_offset lies in first chunk
457+
// we dont delete that first chunk. other chunks will have start_offset as 0.
458+
for (auto chunk : m_journal_chunks) {
459+
uint64_t end_of_chunk = std::min< uint64_t >(m_vdev.get_end_of_chunk(chunk), chunk_size);
460+
461+
auto num_data_bytes = end_of_chunk - start_offset;
462+
if (remaining < num_data_bytes) {
439463
vdev_offset += remaining;
440-
cur_read_cur = nbytes;
464+
break;
441465
}
466+
467+
remaining -= num_data_bytes;
468+
vdev_offset += (chunk_size - start_offset);
469+
start_offset = 0;
442470
}
443471
return vdev_offset;
444472
}
445473

474+
void JournalVirtualDev::Descriptor::update_data_start_offset(off_t offset) {
475+
m_data_start_offset = offset;
476+
auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size);
477+
m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size;
478+
LOGINFOMOD(journalvdev, "Updated data start offset off 0x{} {}", to_hex(offset), to_string());
479+
RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch {}", to_string());
480+
}
481+
446482
off_t JournalVirtualDev::Descriptor::tail_offset(bool reserve_space_include) const {
447483
off_t tail = static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed));
448484
if (reserve_space_include) { tail += m_reserved_sz; }
@@ -456,13 +492,13 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) {
456492

457493
if (tail >= start) {
458494
m_write_sz_in_total.store(tail - start, std::memory_order_relaxed);
459-
} else {
460-
RELEASE_ASSERT(false, "tail {} less than start offset {}", tail, start);
495+
} else if (tail != 0) {
496+
LOGERROR("tail {} less than start offset {} desc {}", tail, start, to_string());
497+
RELEASE_ASSERT(false, "Invalid tail offset");
461498
}
462499
lseek(tail);
463500

464-
LOGDEBUGMOD(journalvdev, "tail arg 0x{} desc {} ", to_hex(tail), to_string());
465-
HS_REL_ASSERT(tail_offset() == tail, "tail offset mismatch after calculation 0x{} : {}", tail_offset(), tail);
501+
LOGINFOMOD(journalvdev, "Updated tail offset arg 0x{} desc {} ", to_hex(tail), to_string());
466502
}
467503

468504
void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) {
@@ -511,7 +547,11 @@ void JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) {
511547
m_vdev.update_chunk_private(chunk, data);
512548

513549
m_vdev.m_chunk_pool->enqueue(chunk);
514-
HS_PERIODIC_LOG(TRACE, journalvdev, "adding chunk {} back to pool desc {}", chunk->chunk_id(), to_string());
550+
#if 0
551+
// Format and add back to pool.
552+
chunk->physical_dev_mutable()->async_write_zero(chunk->size(), chunk->start_offset());
553+
LOGINFOMOD(journalvdev, "Format and adding chunk {} back to pool desc {}", chunk->chunk_id(), to_string());
554+
#endif
515555
}
516556

517557
// Update our start offset, to keep track of actual size
@@ -561,7 +601,8 @@ uint64_t JournalVirtualDev::Descriptor::logical_to_dev_offset(off_t log_offset,
561601
}
562602
#endif
563603

564-
std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::offset_to_chunk(off_t log_offset) const {
604+
std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::offset_to_chunk(off_t log_offset,
605+
bool check) const {
565606
uint64_t chunk_aligned_offset = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size);
566607
uint64_t off_l{static_cast< uint64_t >(log_offset) - chunk_aligned_offset};
567608
uint32_t index = 0;
@@ -574,10 +615,17 @@ std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::of
574615
}
575616
}
576617

577-
HS_DBG_ASSERT(false, "Input log_offset is invalid: {}", log_offset);
618+
if (check) { HS_DBG_ASSERT(false, "Input log_offset is invalid: {} {}", log_offset, to_string()); }
578619
return {nullptr, 0L, 0L};
579620
}
580621

622+
bool JournalVirtualDev::Descriptor::is_offset_at_last_chunk(off_t bytes_offset) {
623+
auto [chunk, chunk_index, _] = offset_to_chunk(bytes_offset, false);
624+
if (chunk == nullptr) return true;
625+
if (chunk_index == m_journal_chunks.size() - 1) { return true; }
626+
return false;
627+
}
628+
581629
void JournalVirtualDev::Descriptor::high_watermark_check() {
582630
if (resource_mgr().check_journal_size(used_size(), size())) {
583631
COUNTER_INCREMENT(m_vdev.m_metrics, vdev_high_watermark_count, 1);
@@ -598,7 +646,7 @@ bool JournalVirtualDev::Descriptor::is_alloc_accross_chunk(size_t size) const {
598646

599647
nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
600648
nlohmann::json j;
601-
j["logdev_id"] = m_logdev_id;
649+
j["logdev"] = m_logdev_id;
602650
j["seek_cursor"] = m_seek_cursor;
603651
j["data_start_offset"] = m_data_start_offset;
604652
j["end_offset"] = m_end_offset;
@@ -613,7 +661,7 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
613661
nlohmann::json c;
614662
auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private()));
615663
c["chunk_id"] = chunk->chunk_id();
616-
c["logdev_id"] = private_data->logdev_id;
664+
c["logdev"] = private_data->logdev_id;
617665
c["is_head"] = private_data->is_head;
618666
c["end_of_chunk"] = private_data->end_of_chunk;
619667
c["next_chunk"] = private_data->next_chunk;
@@ -627,12 +675,13 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
627675
}
628676

629677
std::string JournalVirtualDev::Descriptor::to_string() const {
630-
std::string str{fmt::format("id={};ds=0x{};end=0x{};writesz={};tail=0x{};"
678+
off_t tail =
679+
static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz;
680+
std::string str{fmt::format("log_dev={};ds=0x{};end=0x{};writesz={};tail=0x{};"
631681
"rsvdsz={};chunks={};trunc={};total={};seek=0x{} ",
632682
m_logdev_id, to_hex(m_data_start_offset), to_hex(m_end_offset),
633-
m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail_offset()),
634-
m_reserved_sz, m_journal_chunks.size(), m_truncate_done, m_total_size,
635-
to_hex(m_seek_cursor))};
683+
m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail), m_reserved_sz,
684+
m_journal_chunks.size(), m_truncate_done, m_total_size, to_hex(m_seek_cursor))};
636685
return str;
637686
}
638687

0 commit comments

Comments
 (0)