Skip to content

Commit

Permalink
Merge branch '6.4.tikv' into mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
Little-Wallace authored Apr 1, 2021
2 parents add5369 + 3ddff2e commit cf42e20
Show file tree
Hide file tree
Showing 16 changed files with 341 additions and 78 deletions.
37 changes: 11 additions & 26 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
dist: focal
language: cpp
os:
- linux
- osx
compiler:
- clang
- gcc
osx_image: xcode12
os: linux
compiler: gcc
cache:
- ccache
- apt
Expand Down Expand Up @@ -38,25 +33,15 @@ env:
- JOB_NAME=encrypted_env # 16-18 minutes

matrix:
exclude:
- os: osx
env: TEST_GROUP=1
- os: osx
env: TEST_GROUP=2
- os: osx
env: TEST_GROUP=3
- os: osx
env: TEST_GROUP=4
- os: osx
env: JOB_NAME=lite_build
- os: osx
env: JOB_NAME=examples
- os: osx
env: JOB_NAME=encrypted_env
- os : linux
compiler: clang
- os: osx
compiler: gcc
include:
- os: linux
arch: arm64-graviton2
virt: vm
env: TEST_GROUP=platform_dependent
- os: osx
os_image: xcode12.2
compailer: clang
env: TEST_GROUP=platform_dependent


# https://docs.travis-ci.com/user/caching/#ccache-cache
Expand Down
14 changes: 7 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,14 @@ if(CMAKE_SYSTEM_PROCESSOR MATCHES "ppc64le")
endif(HAS_ALTIVEC)
endif(CMAKE_SYSTEM_PROCESSOR MATCHES "ppc64le")

