diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 4c0798531e7..7c253d9fa0a 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1487,13 +1487,23 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) { auto& wal = *it; assert(wal.IsSyncing()); - + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Synced log %" PRIu64 " from logs_, last seq number %" PRIu64 + "\n", + wal.number, wal.writer->GetLastSequence()); if (logs_.size() > 1) { if (immutable_db_options_.track_and_verify_wals_in_manifest && wal.GetPreSyncSize() > 0) { - synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize())); + synced_wals->AddWal( + wal.number, + WalMetadata(wal.GetPreSyncSize(), wal.writer->GetLastSequence())); } - logs_to_free_.push_back(wal.ReleaseWriter()); + auto writer = wal.ReleaseWriter(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "deleting log %" PRIu64 + " from logs_. Last Seq number of the WAL is %" PRIu64 "\n", + wal.number, writer->GetLastSequence()); + logs_to_free_.push_back(writer); it = logs_.erase(it); } else { wal.FinishSync(); @@ -1507,12 +1517,19 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, void DBImpl::MarkLogsNotSynced(uint64_t up_to) { log_write_mutex_.AssertHeld(); + uint64_t min_wal = 0; for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to; ++it) { auto& wal = *it; + if (min_wal == 0) { + min_wal = it->number; + } wal.FinishSync(); } log_sync_cv_.SignalAll(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "MarkLogsNotSynced from %" PRIu64 " to %" PRIu64 "\n", min_wal, + up_to); } SequenceNumber DBImpl::GetLatestSequenceNumber() const { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index ee5d1825215..61fccfcf6d7 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1845,7 +1845,7 @@ class DBImpl : public DB { IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - LogFileNumberSize& log_file_number_size); + LogFileNumberSize& log_file_number_size, int caller_id); IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, uint64_t* log_used, diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 1db50b476d2..45b3289a9a4 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -280,6 +280,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, return; } + VersionEdit synced_wals; if (!alive_log_files_.empty() && !logs_.empty()) { uint64_t min_log_number = job_context->log_number; size_t num_alive_log_files = alive_log_files_.size(); @@ -293,6 +294,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, earliest.number); log_recycle_files_.push_back(earliest.number); } else { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "deleting WAL log %" PRIu64 "\n", earliest.number); job_context->log_delete_files.push_back(earliest.number); } if (job_context->size_log_to_delete == 0) { @@ -317,7 +320,18 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // logs_ could have changed while we were waiting. continue; } - logs_to_free_.push_back(log.ReleaseWriter()); + if (immutable_db_options_.track_and_verify_wals_in_manifest && + log.GetPreSyncSize() > 0) { + synced_wals.AddWal( + log.number, + WalMetadata(log.GetPreSyncSize(), log.writer->GetLastSequence())); + } + auto writer = log.ReleaseWriter(); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "deleting log %" PRIu64 + " from logs_, last seq number of WAL %" PRIu64 "\n", + log.number, writer->GetLastSequence()); + logs_to_free_.push_back(writer); logs_.pop_front(); } // Current log cannot be obsolete. @@ -331,6 +345,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, logs_to_free_.clear(); log_write_mutex_.Unlock(); mutex_.Lock(); + if (synced_wals.IsWalAddition()) { + ApplyWALToManifest(&synced_wals); + } job_context->log_recycle_files.assign(log_recycle_files_.begin(), log_recycle_files_.end()); } @@ -491,6 +508,10 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { // Close WALs before trying to delete them. for (const auto w : state.logs_to_free) { // TODO: maybe check the return value of Close. + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Close log %" PRIu64 + " from logs_, last Seq number in WAL %" PRIu64 "\n", + w->get_log_number(), w->GetLastSequence()); auto s = w->Close(); s.PermitUncheckedError(); } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index a75a62f5b69..6dd5fc00e73 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1751,7 +1751,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, assert(log_writer->get_log_number() == log_file_number_size.number); impl->mutex_.AssertHeld(); s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size, - log_file_number_size); + log_file_number_size, 0); if (s.ok()) { // Need to fsync, otherwise it might get lost after a power reset. s = impl->FlushWAL(false); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index dd7f65f03f7..9cea7cc2b84 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1432,10 +1432,23 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, - LogFileNumberSize& log_file_number_size) { + LogFileNumberSize& log_file_number_size, + int caller_id) { assert(log_size != nullptr); - + if (log_writer->file()->GetFileSize() == 0) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Start writing to WAL: [%" PRIu64 " ]", + log_writer->get_log_number()); + } + if (log_writer->get_log_number() != logs_.back().number) { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Not writing to latest WAL: [%" PRIu64 ", %" PRIu64 "] CallerId: %d", + log_writer->get_log_number(), logs_.back().number, caller_id); + } Slice log_entry = WriteBatchInternal::Contents(&merged_batch); + SequenceNumber seq = WriteBatchInternal::Sequence(&merged_batch); + log_writer->SetLastSequence(seq); *log_size = log_entry.size(); // When two_write_queues_ WriteToWAL has to be protected from concurretn calls // from the two queues anyway and log_write_mutex_ is already held. Otherwise @@ -1488,7 +1501,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, uint64_t log_size; io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, - log_file_number_size); + log_file_number_size, 1); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -1516,7 +1529,9 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, } for (auto& log : logs_) { + log.PrepareForSync(); io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync); + log.FinishSync(); if (!io_s.ok()) { break; } @@ -1550,6 +1565,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal); RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); } + return io_s; } @@ -1589,7 +1605,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL( uint64_t log_size; io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, - log_file_number_size); + log_file_number_size, 2); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; diff --git a/db/log_writer.cc b/db/log_writer.cc index e2e596596aa..2d6a3ba1ba4 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -29,6 +29,7 @@ Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, char t = static_cast(i); type_crc_[i] = crc32c::Value(&t, 1); } + last_seq_ = 0; } Writer::~Writer() { diff --git a/db/log_writer.h b/db/log_writer.h index 1a91b21994d..20db1abc4cc 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -15,6 +15,7 @@ #include "rocksdb/io_status.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "rocksdb/types.h" namespace ROCKSDB_NAMESPACE { @@ -92,11 +93,16 @@ class Writer { bool TEST_BufferIsEmpty(); + void SetLastSequence(SequenceNumber seq) { last_seq_ = seq; } + + SequenceNumber GetLastSequence() const { return last_seq_; } + private: std::unique_ptr dest_; size_t block_offset_; // Current offset in block uint64_t log_number_; bool recycle_log_files_; + SequenceNumber last_seq_; // crc32c values for all supported record types. These are // pre-computed to reduce the overhead of computing the crc of the diff --git a/db/wal_edit.cc b/db/wal_edit.cc index 2525be610b4..ee159b08b3d 100644 --- a/db/wal_edit.cc +++ b/db/wal_edit.cc @@ -18,6 +18,8 @@ void WalAddition::EncodeTo(std::string* dst) const { PutVarint32(dst, static_cast(WalAdditionTag::kSyncedSize)); PutVarint64(dst, metadata_.GetSyncedSizeInBytes()); } + PutVarint32(dst, static_cast(WalAdditionTag::kLastSyncSeq)); + PutVarint64(dst, metadata_.GetLastSequence()); PutVarint32(dst, static_cast(WalAdditionTag::kTerminate)); } @@ -44,6 +46,14 @@ Status WalAddition::DecodeFrom(Slice* src) { metadata_.SetSyncedSizeInBytes(size); break; } + case WalAdditionTag::kLastSyncSeq: { + uint64_t lsn = 0; + if (!GetVarint64(src, &lsn)) { + return Status::Corruption(class_name, "Error decoding WAL file size"); + } + metadata_.SetLastSequence(lsn); + break; + } // TODO: process future tags such as checksum. case WalAdditionTag::kTerminate: return Status::OK(); @@ -58,13 +68,15 @@ Status WalAddition::DecodeFrom(Slice* src) { JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) { jw << "LogNumber" << wal.GetLogNumber() << "SyncedSizeInBytes" - << wal.GetMetadata().GetSyncedSizeInBytes(); + << wal.GetMetadata().GetSyncedSizeInBytes() << "LastSeqNumber" + << wal.GetMetadata().GetLastSequence(); return jw; } std::ostream& operator<<(std::ostream& os, const WalAddition& wal) { os << "log_number: " << wal.GetLogNumber() - << " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes(); + << " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes() + << " last_seq_number: " << wal.GetMetadata().GetLastSequence(); return os; } @@ -139,6 +151,7 @@ Status WalSet::AddWal(const WalAddition& wal) { // Update synced size for the given WAL. it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes()); + it->second.SetLastSequence(wal.GetMetadata().GetLastSequence()); return Status::OK(); } diff --git a/db/wal_edit.h b/db/wal_edit.h index d27f74ef137..c3c2566a942 100644 --- a/db/wal_edit.h +++ b/db/wal_edit.h @@ -32,8 +32,10 @@ class WalMetadata { public: WalMetadata() = default; - explicit WalMetadata(uint64_t synced_size_bytes) - : synced_size_bytes_(synced_size_bytes) {} + explicit WalMetadata(uint64_t synced_size_bytes, + uint64_t last_sequence_number = 0) + : synced_size_bytes_(synced_size_bytes), + last_sequence_number_(last_sequence_number) {} bool HasSyncedSize() const { return synced_size_bytes_ != kUnknownWalSize; } @@ -41,6 +43,10 @@ class WalMetadata { uint64_t GetSyncedSizeInBytes() const { return synced_size_bytes_; } + uint64_t GetLastSequence() const { return last_sequence_number_; } + + void SetLastSequence(uint64_t lsn) { last_sequence_number_ = lsn; } + private: friend bool operator==(const WalMetadata& lhs, const WalMetadata& rhs); friend bool operator!=(const WalMetadata& lhs, const WalMetadata& rhs); @@ -50,6 +56,8 @@ class WalMetadata { // Size of the most recently synced WAL in bytes. uint64_t synced_size_bytes_ = kUnknownWalSize; + + uint64_t last_sequence_number_ = 0; }; inline bool operator==(const WalMetadata& lhs, const WalMetadata& rhs) { @@ -66,6 +74,8 @@ enum class WalAdditionTag : uint32_t { kTerminate = 1, // Synced Size in bytes. kSyncedSize = 2, + + kLastSyncSeq = 3, // Add tags in the future, such as checksum? };