Skip to content

Commit

Permalink
Add an option to disable write stall (#251)
Browse files Browse the repository at this point in the history
* add option to disable write stall

Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: tabokie <[email protected]>
  • Loading branch information
Connor1996 authored and v01dstar committed Feb 27, 2024
1 parent bccf4ed commit 0acb2b9
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 14 deletions.
21 changes: 14 additions & 7 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
bool needed_delay = write_controller->NeedsDelay();

if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kMemtableLimit) {
write_stall_cause == WriteStallCause::kMemtableLimit &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
ROCKS_LOG_WARN(
Expand All @@ -965,7 +966,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number);
} else if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kL0FileCountLimit) {
write_stall_cause == WriteStallCause::kL0FileCountLimit &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
if (compaction_picker_->IsLevel0CompactionInProgress()) {
Expand All @@ -977,7 +979,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
write_stall_cause == WriteStallCause::kPendingCompactionBytes &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(
InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
Expand All @@ -987,7 +990,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
"bytes %" PRIu64,
name_.c_str(), compaction_needed_bytes);
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kMemtableLimit) {
write_stall_cause == WriteStallCause::kMemtableLimit &&
!mutable_cf_options.disable_write_stall) {
write_controller_token_ =
SetupDelay(write_controller, compaction_needed_bytes,
prev_compaction_needed_bytes_, was_stopped,
Expand All @@ -1002,7 +1006,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
mutable_cf_options.max_write_buffer_number,
write_controller->delayed_write_rate());
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kL0FileCountLimit) {
write_stall_cause == WriteStallCause::kL0FileCountLimit &&
!mutable_cf_options.disable_write_stall) {
// L0 is the last two files from stopping.
bool near_stop = vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_stop_writes_trigger - 2;
Expand All @@ -1022,7 +1027,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), vstorage->l0_delay_trigger_count(),
write_controller->delayed_write_rate());
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
write_stall_cause == WriteStallCause::kPendingCompactionBytes &&
!mutable_cf_options.disable_write_stall) {
// If the distance to hard limit is less than 1/4 of the gap between soft
// and
// hard bytes limit, we think it is near stop and speed up the slowdown.
Expand All @@ -1048,7 +1054,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
write_controller->delayed_write_rate());
} else {
assert(write_stall_condition == WriteStallCondition::kNormal);
assert(write_stall_condition == WriteStallCondition::kNormal ||
mutable_cf_options.disable_write_stall);
if (vstorage->l0_delay_trigger_count() >=
GetL0FileCountForCompactionSpeedup(
mutable_cf_options.level0_file_num_compaction_trigger,
Expand Down
11 changes: 7 additions & 4 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1316,11 +1316,14 @@ Status DBImpl::SetDBOptions(
GetBGJobLimits(mutable_db_options_.max_background_flushes,
mutable_db_options_.max_background_compactions,
mutable_db_options_.max_background_jobs,
mutable_db_options_.base_background_compactions,
/* parallelize_compactions */ true);
const BGJobLimits new_bg_job_limits =
GetBGJobLimits(new_options.max_background_flushes,
new_options.max_background_compactions,
new_options.max_background_jobs,
new_options.base_background_compactions,
/* parallelize_compactions */ true);
const BGJobLimits new_bg_job_limits = GetBGJobLimits(
new_options.max_background_flushes,
new_options.max_background_compactions,
new_options.max_background_jobs, /* parallelize_compactions */ true);

const bool max_flushes_increased =
new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,7 @@ class DBImpl : public DB {
static BGJobLimits GetBGJobLimits(int max_background_flushes,
int max_background_compactions,
int max_background_jobs,
int base_background_compactions,
bool parallelize_compactions);

// move logs pending closing from job_context to the DB queue and
Expand Down
5 changes: 4 additions & 1 deletion db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2879,12 +2879,14 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
return GetBGJobLimits(mutable_db_options_.max_background_flushes,
mutable_db_options_.max_background_compactions,
mutable_db_options_.max_background_jobs,
mutable_db_options_.base_background_compactions,
write_controller_.NeedSpeedupCompaction());
}

DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
int max_background_compactions,
int max_background_jobs,
int base_background_compactions,
bool parallelize_compactions) {
BGJobLimits res;
if (max_background_flushes == -1 && max_background_compactions == -1) {
Expand All @@ -2900,7 +2902,8 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
}
if (!parallelize_compactions) {
// throttle background compactions until we deem necessary
res.max_compactions = 1;
res.max_compactions =
std::max(1, std::min(base_background_compactions, res.max_compactions));
}
return res;
}
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
}
auto bg_job_limits = DBImpl::GetBGJobLimits(
result.max_background_flushes, result.max_background_compactions,
result.max_background_jobs, true /* parallelize_compactions */);
result.max_background_jobs, result.base_background_compactions,
true /* parallelize_compactions */);
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
Env::Priority::LOW);
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
Expand Down
96 changes: 95 additions & 1 deletion db/db_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,87 @@ TEST_F(DBOptionsTest, EnableAutoCompactionAndTriggerStall) {
}
}