if(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|AARCH64")
if(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|AARCH64|arm64")
CHECK_C_COMPILER_FLAG("-march=armv8-a+crc" HAS_ARMV8_CRC)
if(HAS_ARMV8_CRC)
message(STATUS " HAS_ARMV8_CRC yes")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -march=armv8-a+crc -Wno-unused-function")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=armv8-a+crc -Wno-unused-function")
endif(HAS_ARMV8_CRC)
endif(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|AARCH64")
endif(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|AARCH64|arm64")

option(PORTABLE "build a portable binary" OFF)
option(FORCE_SSE42 "force building with SSE4.2, even when PORTABLE=ON" OFF)
Expand Down Expand Up @@ -408,11 +408,6 @@ if(CMAKE_SYSTEM_NAME MATCHES "Cygwin")
add_definitions(-fno-builtin-memcmp -DCYGWIN)
elseif(CMAKE_SYSTEM_NAME MATCHES "Darwin")
add_definitions(-DOS_MACOSX)
if(CMAKE_SYSTEM_PROCESSOR MATCHES arm)
add_definitions(-DIOS_CROSS_COMPILE -DROCKSDB_LITE)
# no debug info for IOS, that will make our library big
add_definitions(-DNDEBUG)
endif()
elseif(CMAKE_SYSTEM_NAME MATCHES "Linux")
add_definitions(-DOS_LINUX)
elseif(CMAKE_SYSTEM_NAME MATCHES "SunOS")
Expand Down Expand Up @@ -485,6 +480,11 @@ if(HAVE_SCHED_GETCPU)
add_definitions(-DROCKSDB_SCHED_GETCPU_PRESENT)
endif()

check_cxx_symbol_exists(getauxval auvx.h HAVE_AUXV_GETAUXVAL)
if(HAVE_AUXV_GETAUXVAL)
add_definitions(-DROCKSDB_AUXV_GETAUXVAL_PRESENT)
endif()

include_directories(${PROJECT_SOURCE_DIR})
include_directories(${PROJECT_SOURCE_DIR}/include)
include_directories(SYSTEM ${PROJECT_SOURCE_DIR}/third-party/gtest-1.7.0/fused-src)
Expand Down
144 changes: 144 additions & 0 deletions db/compact_files_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <string>
#include <thread>
#include <vector>
#include <unistd.h>

#include "db/db_impl/db_impl.h"
#include "port/port.h"
Expand Down Expand Up @@ -205,6 +206,69 @@ TEST_F(CompactFilesTest, L0ConflictsFiles) {
delete db;
}

TEST_F(CompactFilesTest, MultipleLevel) {
Options options;
options.create_if_missing = true;
options.level_compaction_dynamic_level_bytes = true;
options.num_levels = 6;
// Add listener
FlushedFileCollector* collector = new FlushedFileCollector();
options.listeners.emplace_back(collector);

DB* db = nullptr;
DestroyDB(db_name_, options);
Status s = DB::Open(options, db_name_, &db);
ASSERT_OK(s);
ASSERT_NE(db, nullptr);

// create couple files in L0, L3, L4 and L5
for (int i = 5; i > 2; --i) {
collector->ClearFlushedFiles();
ASSERT_OK(db->Put(WriteOptions(), ToString(i), ""));
ASSERT_OK(db->Flush(FlushOptions()));
auto l0_files = collector->GetFlushedFiles();
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, i));

std::string prop;
ASSERT_TRUE(
db->GetProperty("rocksdb.num-files-at-level" + ToString(i), &prop));
ASSERT_EQ("1", prop);
}
ASSERT_OK(db->Put(WriteOptions(), ToString(0), ""));
ASSERT_OK(db->Flush(FlushOptions()));

rocksdb::ColumnFamilyMetaData meta;
db->GetColumnFamilyMetaData(&meta);
// Compact files except the file in L3
std::vector<std::string> files;
for (int i = 0; i < 6; ++i) {
if (i == 3) continue;
for (auto& file : meta.levels[i].files) {
files.push_back(file.db_path + "/" + file.name);
}
}

rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"CompactionJob::Run():Start", "CompactFilesTest.MultipleLevel:0"},
{"CompactFilesTest.MultipleLevel:1", "CompactFilesImpl:3"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

std::thread thread([&] {
TEST_SYNC_POINT("CompactFilesTest.MultipleLevel:0");
ASSERT_OK(db->Put(WriteOptions(), "bar", "v2"));
ASSERT_OK(db->Put(WriteOptions(), "foo", "v2"));
ASSERT_OK(db->Flush(FlushOptions()));
TEST_SYNC_POINT("CompactFilesTest.MultipleLevel:1");
});

ASSERT_OK(db->CompactFiles(rocksdb::CompactionOptions(), files, 5));
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
thread.join();

delete db;
}

TEST_F(CompactFilesTest, ObsoleteFiles) {
Options options;
// to trigger compaction more easily
Expand Down Expand Up @@ -490,6 +554,86 @@ TEST_F(CompactFilesTest, GetCompactionJobInfo) {
delete db;
}

TEST_F(CompactFilesTest, IsWriteStalled) {
class SlowFilter : public CompactionFilter {
public:
SlowFilter(std::atomic<bool>* would_block) { would_block_ = would_block; }

bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
std::string* /*new_value*/, bool* /*value_changed*/) const
override {
while (would_block_->load(std::memory_order_relaxed)) {
usleep(10000);
}
return false;
}

const char* Name() const override { return "SlowFilter"; }

private:
std::atomic<bool>* would_block_;
};

Options options;
options.create_if_missing = true;

ColumnFamilyOptions cf_options;
cf_options.level0_slowdown_writes_trigger = 12;
cf_options.level0_stop_writes_trigger = 20;
cf_options.write_buffer_size = 1024 * 1024;

std::atomic<bool> compaction_would_block;
compaction_would_block.store(true, std::memory_order_relaxed);
cf_options.compaction_filter = new SlowFilter(&compaction_would_block);

std::vector<ColumnFamilyDescriptor> cfds;
cfds.push_back(ColumnFamilyDescriptor("default", cf_options));

DB* db = nullptr;
std::vector<ColumnFamilyHandle*> handles;
DestroyDB(db_name_, options);

Status s = DB::Open(options, db_name_, cfds, &handles, &db);
assert(s.ok());
assert(db);

int flushed_l0_files = 0;
for (; flushed_l0_files < 100;) {
WriteBatch wb;
for (int j = 0; j < 100; ++j) {
char key[16];
bzero(key, 16);
sprintf(key, "foo%.2d", j);
ASSERT_OK(wb.Put(handles[0], key, "bar"));
}

WriteOptions wopts;
wopts.no_slowdown = true;
s = db->Write(wopts, &wb);
if (s.ok()) {
FlushOptions fopts;
fopts.allow_write_stall = true;
ASSERT_OK(db->Flush(fopts));
++flushed_l0_files;
} else {
ASSERT_EQ(s.code(), Status::Code::kIncomplete);
break;
}
}

// The write loop must be terminated by write stall.
ASSERT_EQ(flushed_l0_files, 12);
uint64_t stalled = false;
db->GetIntProperty(handles[0], "rocksdb.is-write-stalled", &stalled);
ASSERT_TRUE(stalled);

compaction_would_block.store(false, std::memory_order_relaxed);
for (size_t i = 0; i < handles.size(); ++i) {
delete handles[i];
}
delete (db);
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
12 changes: 12 additions & 0 deletions db/internal_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ static const std::string num_running_flushes = "num-running-flushes";
static const std::string actual_delayed_write_rate =
"actual-delayed-write-rate";
static const std::string is_write_stopped = "is-write-stopped";
static const std::string is_write_stalled = "is-write-stalled";
static const std::string estimate_oldest_key_time = "estimate-oldest-key-time";
static const std::string block_cache_capacity = "block-cache-capacity";
static const std::string block_cache_usage = "block-cache-usage";
Expand Down Expand Up @@ -337,6 +338,8 @@ const std::string DB::Properties::kActualDelayedWriteRate =
rocksdb_prefix + actual_delayed_write_rate;
const std::string DB::Properties::kIsWriteStopped =
rocksdb_prefix + is_write_stopped;
const std::string DB::Properties::kIsWriteStalled =
rocksdb_prefix + is_write_stalled;
const std::string DB::Properties::kEstimateOldestKeyTime =
rocksdb_prefix + estimate_oldest_key_time;
const std::string DB::Properties::kBlockCacheCapacity =
Expand Down Expand Up @@ -471,6 +474,9 @@ const std::unordered_map<std::string, DBPropertyInfo>
{DB::Properties::kIsWriteStopped,
{false, nullptr, &InternalStats::HandleIsWriteStopped, nullptr,
nullptr}},
{DB::Properties::kIsWriteStalled,
{false, nullptr, &InternalStats::HandleIsWriteStalled, nullptr,
nullptr}},
{DB::Properties::kEstimateOldestKeyTime,
{false, nullptr, &InternalStats::HandleEstimateOldestKeyTime, nullptr,
nullptr}},
Expand Down Expand Up @@ -874,6 +880,12 @@ bool InternalStats::HandleIsWriteStopped(uint64_t* value, DBImpl* db,
return true;
}

bool InternalStats::HandleIsWriteStalled(uint64_t* value, DBImpl* db,
Version* /*version*/) {
*value = db->write_controller().NeedsDelay() ? 1 : 0;
return true;
}

bool InternalStats::HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* /*db*/,
Version* /*version*/) {
// TODO(yiwu): The property is currently available for fifo compaction
Expand Down
1 change: 1 addition & 0 deletions db/internal_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ class InternalStats {
bool HandleActualDelayedWriteRate(uint64_t* value, DBImpl* db,
Version* version);
bool HandleIsWriteStopped(uint64_t* value, DBImpl* db, Version* version);
bool HandleIsWriteStalled(uint64_t* value, DBImpl* db, Version* version);
bool HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* db,
Version* version);
bool HandleBlockCacheCapacity(uint64_t* value, DBImpl* db, Version* version);
Expand Down
14 changes: 0 additions & 14 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5140,20 +5140,6 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
"[%s] compaction output being applied to a different base version from"
" input version",
c->column_family_data()->GetName().c_str());

if (vstorage->compaction_style_ == kCompactionStyleLevel &&
c->start_level() == 0 && c->num_input_levels() > 2U) {
// We are doing a L0->base_level compaction. The assumption is if
// base level is not L1, levels from L1 to base_level - 1 is empty.
// This is ensured by having one compaction from L0 going on at the
// same time in level-based compaction. So that during the time, no
// compaction/flush can put files to those levels.
for (int l = c->start_level() + 1; l < c->output_level(); l++) {
if (vstorage->NumLevelFiles(l) != 0) {
return false;
}
}
}
}

