Skip to content

Commit

Permalink
Shared dictionary compression using reference block
Browse files Browse the repository at this point in the history
Summary:
This adds a new metablock containing a shared dictionary that is used
to compress all data blocks in the SST file. The size of the shared dictionary
is configurable in CompressionOptions and defaults to 0. It's currently only
used for zlib/lz4/lz4hc, but the block will be stored in the SST regardless of
the compression type if the user chooses a nonzero dictionary size.

During compaction, computes the dictionary by randomly sampling the first
output file in each subcompaction. It pre-computes the intervals to sample
by assuming the output file will have the maximum allowable length. In case
the file is smaller, some of the pre-computed sampling intervals can be beyond
end-of-file, in which case we skip over those samples and the dictionary will
be a bit smaller. After the dictionary is generated using the first file in a
subcompaction, it is loaded into the compression library before writing each
block in each subsequent file of that subcompaction.

On the read path, gets the dictionary from the metablock, if it exists. Then,
loads that dictionary into the compression library before reading each block.

Test Plan: new unit test

Reviewers: yhchiang, IslamAbdelRahman, cyan, sdong

Reviewed By: sdong

Subscribers: andrewkr, yoshinorim, kradhakrishnan, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D52287
  • Loading branch information
ajkr committed Apr 28, 2016
1 parent ad573b9 commit 843d2e3
Show file tree
Hide file tree
Showing 30 changed files with 555 additions and 116 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Rocksdb Change Log
## Unreleased
### Public API Change
* Allow preset compression dictionary for improved compression of block-based tables. This is supported for zlib, zstd, and lz4. The compression dictionary's size is configurable via CompressionOptions::max_dict_bytes.
* Delete deprecated classes for creating backups (BackupableDB) and restoring from backups (RestoreBackupableDB). Now, BackupEngine should be used for creating backups, and BackupEngineReadOnly should be used for restorations. For more details, see https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F
* Expose estimate of per-level compression ratio via DB property: "rocksdb.compression-ratio-at-levelN".

Expand Down
5 changes: 3 additions & 2 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ TableBuilder* NewTableBuilder(
int_tbl_prop_collector_factories,
uint32_t column_family_id, const std::string& column_family_name,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters) {
const CompressionOptions& compression_opts,
const std::string* compression_dict, const bool skip_filters) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
return ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, internal_comparator,
int_tbl_prop_collector_factories, compression_type,
compression_opts, skip_filters,
compression_opts, compression_dict, skip_filters,
column_family_name),
column_family_id, file);
}
Expand Down
3 changes: 3 additions & 0 deletions db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class InternalIterator;
// @param column_family_name Name of the column family that is also identified
// by column_family_id, or empty string if unknown. It must outlive the
// TableBuilder returned by this function.
// @param compression_dict Data for presetting the compression library's
// dictionary, or nullptr.
TableBuilder* NewTableBuilder(
const ImmutableCFOptions& options,
const InternalKeyComparator& internal_comparator,
Expand All @@ -44,6 +46,7 @@ TableBuilder* NewTableBuilder(
uint32_t column_family_id, const std::string& column_family_name,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts,
const std::string* compression_dict = nullptr,
const bool skip_filters = false);

// Build a Table file from the contents of *iter. The generated file
Expand Down
6 changes: 4 additions & 2 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1572,11 +1572,13 @@ void rocksdb_options_set_compression_per_level(rocksdb_options_t* opt,
}
}