TEST_F(DBOptionsTest, EnableAutoCompactionButDisableStall) {
const std::string kValue(1024, 'v');
Options options;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.disable_write_stall = true;
options.write_buffer_size = 1024 * 1024 * 10;
options.compression = CompressionType::kNoCompression;
options.level0_file_num_compaction_trigger = 1;
options.level0_stop_writes_trigger = std::numeric_limits<int>::max();
options.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
options.hard_pending_compaction_bytes_limit =
std::numeric_limits<uint64_t>::max();
options.soft_pending_compaction_bytes_limit =
std::numeric_limits<uint64_t>::max();
options.env = env_;

DestroyAndReopen(options);
int i = 0;
for (; i < 1024; i++) {
Put(Key(i), kValue);
}
Flush();
for (; i < 1024 * 2; i++) {
Put(Key(i), kValue);
}
Flush();
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(2, NumTableFilesAtLevel(0));
uint64_t l0_size = SizeAtLevel(0);

options.hard_pending_compaction_bytes_limit = l0_size;
options.soft_pending_compaction_bytes_limit = l0_size;

Reopen(options);
dbfull()->TEST_WaitForCompact();
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());

SyncPoint::GetInstance()->LoadDependency(
{{"DBOptionsTest::EnableAutoCompactionButDisableStall:1",
"BackgroundCallCompaction:0"},
{"DBImpl::BackgroundCompaction():BeforePickCompaction",
"DBOptionsTest::EnableAutoCompactionButDisableStall:2"},
{"DBOptionsTest::EnableAutoCompactionButDisableStall:3",
"DBImpl::BackgroundCompaction():AfterPickCompaction"}});
// Block background compaction.
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:1");
// Wait for stall condition recalculate.
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:2");

ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedSpeedupCompaction());

TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionButDisableStall:3");

// Background compaction executed.
dbfull()->TEST_WaitForCompact();
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedSpeedupCompaction());
}

TEST_F(DBOptionsTest, SetOptionsDisableWriteStall) {
Options options;
options.create_if_missing = true;
options.disable_write_stall = false;
options.env = env_;
Reopen(options);
ASSERT_EQ(false, dbfull()->GetOptions().disable_write_stall);

ASSERT_OK(dbfull()->SetOptions({{"disable_write_stall", "true"}}));
ASSERT_EQ(true, dbfull()->GetOptions().disable_write_stall);
ASSERT_OK(dbfull()->SetOptions({{"disable_write_stall", "false"}}));
ASSERT_EQ(false, dbfull()->GetOptions().disable_write_stall);
}

