Skip to content

Commit

Permalink
Raftstore v2 (#389)
Browse files Browse the repository at this point in the history
 

Signed-off-by: Spade A <[email protected]>
Signed-off-by: Yang Zhang <[email protected]>
Signed-off-by: SpadeA-Tang <[email protected]>

Co-authored-by: Spade  A <[email protected]>
  • Loading branch information
v01dstar and SpadeA-Tang authored Oct 8, 2024
1 parent 528b66b commit 405de0e
Show file tree
Hide file tree
Showing 61 changed files with 2,808 additions and 525 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ set(SOURCES
db/db_impl/db_impl_experimental.cc
db/db_impl/db_impl_readonly.cc
db/db_impl/db_impl_secondary.cc
db/db_impl/db_impl_merge.cc
db/db_info_dumper.cc
db/db_iter.cc
db/dbformat.cc
Expand Down Expand Up @@ -1327,6 +1328,7 @@ if(WITH_TESTS)
db/db_memtable_test.cc
db/db_merge_operator_test.cc
db/db_merge_operand_test.cc
db/db_merge_test.cc
db/db_options_test.cc
db/db_properties_test.cc
db/db_range_del_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,9 @@ db_merge_operator_test: $(OBJ_DIR)/db/db_merge_operator_test.o $(TEST_LIBRARY) $
db_merge_operand_test: $(OBJ_DIR)/db/db_merge_operand_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_merge_test: $(OBJ_DIR)/db/db_merge_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_options_test: $(OBJ_DIR)/db/db_options_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
7 changes: 7 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/db_impl/db_impl_debug.cc",
"db/db_impl/db_impl_experimental.cc",
"db/db_impl/db_impl_files.cc",
"db/db_impl/db_impl_merge.cc",
"db/db_impl/db_impl_open.cc",
"db/db_impl/db_impl_readonly.cc",
"db/db_impl/db_impl_secondary.cc",
Expand Down Expand Up @@ -4862,6 +4863,12 @@ cpp_unittest_wrapper(name="db_merge_operator_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_merge_test",
srcs=["db/db_merge_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_options_test",
srcs=["db/db_options_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
18 changes: 3 additions & 15 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4944,11 +4944,6 @@ bool rocksdb_write_buffer_manager_enabled(rocksdb_write_buffer_manager_t* wbm) {
return wbm->rep->enabled();
}

bool rocksdb_write_buffer_manager_cost_to_cache(
rocksdb_write_buffer_manager_t* wbm) {
return wbm->rep->cost_to_cache();
}

size_t rocksdb_write_buffer_manager_memory_usage(
rocksdb_write_buffer_manager_t* wbm) {
return wbm->rep->memory_usage();
Expand All @@ -4963,17 +4958,10 @@ size_t rocksdb_write_buffer_manager_dummy_entries_in_cache_usage(
rocksdb_write_buffer_manager_t* wbm) {
return wbm->rep->dummy_entries_in_cache_usage();
}
size_t rocksdb_write_buffer_manager_buffer_size(

size_t rocksdb_write_buffer_manager_flush_size(
rocksdb_write_buffer_manager_t* wbm) {
return wbm->rep->buffer_size();
}
void rocksdb_write_buffer_manager_set_buffer_size(
rocksdb_write_buffer_manager_t* wbm, size_t new_size) {
wbm->rep->SetBufferSize(new_size);
}
ROCKSDB_LIBRARY_API void rocksdb_write_buffer_manager_set_allow_stall(
rocksdb_write_buffer_manager_t* wbm, bool new_allow_stall) {
wbm->rep->SetAllowStall(new_allow_stall);
return wbm->rep->flush_size();
}

rocksdb_dbpath_t* rocksdb_dbpath_create(const char* path,
Expand Down
8 changes: 1 addition & 7 deletions db/c_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -3792,14 +3792,8 @@ int main(int argc, char** argv) {

CheckCondition(true ==
rocksdb_write_buffer_manager_enabled(write_buffer_manager));
CheckCondition(true == rocksdb_write_buffer_manager_cost_to_cache(
write_buffer_manager));
CheckCondition(
200 == rocksdb_write_buffer_manager_buffer_size(write_buffer_manager));

rocksdb_write_buffer_manager_set_buffer_size(write_buffer_manager, 300);
CheckCondition(
300 == rocksdb_write_buffer_manager_buffer_size(write_buffer_manager));
200 == rocksdb_write_buffer_manager_flush_size(write_buffer_manager));

rocksdb_write_buffer_manager_destroy(write_buffer_manager);
rocksdb_cache_destroy(lru);
Expand Down
107 changes: 106 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
for (auto& listener : cfd_->ioptions()->listeners) {
listener->OnColumnFamilyHandleDeletionStarted(this);
}
if (cfd_->write_buffer_mgr()) {
cfd_->write_buffer_mgr()->UnregisterColumnFamily(this);
}
// Job id == 0 means that this is not our background process, but rather
// user thread
// Need to hold some shared pointers owned by the initial_cf_options
Expand Down Expand Up @@ -1246,6 +1249,105 @@ Status ColumnFamilyData::RangesOverlapWithMemtables(
return status;
}

Status ColumnFamilyData::GetMemtablesUserKeyRange(PinnableSlice* smallest,
PinnableSlice* largest,
bool* found) {
assert(smallest && largest && found);
Status s;
auto* ucmp = user_comparator();
Arena arena;
ReadOptions read_opts;
read_opts.total_order_seek = true;
MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
merge_iter_builder.AddIterator(mem_->NewIterator(read_opts, &arena));
imm_.current()->AddIterators(read_opts, &merge_iter_builder, false);
ScopedArenaIterator mem_iter(merge_iter_builder.Finish());
mem_iter->SeekToFirst();
if (mem_iter->Valid()) {
auto ukey = mem_iter->user_key();
if (!(*found) || ucmp->Compare(ukey, *smallest) < 0) {
smallest->PinSelf(ukey);
}
mem_iter->SeekToLast();
assert(mem_iter->Valid());
ukey = mem_iter->user_key();
if (!(*found) || ucmp->Compare(*largest, ukey) < 0) {
largest->PinSelf(ukey);
}
*found = true;
}

if (s.ok()) {
autovector<MemTable*> memtables{mem_};
imm_.ExportMemtables(&memtables);
for (auto* mem : memtables) {
auto* iter =
mem->NewRangeTombstoneIterator(read_opts, kMaxSequenceNumber, false);
if (iter != nullptr) {
iter->SeekToFirst();
if (iter->Valid()) {
// It's already a user key.
auto ukey = iter->start_key();
if (!(*found) || ucmp->Compare(ukey, *smallest) < 0) {
smallest->PinSelf(ukey);
}
iter->SeekToLast();
assert(iter->Valid());
// Get the end_key of all tombstones.
ukey = iter->end_key();
if (!(*found) || ucmp->Compare(*largest, ukey) < 0) {
largest->PinSelf(ukey);
}
*found = true;
}
}
}
}

return s;
}

Status ColumnFamilyData::GetUserKeyRange(PinnableSlice* smallest,
PinnableSlice* largest, bool* found) {
assert(smallest && largest && found);
if (ioptions_.compaction_style != CompactionStyle::kCompactionStyleLevel) {
return Status::NotSupported("Unexpected compaction style");
}
Status s = GetMemtablesUserKeyRange(smallest, largest, found);
if (!s.ok()) {
return s;
}

VersionStorageInfo& vsi = *current()->storage_info();
auto* ucmp = user_comparator();
for (const auto& f : vsi.LevelFiles(0)) {
Slice start = f->smallest.user_key();
Slice end = f->largest.user_key();
if (!(*found) || ucmp->Compare(start, *smallest) < 0) {
smallest->PinSelf(start);
}
if (!(*found) || ucmp->Compare(*largest, end) < 0) {
largest->PinSelf(end);
}
*found = true;
}
for (int level = 1; level < vsi.num_levels(); ++level) {
const auto& level_files = vsi.LevelFiles(level);
if (level_files.size() > 0) {
Slice start = level_files.front()->smallest.user_key();
Slice end = level_files.back()->largest.user_key();
if (!(*found) || ucmp->Compare(start, *smallest) < 0) {
smallest->PinSelf(start);
}
if (!(*found) || ucmp->Compare(*largest, end) < 0) {
largest->PinSelf(end);
}
*found = true;
}
}
return s;
}

const int ColumnFamilyData::kCompactAllLevels = -1;
const int ColumnFamilyData::kCompactToBaseLevel = -2;

Expand Down Expand Up @@ -1733,8 +1835,11 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,
const ColumnFamilyOptions& options) {
assert(column_families_.find(name) == column_families_.end());
auto* write_buffer_manager = options.cf_write_buffer_manager != nullptr
? options.cf_write_buffer_manager.get()
: write_buffer_manager_;
ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
id, name, dummy_versions, table_cache_, write_buffer_manager, options,
*db_options_, &file_options_, this, block_cache_tracer_, io_tracer_,
db_id_, db_session_id_);
column_families_.insert({name, id});
Expand Down
8 changes: 8 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,14 @@ class ColumnFamilyData {
SuperVersion* super_version,
bool allow_data_in_errors, bool* overlap);

// Get user key range of memtables. Tombstones are counted.
Status GetMemtablesUserKeyRange(PinnableSlice* smallest,
PinnableSlice* largest, bool* found);

// Get user key range of all data. Tombstones are counted.
Status GetUserKeyRange(PinnableSlice* smallest, PinnableSlice* largest,
bool* found);

// A flag to tell a manual compaction is to compact all levels together
// instead of a specific level.
static const int kCompactAllLevels;
Expand Down
15 changes: 9 additions & 6 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,17 +229,20 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
}

if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
ikey_.type != kTypeWideColumnEntity) {
ikey_.type != kTypeWideColumnEntity && ikey_.type != kTypeDeletion) {
return true;
}

CompactionFilter::Decision decision =
CompactionFilter::Decision::kUndetermined;
CompactionFilter::ValueType value_type =
ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
: ikey_.type == kTypeBlobIndex
? CompactionFilter::ValueType::kBlobIndex
: CompactionFilter::ValueType::kWideColumnEntity;
CompactionFilter::ValueType value_type = CompactionFilter::ValueType::kValue;
if (ikey_.type == kTypeBlobIndex) {
value_type = CompactionFilter::ValueType::kBlobIndex;
} else if (ikey_.type == kTypeWideColumnEntity) {
value_type = CompactionFilter::ValueType::kWideColumnEntity;
} else if (ikey_.type == kTypeDeletion) {
value_type = CompactionFilter::ValueType::kDeletion;
}

// Hack: pass internal key to BlobIndexCompactionFilter since it needs
// to get sequence number.
Expand Down
34 changes: 34 additions & 0 deletions db/compaction/compaction_iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,40 @@ TEST_P(CompactionIteratorTest, SingleMergeOperand) {
ASSERT_EQ("cv1cv2", c_iter_->value().ToString());
}

TEST_P(CompactionIteratorTest, RemoveAllSingleDeletes) {
struct Filter : public CompactionFilter {
Decision UnsafeFilter(int /*level*/, const Slice& key, ValueType t,
const Slice& /*existing_value*/,
std::string* /*new_value*/,
std::string* skip_until) const override {
if (t == ValueType::kDeletion) {
*skip_until = key.ToString();
skip_until->back() += 1;
filtered += 1;
return Decision::kRemoveAndSkipUntil;
}
return Decision::kKeep;
}

const char* Name() const override {
return "CompactionIteratorTest.SingleDelete::Filter";
}
mutable size_t filtered = 0;
};

Filter filter;
InitIterators(
{test::KeyStr("a", 70, kTypeDeletion), test::KeyStr("a", 50, kTypeValue),
test::KeyStr("c", 70, kTypeDeletion),
test::KeyStr("c", 50, kTypeDeletion)},
{"", "a", "", ""}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
nullptr, &filter);

c_iter_->SeekToFirst();
ASSERT_TRUE(!c_iter_->Valid());
ASSERT_EQ(filter.filtered, 2);
}

// In bottommost level, values earlier than earliest snapshot can be output
// with sequence = 0.
TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
Expand Down
6 changes: 4 additions & 2 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
// (found in the LICENSE.Apache file in the root directory).
//


#include <algorithm>
#include <cstdint>
#include <memory>
Expand All @@ -29,7 +28,10 @@
namespace ROCKSDB_NAMESPACE {

Status DBImpl::FlushForGetLiveFiles() {
return DBImpl::FlushAllColumnFamilies(FlushOptions(),
FlushOptions flush_options;
flush_options.allow_write_stall = true;
flush_options.check_if_compaction_disabled = true;
return DBImpl::FlushAllColumnFamilies(flush_options,
FlushReason::kGetLiveFiles);
}

Expand Down
32 changes: 31 additions & 1 deletion db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -740,9 +740,11 @@ class TestFlushListener : public EventListener {
DBFlushTest* test_;
};

// Disabled, because of
// https://github.com/tikv/rocksdb/pull/389/commits/cc433939ed937a82d0a0ccad1280d5907b048654
TEST_F(
DBFlushTest,
FixUnrecoverableWriteDuringAtomicFlushWaitUntilFlushWouldNotStallWrites) {
DISABLED_FixUnrecoverableWriteDuringAtomicFlushWaitUntilFlushWouldNotStallWrites) {
Options options = CurrentOptions();
options.atomic_flush = true;

Expand Down Expand Up @@ -2012,6 +2014,13 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
}
}

void OnFlushBegin(DB* /*db*/, const FlushJobInfo& info) override {
ASSERT_LE(info.smallest_seqno, info.largest_seqno);
if (info.largest_seqno != seq1) {
ASSERT_EQ(info.largest_seqno, seq2);
}
}

void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {
DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
InstrumentedMutex* mutex = db_impl->mutex();
Expand Down Expand Up @@ -3189,6 +3198,27 @@ TEST_P(DBAtomicFlushTest, NoWaitWhenWritesStopped) {
SyncPoint::GetInstance()->DisableProcessing();
}

TEST_P(DBAtomicFlushTest, DisableManualCompaction) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = GetParam();
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(2, handles_.size());
ASSERT_OK(dbfull()->PauseBackgroundWork());
ASSERT_OK(Put(0, "key00", "value00"));
ASSERT_OK(Put(1, "key10", "value10"));
dbfull()->DisableManualCompaction();
FlushOptions flush_opts;
flush_opts.wait = true;
flush_opts.check_if_compaction_disabled = true;
ASSERT_TRUE(dbfull()->Flush(flush_opts, handles_).IsIncomplete());
ASSERT_OK(Put(0, "key01", "value01"));
ASSERT_OK(db_->ContinueBackgroundWork());
dbfull()->EnableManualCompaction();
ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
Close();
}

INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool());

Expand Down
5 changes: 3 additions & 2 deletions db/db_impl/compacted_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ class CompactedDBImpl : public DBImpl {
const Slice& /*key*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}
virtual Status Write(const WriteOptions& /*options*/,
WriteBatch* /*updates*/) override {
using DBImpl::Write;
virtual Status Write(const WriteOptions& /*options*/, WriteBatch* /*updates*/,
PostWriteCallback* /*callback*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}
using DBImpl::CompactRange;
Expand Down
Loading

0 comments on commit 405de0e

Please sign in to comment.