Skip to content

Commit

Permalink
Add size and range in compaction partitioner context
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Zhang <[email protected]>
  • Loading branch information
v01dstar committed Sep 29, 2024
1 parent 33c8c2c commit 6a3eeab
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 1 deletion.
50 changes: 50 additions & 0 deletions db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "db/compaction/compaction.h"

#include <algorithm>
#include <cinttypes>
#include <vector>

Expand Down Expand Up @@ -842,6 +843,52 @@ std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
context);
}

std::pair<std::vector<Slice>, std::vector<uint64_t>>
Compaction::CreateSegmentsForLevel(int level) const {
// So... the below files should be adjacently sorted.
// For now, this is only for creating the next-of-output level info, so it
// makes sense for not supporting L0.
assert(level != 0);

// Some of test cases may not initialize the version...
if (input_version_ == nullptr) {
return std::make_pair(std::vector<Slice>(), std::vector<uint64_t>());
}

const auto vsi = input_version_->storage_info();
if (level >= vsi->num_non_empty_levels()) {
// The level shall be empty.
return std::make_pair(std::vector<Slice>(), std::vector<uint64_t>());
}
const auto& files = vsi->LevelFilesBrief(level);
// The file metadata hold internal keys, however the compaction is bounded by
// user keys.
const auto user_cmp = immutable_options()->user_comparator;
const auto end = files.files + files.num_files;
const auto start = std::lower_bound(
files.files, end, smallest_user_key_,
[user_cmp](FdWithKeyRange& fd, const Slice& slice) {
return user_cmp->Compare(ExtractUserKey(fd.largest_key), slice) < 0;
});

if (start == end) {
// There is no overlapping of next level.
return std::make_pair(std::vector<Slice>(), std::vector<uint64_t>());
}
std::vector<Slice> ranges;
std::vector<uint64_t> sizes;
ranges.push_back(ExtractUserKey(start->smallest_key));
for (const FdWithKeyRange* iter = start; iter < end; iter++) {
if (user_cmp->Compare(ExtractUserKey(iter->smallest_key),
largest_user_key_) > 0) {
break;
}
ranges.push_back(ExtractUserKey(iter->largest_key));
sizes.push_back(iter->fd.GetFileSize());
}
return std::make_pair(ranges, sizes);
}