void rocksdb_options_set_compression_options(
rocksdb_options_t* opt, int w_bits, int level, int strategy) {
void rocksdb_options_set_compression_options(rocksdb_options_t* opt, int w_bits,
int level, int strategy,
size_t max_dict_bytes) {
opt->rep.compression_opts.window_bits = w_bits;
opt->rep.compression_opts.level = level;
opt->rep.compression_opts.strategy = strategy;
opt->rep.compression_opts.max_dict_bytes = max_dict_bytes;
}

void rocksdb_options_set_prefix_extractor(
Expand Down
2 changes: 1 addition & 1 deletion db/c_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ int main(int argc, char** argv) {
rocksdb_options_set_block_based_table_factory(options, table_options);

rocksdb_options_set_compression(options, rocksdb_no_compression);
rocksdb_options_set_compression_options(options, -14, -1, 0);
rocksdb_options_set_compression_options(options, -14, -1, 0, 0);
int compression_levels[] = {rocksdb_no_compression, rocksdb_no_compression,
rocksdb_no_compression, rocksdb_no_compression};
rocksdb_options_set_compression_per_level(options, compression_levels, 4);
Expand Down
99 changes: 94 additions & 5 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
#include <inttypes.h>
#include <algorithm>
#include <functional>
#include <vector>
#include <memory>
#include <list>
#include <memory>
#include <random>
#include <set>
#include <thread>
#include <utility>
#include <vector>

#include "db/builder.h"
#include "db/db_iter.h"
Expand Down Expand Up @@ -111,6 +112,7 @@ struct CompactionJob::SubcompactionState {
uint64_t overlapped_bytes = 0;
// A flag determine whether the key has been seen in ShouldStopBefore()
bool seen_key = false;
std::string compression_dict;

SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
uint64_t size = 0)
Expand All @@ -125,7 +127,8 @@ struct CompactionJob::SubcompactionState {
approx_size(size),
grandparent_index(0),
overlapped_bytes(0),
seen_key(false) {
seen_key(false),
compression_dict() {
assert(compaction != nullptr);
}

Expand All @@ -147,6 +150,7 @@ struct CompactionJob::SubcompactionState {
grandparent_index = std::move(o.grandparent_index);
overlapped_bytes = std::move(o.overlapped_bytes);
seen_key = std::move(o.seen_key);
compression_dict = std::move(o.compression_dict);
return *this;
}

Expand Down Expand Up @@ -665,6 +669,30 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}

ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();

// To build compression dictionary, we sample the first output file, assuming
// it'll reach the maximum length, and then use the dictionary for compressing
// subsequent output files. The dictionary may be less than max_dict_bytes if
// the first output file's length is less than the maximum.
const int kSampleLenShift = 6; // 2^6 = 64-byte samples
std::set<size_t> sample_begin_offsets;
if (bottommost_level_ &&
cfd->ioptions()->compression_opts.max_dict_bytes > 0) {
const size_t kMaxSamples =
cfd->ioptions()->compression_opts.max_dict_bytes >> kSampleLenShift;
const size_t kOutFileLen =
cfd->GetCurrentMutableCFOptions()->MaxFileSizeForLevel(
compact_->compaction->output_level());
if (kOutFileLen != port::kMaxSizet) {
const size_t kOutFileNumSamples = kOutFileLen >> kSampleLenShift;
Random64 generator{versions_->NewFileNumber()};
for (size_t i = 0; i < kMaxSamples; ++i) {
sample_begin_offsets.insert(generator.Uniform(kOutFileNumSamples)
<< kSampleLenShift);
}
}
}

auto compaction_filter = cfd->ioptions()->compaction_filter;
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (compaction_filter == nullptr) {
Expand Down Expand Up @@ -700,6 +728,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
const auto& c_iter_stats = c_iter->iter_stats();
auto sample_begin_offset_iter = sample_begin_offsets.cbegin();
// data_begin_offset and compression_dict are only valid while generating
// dictionary from the first output file.
size_t data_begin_offset = 0;
std::string compression_dict;
compression_dict.reserve(cfd->ioptions()->compression_opts.max_dict_bytes);

// TODO(noetzli): check whether we could check !shutting_down_->... only
// only occasionally (see diff D42687)
while (status.ok() && !shutting_down_->load(std::memory_order_acquire) &&
Expand Down Expand Up @@ -743,6 +778,55 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
key, c_iter->ikey().sequence);
sub_compact->num_output_records++;

if (sub_compact->outputs.size() == 1) { // first output file
// Check if this key/value overlaps any sample intervals; if so, appends
// overlapping portions to the dictionary.
for (const auto& data_elmt : {key, value}) {
size_t data_end_offset = data_begin_offset + data_elmt.size();
while (sample_begin_offset_iter != sample_begin_offsets.cend() &&
*sample_begin_offset_iter < data_end_offset) {
size_t sample_end_offset =
*sample_begin_offset_iter + (1 << kSampleLenShift);
// Invariant: Because we advance sample iterator while processing the
// data_elmt containing the sample's last byte, the current sample
// cannot end before the current data_elmt.
assert(data_begin_offset < sample_end_offset);

size_t data_elmt_copy_offset, data_elmt_copy_len;
if (*sample_begin_offset_iter <= data_begin_offset) {
// The sample starts before data_elmt starts, so take bytes starting
// at the beginning of data_elmt.
data_elmt_copy_offset = 0;
} else {
// data_elmt starts before the sample starts, so take bytes starting
// at the below offset into data_elmt.
data_elmt_copy_offset =
*sample_begin_offset_iter - data_begin_offset;
}
if (sample_end_offset <= data_end_offset) {
// The sample ends before data_elmt ends, so take as many bytes as
// needed.
data_elmt_copy_len =
sample_end_offset - (data_begin_offset + data_elmt_copy_offset);
} else {
// data_elmt ends before the sample ends, so take all remaining
// bytes in data_elmt.
data_elmt_copy_len =
data_end_offset - (data_begin_offset + data_elmt_copy_offset);
}
compression_dict.append(&data_elmt.data()[data_elmt_copy_offset],
data_elmt_copy_len);
if (sample_end_offset > data_end_offset) {
// Didn't finish sample. Try to finish it with the next data_elmt.
break;
}
// Next sample may require bytes from same data_elmt.
sample_begin_offset_iter++;
}
data_begin_offset = data_end_offset;
}
}

// Close output file if it is big enough
// TODO(aekmekji): determine if file should be closed earlier than this
// during subcompactions (i.e. if output size, estimated by input size, is
Expand All @@ -751,8 +835,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (sub_compact->builder->FileSize() >=
sub_compact->compaction->max_output_file_size()) {
status = FinishCompactionOutputFile(input->status(), sub_compact);
if (sub_compact->outputs.size() == 1) {
// Use dictionary from first output file for compression of subsequent
// files.
sub_compact->compression_dict = std::move(compression_dict);
}
}

c_iter->Next();
}

Expand Down Expand Up @@ -1020,7 +1108,8 @@ Status CompactionJob::OpenCompactionOutputFile(
*cfd->ioptions(), cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
sub_compact->outfile.get(), sub_compact->compaction->output_compression(),
cfd->ioptions()->compression_opts, skip_filters));
cfd->ioptions()->compression_opts, &sub_compact->compression_dict,
skip_filters));
LogFlush(db_options_.info_log);
return s;
}
Expand Down
1 change: 0 additions & 1 deletion db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// in Release build.
// which is a pity, it is a good test
#include <algorithm>
#include <iostream>
#include <set>
#include <thread>
#include <unordered_set>
Expand Down
92 changes: 92 additions & 0 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,98 @@ TEST_P(PinL0IndexAndFilterBlocksTest,

INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest,
PinL0IndexAndFilterBlocksTest, ::testing::Bool());

