diff --git a/CMakeLists.txt b/CMakeLists.txt index 23a4014bc08f..3a6c9bba6ade 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -912,6 +912,7 @@ set(SOURCES utilities/persistent_cache/block_cache_tier_metadata.cc utilities/persistent_cache/persistent_cache_tier.cc utilities/persistent_cache/volatile_tier_impl.cc + utilities/rate_limiters/write_amp_based_rate_limiter.cc utilities/simulator_cache/cache_simulator.cc utilities/simulator_cache/sim_cache.cc utilities/table_properties_collectors/compact_on_deletion_collector.cc @@ -1450,6 +1451,7 @@ if(WITH_TESTS) utilities/options/options_util_test.cc utilities/persistent_cache/hash_table_test.cc utilities/persistent_cache/persistent_cache_test.cc + utilities/rate_limiters/write_amp_based_rate_limiter_test.cc utilities/simulator_cache/cache_simulator_test.cc utilities/simulator_cache/sim_cache_test.cc utilities/table_properties_collectors/compact_on_deletion_collector_test.cc diff --git a/Makefile b/Makefile index c942d148dc1f..d14c6a8837a9 100644 --- a/Makefile +++ b/Makefile @@ -1990,6 +1990,9 @@ wide_column_serialization_test: $(OBJ_DIR)/db/wide/wide_column_serialization_tes wide_columns_helper_test: $(OBJ_DIR)/db/wide/wide_columns_helper_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +write_amp_based_rate_limiter_test: $(OBJ_DIR)/utilities/rate_limiters/write_amp_based_rate_limiter_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + #------------------------------------------------- # make install related stuff PREFIX ?= /usr/local diff --git a/TARGETS b/TARGETS index e8aaf325d464..394fe6e636df 100644 --- a/TARGETS +++ b/TARGETS @@ -310,6 +310,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "utilities/persistent_cache/block_cache_tier_metadata.cc", "utilities/persistent_cache/persistent_cache_tier.cc", "utilities/persistent_cache/volatile_tier_impl.cc", + "utilities/rate_limiters/write_amp_based_rate_limiter.cc", "utilities/simulator_cache/cache_simulator.cc", "utilities/simulator_cache/sim_cache.cc", "utilities/table_properties_collectors/compact_on_deletion_collector.cc", @@ -5574,6 +5575,12 @@ cpp_unittest_wrapper(name="work_queue_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="write_amp_based_rate_limiter_test", + srcs=["utilities/rate_limiters/write_amp_based_rate_limiter_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="write_batch_test", srcs=["db/write_batch_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/c.cc b/db/c.cc index 5555ae198752..92095dbc7002 100644 --- a/db/c.cc +++ b/db/c.cc @@ -47,6 +47,7 @@ #include "rocksdb/write_batch.h" #include "rocksdb/write_buffer_manager.h" #include "utilities/merge_operators.h" +#include "utilities/rate_limiters/write_amp_based_rate_limiter.h" using ROCKSDB_NAMESPACE::BackupEngine; using ROCKSDB_NAMESPACE::BackupEngineOptions; @@ -95,6 +96,7 @@ using ROCKSDB_NAMESPACE::NewCompactOnDeletionCollectorFactory; using ROCKSDB_NAMESPACE::NewGenericRateLimiter; using ROCKSDB_NAMESPACE::NewLRUCache; using ROCKSDB_NAMESPACE::NewRibbonFilterPolicy; +using ROCKSDB_NAMESPACE::NewWriteAmpBasedRateLimiter; using ROCKSDB_NAMESPACE::OptimisticTransactionDB; using ROCKSDB_NAMESPACE::OptimisticTransactionOptions; using ROCKSDB_NAMESPACE::Options; @@ -3997,6 +3999,14 @@ rocksdb_ratelimiter_t* rocksdb_ratelimiter_create_auto_tuned( return rate_limiter; } +rocksdb_ratelimiter_t* rocksdb_writeampbasedratelimiter_create( + int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness) { + rocksdb_ratelimiter_t* rate_limiter = new rocksdb_ratelimiter_t; + rate_limiter->rep.reset(NewWriteAmpBasedRateLimiter( + rate_bytes_per_sec, refill_period_us, fairness)); + return rate_limiter; +} + void rocksdb_ratelimiter_destroy(rocksdb_ratelimiter_t* limiter) { delete limiter; } diff --git a/db/column_family.cc b/db/column_family.cc index 9782cd31a706..9b5e6590c77c 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -622,7 +622,8 @@ ColumnFamilyData::ColumnFamilyData( } } - RecalculateWriteStallConditions(mutable_cf_options_); + RecalculateWriteStallConditions(mutable_cf_options_, + ioptions_.rate_limiter.get()); if (cf_options.table_factory->IsInstanceOf( TableFactory::kBlockBasedTableName()) && @@ -935,7 +936,7 @@ ColumnFamilyData::GetWriteStallConditionAndCause( } WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( - const MutableCFOptions& mutable_cf_options) { + const MutableCFOptions& mutable_cf_options, RateLimiter* rate_limiter) { auto write_stall_condition = WriteStallCondition::kNormal; if (current_ != nullptr) { auto* vstorage = current_->storage_info(); @@ -1064,6 +1065,9 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( // compaction. write_controller_token_ = write_controller->GetCompactionPressureToken(); + if (rate_limiter) { + rate_limiter->PaceUp(false /*critical*/); + } } else if (vstorage->estimated_compaction_needed_bytes() >= GetPendingCompactionBytesForCompactionSpeedup( mutable_cf_options, vstorage)) { @@ -1093,6 +1097,16 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( 4); } } + if (rate_limiter) { + // pace up limiter when close to write stall + if (write_stall_condition != WriteStallCondition::kNormal || + vstorage->l0_delay_trigger_count() >= + 0.8 * mutable_cf_options.level0_slowdown_writes_trigger || + vstorage->estimated_compaction_needed_bytes() >= + 0.5 * mutable_cf_options.soft_pending_compaction_bytes_limit) { + rate_limiter->PaceUp(true /*critical*/); + } + } prev_compaction_needed_bytes_ = compaction_needed_bytes; } return write_stall_condition; @@ -1320,8 +1334,8 @@ void ColumnFamilyData::InstallSuperVersion( // Should not recalculate slow down condition if nothing has changed, since // currently RecalculateWriteStallConditions() treats it as further slowing // down is needed. - super_version_->write_stall_condition = - RecalculateWriteStallConditions(mutable_cf_options); + super_version_->write_stall_condition = RecalculateWriteStallConditions( + mutable_cf_options, ioptions_.rate_limiter.get()); } else { super_version_->write_stall_condition = old_superversion->write_stall_condition; diff --git a/db/column_family.h b/db/column_family.h index 2a38feb73107..067b419dbba2 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -478,7 +478,8 @@ class ColumnFamilyData { // Recalculate some stall conditions, which are changed only during // compaction, adding new memtable and/or recalculation of compaction score. WriteStallCondition RecalculateWriteStallConditions( - const MutableCFOptions& mutable_cf_options); + const MutableCFOptions& mutable_cf_options, + RateLimiter* rate_limiter = nullptr); void set_initialized() { initialized_.store(true); } diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 8a26585fe738..f98406dddec9 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -1681,6 +1681,10 @@ extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t* rocksdb_ratelimiter_create_auto_tuned(int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness); +extern ROCKSDB_LIBRARY_API rocksdb_ratelimiter_t* +rocksdb_writeampbasedratelimiter_create(int64_t rate_bytes_per_sec, + int64_t refill_period_us, + int32_t fairness); extern ROCKSDB_LIBRARY_API void rocksdb_ratelimiter_destroy( rocksdb_ratelimiter_t*); diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index 3515b1e953ba..1939843d6daf 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -48,6 +48,8 @@ class RateLimiter { virtual Status SetSingleBurstBytes(int64_t /* single_burst_bytes */) { return Status::NotSupported(); } + // Dynamically change rate limiter's auto_tuned mode. + virtual void SetAutoTuned(bool /*auto_tuned*/) {} // Deprecated. New RateLimiter derived classes should override // Request(const int64_t, const Env::IOPriority, Statistics*) or @@ -120,6 +122,8 @@ class RateLimiter { virtual int64_t GetBytesPerSecond() const = 0; + virtual bool GetAutoTuned() const { return false; } + virtual bool IsRateLimited(OpType op_type) { if ((mode_ == RateLimiter::Mode::kWritesOnly && op_type == RateLimiter::OpType::kRead) || @@ -130,6 +134,8 @@ class RateLimiter { return true; } + virtual void PaceUp(bool /*critical*/) {} + protected: Mode GetMode() { return mode_; } @@ -165,4 +171,11 @@ extern RateLimiter* NewGenericRateLimiter( RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly, bool auto_tuned = false); +extern RateLimiter* NewWriteAmpBasedRateLimiter( + int64_t rate_bytes_per_sec, int64_t refill_period_us = 100 * 1000, + int32_t fairness = 10, + RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly, + bool auto_tuned = false, int tune_per_sec = 1, + size_t smooth_window_size = 300, size_t recent_window_size = 30); + } // namespace ROCKSDB_NAMESPACE diff --git a/src.mk b/src.mk index a03a476ff151..11be8b271b34 100644 --- a/src.mk +++ b/src.mk @@ -298,6 +298,7 @@ LIB_SOURCES = \ utilities/persistent_cache/block_cache_tier_metadata.cc \ utilities/persistent_cache/persistent_cache_tier.cc \ utilities/persistent_cache/volatile_tier_impl.cc \ + utilities/rate_limiters/write_amp_based_rate_limiter.cc \ utilities/simulator_cache/cache_simulator.cc \ utilities/simulator_cache/sim_cache.cc \ utilities/table_properties_collectors/compact_on_deletion_collector.cc \ @@ -619,6 +620,7 @@ TEST_MAIN_SOURCES = \ utilities/options/options_util_test.cc \ utilities/persistent_cache/hash_table_test.cc \ utilities/persistent_cache/persistent_cache_test.cc \ + utilities/rate_limiters/write_amp_based_rate_limiter_test.cc \ utilities/simulator_cache/cache_simulator_test.cc \ utilities/simulator_cache/sim_cache_test.cc \ utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \ diff --git a/utilities/rate_limiters/write_amp_based_rate_limiter.cc b/utilities/rate_limiters/write_amp_based_rate_limiter.cc new file mode 100644 index 000000000000..3b0479d63e60 --- /dev/null +++ b/utilities/rate_limiters/write_amp_based_rate_limiter.cc @@ -0,0 +1,433 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "utilities/rate_limiters/write_amp_based_rate_limiter.h" + +#include "monitoring/statistics_impl.h" +#include "port/port.h" +#include "rocksdb/env.h" +#include "test_util/sync_point.h" +#include "util/aligned_buffer.h" + +namespace ROCKSDB_NAMESPACE { + +// Pending request +struct WriteAmpBasedRateLimiter::Req { + explicit Req(int64_t _bytes, port::Mutex* _mu) + : request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {} + int64_t request_bytes; + int64_t bytes; + port::CondVar cv; + bool granted; +}; + +namespace { +// Due to the execution model of compaction, large waves of pending compactions +// could possibly be hidden behind a constant rate of I/O requests. It's then +// wise to raise the threshold slightly above estimation to ensure those +// pending compactions can contribute to the convergence of a new alternative +// threshold. +// Padding is calculated through hyperbola based on empirical percentage of 10% +// and special care for low-pressure domain. E.g. coordinates (5M, 18M) and +// (10M, 16M) are on this curve. +int64_t CalculatePadding(int64_t base) { + return base / 10 + 577464606419583ll / (base + 26225305); +} +} // unnamed namespace + +WriteAmpBasedRateLimiter::WriteAmpBasedRateLimiter( + int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness, + RateLimiter::Mode mode, Env* env, bool auto_tuned, int secs_per_tune, + size_t smooth_window_size, size_t recent_window_size) + : RateLimiter(mode), + refill_period_us_(refill_period_us), + rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2 + : rate_bytes_per_sec), + refill_bytes_per_period_( + CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)), + env_(env), + stop_(false), + exit_cv_(&request_mutex_), + requests_to_wait_(0), + available_bytes_(0), + next_refill_us_(NowMicrosMonotonic(env_)), + fairness_(fairness > 100 ? 100 : fairness), + rnd_((uint32_t)time(nullptr)), + leader_(nullptr), + auto_tuned_(auto_tuned), + secs_per_tune_(secs_per_tune == 0 ? 1 : secs_per_tune), + max_bytes_per_sec_(rate_bytes_per_sec), + tuned_time_(NowMicrosMonotonic(env_)), + duration_highpri_bytes_through_(0), + duration_bytes_through_(0), + bytes_sampler_(smooth_window_size, recent_window_size), + highpri_bytes_sampler_(smooth_window_size, recent_window_size), + limit_bytes_sampler_(recent_window_size, recent_window_size), + critical_pace_up_(false), + normal_pace_up_(false), + percent_delta_(0) { + std::fill(total_requests_, total_requests_ + Env::IO_TOTAL, 0); + std::fill(total_bytes_through_, total_bytes_through_ + Env::IO_TOTAL, 0); +} + +WriteAmpBasedRateLimiter::~WriteAmpBasedRateLimiter() { + MutexLock g(&request_mutex_); + stop_ = true; + requests_to_wait_ = static_cast(queue_[Env::IO_LOW].size() + + queue_[Env::IO_HIGH].size()); + for (auto& r : queue_[Env::IO_HIGH]) { + r->cv.Signal(); + } + for (auto& r : queue_[Env::IO_LOW]) { + r->cv.Signal(); + } + while (requests_to_wait_ > 0) { + exit_cv_.Wait(); + } +} + +void WriteAmpBasedRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { + assert(bytes_per_second > 0); + if (auto_tuned_.load(std::memory_order_acquire)) { + max_bytes_per_sec_.store(bytes_per_second, std::memory_order_relaxed); + } else { + SetActualBytesPerSecond(bytes_per_second); + } +} + +void WriteAmpBasedRateLimiter::SetAutoTuned(bool auto_tuned) { + MutexLock g(&auto_tuned_mutex_); + if (auto_tuned_.load(std::memory_order_acquire) != auto_tuned) { + if (auto_tuned) { + max_bytes_per_sec_.store(rate_bytes_per_sec_, std::memory_order_relaxed); + refill_bytes_per_period_.store( + CalculateRefillBytesPerPeriod(rate_bytes_per_sec_), + std::memory_order_relaxed); + } else { + // must hold this lock to avoid tuner changing `rate_bytes_per_sec_` + MutexLock g2(&request_mutex_); + rate_bytes_per_sec_ = max_bytes_per_sec_.load(std::memory_order_relaxed); + refill_bytes_per_period_.store( + CalculateRefillBytesPerPeriod(rate_bytes_per_sec_), + std::memory_order_relaxed); + } + auto_tuned_.store(auto_tuned, std::memory_order_release); + } +} + +void WriteAmpBasedRateLimiter::SetActualBytesPerSecond( + int64_t bytes_per_second) { + rate_bytes_per_sec_ = bytes_per_second; + refill_bytes_per_period_.store( + CalculateRefillBytesPerPeriod(bytes_per_second), + std::memory_order_relaxed); +} + +void WriteAmpBasedRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, + Statistics* stats) { + TEST_SYNC_POINT("WriteAmpBasedRateLimiter::Request"); + TEST_SYNC_POINT_CALLBACK("WriteAmpBasedRateLimiter::Request:1", + &rate_bytes_per_sec_); + if (auto_tuned_.load(std::memory_order_acquire) && pri == Env::IO_HIGH && + duration_highpri_bytes_through_ + duration_bytes_through_ + bytes <= + max_bytes_per_sec_.load(std::memory_order_relaxed) * secs_per_tune_) { + // In the case where low-priority request is absent, actual time elapsed + // will be larger than secs_per_tune_, making the limit even tighter. + total_bytes_through_[Env::IO_HIGH] += bytes; + ++total_requests_[Env::IO_HIGH]; + duration_highpri_bytes_through_ += bytes; + return; + } + assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed)); + MutexLock g(&request_mutex_); + + if (auto_tuned_.load(std::memory_order_acquire)) { + std::chrono::microseconds now(NowMicrosMonotonic(env_)); + auto micros_per_tune = 1000 * 1000 * secs_per_tune_; + if (now - tuned_time_ >= std::chrono::microseconds(micros_per_tune)) { + Tune(); + } + } + + if (stop_) { + return; + } + + ++total_requests_[pri]; + + if (available_bytes_ >= bytes) { + // Refill thread assigns quota and notifies requests waiting on + // the queue under mutex. So if we get here, that means nobody + // is waiting? + available_bytes_ -= bytes; + total_bytes_through_[pri] += bytes; + duration_bytes_through_ += bytes; + return; + } + + // Request cannot be satisfied at this moment, enqueue + Req r(bytes, &request_mutex_); + queue_[pri].push_back(&r); + + do { + bool timedout = false; + // Leader election, candidates can be: + // (1) a new incoming request, + // (2) a previous leader, whose quota has not been not assigned yet due + // to lower priority + // (3) a previous waiter at the front of queue, who got notified by + // previous leader + if (leader_ == nullptr && + ((!queue_[Env::IO_HIGH].empty() && + &r == queue_[Env::IO_HIGH].front()) || + (!queue_[Env::IO_LOW].empty() && &r == queue_[Env::IO_LOW].front()))) { + leader_ = &r; + int64_t delta = next_refill_us_ - NowMicrosMonotonic(env_); + delta = delta > 0 ? delta : 0; + if (delta == 0) { + timedout = true; + } else { + int64_t wait_until = env_->NowMicros() + delta; + RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); + timedout = r.cv.TimedWait(wait_until); + } + } else { + // Not at the front of queue or an leader has already been elected + r.cv.Wait(); + } + + // request_mutex_ is held from now on + if (stop_) { + --requests_to_wait_; + exit_cv_.Signal(); + return; + } + + // Make sure the waken up request is always the header of its queue + assert( + r.granted || + (!queue_[Env::IO_HIGH].empty() && &r == queue_[Env::IO_HIGH].front()) || + (!queue_[Env::IO_LOW].empty() && &r == queue_[Env::IO_LOW].front())); + assert(leader_ == nullptr || + (!queue_[Env::IO_HIGH].empty() && + leader_ == queue_[Env::IO_HIGH].front()) || + (!queue_[Env::IO_LOW].empty() && + leader_ == queue_[Env::IO_LOW].front())); + + if (leader_ == &r) { + // Waken up from TimedWait() + if (timedout) { + // Time to do refill! + Refill(); + + // Re-elect a new leader regardless. This is to simplify the + // election handling. + leader_ = nullptr; + + // Notify the header of queue if current leader is going away + if (r.granted) { + // Current leader already got granted with quota. Notify header + // of waiting queue to participate next round of election. + assert((queue_[Env::IO_HIGH].empty() || + &r != queue_[Env::IO_HIGH].front()) && + (queue_[Env::IO_LOW].empty() || + &r != queue_[Env::IO_LOW].front())); + if (!queue_[Env::IO_HIGH].empty()) { + queue_[Env::IO_HIGH].front()->cv.Signal(); + } else if (!queue_[Env::IO_LOW].empty()) { + queue_[Env::IO_LOW].front()->cv.Signal(); + } + // Done + break; + } + } else { + // Spontaneous wake up, need to continue to wait + assert(!r.granted); + leader_ = nullptr; + } + } else { + // Waken up by previous leader: + // (1) if requested quota is granted, it is done. + // (2) if requested quota is not granted, this means current thread + // was picked as a new leader candidate (previous leader got quota). + // It needs to participate leader election because a new request may + // come in before this thread gets waken up. So it may actually need + // to do Wait() again. + assert(!timedout); + } + } while (!r.granted); +} + +void WriteAmpBasedRateLimiter::Refill() { + TEST_SYNC_POINT("WriteAmpBasedRateLimiter::Refill"); + next_refill_us_ = NowMicrosMonotonic(env_) + refill_period_us_; + // Carry over the left over quota from the last period + auto refill_bytes_per_period = + refill_bytes_per_period_.load(std::memory_order_relaxed); + available_bytes_ = refill_bytes_per_period; + + int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1; + for (int q = 0; q < 2; ++q) { + auto use_pri = (use_low_pri_first == q) ? Env::IO_LOW : Env::IO_HIGH; + auto* queue = &queue_[use_pri]; + while (!queue->empty()) { + auto* next_req = queue->front(); + if (available_bytes_ < next_req->request_bytes) { + // avoid starvation + next_req->request_bytes -= available_bytes_; + available_bytes_ = 0; + break; + } + available_bytes_ -= next_req->request_bytes; + next_req->request_bytes = 0; + total_bytes_through_[use_pri] += next_req->bytes; + duration_bytes_through_ += next_req->bytes; + queue->pop_front(); + + next_req->granted = true; + if (next_req != leader_) { + // Quota granted, signal the thread + next_req->cv.Signal(); + } + } + } +} + +int64_t WriteAmpBasedRateLimiter::CalculateRefillBytesPerPeriod( + int64_t rate_bytes_per_sec) { + if (std::numeric_limits::max() / rate_bytes_per_sec < + refill_period_us_) { + // Avoid unexpected result in the overflow case. The result now is still + // inaccurate but is a number that is large enough. + return std::numeric_limits::max() / 1000000; + } else { + return std::max(kMinRefillBytesPerPeriod, + rate_bytes_per_sec * refill_period_us_ / 1000000); + } +} + +// The core function used to dynamically adjust the compaction rate limit, +// called **at most** once every `secs_per_tune`. +// I/O throughput threshold is automatically tuned based on history samples of +// compaction and flush flow. This algorithm excels by taking into account the +// limiter's inability to estimate the pressure of pending compactions, and the +// possibility of foreground write fluctuation. +Status WriteAmpBasedRateLimiter::Tune() { + // computed rate limit will be larger than 10MB/s + const int64_t kMinBytesPerSec = 10 << 20; + // high-priority bytes are padded to 8MB + const int64_t kHighBytesLower = 8 << 20; + // lower bound for write amplification estimation + const int kRatioLower = 10; + const int kPercentDeltaMax = 6; + + std::chrono::microseconds prev_tuned_time = tuned_time_; + tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic(env_)); + auto duration = tuned_time_ - prev_tuned_time; + auto duration_ms = + std::chrono::duration_cast(duration).count(); + + int64_t prev_bytes_per_sec = GetBytesPerSecond(); + + // This function can be called less frequent than we anticipate when + // compaction rate is low. Loop through the actual time slice to correct + // the estimation. + auto millis_per_tune = 1000 * secs_per_tune_; + for (uint32_t i = 0; i < duration_ms / millis_per_tune; i++) { + bytes_sampler_.AddSample(duration_bytes_through_ * 1000 / duration_ms); + highpri_bytes_sampler_.AddSample(duration_highpri_bytes_through_ * 1000 / + duration_ms); + limit_bytes_sampler_.AddSample(prev_bytes_per_sec); + } + int64_t new_bytes_per_sec = bytes_sampler_.GetFullValue(); + int32_t ratio = std::max( + kRatioLower, + static_cast( + bytes_sampler_.GetFullValue() * 10 / + std::max(highpri_bytes_sampler_.GetFullValue(), kHighBytesLower))); + // Only adjust threshold when foreground write (flush) flow increases, + // because decreasement could also be caused by manual flow control at + // application level to alleviate background pressure. + new_bytes_per_sec = std::max( + new_bytes_per_sec, + ratio * + std::max(highpri_bytes_sampler_.GetRecentValue(), kHighBytesLower) / + 10); + // Set the threshold higher to avoid write stalls caused by pending + // compactions. + int64_t padding = CalculatePadding(new_bytes_per_sec); + // Adjustment based on utilization. + int64_t util = bytes_sampler_.GetRecentValue() * 1000 / + limit_bytes_sampler_.GetRecentValue(); + if (util >= 995) { + if (percent_delta_ < kPercentDeltaMax) { + percent_delta_ += 1; + } + } else if (percent_delta_ > 0) { + percent_delta_ -= 1; + } + // React to pace-up requests when LSM is out of shape. + if (critical_pace_up_.load(std::memory_order_relaxed)) { + percent_delta_ = 150; + critical_pace_up_.store(false, std::memory_order_relaxed); + } else if (normal_pace_up_.load(std::memory_order_relaxed)) { + percent_delta_ = + std::max(percent_delta_, + static_cast(padding * 150 / new_bytes_per_sec)); + normal_pace_up_.store(false, std::memory_order_relaxed); + } + new_bytes_per_sec += padding + new_bytes_per_sec * percent_delta_ / 100; + new_bytes_per_sec = + std::max(kMinBytesPerSec, + std::min(new_bytes_per_sec, + max_bytes_per_sec_.load(std::memory_order_relaxed) - + highpri_bytes_sampler_.GetRecentValue())); + if (new_bytes_per_sec != prev_bytes_per_sec) { + SetActualBytesPerSecond(new_bytes_per_sec); + } + + duration_bytes_through_ = 0; + duration_highpri_bytes_through_ = 0; + return Status::OK(); +} + +void WriteAmpBasedRateLimiter::PaceUp(bool critical) { + if (auto_tuned_.load(std::memory_order_acquire)) { + if (critical) { + critical_pace_up_.store(true, std::memory_order_relaxed); + } else { + normal_pace_up_.store(true, std::memory_order_relaxed); + } + } +} + +RateLimiter* NewWriteAmpBasedRateLimiter( + int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */, + int32_t fairness /* = 10 */, + RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */, + bool auto_tuned /* = false */, int tune_per_sec /* = 1 */, + size_t smooth_window_size /* = 300 */, + size_t recent_window_size /* = 30 */) { + assert(rate_bytes_per_sec > 0); + assert(refill_period_us > 0); + assert(fairness > 0); + assert(tune_per_sec >= 0); + assert(smooth_window_size >= recent_window_size); + if (smooth_window_size == 0) { + smooth_window_size = 300; + } + if (recent_window_size == 0) { + recent_window_size = 30; + } + return new WriteAmpBasedRateLimiter( + rate_bytes_per_sec, refill_period_us, fairness, mode, Env::Default(), + auto_tuned, tune_per_sec, smooth_window_size, recent_window_size); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/rate_limiters/write_amp_based_rate_limiter.h b/utilities/rate_limiters/write_amp_based_rate_limiter.h new file mode 100644 index 000000000000..5f19d72da34e --- /dev/null +++ b/utilities/rate_limiters/write_amp_based_rate_limiter.h @@ -0,0 +1,166 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include +#include +#include +#include + +#include "port/port.h" +#include "rocksdb/env.h" +#include "rocksdb/rate_limiter.h" +#include "util/mutexlock.h" +#include "util/random.h" + +namespace ROCKSDB_NAMESPACE { + +class WriteAmpBasedRateLimiter : public RateLimiter { + public: + WriteAmpBasedRateLimiter(int64_t refill_bytes, int64_t refill_period_us, + int32_t fairness, RateLimiter::Mode mode, Env* env, + bool auto_tuned, int secs_per_tune, + size_t auto_tune_smooth_window, + size_t auto_tune_recent_window); + + virtual ~WriteAmpBasedRateLimiter(); + + // This API allows user to dynamically change rate limiter's bytes per second. + // When auto-tuned is on, this sets rate limit's upper bound instead. + virtual void SetBytesPerSecond(int64_t bytes_per_second) override; + + // Dynamically change rate limiter's auto_tuned mode. + virtual void SetAutoTuned(bool auto_tuned) override; + + // Request for token to write bytes. If this request can not be satisfied, + // the call is blocked. Caller is responsible to make sure + // bytes <= GetSingleBurstBytes() + using RateLimiter::Request; + virtual void Request(const int64_t bytes, const Env::IOPriority pri, + Statistics* stats) override; + + virtual int64_t GetSingleBurstBytes() const override { + return refill_bytes_per_period_.load(std::memory_order_relaxed); + } + + virtual int64_t GetTotalBytesThrough( + const Env::IOPriority pri = Env::IO_TOTAL) const override { + MutexLock g(&request_mutex_); + if (pri == Env::IO_TOTAL) { + return total_bytes_through_[Env::IO_LOW] + + total_bytes_through_[Env::IO_HIGH]; + } + return total_bytes_through_[pri]; + } + + virtual int64_t GetTotalRequests( + const Env::IOPriority pri = Env::IO_TOTAL) const override { + MutexLock g(&request_mutex_); + if (pri == Env::IO_TOTAL) { + return total_requests_[Env::IO_LOW] + total_requests_[Env::IO_HIGH]; + } + return total_requests_[pri]; + } + + virtual int64_t GetBytesPerSecond() const override { + return rate_bytes_per_sec_; + } + + virtual bool GetAutoTuned() const override { + return auto_tuned_.load(std::memory_order_acquire); + } + + virtual void PaceUp(bool critical) override; + + private: + void Refill(); + int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); + void SetActualBytesPerSecond(int64_t bytes_per_second); + Status Tune(); + + uint64_t NowMicrosMonotonic(Env* env) { + return env->NowNanos() / std::milli::den; + } + + // This mutex guard all internal states + mutable port::Mutex request_mutex_; + + const int64_t kMinRefillBytesPerPeriod = 100; + + const int64_t refill_period_us_; + + int64_t rate_bytes_per_sec_; + // This variable can be changed dynamically. + std::atomic refill_bytes_per_period_; + Env* const env_; + + bool stop_; + port::CondVar exit_cv_; + int32_t requests_to_wait_; + + int64_t total_requests_[Env::IO_TOTAL]; + int64_t total_bytes_through_[Env::IO_TOTAL]; + int64_t available_bytes_; + int64_t next_refill_us_; + + int32_t fairness_; + Random rnd_; + + struct Req; + Req* leader_; + std::deque queue_[Env::IO_TOTAL]; + + // only used to synchronize auto_tuned setters + port::Mutex auto_tuned_mutex_; + + std::atomic auto_tuned_; + int secs_per_tune_; + std::atomic max_bytes_per_sec_; + std::chrono::microseconds tuned_time_; + int64_t duration_highpri_bytes_through_; + int64_t duration_bytes_through_; + + class WindowSmoother { + public: + WindowSmoother(size_t smooth_window_size, size_t recent_window_size) + : smooth_window_size_(smooth_window_size), + recent_window_size_(recent_window_size), + data_(smooth_window_size, 0) {} + void AddSample(int64_t v) { + auto recent_cursor = + (cursor_ + 1 + smooth_window_size_ - recent_window_size_) % + smooth_window_size_; + cursor_ = (cursor_ + 1) % smooth_window_size_; + full_sum_ += v - data_[cursor_]; + recent_sum_ += v - data_[recent_cursor]; + data_[cursor_] = v; + } + int64_t GetFullValue() { return full_sum_ / smooth_window_size_; } + int64_t GetRecentValue() { return recent_sum_ / recent_window_size_; } + bool AtTimePoint() const { return cursor_ == 0; } + + private: + uint32_t cursor_{0}; // point to the most recent sample + size_t smooth_window_size_; + size_t recent_window_size_; + std::vector data_; + int64_t full_sum_{0}; + int64_t recent_sum_{0}; + }; + + WindowSmoother bytes_sampler_; + WindowSmoother highpri_bytes_sampler_; + WindowSmoother limit_bytes_sampler_; + std::atomic critical_pace_up_; + std::atomic normal_pace_up_; + uint32_t percent_delta_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/rate_limiters/write_amp_based_rate_limiter_test.cc b/utilities/rate_limiters/write_amp_based_rate_limiter_test.cc new file mode 100644 index 000000000000..281d0e0697b8 --- /dev/null +++ b/utilities/rate_limiters/write_amp_based_rate_limiter_test.cc @@ -0,0 +1,209 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "utilities/rate_limiters/write_amp_based_rate_limiter.h" + +#include +#include +#include + +#include "db/db_test_util.h" +#include "rocksdb/env.h" +#include "rocksdb/rate_limiter.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" +#include "util/random.h" + +namespace ROCKSDB_NAMESPACE { + +// TODO(yhchiang): the rate will not be accurate when we run test in parallel. +class WriteAmpBasedRateLimiterTest : public testing::Test {}; + +TEST_F(WriteAmpBasedRateLimiterTest, OverflowRate) { + WriteAmpBasedRateLimiter limiter(std::numeric_limits::max(), 1000, + 10, RateLimiter::Mode::kWritesOnly, + Env::Default(), false /* auto_tuned */, 1, + 100, 10); + ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll); +} + +TEST_F(WriteAmpBasedRateLimiterTest, StartStop) { + std::unique_ptr limiter( + NewWriteAmpBasedRateLimiter(100, 100, 10)); +} + +TEST_F(WriteAmpBasedRateLimiterTest, Modes) { + for (auto mode : {RateLimiter::Mode::kWritesOnly, + RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) { + WriteAmpBasedRateLimiter limiter( + 2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, + 10 /* fairness */, mode, Env::Default(), false /* auto_tuned */, + 1 /* secs_per_tune */, 100 /* smooth_window */, 10 /* recent_window */); + limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kRead); + if (mode == RateLimiter::Mode::kWritesOnly) { + ASSERT_EQ(0, limiter.GetTotalBytesThrough(Env::IO_HIGH)); + } else { + ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); + } + + limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kWrite); + if (mode == RateLimiter::Mode::kAllIo) { + ASSERT_EQ(2000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); + } else { + ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); + } + } +} + +TEST_F(WriteAmpBasedRateLimiterTest, AutoTune) { + auto* thread_env = Env::Default(); + WriteAmpBasedRateLimiter limiter( + 10000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo, Env::Default(), + true /* auto_tuned */, 1 /* secs_per_tune */, 100 /* smooth_window */, + 10 /* recent_window */); + limiter.Request(8000 /* bytes */, Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kWrite); + ASSERT_EQ(8000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); + + thread_env->SleepForMicroseconds(1000 * 1000); + // request from low io can trigger auto tune. + limiter.Request(1000 /* bytes */, Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kWrite); + ASSERT_EQ(10485760, limiter.GetBytesPerSecond()); + // TODO: add more logic for auto-tune +} + +#if !(defined(TRAVIS) && defined(OS_MACOSX)) +TEST_F(WriteAmpBasedRateLimiterTest, Rate) { + auto* env = Env::Default(); + struct Arg { + Arg(int32_t _target_rate, int _burst) + : limiter(NewWriteAmpBasedRateLimiter(_target_rate, 100 * 1000, 10)), + request_size(_target_rate / 10), + burst(_burst) {} + std::unique_ptr limiter; + int32_t request_size; + int burst; + }; + + auto writer = [](void* p) { + auto* thread_env = Env::Default(); + auto* arg = static_cast(p); + // Test for 2 seconds + auto until = thread_env->NowMicros() + 2 * 1000000; + Random r((uint32_t)(thread_env->NowNanos() % + std::numeric_limits::max())); + while (thread_env->NowMicros() < until) { + for (int i = 0; i < static_cast(r.Skewed(arg->burst) + 1); ++i) { + arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, + Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kWrite); + } + arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW, + nullptr /* stats */, RateLimiter::OpType::kWrite); + } + }; + + for (int i = 1; i <= 16; i *= 2) { + int32_t target = i * 1024 * 10; + Arg arg(target, i / 4 + 1); + int64_t old_total_bytes_through = 0; + for (int iter = 1; iter <= 2; ++iter) { + // second iteration changes the target dynamically + if (iter == 2) { + target *= 2; + arg.limiter->SetBytesPerSecond(target); + } + auto start = env->NowMicros(); + for (int t = 0; t < i; ++t) { + env->StartThread(writer, &arg); + } + env->WaitForJoin(); + + auto elapsed = env->NowMicros() - start; + double rate = + (arg.limiter->GetTotalBytesThrough() - old_total_bytes_through) * + 1000000.0 / elapsed; + old_total_bytes_through = arg.limiter->GetTotalBytesThrough(); + fprintf(stderr, + "request size [1 - %" PRIi32 "], limit %" PRIi32 + " KB/sec, actual rate: %lf KB/sec, elapsed %.2lf seconds\n", + arg.request_size - 1, target / 1024, rate / 1024, + elapsed / 1000000.0); + + ASSERT_GE(rate / target, 0.75); + ASSERT_LE(rate / target, 1.25); + } + } +} +#endif + +TEST_F(WriteAmpBasedRateLimiterTest, LimitChangeTest) { + // starvation test when limit changes to a smaller value + int64_t refill_period = 1000 * 1000; + auto* env = Env::Default(); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + struct Arg { + Arg(int32_t _request_size, Env::IOPriority _pri, + std::shared_ptr _limiter) + : request_size(_request_size), pri(_pri), limiter(_limiter) {} + int32_t request_size; + Env::IOPriority pri; + std::shared_ptr limiter; + }; + + auto writer = [](void* p) { + auto* arg = static_cast(p); + arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */, + RateLimiter::OpType::kWrite); + }; + + for (uint32_t i = 1; i <= 16; i <<= 1) { + int32_t target = i * 1024 * 10; + // refill per second + for (int iter = 0; iter < 2; iter++) { + std::shared_ptr limiter = + std::make_shared( + target, refill_period, 10, RateLimiter::Mode::kWritesOnly, + Env::Default(), false /* auto_tuned */, 1, 300, 30); + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"WriteAmpBasedRateLimiter::Request", + "WriteAmpBasedRateLimiterTest::LimitChangeTest:changeLimitStart"}, + {"WriteAmpBasedRateLimiterTest::LimitChangeTest:changeLimitEnd", + "WriteAmpBasedRateLimiter::Refill"}}); + Arg arg(target, Env::IO_HIGH, limiter); + // The idea behind is to start a request first, then before it refills, + // update limit to a different value (2X/0.5X). No starvation should + // be guaranteed under any situation + // TODO(lightmark): more test cases are welcome. + env->StartThread(writer, &arg); + int32_t new_limit = (target << 1) >> (iter << 1); + TEST_SYNC_POINT( + "WriteAmpBasedRateLimiterTest::LimitChangeTest:changeLimitStart"); + arg.limiter->SetBytesPerSecond(new_limit); + TEST_SYNC_POINT( + "WriteAmpBasedRateLimiterTest::LimitChangeTest:changeLimitEnd"); + env->WaitForJoin(); + fprintf(stderr, + "[COMPLETE] request size %" PRIi32 " KB, new limit %" PRIi32 + "KB/sec, refill period %" PRIi64 " ms\n", + target / 1024, new_limit / 1024, refill_period / 1000); + } + } +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}