std::unique_ptr<SstPartitioner> Compaction::CreateSstPartitioner() const {
if (!immutable_options_.sst_partitioner_factory) {
return nullptr;
Expand All @@ -853,6 +900,9 @@ std::unique_ptr<SstPartitioner> Compaction::CreateSstPartitioner() const {
context.output_level = output_level_;
context.smallest_user_key = smallest_user_key_;
context.largest_user_key = largest_user_key_;
std::tie(context.output_next_level_boundaries,
context.output_next_level_size) =
CreateSegmentsForLevel(output_level_ + 1);
return immutable_options_.sst_partitioner_factory->CreatePartitioner(context);
}

Expand Down
3 changes: 3 additions & 0 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,9 @@ class Compaction {
static bool IsFullCompaction(VersionStorageInfo* vstorage,
const std::vector<CompactionInputFiles>& inputs);

std::pair<std::vector<Slice>, std::vector<uint64_t>> CreateSegmentsForLevel(
int in_level) const;

VersionStorageInfo* input_vstorage_;

const int start_level_; // the lowest level to be compacted
Expand Down
105 changes: 104 additions & 1 deletion db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ class RoundRobinSubcompactionsAgainstPressureToken
}
bool grab_pressure_token_;
};

class RoundRobinSubcompactionsAgainstResources
: public DBCompactionTest,
public ::testing::WithParamInterface<std::tuple<int, int>> {
Expand All @@ -239,6 +238,35 @@ class RoundRobinSubcompactionsAgainstResources
};

namespace {
class SplitAllPartitioner : public SstPartitioner {
public:
const char* Name() const override { return "SplitAllPartitioner"; }

PartitionerResult ShouldPartition(
const PartitionerRequest& /*request*/) override {
return PartitionerResult::kRequired;
}

bool CanDoTrivialMove(const Slice&, const Slice&) override { return true; }
};

class SplitAllPatitionerFactory : public SstPartitionerFactory {
public:
std::function<void(const SstPartitioner::Context&)> on_create_;

SplitAllPatitionerFactory(
std::function<void(const SstPartitioner::Context&)> on_create)
: on_create_(on_create) {}

std::unique_ptr<SstPartitioner> CreatePartitioner(
const SstPartitioner::Context& context) const override {
on_create_(context);
return std::unique_ptr<SstPartitioner>(new SplitAllPartitioner());
}

const char* Name() const override { return "SplitAllPartitionerFactory"; }
};

class FlushedFileCollector : public EventListener {
public:
FlushedFileCollector() {}
Expand Down Expand Up @@ -1164,6 +1192,81 @@ TEST_F(DBCompactionTest, CompactionSstPartitionerNonTrivial) {
ASSERT_EQ("B", Get("bbbb1"));
}

TEST_F(DBCompactionTest, CompactionSstPartitionerNextLevel) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleLevel;
options.level0_file_num_compaction_trigger = 1;
options.max_bytes_for_level_base = 10;
options.max_bytes_for_level_multiplier = 2;
options.sst_partitioner_factory = std::unique_ptr<SstPartitionerFactory>(
new SplitAllPatitionerFactory([this](const SstPartitioner::Context& cx) {
if (!cx.output_next_level_boundaries.empty()) {
std::vector<LiveFileMetaData> files;
// We are holding the mutex in this context...
// Perhaps we'd better make a `TEST_GetVersion` for fetching.
dbfull()->TEST_UnlockMutex();
dbfull()->GetLiveFilesMetaData(&files);
dbfull()->TEST_LockMutex();
std::vector<LiveFileMetaData> overlapped_files;
std::copy_if(
files.begin(), files.end(), std::back_inserter(overlapped_files),
[&](const LiveFileMetaData& ld) {
return Slice(ld.smallestkey).compare(cx.largest_user_key) < 0 &&
Slice(ld.largestkey).compare(cx.smallest_user_key) > 0 &&
ld.level == cx.output_level + 1;
});
std::sort(overlapped_files.begin(), overlapped_files.end(),
[](LiveFileMetaData& x, LiveFileMetaData& y) {
return x.largestkey < y.largestkey;
});
auto next_level_overlap_files = overlapped_files.size();
ASSERT_EQ(next_level_overlap_files + 1,
cx.output_next_level_boundaries.size());
ASSERT_EQ(next_level_overlap_files, cx.output_next_level_size.size());
ASSERT_EQ(next_level_overlap_files, cx.OutputNextLevelSegmentCount());
for (size_t i = 0; i < overlapped_files.size(); i++) {
Slice next_level_lower, next_level_upper;
uint64_t next_level_size;
cx.OutputNextLevelSegment(i, &next_level_lower, &next_level_upper,
&next_level_size);

if (i == 0) {
ASSERT_EQ(overlapped_files[i].smallestkey, next_level_lower);
}
ASSERT_EQ(overlapped_files[i].largestkey, next_level_upper);
ASSERT_EQ(overlapped_files[i].size, next_level_size);
}
}
}));
DestroyAndReopen(options);

ASSERT_OK(Put("A", "there are more than 10 bytes."));
ASSERT_OK(Put("B", "yet another key."));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(Put("A1", "the new challenger..."));
ASSERT_OK(Put("B1", "and his buddy."));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(Put("A1P", "the new challenger... Changed."));
ASSERT_OK(Put("B1P", "and his buddy. Changed too."));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(Put(InternalKey("A", 0, ValueType::kTypeDeletion).Encode(),
"And a tricker: he pretends to be A, but not A."));
ASSERT_OK(Put(InternalKey("B", 0, ValueType::kTypeDeletion).Encode(),
"Yeah, another tricker."));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());

std::vector<LiveFileMetaData> files;
dbfull()->GetLiveFilesMetaData(&files);
ASSERT_EQ(8, files.size());
}

TEST_F(DBCompactionTest, ZeroSeqIdCompaction) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleLevel;
Expand Down
41 changes: 41 additions & 0 deletions include/rocksdb/sst_partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ class SstPartitioner {
virtual bool CanDoTrivialMove(const Slice& smallest_user_key,
const Slice& largest_user_key) = 0;

struct Segment {
Segment(uint64_t size_diff, Slice until)
: size_in_this_segment(size_diff), segment_until_user_key(until) {}
uint64_t size_in_this_segment;
Slice segment_until_user_key;
};

// Context information of a compaction run
struct Context {
// Does this compaction run include all data files
Expand All @@ -75,6 +82,40 @@ class SstPartitioner {
Slice smallest_user_key;
// Largest key for compaction
Slice largest_user_key;

// The segments consist with the next level of target level.
// This will be useful while deciding whether to partition
// files to finer parts for avoiding possible huge compactions.

// The boundaries of the next level of output level.
// For example, when the next level contains files with range ("001",
// "002"), ("003", "004"), The boundaries will be ["001", "002", "004"];
std::vector<Slice> output_next_level_boundaries;
// The size of each segment, for example, when
// `output_next_level_boundaries` is ["001", "002", "004"], this might be
// [42, 96], which means range ["001", "002") contains 42 bytes of data,
// ["002", "004") contains 96 bytes of data.
std::vector<uint64_t> output_next_level_size;

// Helper function to fetch the count of next level segments.
size_t OutputNextLevelSegmentCount() const {
return output_next_level_size.size();
}

// Helper function to fetch the n-th segment of the next level of the output
// level. `index` shall less than `OutputNextLevelSegmentCount`.
void OutputNextLevelSegment(size_t index, Slice* smallest_key,
Slice* largest_key, uint64_t* size) const {
if (smallest_key != nullptr) {
*smallest_key = output_next_level_boundaries[index];
}
if (largest_key != nullptr) {
*largest_key = output_next_level_boundaries[index + 1];
}
if (size != nullptr) {
*size = output_next_level_size[index];
}
}
};
};

Expand Down

0 comments on commit 6a3eeab

Please sign in to comment.