TEST_F(DBTest2, PresetCompressionDict) {
const size_t kBlockSizeBytes = 4 << 10;
const size_t kL0FileBytes = 128 << 10;
const size_t kApproxPerBlockOverheadBytes = 50;
const int kNumL0Files = 5;

Options options;
options.arena_block_size = kBlockSizeBytes;
options.compaction_style = kCompactionStyleUniversal;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.level0_file_num_compaction_trigger = kNumL0Files;
options.memtable_factory.reset(
new SpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
options.num_levels = 2;
options.target_file_size_base = kL0FileBytes;
options.target_file_size_multiplier = 2;
options.write_buffer_size = kL0FileBytes;
BlockBasedTableOptions table_options;
table_options.block_size = kBlockSizeBytes;
std::vector<CompressionType> compression_types;
if (Zlib_Supported()) {
compression_types.push_back(kZlibCompression);
}
#if LZ4_VERSION_NUMBER >= 10400 // r124+
compression_types.push_back(kLZ4Compression);
compression_types.push_back(kLZ4HCCompression);
#endif // LZ4_VERSION_NUMBER >= 10400
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
compression_types.push_back(kZSTDNotFinalCompression);
#endif // ZSTD_VERSION_NUMBER >= 500

for (auto compression_type : compression_types) {
options.compression = compression_type;
size_t prev_out_bytes;
for (int i = 0; i < 2; ++i) {
// First iteration: compress without preset dictionary
// Second iteration: compress with preset dictionary
// To make sure the compression dictionary was actually used, we verify
// the compressed size is smaller in the second iteration. Also in the
// second iteration, verify the data we get out is the same data we put
// in.
if (i) {
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
} else {
options.compression_opts.max_dict_bytes = 0;
}

options.statistics = rocksdb::CreateDBStatistics();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
std::string seq_data =
RandomString(&rnd, kBlockSizeBytes - kApproxPerBlockOverheadBytes);

ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
for (int j = 0; j < kNumL0Files; ++j) {
for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) {
ASSERT_OK(Put(1, Key(static_cast<int>(
j * (kL0FileBytes / kBlockSizeBytes) + k)),
seq_data));
}
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
}
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);

size_t out_bytes = 0;
std::vector<std::string> files;
GetSstFiles(dbname_, &files);
for (const auto& file : files) {
size_t curr_bytes;
env_->GetFileSize(dbname_ + "/" + file, &curr_bytes);
out_bytes += curr_bytes;
}

for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes);
j++) {
ASSERT_EQ(seq_data, Get(1, Key(static_cast<int>(j))));
}
if (i) {
ASSERT_GT(prev_out_bytes, out_bytes);
}
prev_out_bytes = out_bytes;
DestroyAndReopen(options);
}
}
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down
2 changes: 1 addition & 1 deletion include/rocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_open_files(
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_total_wal_size(
rocksdb_options_t* opt, uint64_t n);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_compression_options(
rocksdb_options_t*, int, int, int);
rocksdb_options_t*, int, int, int, size_t);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_prefix_extractor(
rocksdb_options_t*, rocksdb_slicetransform_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_num_levels(
Expand Down
20 changes: 17 additions & 3 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,23 @@ struct CompressionOptions {
int window_bits;
int level;
int strategy;
CompressionOptions() : window_bits(-14), level(-1), strategy(0) {}
CompressionOptions(int wbits, int _lev, int _strategy)
: window_bits(wbits), level(_lev), strategy(_strategy) {}
// Maximum size of dictionary used to prime the compression library. Currently
// this dictionary will be constructed by sampling the first output file in a
// subcompaction when the target level is bottommost. This dictionary will be
// loaded into the compression library before compressing/uncompressing each
// data block of subsequent files in the subcompaction. Effectively, this
// improves compression ratios when there are repetitions across data blocks.
// A value of 0 indicates the feature is disabled.
// Default: 0.
uint32_t max_dict_bytes;

CompressionOptions()
: window_bits(-14), level(-1), strategy(0), max_dict_bytes(0) {}
CompressionOptions(int wbits, int _lev, int _strategy, size_t _max_dict_bytes)
: window_bits(wbits),
level(_lev),
strategy(_strategy),
max_dict_bytes(_max_dict_bytes) {}
};

enum UpdateStatus { // Return status For inplace update callback
Expand Down
1 change: 1 addition & 0 deletions include/rocksdb/table_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct TablePropertiesNames {
};

extern const std::string kPropertiesBlock;
extern const std::string kCompressionDictBlock;

enum EntryType {
kEntryPut,
Expand Down
Loading

0 comments on commit 843d2e3

Please sign in to comment.