Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Wal debug 6.29.tikv #356

Open
wants to merge 18 commits into
base: 6.29.tikv
Choose a base branch
from
17 changes: 15 additions & 2 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1487,13 +1487,19 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
tonyxuqqi marked this conversation as resolved.
Show resolved Hide resolved
auto& wal = *it;
assert(wal.IsSyncing());

ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Synced log %" PRIu64 " from logs_\n", wal.number);
if (logs_.size() > 1) {
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.GetPreSyncSize() > 0) {
synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
}
logs_to_free_.push_back(wal.ReleaseWriter());
auto writer = wal.ReleaseWriter();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"deleting log %" PRIu64
" from logs_. Last Seq number of the WAL is %" PRIu64 "\n",
wal.number, writer->GetLastSequence());
logs_to_free_.push_back(writer);
it = logs_.erase(it);
} else {
wal.FinishSync();
Expand All @@ -1507,12 +1513,19 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,

void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
log_write_mutex_.AssertHeld();
uint64_t min_wal = 0;
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) {
auto& wal = *it;
if (min_wal == 0) {
min_wal = it->number;
}
wal.FinishSync();
}
log_sync_cv_.SignalAll();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"MarkLogsNotSynced from %" PRIu64 " to %" PRIu64 "\n", min_wal,
up_to);
}

SequenceNumber DBImpl::GetLatestSequenceNumber() const {
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1845,7 +1845,7 @@ class DBImpl : public DB {

IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
uint64_t* log_used, uint64_t* log_size,
LogFileNumberSize& log_file_number_size);
LogFileNumberSize& log_file_number_size, int caller_id);

IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
Expand Down
13 changes: 12 additions & 1 deletion db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
earliest.number);
log_recycle_files_.push_back(earliest.number);
} else {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"deleting WAL log %" PRIu64 "\n", earliest.number);
job_context->log_delete_files.push_back(earliest.number);
}
if (job_context->size_log_to_delete == 0) {
Expand All @@ -317,7 +319,12 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// logs_ could have changed while we were waiting.
continue;
}
logs_to_free_.push_back(log.ReleaseWriter());
auto writer = log.ReleaseWriter();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"deleting log %" PRIu64
" from logs_, last seq number of WAL %" PRIu64 "\n",
log.number, writer->GetLastSequence());
logs_to_free_.push_back(writer);
logs_.pop_front();
}
// Current log cannot be obsolete.
Expand Down Expand Up @@ -491,6 +498,10 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// Close WALs before trying to delete them.
for (const auto w : state.logs_to_free) {
// TODO: maybe check the return value of Close.
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Close log %" PRIu64
" from logs_, last Seq number in WAL %" PRIu64 "\n",
w->get_log_number(), w->GetLastSequence());
auto s = w->Close();
s.PermitUncheckedError();
}
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
assert(log_writer->get_log_number() == log_file_number_size.number);
impl->mutex_.AssertHeld();
s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size,
log_file_number_size);
log_file_number_size, 0);
if (s.ok()) {
// Need to fsync, otherwise it might get lost after a power reset.
s = impl->FlushWAL(false);
Expand Down
22 changes: 18 additions & 4 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1432,10 +1432,23 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size,
LogFileNumberSize& log_file_number_size) {
LogFileNumberSize& log_file_number_size,
int caller_id) {
assert(log_size != nullptr);

if (log_writer->file()->GetFileSize() == 0) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Start writing to WAL: [%" PRIu64 " ]",
log_writer->get_log_number());
}
if (log_writer->get_log_number() != logs_.back().number) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tonyxuqqi : At the beginning of the write, writer will reference the last WAL only. The race condition happens, while the write is ongoing. We should check this condition after the write.

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Not writing to latest WAL: [%" PRIu64 ", %" PRIu64 "] CallerId: %d",
log_writer->get_log_number(), logs_.back().number, caller_id);
}
Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
SequenceNumber seq = WriteBatchInternal::Sequence(&merged_batch);
log_writer->SetLastSequence(seq);
*log_size = log_entry.size();
// When two_write_queues_ WriteToWAL has to be protected from concurretn calls
// from the two queues anyway and log_write_mutex_ is already held. Otherwise
Expand Down Expand Up @@ -1488,7 +1501,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,

uint64_t log_size;
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
log_file_number_size);
log_file_number_size, 1);
if (to_be_cached_state) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a lot when the fsync happens in writeToWAL

io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync);

cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
Expand Down Expand Up @@ -1550,6 +1563,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
}

return io_s;
}

Expand Down Expand Up @@ -1589,7 +1603,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL(

uint64_t log_size;
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
log_file_number_size);
log_file_number_size, 2);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
Expand Down
1 change: 1 addition & 0 deletions db/log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
char t = static_cast<char>(i);
type_crc_[i] = crc32c::Value(&t, 1);
}
last_seq_ = 0;
}

Writer::~Writer() {
Expand Down
6 changes: 6 additions & 0 deletions db/log_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rocksdb/io_status.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -92,11 +93,16 @@ class Writer {

bool TEST_BufferIsEmpty();

void SetLastSequence(SequenceNumber seq) { last_seq_ = seq; }

SequenceNumber GetLastSequence() const { return last_seq_; }

private:
std::unique_ptr<WritableFileWriter> dest_;
size_t block_offset_; // Current offset in block
uint64_t log_number_;
bool recycle_log_files_;
SequenceNumber last_seq_;

// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the
Expand Down
Loading