From 0acb2b9330215cba3711c9471a8cb322f92ffa0f Mon Sep 17 00:00:00 2001 From: Connor Date: Tue, 3 Aug 2021 15:10:40 +0800 Subject: [PATCH] Add an option to disable write stall (#251) * add option to disable write stall Signed-off-by: Connor1996 Signed-off-by: tabokie --- db/column_family.cc | 21 ++++-- db/db_impl/db_impl.cc | 11 +-- db/db_impl/db_impl.h | 1 + db/db_impl/db_impl_compaction_flush.cc | 5 +- db/db_impl/db_impl_open.cc | 3 +- db/db_options_test.cc | 96 +++++++++++++++++++++++++- include/rocksdb/options.h | 5 ++ options/cf_options.cc | 6 ++ options/cf_options.h | 3 + options/db_options.cc | 2 + options/options.cc | 2 + options/options_helper.cc | 1 + options/options_settable_test.cc | 1 + 13 files changed, 143 insertions(+), 14 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 71c72d71a5e6..be9e1e5d578a 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -955,7 +955,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( bool needed_delay = write_controller->NeedsDelay(); if (write_stall_condition == WriteStallCondition::kStopped && - write_stall_cause == WriteStallCause::kMemtableLimit) { + write_stall_cause == WriteStallCause::kMemtableLimit && + !mutable_cf_options.disable_write_stall) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1); ROCKS_LOG_WARN( @@ -965,7 +966,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), imm()->NumNotFlushed(), mutable_cf_options.max_write_buffer_number); } else if (write_stall_condition == WriteStallCondition::kStopped && - write_stall_cause == WriteStallCause::kL0FileCountLimit) { + write_stall_cause == WriteStallCause::kL0FileCountLimit && + !mutable_cf_options.disable_write_stall) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1); if (compaction_picker_->IsLevel0CompactionInProgress()) { @@ -977,7 +979,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( "[%s] Stopping writes because we have %d level-0 files", name_.c_str(), vstorage->l0_delay_trigger_count()); } else if (write_stall_condition == WriteStallCondition::kStopped && - write_stall_cause == WriteStallCause::kPendingCompactionBytes) { + write_stall_cause == WriteStallCause::kPendingCompactionBytes && + !mutable_cf_options.disable_write_stall) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats( InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1); @@ -987,7 +990,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( "bytes %" PRIu64, name_.c_str(), compaction_needed_bytes); } else if (write_stall_condition == WriteStallCondition::kDelayed && - write_stall_cause == WriteStallCause::kMemtableLimit) { + write_stall_cause == WriteStallCause::kMemtableLimit && + !mutable_cf_options.disable_write_stall) { write_controller_token_ = SetupDelay(write_controller, compaction_needed_bytes, prev_compaction_needed_bytes_, was_stopped, @@ -1002,7 +1006,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( mutable_cf_options.max_write_buffer_number, write_controller->delayed_write_rate()); } else if (write_stall_condition == WriteStallCondition::kDelayed && - write_stall_cause == WriteStallCause::kL0FileCountLimit) { + write_stall_cause == WriteStallCause::kL0FileCountLimit && + !mutable_cf_options.disable_write_stall) { // L0 is the last two files from stopping. bool near_stop = vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_stop_writes_trigger - 2; @@ -1022,7 +1027,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), vstorage->l0_delay_trigger_count(), write_controller->delayed_write_rate()); } else if (write_stall_condition == WriteStallCondition::kDelayed && - write_stall_cause == WriteStallCause::kPendingCompactionBytes) { + write_stall_cause == WriteStallCause::kPendingCompactionBytes && + !mutable_cf_options.disable_write_stall) { // If the distance to hard limit is less than 1/4 of the gap between soft // and // hard bytes limit, we think it is near stop and speed up the slowdown. @@ -1048,7 +1054,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), vstorage->estimated_compaction_needed_bytes(), write_controller->delayed_write_rate()); } else { - assert(write_stall_condition == WriteStallCondition::kNormal); + assert(write_stall_condition == WriteStallCondition::kNormal || + mutable_cf_options.disable_write_stall); if (vstorage->l0_delay_trigger_count() >= GetL0FileCountForCompactionSpeedup( mutable_cf_options.level0_file_num_compaction_trigger, diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 18283cbba5e2..513dc429ace5 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1316,11 +1316,14 @@ Status DBImpl::SetDBOptions( GetBGJobLimits(mutable_db_options_.max_background_flushes, mutable_db_options_.max_background_compactions, mutable_db_options_.max_background_jobs, + mutable_db_options_.base_background_compactions, + /* parallelize_compactions */ true); + const BGJobLimits new_bg_job_limits = + GetBGJobLimits(new_options.max_background_flushes, + new_options.max_background_compactions, + new_options.max_background_jobs, + new_options.base_background_compactions, /* parallelize_compactions */ true); - const BGJobLimits new_bg_job_limits = GetBGJobLimits( - new_options.max_background_flushes, - new_options.max_background_compactions, - new_options.max_background_jobs, /* parallelize_compactions */ true); const bool max_flushes_increased = new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 34a5f33989c4..95b5c51649fe 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -811,6 +811,7 @@ class DBImpl : public DB { static BGJobLimits GetBGJobLimits(int max_background_flushes, int max_background_compactions, int max_background_jobs, + int base_background_compactions, bool parallelize_compactions); // move logs pending closing from job_context to the DB queue and diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 71c23de95a5d..a23416866b47 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2879,12 +2879,14 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const { return GetBGJobLimits(mutable_db_options_.max_background_flushes, mutable_db_options_.max_background_compactions, mutable_db_options_.max_background_jobs, + mutable_db_options_.base_background_compactions, write_controller_.NeedSpeedupCompaction()); } DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes, int max_background_compactions, int max_background_jobs, + int base_background_compactions, bool parallelize_compactions) { BGJobLimits res; if (max_background_flushes == -1 && max_background_compactions == -1) { @@ -2900,7 +2902,8 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes, } if (!parallelize_compactions) { // throttle background compactions until we deem necessary - res.max_compactions = 1; + res.max_compactions = + std::max(1, std::min(base_background_compactions, res.max_compactions)); } return res; } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 074fa86214ab..40c80b03cfbe 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -75,7 +75,8 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src, } auto bg_job_limits = DBImpl::GetBGJobLimits( result.max_background_flushes, result.max_background_compactions, - result.max_background_jobs, true /* parallelize_compactions */); + result.max_background_jobs, result.base_background_compactions, + true /* parallelize_compactions */); result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions, Env::Priority::LOW); result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes, diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 8f60f0051b95..6d10c3ea2ff6 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -582,6 +582,87 @@ TEST_F(DBOptionsTest, EnableAutoCompactionAndTriggerStall) { } } +TEST_F(DBOptionsTest, EnableAutoCompactionButDisableStall) { + const std::string kValue(1024, 'v'); + Options options; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.disable_write_stall = true; + options.write_buffer_size = 1024 * 1024 * 10; + options.compression = CompressionType::kNoCompression; + options.level0_file_num_compaction_trigger = 1; + options.level0_stop_writes_trigger = std::numeric_limits::max(); + options.level0_slowdown_writes_trigger = std::numeric_limits::max(); + options.hard_pending_compaction_bytes_limit = + std::numeric_limits::max(); + options.soft_pending_compaction_bytes_limit = + std::numeric_limits::max(); + options.env = env_; + + DestroyAndReopen(options); + int i = 0; + for (; i < 1024; i++) { + Put(Key(i), kValue); + } + Flush(); + for (; i < 1024 * 2; i++) { + Put(Key(i), kValue); + } + Flush(); + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(2, NumTableFilesAtLevel(0)); + uint64_t l0_size = SizeAtLevel(0); + + options.hard_pending_compaction_bytes_limit = l0_size; + options.soft_pending_compaction_bytes_limit = l0_size; + + Reopen(options); + dbfull()->TEST_WaitForCompact(); + ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped()); + ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay()); + + SyncPoint::GetInstance()->LoadDependency( + {{"DBOptionsTest::EnableAutoCompactionButDisableStall:1", + "BackgroundCallCompaction:0"}, + {"DBImpl::BackgroundCompaction():BeforePickCompaction", + "DBOptionsTest::EnableAutoCompactionButDisableStall:2"}, + {"DBOptionsTest::EnableAutoCompactionButDisableStall:3", + "DBImpl::BackgroundCompaction():AfterPickCompaction"}}); + // Block background compaction. + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); + TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:1"); + // Wait for stall condition recalculate. + TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:2"); + + ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped()); + ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedSpeedupCompaction()); + + TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:3"); + + // Background compaction executed. + dbfull()->TEST_WaitForCompact(); + ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped()); + ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_FALSE(dbfull()->TEST_write_controler().NeedSpeedupCompaction()); +} + +TEST_F(DBOptionsTest, SetOptionsDisableWriteStall) { + Options options; + options.create_if_missing = true; + options.disable_write_stall = false; + options.env = env_; + Reopen(options); + ASSERT_EQ(false, dbfull()->GetOptions().disable_write_stall); + + ASSERT_OK(dbfull()->SetOptions({{"disable_write_stall", "true"}})); + ASSERT_EQ(true, dbfull()->GetOptions().disable_write_stall); + ASSERT_OK(dbfull()->SetOptions({{"disable_write_stall", "false"}})); + ASSERT_EQ(false, dbfull()->GetOptions().disable_write_stall); +} + TEST_F(DBOptionsTest, SetOptionsMayTriggerCompaction) { Options options; options.level_compaction_dynamic_level_bytes = false; @@ -611,8 +692,21 @@ TEST_F(DBOptionsTest, SetBackgroundCompactionThreads) { ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); ASSERT_OK(dbfull()->SetDBOptions({{"max_background_compactions", "3"}})); ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); - auto stop_token = dbfull()->TEST_write_controler().GetStopToken(); + { + auto stop_token = dbfull()->TEST_write_controler().GetStopToken(); + ASSERT_EQ(3, dbfull()->TEST_BGCompactionsAllowed()); + } + + ASSERT_OK(dbfull()->SetDBOptions({{"base_background_compactions", "2"}})); + ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed()); + ASSERT_OK(dbfull()->SetDBOptions({{"base_background_compactions", "5"}})); ASSERT_EQ(3, dbfull()->TEST_BGCompactionsAllowed()); + ASSERT_OK(dbfull()->SetDBOptions({{"base_background_compactions", "1"}})); + ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed()); + { + auto stop_token = dbfull()->TEST_write_controler().GetStopToken(); + ASSERT_EQ(3, dbfull()->TEST_BGCompactionsAllowed()); + } } TEST_F(DBOptionsTest, SetBackgroundFlushThreads) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index ae5ed2c2656c..1c2daed9a3ff 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -296,6 +296,11 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions { // Dynamically changeable through SetOptions() API bool disable_auto_compactions = false; + // Disable write stall mechanism. + // + // Dynamically changeable through SetOptions() API + bool disable_write_stall = false; + // This is a factory that provides TableFactory objects. // Default: a block-based table factory that provides a default // implementation of TableBuilder and TableReader with default diff --git a/options/cf_options.cc b/options/cf_options.cc index 2ca826c98f96..3a2d9894bd2a 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -262,6 +262,10 @@ static std::unordered_map {offsetof(struct MutableCFOptions, disable_auto_compactions), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, + {"disable_write_stall", + {offsetof(struct MutableCFOptions, disable_write_stall), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, {"filter_deletes", {0, OptionType::kBoolean, OptionVerificationType::kDeprecated, OptionTypeFlags::kMutable}}, @@ -1069,6 +1073,8 @@ void MutableCFOptions::Dump(Logger* log) const { : prefix_extractor->GetId().c_str()); ROCKS_LOG_INFO(log, " disable_auto_compactions: %d", disable_auto_compactions); + ROCKS_LOG_INFO(log, " disable_write_stall: %d", + disable_write_stall); ROCKS_LOG_INFO(log, " soft_pending_compaction_bytes_limit: %" PRIu64, soft_pending_compaction_bytes_limit); ROCKS_LOG_INFO(log, " hard_pending_compaction_bytes_limit: %" PRIu64, diff --git a/options/cf_options.h b/options/cf_options.h index f42d6b562994..6f7f2c8571bb 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -125,6 +125,7 @@ struct MutableCFOptions { experimental_mempurge_threshold( options.experimental_mempurge_threshold), disable_auto_compactions(options.disable_auto_compactions), + disable_write_stall(options.disable_write_stall), soft_pending_compaction_bytes_limit( options.soft_pending_compaction_bytes_limit), hard_pending_compaction_bytes_limit( @@ -196,6 +197,7 @@ struct MutableCFOptions { prefix_extractor(nullptr), experimental_mempurge_threshold(0.0), disable_auto_compactions(false), + disable_write_stall(false), soft_pending_compaction_bytes_limit(0), hard_pending_compaction_bytes_limit(0), level0_file_num_compaction_trigger(0), @@ -280,6 +282,7 @@ struct MutableCFOptions { // Compaction related options bool disable_auto_compactions; + bool disable_write_stall; uint64_t soft_pending_compaction_bytes_limit; uint64_t hard_pending_compaction_bytes_limit; int level0_file_num_compaction_trigger; diff --git a/options/db_options.cc b/options/db_options.cc index 2d213f13f815..c4874783bdf3 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -1021,6 +1021,8 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options) void MutableDBOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " Options.max_background_jobs: %d", max_background_jobs); + ROCKS_LOG_HEADER(log, " Options.base_background_compactions: %d", + base_background_compactions); ROCKS_LOG_HEADER(log, " Options.max_background_compactions: %d", max_background_compactions); ROCKS_LOG_HEADER(log, " Options.max_subcompactions: %" PRIu32, diff --git a/options/options.cc b/options/options.cc index d96cf4072e72..138a1a7bcfe3 100644 --- a/options/options.cc +++ b/options/options.cc @@ -304,6 +304,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { hard_pending_compaction_bytes_limit); ROCKS_LOG_HEADER(log, " Options.disable_auto_compactions: %d", disable_auto_compactions); + ROCKS_LOG_HEADER(log, " Options.disable_write_stall: %d", + disable_write_stall); const auto& it_compaction_style = compaction_style_to_string.find(compaction_style); diff --git a/options/options_helper.cc b/options/options_helper.cc index fa5d549c1813..362af2839de7 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -219,6 +219,7 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions, // Compaction related options cf_opts->disable_auto_compactions = moptions.disable_auto_compactions; + cf_opts->disable_write_stall = moptions.disable_write_stall; cf_opts->soft_pending_compaction_bytes_limit = moptions.soft_pending_compaction_bytes_limit; cf_opts->hard_pending_compaction_bytes_limit = diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 2f7493f32eb2..decd1c423471 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -538,6 +538,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "compaction_pri=kMinOverlappingRatio;" "hard_pending_compaction_bytes_limit=0;" "disable_auto_compactions=false;" + "disable_write_stall=false;" "report_bg_io_stats=true;" "ttl=60;" "periodic_compaction_seconds=3600;"