From 56fb1ab13602b68234d779538619a29c12ce1439 Mon Sep 17 00:00:00 2001 From: Yang Zhang Date: Sun, 29 Sep 2024 23:58:36 -0700 Subject: [PATCH] Add multi-batches write implementation (#387) Signed-off-by: Yang Zhang Co-authored-by: glorv --- db/column_family.cc | 7 + db/db_impl/db_impl.h | 14 +- db/db_impl/db_impl_open.cc | 13 +- db/db_impl/db_impl_secondary.cc | 2 +- db/db_impl/db_impl_write.cc | 297 ++++++++++++++++++++++++++++--- db/db_kv_checksum_test.cc | 17 +- db/db_properties_test.cc | 3 + db/db_test_util.cc | 6 + db/db_test_util.h | 1 + db/db_write_test.cc | 82 ++++++++- db/external_sst_file_test.cc | 30 ++++ db/write_batch.cc | 25 ++- db/write_batch_internal.h | 13 +- db/write_callback_test.cc | 6 + db/write_thread.cc | 92 ++++++++-- db/write_thread.h | 151 +++++++++++++++- include/rocksdb/db.h | 5 + include/rocksdb/options.h | 16 ++ options/db_options.cc | 7 + options/db_options.h | 1 + options/options_helper.cc | 2 + options/options_settable_test.cc | 1 + tools/db_bench_tool.cc | 77 +++++++- 23 files changed, 798 insertions(+), 70 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index f6af0c8f9de..76cfc018985 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1515,6 +1515,13 @@ Status ColumnFamilyData::ValidateOptions( } } } + + if (db_options.enable_multi_batch_write && + cf_options.max_successive_merges > 0) { + return Status::NotSupported( + "Multi thread write is only supported with no successive merges"); + } + return s; } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 34a5f33989c..ed771324827 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -232,6 +232,10 @@ class DBImpl : public DB { virtual Status Write(const WriteOptions& options, WriteBatch* updates) override; + using DB::MultiBatchWrite; + virtual Status MultiBatchWrite(const WriteOptions& options, + std::vector&& updates) override; + using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, @@ -1475,6 +1479,13 @@ class DBImpl : public DB { PreReleaseCallback* pre_release_callback = nullptr, PostMemTableCallback* post_memtable_callback = nullptr); + Status MultiBatchWriteImpl(const WriteOptions& write_options, + std::vector&& my_batch, + WriteCallback* callback = nullptr, + uint64_t* log_used = nullptr, uint64_t log_ref = 0, + uint64_t* seq_used = nullptr); + void MultiBatchWriteCommit(CommitRequest* request); + Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, uint64_t* log_used = nullptr, uint64_t log_ref = 0, @@ -2005,7 +2016,8 @@ class DBImpl : public DB { mutex_.Lock(); } - if (!immutable_db_options_.unordered_write) { + if (!immutable_db_options_.unordered_write && + !immutable_db_options_.enable_multi_batch_write) { // Then the writes are finished before the next write group starts return; } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 074fa86214a..086e014e581 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -151,6 +151,16 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src, result.avoid_flush_during_recovery = false; } + // multi thread write do not support two-write-que or write in 2PC + if (result.two_write_queues || result.allow_2pc) { + result.enable_multi_batch_write = false; + } + + if (result.enable_multi_batch_write) { + result.enable_pipelined_write = false; + result.allow_concurrent_memtable_write = true; + } + ImmutableDBOptions immutable_db_options(result); if (!immutable_db_options.IsWalDirSameAsDBPath()) { // Either the WAL dir and db_paths[0]/db_name are not the same, or we @@ -1290,7 +1300,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, bool has_valid_writes = false; status = WriteBatchInternal::InsertInto( batch_to_use, column_family_memtables_.get(), &flush_scheduler_, - &trim_history_scheduler_, true, wal_number, this, + &trim_history_scheduler_, true, wal_number, 0, this, false /* concurrent_memtable_writes */, next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); MaybeIgnoreError(&status); @@ -2230,7 +2240,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, impl->immutable_db_options_.db_paths[0].path); } - if (s.ok()) { ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p", impl); diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 235a528ba08..1ee6e9df0dd 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -285,7 +285,7 @@ Status DBImplSecondary::RecoverLogFiles( status = WriteBatchInternal::InsertInto( &batch, column_family_memtables_.get(), nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/, - true, log_number, this, false /* concurrent_memtable_writes */, + true, log_number, 0, this, false /* concurrent_memtable_writes */, next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); } // If column family was not found, it might mean that the WAL write diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index cbe6e8fbd74..536c514a2ec 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -174,6 +174,229 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options, return s; } +void DBImpl::MultiBatchWriteCommit(CommitRequest* request) { + write_thread_.ExitWaitSequenceCommit(request, &versions_->last_sequence_); + size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1; + if (pending_cnt == 0) { + // switch_cv_ waits until pending_memtable_writes_ = 0. Locking its mutex + // before notify ensures that cv is in waiting state when it is notified + // thus not missing the update to pending_memtable_writes_ even though it + // is not modified under the mutex. + std::lock_guard lck(switch_mutex_); + switch_cv_.notify_all(); + } +} + +Status DBImpl::MultiBatchWrite(const WriteOptions& options, + std::vector&& updates) { + if (immutable_db_options_.enable_multi_batch_write) { + return MultiBatchWriteImpl(options, std::move(updates), nullptr, nullptr); + } else { + return Status::NotSupported(); + } +} + +// In this way, RocksDB will apply WriteBatch to memtable out of order but +// commit +// them in order. (We borrow the idea from: +// https://github.com/cockroachdb/pebble/blob/master/docs/rocksdb.md#commit-pipeline. +// On this basis, we split the WriteBatch into smaller-grained WriteBatch +// vector, +// and when the WriteBatch sizes of multiple writers are not balanced, writers +// that finish first need to help the front writer finish writing the remaining +// WriteBatch to increase cpu usage and reduce overall latency) +// +// More details: +// +// Request Queue WriteBatchVec +// +--------------+ +---------------------+ +// | Front Writer | -> | WB1 | WB2 | WB3|... | +// +--------------+ +-----+ +---------------------+ +// | Writer 2 | -> | WB1 | +// +--------------+ +-----+ +-----------+ +// | Writer 3 | -> | WB1 | WB2 | +// +--------------+ +---+ +-----------+ +// | ... | -> |...| +// +--------------+ +---+ +// +// 1. Mutli Writers enter the `Request queue` to determine the commit order. +// 2. Then all writers write to the memtable in parallel (Each thread iterates +// over +// its own write batch vector). +// 3.1. If the Front Writer finishes writing and enters the commit phase first, +// it will +// pop itself from the `Request queue`, then this function will return to +// its caller, +// and the Writer 2 becomes the new front. +// 3.2. If the Writer 2 or 3 finishes writing and enters the commit phase first, +// it will +// help the front writer complete its pending WBs one by one until all done +// and wake +// up the Front Writer, then the Front Writer will traverse and pop +// completed writers, +// the first unfinished writer encountered will become the new front. +// +Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, + std::vector&& my_batch, + WriteCallback* callback, uint64_t* log_used, + uint64_t log_ref, uint64_t* seq_used) { + PERF_TIMER_GUARD(write_pre_and_post_process_time); + StopWatch write_sw(immutable_db_options_.clock, + immutable_db_options_.statistics.get(), DB_WRITE); + WriteThread::Writer writer(write_options, std::move(my_batch), callback, + log_ref, false /*disable_memtable*/); + CommitRequest request(&writer); + writer.request = &request; + write_thread_.JoinBatchGroup(&writer); + + WriteContext write_context; + if (writer.state == WriteThread::STATE_GROUP_LEADER) { + WriteThread::WriteGroup wal_write_group; + if (writer.callback && !writer.callback->AllowWriteBatching()) { + WaitForPendingWrites(); + } + LogContext log_context(!write_options.disableWAL && write_options.sync); + PERF_TIMER_STOP(write_pre_and_post_process_time); + writer.status = + PreprocessWrite(write_options, &log_context, &write_context); + PERF_TIMER_START(write_pre_and_post_process_time); + + // This can set non-OK status if callback fail. + last_batch_group_size_ = + write_thread_.EnterAsBatchGroupLeader(&writer, &wal_write_group); + const SequenceNumber current_sequence = + write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1; + size_t total_count = 0; + size_t total_byte_size = 0; + auto stats = default_cf_internal_stats_; + size_t memtable_write_cnt = 0; + IOStatus io_s; + io_s.PermitUncheckedError(); // Allow io_s to be uninitialized + if (writer.status.ok()) { + SequenceNumber next_sequence = current_sequence; + for (auto w : wal_write_group) { + if (w->CheckCallback(this)) { + if (w->ShouldWriteToMemtable()) { + w->sequence = next_sequence; + size_t count = WriteBatchInternal::Count(w->multi_batch.batches); + if (count > 0) { + auto sequence = w->sequence; + for (auto b : w->multi_batch.batches) { + WriteBatchInternal::SetSequence(b, sequence); + sequence += WriteBatchInternal::Count(b); + } + w->multi_batch.SetContext( + versions_->GetColumnFamilySet(), &flush_scheduler_, + &trim_history_scheduler_, + write_options.ignore_missing_column_families, this); + w->request->commit_lsn = next_sequence + count - 1; + write_thread_.EnterCommitQueue(w->request); + next_sequence += count; + total_count += count; + memtable_write_cnt++; + } + } + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, + WriteBatchInternal::ByteSize(w->multi_batch.batches)); + } + } + if (writer.disable_wal) { + has_unpersisted_data_.store(true, std::memory_order_relaxed); + } + write_thread_.UpdateLastSequence(current_sequence + total_count - 1); + stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count); + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size); + RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); + + PERF_TIMER_STOP(write_pre_and_post_process_time); + if (!write_options.disableWAL) { + PERF_TIMER_GUARD(write_wal_time); + stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1); + RecordTick(stats_, WRITE_DONE_BY_SELF, 1); + if (wal_write_group.size > 1) { + stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther, + wal_write_group.size - 1); + + RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); + } + assert(log_context.log_file_number_size); + LogFileNumberSize& log_file_number_size = + *(log_context.log_file_number_size); + io_s = + WriteToWAL(wal_write_group, log_context.writer, log_used, + log_context.need_log_sync, log_context.need_log_dir_sync, + current_sequence, log_file_number_size); + writer.status = io_s; + } + } + if (!io_s.ok()) { + // Check WriteToWAL status + IOStatusCheck(io_s); + } else if (!writer.CallbackFailed()) { + WriteStatusCheck(writer.status); + } + + VersionEdit synced_wals; + if (log_context.need_log_sync) { + InstrumentedMutexLock l(&log_write_mutex_); + if (writer.status.ok()) { + MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, + &synced_wals); + } else { + MarkLogsNotSynced(logfile_number_); + } + } + if (writer.status.ok() && synced_wals.IsWalAddition()) { + InstrumentedMutexLock l(&mutex_); + const ReadOptions read_options; + writer.status = ApplyWALToManifest(read_options, &synced_wals); + } + if (writer.status.ok()) { + pending_memtable_writes_ += memtable_write_cnt; + } else { + // The `pending_wb_cnt` must be reset to avoid other writers helping + // the front writer write its WBs after it failed to write the WAL. + writer.ResetPendingWBCnt(); + } + write_thread_.ExitAsBatchGroupLeader(wal_write_group, writer.status); + } + + if (seq_used != nullptr) { + *seq_used = writer.sequence; + } + TEST_SYNC_POINT("DBImpl::WriteImpl:CommitAfterWriteWAL"); + + if (writer.request->commit_lsn != 0 && writer.status.ok()) { + TEST_SYNC_POINT("DBImpl::WriteImpl:BeforePipelineWriteMemtable"); + PERF_TIMER_GUARD(write_memtable_time); + size_t total_count = WriteBatchInternal::Count(my_batch); + InternalStats* stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count); + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + + while (writer.ConsumeOne()) + ; + MultiBatchWriteCommit(writer.request); + + WriteStatusCheck(writer.status); + if (!writer.FinalStatus().ok()) { + writer.status = writer.FinalStatus(); + } + } else if (writer.request->commit_lsn != 0) { + // When the leader fails to write WAL, all writers in the group need to + // cancel + // the write to memtable. + writer.ResetPendingWBCnt(); + MultiBatchWriteCommit(writer.request); + } else { + writer.ResetPendingWBCnt(); + } + return writer.status; +} + // The main write queue. This is the only write queue that updates LastSequence. // When using one write queue, the same sequence also indicates the last // published sequence. @@ -240,6 +463,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::NotSupported( "pipelined_writes is not compatible with concurrent prepares"); } + if (two_write_queues_ && immutable_db_options_.enable_multi_batch_write) { + return Status::NotSupported( + "pipelined_writes is not compatible with concurrent prepares"); + } if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) { // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt return Status::NotSupported( @@ -308,6 +535,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return status; } + if (immutable_db_options_.enable_multi_batch_write && !disable_memtable) { + std::vector updates(1); + updates[0] = my_batch; + return MultiBatchWriteImpl(write_options, std::move(updates), callback, + log_used, log_ref, seq_used); + } + if (immutable_db_options_.enable_pipelined_write) { return PipelinedWriteImpl(write_options, my_batch, callback, log_used, log_ref, disable_memtable, seq_used); @@ -436,10 +670,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (writer->CheckCallback(this)) { valid_batches += writer->batch_cnt; if (writer->ShouldWriteToMemtable()) { - total_count += WriteBatchInternal::Count(writer->batch); + total_count += + WriteBatchInternal::Count(writer->multi_batch.batches[0]); total_byte_size = WriteBatchInternal::AppendedByteSize( - total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); - parallel = parallel && !writer->batch->HasMerge(); + total_byte_size, + WriteBatchInternal::ByteSize(writer->multi_batch.batches[0])); + parallel = parallel && !writer->multi_batch.batches[0]->HasMerge(); } if (writer->pre_release_callback) { pre_release_callback_cnt++; @@ -456,7 +692,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, continue; } // TODO: maybe handle the tracing status? - tracer_->Write(writer->batch).PermitUncheckedError(); + tracer_->Write(writer->multi_batch.batches[0]).PermitUncheckedError(); } } } @@ -551,7 +787,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, assert(writer->batch_cnt); next_sequence += writer->batch_cnt; } else if (writer->ShouldWriteToMemtable()) { - next_sequence += WriteBatchInternal::Count(writer->batch); + next_sequence += + WriteBatchInternal::Count(writer->multi_batch.batches[0]); } } } @@ -709,7 +946,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, if (tracer_ != nullptr && tracer_->IsWriteOrderPreserved()) { for (auto* writer : wal_write_group) { // TODO: maybe handle the tracing status? - tracer_->Write(writer->batch).PermitUncheckedError(); + tracer_->Write(writer->multi_batch.batches[0]) + .PermitUncheckedError(); } } } @@ -719,9 +957,11 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, if (writer->CheckCallback(this)) { if (writer->ShouldWriteToMemtable()) { writer->sequence = next_sequence; - size_t count = WriteBatchInternal::Count(writer->batch); + size_t count = + WriteBatchInternal::Count(writer->multi_batch.batches[0]); total_byte_size = WriteBatchInternal::AppendedByteSize( - total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); + total_byte_size, + WriteBatchInternal::ByteSize(writer->multi_batch.batches[0])); next_sequence += count; total_count += count; } @@ -964,7 +1204,7 @@ Status DBImpl::WriteImplWALOnly( if (tracer_ != nullptr && tracer_->IsWriteOrderPreserved()) { for (auto* writer : write_group) { // TODO: maybe handle the tracing status? - tracer_->Write(writer->batch).PermitUncheckedError(); + tracer_->Write(writer->multi_batch.batches[0]).PermitUncheckedError(); } } } @@ -975,7 +1215,8 @@ Status DBImpl::WriteImplWALOnly( assert(writer); if (writer->CheckCallback(this)) { total_byte_size = WriteBatchInternal::AppendedByteSize( - total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); + total_byte_size, + WriteBatchInternal::ByteSize(writer->multi_batch.batches[0])); if (writer->pre_release_callback) { pre_release_callback_cnt++; } @@ -1281,11 +1522,12 @@ Status DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, auto* leader = write_group.leader; assert(!leader->disable_wal); // Same holds for all in the batch group if (write_group.size == 1 && !leader->CallbackFailed() && - leader->batch->GetWalTerminationPoint().is_cleared()) { + leader->multi_batch.batches.size() == 1 && + leader->multi_batch.batches[0]->GetWalTerminationPoint().is_cleared()) { // we simply write the first WriteBatch to WAL if the group only // contains one batch, that batch should be written to the WAL, // and the batch is not wanting to be truncated - *merged_batch = leader->batch; + *merged_batch = leader->multi_batch.batches[0]; if (WriteBatchInternal::IsLatestPersistentState(*merged_batch)) { *to_be_cached_state = *merged_batch; } @@ -1297,17 +1539,19 @@ Status DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, *merged_batch = tmp_batch; for (auto writer : write_group) { if (!writer->CallbackFailed()) { - Status s = WriteBatchInternal::Append(*merged_batch, writer->batch, - /*WAL_only*/ true); - if (!s.ok()) { - tmp_batch->Clear(); - return s; - } - if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) { - // We only need to cache the last of such write batch - *to_be_cached_state = writer->batch; + for (auto b : writer->multi_batch.batches) { + Status s = WriteBatchInternal::Append(*merged_batch, b, + /*WAL_only*/ true); + if (!s.ok()) { + tmp_batch->Clear(); + return s; + } + if (WriteBatchInternal::IsLatestPersistentState(b)) { + // We only need to cache the last of such write batch + *to_be_cached_state = b; + } + (*write_with_wal)++; } - (*write_with_wal)++; } } } @@ -1381,7 +1625,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, return io_s; } - if (merged_batch == write_group.leader->batch) { + if (merged_batch == write_group.leader->multi_batch.batches[0]) { write_group.leader->log_used = logfile_number_; } else if (write_with_wal > 1) { for (auto writer : write_group) { @@ -1481,7 +1725,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL( // We need to lock log_write_mutex_ since logs_ and alive_log_files might be // pushed back concurrently log_write_mutex_.Lock(); - if (merged_batch == write_group.leader->batch) { + if (merged_batch == write_group.leader->multi_batch.batches[0]) { write_group.leader->log_used = logfile_number_; } else if (write_with_wal > 1) { for (auto writer : write_group) { @@ -1538,8 +1782,9 @@ Status DBImpl::WriteRecoverableState() { auto status = WriteBatchInternal::InsertInto( &cached_recoverable_state_, column_family_memtables_.get(), &flush_scheduler_, &trim_history_scheduler_, true, - 0 /*recovery_log_number*/, this, false /* concurrent_memtable_writes */, - &next_seq, &dont_care_bool, seq_per_batch_); + 0 /*recovery_log_number*/, 0 /*log_ref*/, this, + false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool, + seq_per_batch_); auto last_seq = next_seq - 1; if (two_write_queues_) { versions_->FetchAddLastAllocatedSequence(last_seq - seq); diff --git a/db/db_kv_checksum_test.cc b/db/db_kv_checksum_test.cc index 614399243e5..8d39d72f89f 100644 --- a/db/db_kv_checksum_test.cc +++ b/db/db_kv_checksum_test.cc @@ -449,7 +449,7 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) { // this writer joins the write group ASSERT_NE(follower->state, WriteThread::STATE_GROUP_LEADER); if (corrupt_byte_offset >= leader_batch_size) { - Slice batch_content = follower->batch->Data(); + Slice batch_content = follower->multi_batch.batches[0]->Data(); CorruptWriteBatch(&batch_content, corrupt_byte_offset - leader_batch_size, corrupt_byte_addend_); @@ -473,9 +473,10 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) { .IsCorruption()); }); - ASSERT_EQ(leader->batch->GetDataSize(), leader_batch_size); + ASSERT_EQ(leader->multi_batch.batches[0]->GetDataSize(), + leader_batch_size); if (corrupt_byte_offset < leader_batch_size) { - Slice batch_content = leader->batch->Data(); + Slice batch_content = leader->multi_batch.batches[0]->Data(); CorruptWriteBatch(&batch_content, corrupt_byte_offset, corrupt_byte_addend_); } @@ -561,8 +562,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) { // this writer joins the write group ASSERT_NE(follower->state, WriteThread::STATE_GROUP_LEADER); if (corrupt_byte_offset >= leader_batch_size) { - Slice batch_content = - WriteBatchInternal::Contents(follower->batch); + Slice batch_content = WriteBatchInternal::Contents( + follower->multi_batch.batches[0]); CorruptWriteBatch(&batch_content, corrupt_byte_offset - leader_batch_size, corrupt_byte_addend_); @@ -585,9 +586,11 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) { .IsCorruption()); }); - ASSERT_EQ(leader->batch->GetDataSize(), leader_batch_size); + ASSERT_EQ(leader->multi_batch.batches[0]->GetDataSize(), + leader_batch_size); if (corrupt_byte_offset < leader_batch_size) { - Slice batch_content = WriteBatchInternal::Contents(leader->batch); + Slice batch_content = + WriteBatchInternal::Contents(leader->multi_batch.batches[0]); CorruptWriteBatch(&batch_content, corrupt_byte_offset, corrupt_byte_addend_); } diff --git a/db/db_properties_test.cc b/db/db_properties_test.cc index e761f96d9ce..337eadb7328 100644 --- a/db/db_properties_test.cc +++ b/db/db_properties_test.cc @@ -63,6 +63,9 @@ TEST_F(DBPropertiesTest, Empty) { options.write_buffer_size = 100000; // Small write buffer options.allow_concurrent_memtable_write = false; options = CurrentOptions(options); + if (options.enable_multi_batch_write) { + continue; + } CreateAndReopenWithCF({"pikachu"}, options); std::string num; diff --git a/db/db_test_util.cc b/db/db_test_util.cc index f2fbb0f2f47..28d67527fe9 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -550,6 +550,12 @@ Options DBTestBase::GetOptions( options.enable_pipelined_write = true; break; } + case kMultiBatchWrite: { + options.enable_multi_batch_write = true; + options.enable_pipelined_write = false; + options.two_write_queues = false; + break; + } case kConcurrentWALWrites: { // This options optimize 2PC commit path options.two_write_queues = true; diff --git a/db/db_test_util.h b/db/db_test_util.h index 1bf71320603..dc34352dc2e 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -999,6 +999,7 @@ class DBTestBase : public testing::Test { kConcurrentSkipList = 27, kPipelinedWrite = 28, kConcurrentWALWrites = 29, + kMultiBatchWrite = 30, kDirectIO, kLevelSubcompactions, kBlockBasedTableWithIndexRestartInterval, diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 59c26eaaaf5..0c6fdf849c5 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -39,8 +39,25 @@ class DBWriteTestUnparameterized : public DBTestBase { : DBTestBase("pipelined_write_test", /*env_do_fsync=*/false) {} }; +TEST_P(DBWriteTest, WriteEmptyBatch) { + Options options = GetOptions(); + options.write_buffer_size = 65536; + Reopen(options); + WriteOptions write_options; + WriteBatch batch; + Random rnd(301); + // Trigger a flush so that we will enter `WaitForPendingWrites`. + for (auto i = 0; i < 10; i++) { + batch.Clear(); + ASSERT_OK(dbfull()->Write(write_options, &batch)); + ASSERT_OK(batch.Put(std::to_string(i), rnd.RandomString(10240))); + ASSERT_OK(dbfull()->Write(write_options, &batch)); + } +} + // It is invalid to do sync write while disabling WAL. TEST_P(DBWriteTest, SyncAndDisableWAL) { + Reopen(GetOptions()); WriteOptions write_options; write_options.sync = true; write_options.disableWAL = true; @@ -780,10 +797,73 @@ TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) { ASSERT_LE(bytes_num, 1024 * 100); } +TEST_P(DBWriteTest, MultiThreadWrite) { + Options options = GetOptions(); + std::unique_ptr mock_env( + new FaultInjectionTestEnv(env_)); + if (!options.enable_multi_batch_write) { + return; + } + constexpr int kNumThreads = 4; + constexpr int kNumWrite = 4; + constexpr int kNumBatch = 8; + constexpr int kBatchSize = 16; + options.env = mock_env.get(); + options.write_buffer_size = 1024 * 128; + Reopen(options); + std::vector threads; + for (int t = 0; t < kNumThreads; t++) { + threads.push_back(port::Thread( + [&](int index) { + WriteOptions opt; + std::vector data(kNumBatch); + for (int j = 0; j < kNumWrite; j++) { + std::vector batches; + for (int i = 0; i < kNumBatch; i++) { + WriteBatch* batch = &data[i]; + batch->Clear(); + for (int k = 0; k < kBatchSize; k++) { + batch->Put("key_" + std::to_string(index) + "_" + + std::to_string(j) + "_" + std::to_string(i) + + "_" + std::to_string(k), + "value" + std::to_string(k)); + } + batches.push_back(batch); + } + dbfull()->MultiBatchWrite(opt, std::move(batches)); + } + }, + t)); + } + for (int i = 0; i < kNumThreads; i++) { + threads[i].join(); + } + ReadOptions opt; + for (int t = 0; t < kNumThreads; t++) { + std::string value; + for (int i = 0; i < kNumWrite; i++) { + for (int j = 0; j < kNumBatch; j++) { + for (int k = 0; k < kBatchSize; k++) { + ASSERT_OK(dbfull()->Get( + opt, + "key_" + std::to_string(t) + "_" + std::to_string(i) + "_" + + std::to_string(j) + "_" + std::to_string(k), + &value)); + std::string expected_value = "value" + std::to_string(k); + ASSERT_EQ(expected_value, value); + } + } + } + } + + Close(); +} + INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, DBTestBase::kConcurrentWALWrites, - DBTestBase::kPipelinedWrite)); + DBTestBase::kPipelinedWrite, + DBTestBase::kMultiBatchWrite)); } // namespace ROCKSDB_NAMESPACE diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index ef4ab7fa58a..250807baf42 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1735,6 +1735,36 @@ TEST_F(ExternalSSTFileTest, WithUnorderedWrite) { SyncPoint::GetInstance()->ClearAllCallBacks(); } +TEST_F(ExternalSSTFileTest, WithMultiBatchWrite) { + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::WriteImpl:CommitAfterWriteWAL", + "ExternalSSTFileTest::WithMultiBatchWrite:WaitWriteWAL"}, + {"DBImpl::WaitForPendingWrites:BeforeBlock", + "DBImpl::WriteImpl:BeforePipelineWriteMemtable"}}); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::IngestExternalFile:NeedFlush", [&](void* need_flush) { + ASSERT_TRUE(*reinterpret_cast(need_flush)); + }); + + Options options = CurrentOptions(); + options.unordered_write = false; + options.enable_multi_batch_write = true; + DestroyAndReopen(options); + Put("foo", "v1"); + SyncPoint::GetInstance()->EnableProcessing(); + port::Thread writer([&]() { Put("bar", "v2"); }); + + TEST_SYNC_POINT("ExternalSSTFileTest::WithMultiBatchWrite:WaitWriteWAL"); + ASSERT_OK(GenerateAndAddExternalFile(options, {{"bar", "v3"}}, -1, + true /* allow_global_seqno */)); + ASSERT_EQ(Get("bar"), "v3"); + + writer.join(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoRandomized) { env_->skip_fsync_ = true; diff --git a/db/write_batch.cc b/db/write_batch.cc index e734bb3f617..0b55cb4aae5 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -732,6 +732,14 @@ uint32_t WriteBatchInternal::Count(const WriteBatch* b) { return DecodeFixed32(b->rep_.data() + 8); } +uint32_t WriteBatchInternal::Count(const std::vector b) { + uint32_t count = 0; + for (auto w : b) { + count += DecodeFixed32(w->rep_.data() + 8); + } + return count; +} + void WriteBatchInternal::SetCount(WriteBatch* b, uint32_t n) { EncodeFixed32(&b->rep_[8], n); } @@ -2946,10 +2954,10 @@ Status WriteBatchInternal::InsertInto( inserter.MaybeAdvanceSeq(true); continue; } - SetSequence(w->batch, inserter.sequence()); + SetSequence(w->multi_batch.batches[0], inserter.sequence()); inserter.set_log_number_ref(w->log_ref); - inserter.set_prot_info(w->batch->prot_info_.get()); - w->status = w->batch->Iterate(&inserter); + inserter.set_prot_info(w->multi_batch.batches[0]->prot_info_.get()); + w->status = w->multi_batch.batches[0]->Iterate(&inserter); if (!w->status.ok()) { return w->status; } @@ -2976,10 +2984,10 @@ Status WriteBatchInternal::InsertInto( concurrent_memtable_writes, nullptr /* prot_info */, nullptr /*has_valid_writes*/, seq_per_batch, batch_per_txn, hint_per_batch); - SetSequence(writer->batch, sequence); + SetSequence(writer->multi_batch.batches[0], sequence); inserter.set_log_number_ref(writer->log_ref); - inserter.set_prot_info(writer->batch->prot_info_.get()); - Status s = writer->batch->Iterate(&inserter); + inserter.set_prot_info(writer->multi_batch.batches[0]->prot_info_.get()); + Status s = writer->multi_batch.batches[0]->Iterate(&inserter); assert(!seq_per_batch || batch_cnt != 0); assert(!seq_per_batch || inserter.sequence() - sequence == batch_cnt); if (concurrent_memtable_writes) { @@ -2992,14 +3000,15 @@ Status WriteBatchInternal::InsertInto( const WriteBatch* batch, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, TrimHistoryScheduler* trim_history_scheduler, - bool ignore_missing_column_families, uint64_t log_number, DB* db, - bool concurrent_memtable_writes, SequenceNumber* next_seq, + bool ignore_missing_column_families, uint64_t log_number, uint64_t log_ref, + DB* db, bool concurrent_memtable_writes, SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch, bool batch_per_txn) { MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler, trim_history_scheduler, ignore_missing_column_families, log_number, db, concurrent_memtable_writes, batch->prot_info_.get(), has_valid_writes, seq_per_batch, batch_per_txn); + inserter.set_log_number_ref(log_ref); Status s = batch->Iterate(&inserter); if (next_seq != nullptr) { *next_seq = inserter.sequence(); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 36e7f71f4c1..fcae19f0cb2 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -134,6 +134,8 @@ class WriteBatchInternal { // Return the number of entries in the batch. static uint32_t Count(const WriteBatch* batch); + static uint32_t Count(const std::vector batch); + // Set the count for the number of entries in the batch. static void SetCount(WriteBatch* batch, uint32_t n); @@ -152,6 +154,14 @@ class WriteBatchInternal { static size_t ByteSize(const WriteBatch* batch) { return batch->rep_.size(); } + static size_t ByteSize(const std::vector batch) { + size_t count = 0; + for (auto w : batch) { + count += w->rep_.size(); + } + return count; + } + static Status SetContents(WriteBatch* batch, const Slice& contents); static Status CheckSlicePartsLength(const SliceParts& key, @@ -189,7 +199,8 @@ class WriteBatchInternal { FlushScheduler* flush_scheduler, TrimHistoryScheduler* trim_history_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, - DB* db = nullptr, bool concurrent_memtable_writes = false, + uint64_t log_ref = 0, DB* db = nullptr, + bool concurrent_memtable_writes = false, SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr, bool seq_per_batch = false, bool batch_per_txn = true); diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index 7709257f0cb..6f4e108e0bd 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -160,12 +160,18 @@ TEST_P(WriteCallbackPTest, WriteWithCallbackTest) { if (options.enable_pipelined_write && options.two_write_queues) { continue; } + if (options.enable_multi_batch_write && options.two_write_queues) { + continue; + } if (options.unordered_write && !options.allow_concurrent_memtable_write) { continue; } if (options.unordered_write && options.enable_pipelined_write) { continue; } + if (options.unordered_write && options.enable_multi_batch_write) { + continue; + } ReadOptions read_options; DB* db; diff --git a/db/write_thread.cc b/db/write_thread.cc index 79870077523..b24d3667af8 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -400,7 +400,7 @@ void WriteThread::WaitForStallEndedCount(uint64_t stall_count) { static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup"); void WriteThread::JoinBatchGroup(Writer* w) { TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w); - assert(w->batch != nullptr); + assert(!w->multi_batch.batches.empty()); bool linked_as_leader = LinkOne(w, &newest_writer_); @@ -437,10 +437,10 @@ void WriteThread::JoinBatchGroup(Writer* w) { size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group) { assert(leader->link_older == nullptr); - assert(leader->batch != nullptr); + assert(!leader->multi_batch.batches.empty()); assert(write_group != nullptr); - size_t size = WriteBatchInternal::ByteSize(leader->batch); + size_t size = WriteBatchInternal::ByteSize(leader->multi_batch.batches); // Allow the group to grow up to a maximum size, but if the // original write is small, limit the growth so we do not slow @@ -498,7 +498,7 @@ size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, break; } - if (w->batch == nullptr) { + if (w->multi_batch.batches.empty()) { // Do not include those writes with nullptr batch. Those are not writes, // those are something else. They want to be alone break; @@ -509,7 +509,7 @@ size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, break; } - auto batch_size = WriteBatchInternal::ByteSize(w->batch); + auto batch_size = WriteBatchInternal::ByteSize(w->multi_batch.batches); if (size + batch_size > max_size) { // Do not make batch too big break; @@ -520,6 +520,7 @@ size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, write_group->last_writer = w; write_group->size++; } + TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w); return size; } @@ -528,10 +529,10 @@ void WriteThread::EnterAsMemTableWriter(Writer* leader, WriteGroup* write_group) { assert(leader != nullptr); assert(leader->link_older == nullptr); - assert(leader->batch != nullptr); + assert(!leader->multi_batch.batches.empty()); assert(write_group != nullptr); - size_t size = WriteBatchInternal::ByteSize(leader->batch); + size_t size = WriteBatchInternal::ByteSize(leader->multi_batch.batches); // Allow the group to grow up to a maximum size, but if the // original write is small, limit the growth so we do not slow @@ -547,7 +548,8 @@ void WriteThread::EnterAsMemTableWriter(Writer* leader, write_group->size = 1; Writer* last_writer = leader; - if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) { + if (!allow_concurrent_memtable_write_ || + !leader->multi_batch.batches[0]->HasMerge()) { Writer* newest_writer = newest_memtable_writer_.load(); CreateMissingNewerLinks(newest_writer); @@ -556,16 +558,16 @@ void WriteThread::EnterAsMemTableWriter(Writer* leader, assert(w->link_newer); w = w->link_newer; - if (w->batch == nullptr) { + if (w->multi_batch.batches.empty()) { break; } - if (w->batch->HasMerge()) { + if (w->multi_batch.batches[0]->HasMerge()) { break; } if (!allow_concurrent_memtable_write_) { - auto batch_size = WriteBatchInternal::ByteSize(w->batch); + auto batch_size = WriteBatchInternal::ByteSize(w->multi_batch.batches); if (size + batch_size > max_size) { // Do not make batch too big break; @@ -581,7 +583,8 @@ void WriteThread::EnterAsMemTableWriter(Writer* leader, write_group->last_writer = last_writer; write_group->last_sequence = - last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1; + last_writer->sequence + + WriteBatchInternal::Count(last_writer->multi_batch.batches) - 1; } void WriteThread::ExitAsMemTableWriter(Writer* /*self*/, @@ -802,7 +805,7 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, static WriteThread::AdaptationContext eu_ctx("EnterUnbatched"); void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) { - assert(w != nullptr && w->batch == nullptr); + assert(w != nullptr && w->multi_batch.batches.empty()); mu->Unlock(); bool linked_as_leader = LinkOne(w, &newest_writer_); if (!linked_as_leader) { @@ -841,4 +844,67 @@ void WriteThread::WaitForMemTableWriters() { newest_memtable_writer_.store(nullptr); } +RequestQueue::RequestQueue() {} + +RequestQueue::~RequestQueue() {} + +void RequestQueue::Enter(CommitRequest* req) { + std::unique_lock guard(commit_mu_); + requests_.push_back(req); +} + +void RequestQueue::CommitSequenceAwait(CommitRequest* req, + std::atomic* commit_sequence) { + std::unique_lock guard(commit_mu_); + while (!requests_.empty() && requests_.front() != req && !req->committed) { + // When the subsequent commit finds that the front writer has not yet + // submitted, it will help the front writer to perform some tasks + auto front = requests_.front()->writer; + if (front->ConsumableOnOtherThreads()) { + auto claimed = front->Claim(); + if (claimed < front->multi_batch.batches.size()) { + guard.unlock(); + front->ConsumeOne(claimed); + guard.lock(); + continue; + } + } else { + // The front writer may be waiting for this helper writer + commit_cv_.notify_all(); + } + commit_cv_.wait(guard); + } + if (req->committed) { + return; + } else if (requests_.front() == req) { + // As the front writer, some write tasks can be stolen by other writers. + // Wait for them to finish. + while (req->writer->HasPendingWB()) { + commit_cv_.wait(guard); + } + while (!requests_.empty() && !requests_.front()->writer->HasPendingWB()) { + CommitRequest* current = requests_.front(); + commit_sequence->store(current->commit_lsn, std::memory_order_release); + current->committed = true; + requests_.pop_front(); + } + commit_cv_.notify_all(); + } +} + +void WriteThread::Writer::ConsumeOne(size_t claimed) { + assert(claimed < multi_batch.batches.size()); + ColumnFamilyMemTablesImpl memtables(multi_batch.version_set); + Status s = WriteBatchInternal::InsertInto( + multi_batch.batches[claimed], &memtables, multi_batch.flush_scheduler, + multi_batch.trim_history_scheduler, + multi_batch.ignore_missing_column_families, 0, this->log_ref, + multi_batch.db, true); + if (!s.ok()) { + std::lock_guard guard(this->status_lock); + this->status = s; + } + multi_batch.pending_wb_cnt.fetch_sub(1, std::memory_order_acq_rel); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/write_thread.h b/db/write_thread.h index 6e5805e3764..b0c8fb5c435 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -17,6 +18,7 @@ #include "db/dbformat.h" #include "db/post_memtable_callback.h" #include "db/pre_release_callback.h" +#include "db/trim_history_scheduler.h" #include "db/write_callback.h" #include "monitoring/instrumented_mutex.h" #include "rocksdb/options.h" @@ -24,9 +26,29 @@ #include "rocksdb/types.h" #include "rocksdb/write_batch.h" #include "util/autovector.h" +#include "util/mutexlock.h" namespace ROCKSDB_NAMESPACE { +struct CommitRequest; + +class ColumnFamilySet; +class FlushScheduler; + +class RequestQueue { + public: + RequestQueue(); + ~RequestQueue(); + void Enter(CommitRequest* req); + void CommitSequenceAwait(CommitRequest* req, + std::atomic* commit_sequence); + + private: + std::mutex commit_mu_; + std::condition_variable commit_cv_; + std::deque requests_; +}; + class WriteThread { public: enum State : uint8_t { @@ -112,9 +134,49 @@ class WriteThread { Iterator end() const { return Iterator(nullptr, nullptr); } }; + struct MultiBatch { + std::vector batches; + std::atomic claimed_cnt; + std::atomic pending_wb_cnt; + ColumnFamilySet* version_set; + FlushScheduler* flush_scheduler; + TrimHistoryScheduler* trim_history_scheduler; + bool ignore_missing_column_families; + DB* db; + + MultiBatch() + : claimed_cnt(0), + pending_wb_cnt(0), + version_set(nullptr), + flush_scheduler(nullptr), + trim_history_scheduler(nullptr), + ignore_missing_column_families(false), + db(nullptr) {} + + explicit MultiBatch(std::vector&& _batch) + : batches(_batch), + claimed_cnt(0), + pending_wb_cnt(_batch.size()), + version_set(nullptr), + flush_scheduler(nullptr), + trim_history_scheduler(nullptr), + ignore_missing_column_families(false), + db(nullptr) {} + + void SetContext(ColumnFamilySet* _version_set, + FlushScheduler* _flush_scheduler, + TrimHistoryScheduler* _trim_history_scheduler, + bool _ignore_missing_column_families, DB* _db) { + version_set = _version_set; + flush_scheduler = _flush_scheduler; + trim_history_scheduler = _trim_history_scheduler; + ignore_missing_column_families = _ignore_missing_column_families; + db = _db; + } + }; + // Information kept for every waiting writer. struct Writer { - WriteBatch* batch; bool sync; bool no_slowdown; bool disable_wal; @@ -130,8 +192,10 @@ class WriteThread { bool made_waitable; // records lazy construction of mutex and cv std::atomic state; // write under StateMutex() or pre-link WriteGroup* write_group; + CommitRequest* request; SequenceNumber sequence; // the sequence number to use for the first key - Status status; + Status status; // write protected by status_lock in multi batch write. + SpinMutex status_lock; Status callback_status; // status returned by callback->Callback() std::aligned_storage::type state_mutex_bytes; @@ -139,9 +203,10 @@ class WriteThread { Writer* link_older; // read/write only before linking, or as leader Writer* link_newer; // lazy, read/write only before linking, or as leader + MultiBatch multi_batch; + Writer() - : batch(nullptr), - sync(false), + : sync(false), no_slowdown(false), disable_wal(false), rate_limiter_priority(Env::IOPriority::IO_TOTAL), @@ -156,6 +221,7 @@ class WriteThread { made_waitable(false), state(STATE_INIT), write_group(nullptr), + request(nullptr), sequence(kMaxSequenceNumber), link_older(nullptr), link_newer(nullptr) {} @@ -165,8 +231,7 @@ class WriteThread { size_t _batch_cnt = 0, PreReleaseCallback* _pre_release_callback = nullptr, PostMemTableCallback* _post_memtable_callback = nullptr) - : batch(_batch), - sync(write_options.sync), + : sync(write_options.sync), no_slowdown(write_options.no_slowdown), disable_wal(write_options.disableWAL), rate_limiter_priority(write_options.rate_limiter_priority), @@ -181,9 +246,37 @@ class WriteThread { made_waitable(false), state(STATE_INIT), write_group(nullptr), + request(nullptr), sequence(kMaxSequenceNumber), link_older(nullptr), - link_newer(nullptr) {} + link_newer(nullptr) { + multi_batch.batches.push_back(_batch); + multi_batch.pending_wb_cnt.fetch_add(1, std::memory_order_acq_rel); + } + + Writer(const WriteOptions& write_options, std::vector&& _batch, + WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable, + PreReleaseCallback* _pre_release_callback = nullptr, + PostMemTableCallback* _post_memtable_callback = nullptr) + : sync(write_options.sync), + no_slowdown(write_options.no_slowdown), + disable_wal(write_options.disableWAL), + rate_limiter_priority(write_options.rate_limiter_priority), + disable_memtable(_disable_memtable), + batch_cnt(0), + pre_release_callback(_pre_release_callback), + post_memtable_callback(_post_memtable_callback), + log_used(0), + log_ref(_log_ref), + callback(_callback), + made_waitable(false), + state(STATE_INIT), + write_group(nullptr), + request(nullptr), + sequence(kMaxSequenceNumber), + link_older(nullptr), + link_newer(nullptr), + multi_batch(std::move(_batch)) {} ~Writer() { if (made_waitable) { @@ -256,6 +349,33 @@ class WriteThread { return *static_cast( static_cast(&state_cv_bytes)); } + + bool ConsumableOnOtherThreads() { + return multi_batch.pending_wb_cnt.load(std::memory_order_acquire) > 1; + } + + size_t Claim() { + return multi_batch.claimed_cnt.fetch_add(1, std::memory_order_acq_rel); + } + + bool HasPendingWB() { + return multi_batch.pending_wb_cnt.load(std::memory_order_acquire) > 0; + } + + void ResetPendingWBCnt() { + multi_batch.pending_wb_cnt.store(0, std::memory_order_release); + } + + bool ConsumeOne() { + auto claimed = Claim(); + if (claimed < multi_batch.batches.size()) { + ConsumeOne(claimed); + return true; + } + return false; + } + + void ConsumeOne(size_t claimed); }; struct AdaptationContext { @@ -374,6 +494,13 @@ class WriteThread { // (Does not require db mutex held) void WaitForStallEndedCount(uint64_t stall_count); + void EnterCommitQueue(CommitRequest* req) { return commit_queue_.Enter(req); } + + void ExitWaitSequenceCommit(CommitRequest* req, + std::atomic* commit_sequence) { + commit_queue_.CommitSequenceAwait(req, commit_sequence); + } + private: // See AwaitState. const uint64_t max_yield_usec_; @@ -406,6 +533,7 @@ class WriteThread { // at the tail of the writer queue by the leader, so newer writers can just // check for this and bail Writer write_stall_dummy_; + RequestQueue commit_queue_; // Mutex and condvar for writers to block on a write stall. During a write // stall, writers with no_slowdown set to false will wait on this rather @@ -461,4 +589,13 @@ class WriteThread { void CompleteFollower(Writer* w, WriteGroup& write_group); }; +struct CommitRequest { + WriteThread::Writer* writer; + uint64_t commit_lsn; + // protected by RequestQueue::commit_mu_ + bool committed; + CommitRequest(WriteThread::Writer* w) + : writer(w), commit_lsn(0), committed(false) {} +}; + } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 5ae73182b60..662522976b2 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -542,6 +542,11 @@ class DB { // Note: consider setting options.sync = true. virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0; + virtual Status MultiBatchWrite(const WriteOptions& /*options*/, + std::vector&& /*updates*/) { + return Status::NotSupported(); + } + // If the column family specified by "column_family" contains an entry for // "key", return the corresponding value in "*value". If the entry is a plain // key-value, return the value as-is; if it is a wide-column entity, return diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 1c2daed9a3f..a33f8eea4bb 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1129,6 +1129,22 @@ struct DBOptions { // // Default: false bool unordered_write = false; + // By default, a single write thread queue is maintained. The thread gets + // to the head of the queue becomes write batch group leader and responsible + // for writing to WAL. + // + // If enable_multi_batch_write is true, RocksDB will apply WriteBatch to + // memtable out of order but commit them in order. (We borrow the idea from + // https://github.com/cockroachdb/pebble/blob/master/docs/rocksdb.md#commit-pipeline. + // On this basis, we split the WriteBatch into smaller-grained WriteBatch + // vector, + // and when the WriteBatch sizes of multiple writers are not balanced, writers + // that finish first need to help the front writer finish writing the + // remaining + // WriteBatch to increase cpu usage and reduce overall latency). + // + // Default: false + bool enable_multi_batch_write = false; // If true, allow multi-writers to update mem tables in parallel. // Only some memtable_factory-s support concurrent writes; currently it diff --git a/options/db_options.cc b/options/db_options.cc index 2d213f13f81..2c03fb37826 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -327,6 +327,10 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, enable_pipelined_write), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"enable_multi_batch_write", + {offsetof(struct ImmutableDBOptions, enable_multi_batch_write), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, {"unordered_write", {offsetof(struct ImmutableDBOptions, unordered_write), OptionType::kBoolean, OptionVerificationType::kNormal, @@ -731,6 +735,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) enable_thread_tracking(options.enable_thread_tracking), enable_pipelined_write(options.enable_pipelined_write), unordered_write(options.unordered_write), + enable_multi_batch_write(options.enable_multi_batch_write), allow_concurrent_memtable_write(options.allow_concurrent_memtable_write), enable_write_thread_adaptive_yield( options.enable_write_thread_adaptive_yield), @@ -882,6 +887,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { enable_pipelined_write); ROCKS_LOG_HEADER(log, " Options.unordered_write: %d", unordered_write); + ROCKS_LOG_HEADER(log, " Options.enable_multi_batch_write: %d", + enable_multi_batch_write); ROCKS_LOG_HEADER(log, " Options.allow_concurrent_memtable_write: %d", allow_concurrent_memtable_write); ROCKS_LOG_HEADER(log, " Options.enable_write_thread_adaptive_yield: %d", diff --git a/options/db_options.h b/options/db_options.h index 701a83febb0..86e17e9670a 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -68,6 +68,7 @@ struct ImmutableDBOptions { bool enable_thread_tracking; bool enable_pipelined_write; bool unordered_write; + bool enable_multi_batch_write; bool allow_concurrent_memtable_write; bool enable_write_thread_adaptive_yield; uint64_t write_thread_max_yield_usec; diff --git a/options/options_helper.cc b/options/options_helper.cc index 362af2839de..a9583dd1e98 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -133,6 +133,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.enable_thread_tracking = immutable_db_options.enable_thread_tracking; options.delayed_write_rate = mutable_db_options.delayed_write_rate; options.enable_pipelined_write = immutable_db_options.enable_pipelined_write; + options.enable_multi_batch_write = + immutable_db_options.enable_multi_batch_write; options.unordered_write = immutable_db_options.unordered_write; options.allow_concurrent_memtable_write = immutable_db_options.allow_concurrent_memtable_write; diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index decd1c42347..ced8597a9d6 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -338,6 +338,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "advise_random_on_open=true;" "fail_if_options_file_error=false;" "enable_pipelined_write=false;" + "enable_multi_batch_write=false;" "unordered_write=false;" "allow_concurrent_memtable_write=true;" "wal_recovery_mode=kPointInTimeRecovery;" diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index e0915bf7bdb..fed12e50f11 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1005,6 +1005,9 @@ DEFINE_uint64(fifo_age_for_warm, 0, "age_for_warm for FIFO compaction."); // Stacked BlobDB Options DEFINE_bool(use_blob_db, false, "[Stacked BlobDB] Open a BlobDB instance."); +DEFINE_bool(use_multi_thread_write, false, + "Open a RocksDB with multi thread write pool"); + DEFINE_bool( blob_db_enable_gc, ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().enable_garbage_collection, @@ -1802,6 +1805,60 @@ static Status CreateMemTableRepFactory( return s; } +class WriteBatchVec { + public: + explicit WriteBatchVec(uint32_t max_batch_size) + : max_batch_size_(max_batch_size), current_(0) {} + ~WriteBatchVec() { + for (auto w : batches_) { + delete w; + } + } + void Clear() { + for (size_t i = 0; i <= current_ && i < batches_.size(); i++) { + batches_[i]->Clear(); + } + current_ = 0; + } + + Status Put(const Slice& key, const Slice& value) { + if (current_ < batches_.size() && + batches_[current_]->Count() < max_batch_size_) { + return batches_[current_]->Put(key, value); + } else if (current_ + 1 >= batches_.size()) { + batches_.push_back(new WriteBatch); + } + if (current_ + 1 < batches_.size()) { + current_ += 1; + } + return batches_[current_]->Put(key, value); + } + + std::vector GetWriteBatch() const { + std::vector batches; + for (size_t i = 0; i < batches_.size(); i++) { + if (i > current_) { + break; + } + batches.push_back(batches_[i]); + } + return batches; + } + + uint32_t Count() const { + uint32_t count = 0; + for (size_t i = 0; i <= current_ && i < batches_.size(); i++) { + count += batches_[i]->Count(); + } + return count; + } + + private: + uint32_t max_batch_size_; + size_t current_; + std::vector batches_; +}; + } // namespace enum DistributionType : unsigned char { kFixed = 0, kUniform, kNormal }; @@ -2733,6 +2790,7 @@ class Benchmark { bool use_blob_db_; // Stacked BlobDB bool read_operands_; // read via GetMergeOperands() std::vector keys_; + bool use_multi_write_; class ErrorHandlerListener : public EventListener { public: @@ -3205,7 +3263,8 @@ class Benchmark { merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys), report_file_operations_(FLAGS_report_file_operations), use_blob_db_(FLAGS_use_blob_db), // Stacked BlobDB - read_operands_(false) { + read_operands_(false), + use_multi_write_(FLAGS_use_multi_thread_write) { // use simcache instead of cache if (FLAGS_simcache_size >= 0) { if (FLAGS_cache_numshardbits >= 1) { @@ -4844,6 +4903,9 @@ class Benchmark { DBWithColumnFamilies* db) { uint64_t open_start = FLAGS_report_open_timing ? FLAGS_env->NowNanos() : 0; Status s; + if (use_multi_write_) { + options.enable_multi_batch_write = true; + } // Open with column families if necessary. if (FLAGS_num_column_families > 1) { size_t num_hot = FLAGS_num_column_families; @@ -5107,6 +5169,7 @@ class Benchmark { WriteBatch batch(/*reserved_bytes=*/0, /*max_bytes=*/0, FLAGS_write_batch_protection_bytes_per_key, user_timestamp_size_); + WriteBatchVec batches(32); Status s; int64_t bytes = 0; @@ -5241,6 +5304,7 @@ class Benchmark { DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id); batch.Clear(); + batches.Clear(); int64_t batch_bytes = 0; for (int64_t j = 0; j < entries_per_batch_; j++) { @@ -5356,7 +5420,9 @@ class Benchmark { } else { val = gen.Generate(); } - if (use_blob_db_) { + if (use_multi_write_) { + batches.Put(key, val); + } else if (use_blob_db_) { // Stacked BlobDB blob_db::BlobDB* blobdb = static_cast(db_with_cfh->db); @@ -5419,6 +5485,7 @@ class Benchmark { batch.Delete(db_with_cfh->GetCfh(rand_num), expanded_keys[offset]); } + assert(!use_multi_write_); } } else { GenerateKeyFromInt(begin_num, FLAGS_num, &begin_key); @@ -5435,6 +5502,7 @@ class Benchmark { batch.DeleteRange(db_with_cfh->GetCfh(rand_num), begin_key, end_key); } + assert(!use_multi_write_); } } } @@ -5457,7 +5525,10 @@ class Benchmark { ErrorExit(); } } - if (!use_blob_db_) { + if (use_multi_write_) { + s = db_with_cfh->db->MultiBatchWrite(write_options_, + batches.GetWriteBatch()); + } else if (!use_blob_db_) { // Not stacked BlobDB s = db_with_cfh->db->Write(write_options_, &batch); }