From 092c0d994c2bc6b880cbe33aae5bf660e60485e3 Mon Sep 17 00:00:00 2001 From: Xinye Tao Date: Mon, 21 Dec 2020 13:57:10 +0800 Subject: [PATCH] Add support for TiKV IO rate limiter Signed-off-by: Yang Zhang --- CMakeLists.txt | 1 + TARGETS | 1 + env/env_basic_test.cc | 39 ++++ env/env_inspected.cc | 399 ++++++++++++++++++++++++++++++++ include/rocksdb/env_inspected.h | 56 +++++ src.mk | 1 + 6 files changed, 497 insertions(+) create mode 100644 env/env_inspected.cc create mode 100644 include/rocksdb/env_inspected.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 952dc52d866..6f9debfb50c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -725,6 +725,7 @@ set(SOURCES env/env.cc env/env_chroot.cc env/env_encryption.cc + env/env_inspected.cc env/file_system.cc env/file_system_tracer.cc env/fs_remap.cc diff --git a/TARGETS b/TARGETS index 6f7be5c578a..8642fa9cc61 100644 --- a/TARGETS +++ b/TARGETS @@ -112,6 +112,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "env/env.cc", "env/env_chroot.cc", "env/env_encryption.cc", + "env/env_inspected.cc", "env/env_posix.cc", "env/file_system.cc", "env/file_system_tracer.cc", diff --git a/env/env_basic_test.cc b/env/env_basic_test.cc index 6a3b0390af2..3a54722035e 100644 --- a/env/env_basic_test.cc +++ b/env/env_basic_test.cc @@ -14,6 +14,7 @@ #include "rocksdb/convenience.h" #include "rocksdb/env.h" #include "rocksdb/env_encryption.h" +#include "rocksdb/env_inspected.h" #include "test_util/testharness.h" namespace ROCKSDB_NAMESPACE { @@ -81,6 +82,41 @@ static Env* GetTestFS() { return fs_env; } +class DummyFileSystemInspector : public FileSystemInspector { + public: + DummyFileSystemInspector(size_t refill_bytes = 0) + : refill_bytes_(refill_bytes) {} + + Status Read(size_t len, size_t* allowed) override { + assert(allowed); + if (refill_bytes_ == 0) { + *allowed = len; + } else { + *allowed = std::min(refill_bytes_, len); + } + return Status::OK(); + } + + Status Write(size_t len, size_t* allowed) override { + assert(allowed); + if (refill_bytes_ == 0) { + *allowed = len; + } else { + *allowed = std::min(refill_bytes_, len); + } + return Status::OK(); + } + + private: + size_t refill_bytes_; +}; + +static Env* GetInspectedEnv() { + static std::unique_ptr inspected_env(NewFileSystemInspectedEnv( + Env::Default(), std::make_shared(1))); + return inspected_env.get(); +} + } // namespace class EnvBasicTestWithParam : public testing::Test, @@ -118,6 +154,9 @@ INSTANTIATE_TEST_CASE_P(EncryptedEnv, EnvMoreTestWithParam, INSTANTIATE_TEST_CASE_P(MemEnv, EnvBasicTestWithParam, ::testing::Values(&GetMemoryEnv)); +INSTANTIATE_TEST_CASE_P(InspectedEnv, EnvBasicTestWithParam, + ::testing::Values(&GetInspectedEnv)); + namespace { // Returns a vector of 0 or 1 Env*, depending whether an Env is registered for diff --git a/env/env_inspected.cc b/env/env_inspected.cc new file mode 100644 index 00000000000..6b38ada3dc7 --- /dev/null +++ b/env/env_inspected.cc @@ -0,0 +1,399 @@ +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + +#include "rocksdb/env_inspected.h" + +namespace ROCKSDB_NAMESPACE { + +class InspectedSequentialFile : public SequentialFileWrapper { + public: + InspectedSequentialFile(std::unique_ptr&& target, + std::shared_ptr inspector) + : SequentialFileWrapper(target.get()), + owner_(std::move(target)), + inspector_(inspector) {} + + Status Read(size_t n, Slice* result, char* scratch) override { + assert(inspector_); + Status s; + size_t offset = 0; + size_t allowed = 0; + while (offset < n) { + s = inspector_->Read(n - offset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= n - offset); + if (allowed > 0) { + s = SequentialFileWrapper::Read(allowed, result, scratch + offset); + if (!s.ok()) { + break; + } + size_t actual_read = result->size(); + if (result->data() != scratch + offset) { + // Only possible when underlying file ignores or misuses user + // provided buffer. Reject this case. + memmove(scratch + offset, result->data(), actual_read); + assert(false); + } + offset += actual_read; + if (actual_read < allowed) { + break; + } + } + } + *result = Slice(scratch, offset); + return s; + } + + Status PositionedRead(uint64_t offset, size_t n, Slice* result, + char* scratch) override { + assert(inspector_); + Status s; + size_t roffset = 0; + size_t allowed = 0; + while (roffset < n) { + s = inspector_->Read(n - roffset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= n - roffset); + if (allowed > 0) { + s = SequentialFileWrapper::PositionedRead(offset + roffset, allowed, + result, scratch + roffset); + if (!s.ok()) { + break; + } + size_t actual_read = result->size(); + if (result->data() != scratch + roffset) { + memmove(scratch + roffset, result->data(), actual_read); + assert(false); + } + roffset += actual_read; + if (actual_read < allowed) { + break; + } + } + } + *result = Slice(scratch, roffset); + return s; + } + + private: + std::unique_ptr owner_; + std::shared_ptr inspector_; +}; + +class InspectedRandomAccessFile : public RandomAccessFileWrapper { + public: + InspectedRandomAccessFile(std::unique_ptr&& target, + std::shared_ptr inspector) + : RandomAccessFileWrapper(target.get()), + owner_(std::move(target)), + inspector_(inspector) {} + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + assert(inspector_); + Status s; + size_t roffset = 0; + size_t allowed = 0; + while (roffset < n) { + s = inspector_->Read(n - roffset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= n - roffset); + if (allowed > 0) { + s = RandomAccessFileWrapper::Read(offset + roffset, allowed, result, + scratch + roffset); + if (!s.ok()) { + break; + } + size_t actual_read = result->size(); + if (result->data() != scratch + roffset) { + memmove(scratch + roffset, result->data(), actual_read); + assert(false); + } + roffset += actual_read; + if (actual_read < allowed) { + break; + } + } + } + *result = Slice(scratch, roffset); + return s; + } + + // TODO: support parallel MultiRead + Status MultiRead(ReadRequest* reqs, size_t num_reqs) override { + assert(reqs != nullptr); + for (size_t i = 0; i < num_reqs; ++i) { + ReadRequest& req = reqs[i]; + req.status = Read(req.offset, req.len, &req.result, req.scratch); + } + return Status::OK(); + } + + private: + std::unique_ptr owner_; + std::shared_ptr inspector_; +}; + +class InspectedWritableFile : public WritableFileWrapper { + public: + InspectedWritableFile(std::unique_ptr&& target, + std::shared_ptr inspector) + : WritableFileWrapper(target.get()), + owner_(std::move(target)), + inspector_(inspector) {} + + Status Append(const Slice& data) override { + assert(inspector_); + Status s; + size_t size = data.size(); + size_t offset = 0; + size_t allowed = 0; + while (offset < size) { + s = inspector_->Write(size - offset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= size - offset); + if (allowed > 0) { + s = WritableFileWrapper::Append(Slice(data.data() + offset, allowed)); + if (!s.ok()) { + break; + } + } + offset += allowed; + } + return s; + } + + Status Append(const Slice& data, + const DataVerificationInfo& verification_info) override { + assert(inspector_); + Status s; + size_t size = data.size(); + size_t offset = 0; + size_t allowed = 0; + while (offset < size) { + s = inspector_->Write(size - offset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= size - offset); + if (allowed > 0) { + s = WritableFileWrapper::Append(Slice(data.data() + offset, allowed), + verification_info); + if (!s.ok()) { + break; + } + } + offset += allowed; + } + return s; + } + + Status PositionedAppend(const Slice& data, uint64_t offset) override { + assert(inspector_); + Status s; + size_t size = data.size(); + size_t roffset = 0; + size_t allowed = 0; + while (roffset < size) { + s = inspector_->Write(size - roffset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= size - roffset); + if (allowed > 0) { + s = WritableFileWrapper::PositionedAppend( + Slice(data.data() + roffset, allowed), offset + roffset); + if (!s.ok()) { + break; + } + } + roffset += allowed; + } + return s; + } + + Status PositionedAppend( + const Slice& data, uint64_t offset, + const DataVerificationInfo& verification_info) override { + assert(inspector_); + Status s; + size_t size = data.size(); + size_t roffset = 0; + size_t allowed = 0; + while (roffset < size) { + s = inspector_->Write(size - roffset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= size - roffset); + if (allowed > 0) { + s = WritableFileWrapper::PositionedAppend( + Slice(data.data() + roffset, allowed), offset + roffset, + verification_info); + if (!s.ok()) { + break; + } + } + roffset += allowed; + } + return s; + } + + private: + std::unique_ptr owner_; + std::shared_ptr inspector_; +}; + +class InspectedRandomRWFile : public RandomRWFileWrapper { + public: + InspectedRandomRWFile(std::unique_ptr&& target, + std::shared_ptr inspector) + : RandomRWFileWrapper(target.get()), + owner_(std::move(target)), + inspector_(inspector) {} + + Status Write(uint64_t offset, const Slice& data) override { + assert(inspector_); + Status s; + size_t size = data.size(); + size_t roffset = 0; + size_t allowed = 0; + while (roffset < size) { + s = inspector_->Write(size - roffset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= size - roffset); + if (allowed > 0) { + s = RandomRWFileWrapper::Write(offset + roffset, + Slice(data.data() + roffset, allowed)); + if (!s.ok()) { + break; + } + } + roffset += allowed; + } + return s; + } + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + assert(inspector_); + Status s; + size_t roffset = 0; + size_t allowed = 0; + while (roffset < n) { + s = inspector_->Read(n - roffset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= n - roffset); + if (allowed > 0) { + s = RandomRWFileWrapper::Read(offset + roffset, allowed, result, + scratch + roffset); + if (!s.ok()) { + return s; + } + size_t actual_read = result->size(); + if (result->data() != scratch + roffset) { + memmove(scratch + roffset, result->data(), actual_read); + assert(false); + } + roffset += actual_read; + if (actual_read < allowed) { + break; + } + } + } + *result = Slice(scratch, roffset); + return s; + } + + private: + std::unique_ptr owner_; + std::shared_ptr inspector_; +}; + +FileSystemInspectedEnv::FileSystemInspectedEnv( + Env* base_env, std::shared_ptr& inspector) + : EnvWrapper(base_env), inspector_(inspector) {} + +Status FileSystemInspectedEnv::NewSequentialFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& options) { + auto s = EnvWrapper::NewSequentialFile(fname, result, options); + if (!s.ok()) { + return s; + } + result->reset(new InspectedSequentialFile(std::move(*result), inspector_)); + return s; +} + +Status FileSystemInspectedEnv::NewRandomAccessFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& options) { + auto s = EnvWrapper::NewRandomAccessFile(fname, result, options); + if (!s.ok()) { + return s; + } + result->reset(new InspectedRandomAccessFile(std::move(*result), inspector_)); + return s; +} + +Status FileSystemInspectedEnv::NewWritableFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& options) { + auto s = EnvWrapper::NewWritableFile(fname, result, options); + if (!s.ok()) { + return s; + } + result->reset(new InspectedWritableFile(std::move(*result), inspector_)); + return s; +} + +Status FileSystemInspectedEnv::ReopenWritableFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& options) { + auto s = EnvWrapper::ReopenWritableFile(fname, result, options); + if (!s.ok()) { + return s; + } + result->reset(new InspectedWritableFile(std::move(*result), inspector_)); + return s; +} + +Status FileSystemInspectedEnv::ReuseWritableFile( + const std::string& fname, const std::string& old_fname, + std::unique_ptr* result, const EnvOptions& options) { + auto s = EnvWrapper::ReuseWritableFile(fname, old_fname, result, options); + if (!s.ok()) { + return s; + } + result->reset(new InspectedWritableFile(std::move(*result), inspector_)); + return s; +} + +Status FileSystemInspectedEnv::NewRandomRWFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& options) { + auto s = EnvWrapper::NewRandomRWFile(fname, result, options); + if (!s.ok()) { + return s; + } + result->reset(new InspectedRandomRWFile(std::move(*result), inspector_)); + return s; +} + +Env* NewFileSystemInspectedEnv(Env* base_env, + std::shared_ptr inspector) { + return new FileSystemInspectedEnv(base_env, inspector); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/env_inspected.h b/include/rocksdb/env_inspected.h new file mode 100644 index 00000000000..bc3d0243b8f --- /dev/null +++ b/include/rocksdb/env_inspected.h @@ -0,0 +1,56 @@ +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + +#pragma once + +#include +#include + +#include "rocksdb/env.h" + +namespace ROCKSDB_NAMESPACE { + +// Interface to inspect storage requests. FileSystemInspectedEnv will consult +// FileSystemInspector before issuing actual disk IO. +class FileSystemInspector { + public: + virtual ~FileSystemInspector() = default; + + virtual Status Read(size_t len, size_t* allowed) = 0; + virtual Status Write(size_t len, size_t* allowed) = 0; +}; + +// An Env with underlying IO requests being inspected. It holds a reference to +// an external FileSystemInspector to consult for IO inspection. +class FileSystemInspectedEnv : public EnvWrapper { + public: + FileSystemInspectedEnv(Env* base_env, + std::shared_ptr& inspector); + + Status NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + Status NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + Status ReopenWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + Status ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + std::unique_ptr* result, + const EnvOptions& options) override; + Status NewRandomRWFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + + private: + const std::shared_ptr inspector_; +}; + +extern Env* NewFileSystemInspectedEnv( + Env* base_env, std::shared_ptr inspector); + +} // namespace ROCKSDB_NAMESPACE diff --git a/src.mk b/src.mk index 7e0c9f2c040..8ebf14dfba1 100644 --- a/src.mk +++ b/src.mk @@ -105,6 +105,7 @@ LIB_SOURCES = \ env/env.cc \ env/env_chroot.cc \ env/env_encryption.cc \ + env/env_inspected.cc \ env/env_posix.cc \ env/file_system.cc \ env/fs_posix.cc \