TEST_F(DBOptionsTest, SetOptionsMayTriggerCompaction) {
Options options;
options.level_compaction_dynamic_level_bytes = false;
Expand Down Expand Up @@ -611,8 +692,21 @@ TEST_F(DBOptionsTest, SetBackgroundCompactionThreads) {
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_OK(dbfull()->SetDBOptions({{"max_background_compactions", "3"}}));
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
auto stop_token = dbfull()->TEST_write_controler().GetStopToken();
{
auto stop_token = dbfull()->TEST_write_controler().GetStopToken();
ASSERT_EQ(3, dbfull()->TEST_BGCompactionsAllowed());
}

ASSERT_OK(dbfull()->SetDBOptions({{"base_background_compactions", "2"}}));
ASSERT_EQ(2, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_OK(dbfull()->SetDBOptions({{"base_background_compactions", "5"}}));
ASSERT_EQ(3, dbfull()->TEST_BGCompactionsAllowed());
ASSERT_OK(dbfull()->SetDBOptions({{"base_background_compactions", "1"}}));
ASSERT_EQ(1, dbfull()->TEST_BGCompactionsAllowed());
{
auto stop_token = dbfull()->TEST_write_controler().GetStopToken();
ASSERT_EQ(3, dbfull()->TEST_BGCompactionsAllowed());
}
}

TEST_F(DBOptionsTest, SetBackgroundFlushThreads) {
Expand Down
5 changes: 5 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// Dynamically changeable through SetOptions() API
bool disable_auto_compactions = false;

// Disable write stall mechanism.
//
// Dynamically changeable through SetOptions() API
bool disable_write_stall = false;

// This is a factory that provides TableFactory objects.
// Default: a block-based table factory that provides a default
// implementation of TableBuilder and TableReader with default
Expand Down
6 changes: 6 additions & 0 deletions options/cf_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct MutableCFOptions, disable_auto_compactions),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"disable_write_stall",
{offsetof(struct MutableCFOptions, disable_write_stall),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"filter_deletes",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
Expand Down Expand Up @@ -1069,6 +1073,8 @@ void MutableCFOptions::Dump(Logger* log) const {
: prefix_extractor->GetId().c_str());
ROCKS_LOG_INFO(log, " disable_auto_compactions: %d",
disable_auto_compactions);
ROCKS_LOG_INFO(log, " disable_write_stall: %d",
disable_write_stall);
ROCKS_LOG_INFO(log, " soft_pending_compaction_bytes_limit: %" PRIu64,
soft_pending_compaction_bytes_limit);
ROCKS_LOG_INFO(log, " hard_pending_compaction_bytes_limit: %" PRIu64,
Expand Down
3 changes: 3 additions & 0 deletions options/cf_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ struct MutableCFOptions {
experimental_mempurge_threshold(
options.experimental_mempurge_threshold),
disable_auto_compactions(options.disable_auto_compactions),
disable_write_stall(options.disable_write_stall),
soft_pending_compaction_bytes_limit(
options.soft_pending_compaction_bytes_limit),
hard_pending_compaction_bytes_limit(
Expand Down Expand Up @@ -196,6 +197,7 @@ struct MutableCFOptions {
prefix_extractor(nullptr),
experimental_mempurge_threshold(0.0),
disable_auto_compactions(false),
disable_write_stall(false),
soft_pending_compaction_bytes_limit(0),
hard_pending_compaction_bytes_limit(0),
level0_file_num_compaction_trigger(0),
Expand Down Expand Up @@ -280,6 +282,7 @@ struct MutableCFOptions {

// Compaction related options
bool disable_auto_compactions;
bool disable_write_stall;
uint64_t soft_pending_compaction_bytes_limit;
uint64_t hard_pending_compaction_bytes_limit;
int level0_file_num_compaction_trigger;
Expand Down
2 changes: 2 additions & 0 deletions options/db_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,8 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options)
void MutableDBOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log, " Options.max_background_jobs: %d",
max_background_jobs);
ROCKS_LOG_HEADER(log, " Options.base_background_compactions: %d",
base_background_compactions);
ROCKS_LOG_HEADER(log, " Options.max_background_compactions: %d",
max_background_compactions);
ROCKS_LOG_HEADER(log, " Options.max_subcompactions: %" PRIu32,
Expand Down
2 changes: 2 additions & 0 deletions options/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
hard_pending_compaction_bytes_limit);
ROCKS_LOG_HEADER(log, " Options.disable_auto_compactions: %d",
disable_auto_compactions);
ROCKS_LOG_HEADER(log, " Options.disable_write_stall: %d",
disable_write_stall);

const auto& it_compaction_style =
compaction_style_to_string.find(compaction_style);
Expand Down
1 change: 1 addition & 0 deletions options/options_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ void UpdateColumnFamilyOptions(const MutableCFOptions& moptions,

// Compaction related options
cf_opts->disable_auto_compactions = moptions.disable_auto_compactions;
cf_opts->disable_write_stall = moptions.disable_write_stall;
cf_opts->soft_pending_compaction_bytes_limit =
moptions.soft_pending_compaction_bytes_limit;
cf_opts->hard_pending_compaction_bytes_limit =
Expand Down
1 change: 1 addition & 0 deletions options/options_settable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"compaction_pri=kMinOverlappingRatio;"
"hard_pending_compaction_bytes_limit=0;"
"disable_auto_compactions=false;"
"disable_write_stall=false;"
"report_bg_io_stats=true;"
"ttl=60;"
"periodic_compaction_seconds=3600;"
Expand Down

0 comments on commit 0acb2b9

Please sign in to comment.