for (size_t input = 0; input < c->num_input_levels(); ++input) {
Expand Down
35 changes: 27 additions & 8 deletions encryption/encryption.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "file/filename.h"
#include "port/port.h"
#include "test_util/sync_point.h"

namespace rocksdb {
namespace encryption {
Expand Down Expand Up @@ -265,6 +266,17 @@ Status KeyManagedEncryptedEnv::NewSequentialFile(
case EncryptionMethod::kAES192_CTR:
case EncryptionMethod::kAES256_CTR:
s = encrypted_env_->NewSequentialFile(fname, result, options);
// Hack: when upgrading from TiKV <= v5.0.0-rc, the old current
// file is encrypted but it could be replaced with a plaintext
// current file. The operation below guarantee that the current
// file is read correctly.
if (s.ok() && IsCurrentFile(fname)) {
if (!IsValidCurrentFile(std::move(*result))) {
s = target()->NewSequentialFile(fname, result, options);
} else {
s = encrypted_env_->NewSequentialFile(fname, result, options);
}
}
break;
default:
s = Status::InvalidArgument("Unsupported encryption method: " +
Expand Down Expand Up @@ -302,7 +314,8 @@ Status KeyManagedEncryptedEnv::NewWritableFile(
const EnvOptions& options) {
FileEncryptionInfo file_info;
Status s;
bool skipped = ShouldSkipEncryption(fname);
bool skipped = IsCurrentFile(fname);
TEST_SYNC_POINT_CALLBACK("KeyManagedEncryptedEnv::NewWritableFile", &skipped);
if (!skipped) {
s = key_manager_->NewFile(fname, &file_info);
if (!s.ok()) {
Expand Down Expand Up @@ -433,12 +446,12 @@ Status KeyManagedEncryptedEnv::DeleteFile(const std::string& fname) {

Status KeyManagedEncryptedEnv::LinkFile(const std::string& src_fname,
const std::string& dst_fname) {
if (ShouldSkipEncryption(dst_fname)) {
assert(ShouldSkipEncryption(src_fname));
if (IsCurrentFile(dst_fname)) {
assert(IsCurrentFile(src_fname));
Status s = target()->LinkFile(src_fname, dst_fname);
return s;
} else {
assert(!ShouldSkipEncryption(src_fname));
assert(!IsCurrentFile(src_fname));
}
Status s = key_manager_->LinkFile(src_fname, dst_fname);
if (!s.ok()) {
Expand All @@ -455,11 +468,17 @@ Status KeyManagedEncryptedEnv::LinkFile(const std::string& src_fname,

Status KeyManagedEncryptedEnv::RenameFile(const std::string& src_fname,
const std::string& dst_fname) {
if (ShouldSkipEncryption(dst_fname)) {
assert(ShouldSkipEncryption(src_fname));
return target()->RenameFile(src_fname, dst_fname);
if (IsCurrentFile(dst_fname)) {
assert(IsCurrentFile(src_fname));
Status s = target()->RenameFile(src_fname, dst_fname);
// Replacing with plaintext requires deleting the info in the key manager.
// The stale current file info exists when upgrading from TiKV <= v5.0.0-rc.
Status delete_status __attribute__((__unused__)) =
key_manager_->DeleteFile(dst_fname);
assert(delete_status.ok());
return s;
} else {
assert(!ShouldSkipEncryption(src_fname));
assert(!IsCurrentFile(src_fname));
}
// Link(copy)File instead of RenameFile to avoid losing src_fname info when
// failed to rename the src_fname in the file system.
Expand Down
Loading

0 comments on commit cf42e20

Please sign in to comment.