Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine blob cache related code #309

Merged
merged 4 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/titan/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ struct TitanCFOptions : public ColumnFamilyOptions {
return *this;
}

MemoryAllocator* memory_allocator() const {
return blob_cache ? blob_cache->memory_allocator() : nullptr;
}

void Dump(Logger* logger) const;
void UpdateMutableOptions(const MutableTitanCFOptions& new_options);
};
Expand Down
4 changes: 2 additions & 2 deletions src/blob_file_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ BlobFileCache::BlobFileCache(const TitanDBOptions& db_options,

Status BlobFileCache::Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const BlobHandle& handle,
BlobRecord* record, PinnableSlice* buffer) {
BlobRecord* record, PinnableSlice* value) {
Cache::Handle* cache_handle = nullptr;
Status s = FindFile(file_number, file_size, &cache_handle);
if (!s.ok()) return s;

auto reader = reinterpret_cast<BlobFileReader*>(cache_->Value(cache_handle));
s = reader->Get(options, handle, record, buffer);
s = reader->Get(options, handle, record, value);
cache_->Release(cache_handle);
return s;
}
Expand Down
2 changes: 1 addition & 1 deletion src/blob_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class BlobFileCache {
// the buffer must be valid when the record is used.
Status Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const BlobHandle& handle, BlobRecord* record,
PinnableSlice* buffer);
PinnableSlice* value);

// Creates a prefetcher for the specified file number.
Status NewPrefetcher(uint64_t file_number, uint64_t file_size,
Expand Down
6 changes: 4 additions & 2 deletions src/blob_file_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ bool BlobFileIterator::Init() {

if (blob_file_header.flags & BlobFileHeader::kHasUncompressionDictionary) {
status_ = InitUncompressionDict(blob_file_footer, file_.get(),
&uncompression_dict_);
&uncompression_dict_,
titan_cf_options_.memory_allocator());
if (!status_.ok()) {
return false;
}
Expand Down Expand Up @@ -146,7 +147,8 @@ void BlobFileIterator::GetBlobRecord() {
nullptr /*aligned_buf*/, true /*for_compaction*/);
if (status_.ok()) {
status_ =
decoder_.DecodeRecord(&record_slice, &cur_blob_record_, &uncompressed_);
decoder_.DecodeRecord(&record_slice, &cur_blob_record_, &uncompressed_,
titan_cf_options_.memory_allocator());
}
if (!status_.ok()) return;

Expand Down
37 changes: 24 additions & 13 deletions src/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ Status BlobFileReader::Open(const TitanCFOptions& options,
reader->footer_ = footer;
if (header.flags & BlobFileHeader::kHasUncompressionDictionary) {
s = InitUncompressionDict(footer, reader->file_.get(),
&reader->uncompression_dict_);
&reader->uncompression_dict_,
options.memory_allocator());
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -151,7 +152,7 @@ BlobFileReader::BlobFileReader(const TitanCFOptions& options,

Status BlobFileReader::Get(const ReadOptions& /*options*/,
const BlobHandle& handle, BlobRecord* record,
PinnableSlice* buffer) {
PinnableSlice* value) {
TEST_SYNC_POINT("BlobFileReader::Get");

std::string cache_key;
Expand All @@ -162,8 +163,11 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/,
if (cache_handle) {
RecordTick(statistics(stats_), TITAN_BLOB_CACHE_HIT);
auto blob = reinterpret_cast<OwnedSlice*>(cache_->Value(cache_handle));
buffer->PinSlice(*blob, UnrefCacheHandle, cache_.get(), cache_handle);
return DecodeInto(*blob, record);
Status s = DecodeInto(*blob, record);
if (!s.ok()) return s;
value->PinSlice(record->value, UnrefCacheHandle, cache_.get(),
cache_handle);
return s;
}
}
RecordTick(statistics(stats_), TITAN_BLOB_CACHE_MISS);
Expand All @@ -178,11 +182,13 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/,
auto cache_value = new OwnedSlice(std::move(blob));
auto cache_size = cache_value->size() + sizeof(*cache_value);
cache_->Insert(cache_key, cache_value, cache_size,
&DeleteCacheValue<OwnedSlice>, &cache_handle);
buffer->PinSlice(*cache_value, UnrefCacheHandle, cache_.get(),
cache_handle);
&DeleteCacheValue<OwnedSlice>, &cache_handle,
Cache::Priority::BOTTOM);
value->PinSlice(record->value, UnrefCacheHandle, cache_.get(),
cache_handle);
} else {
buffer->PinSlice(blob, OwnedSlice::CleanupFunc, blob.release(), nullptr);
value->PinSlice(record->value, OwnedSlice::CleanupFunc, blob.release(),
Copy link
Contributor

@tonyxuqqi tonyxuqqi Jan 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the new logic will return the value only and the old code returns the whole kv value pair?
If that, should the caller of Get() be aware of this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not feel right. BlobRecord::DecodeFrom() assumes the buffer contains both key and value. While, I guess this PR wants to optimize the cache utilization by storing value only. In addition to changing the PR description and func comment, we also need to change the decoding logic of BlobRecord accordingly.

Copy link
Member Author

@Connor1996 Connor1996 Feb 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the new logic will return the value only and the old code returns the whole kv value pair?

Old code reads the kv value pair by record field. New logic still can read the kv value pair by record field. The record is just a slice refering to the pinnable slice underlying buffer. The value pinnable slice holds the underlying buffer, while as a slice I let it only read out value from it instead of the whole buffer.

Should the caller of Get() be aware of this change?

Previously, caller wouldn't use the buffer slice directly as they don't know the content. Key and value are read by record field. So no need to change and I just change the naming buffer to value

We also need to change the decoding logic of BlobRecord accordingly.

No need. When calling BlobRecord::DecodeFrom(), the buffer doesn't contain both key and value.

nullptr);
}

return Status::OK();
Expand All @@ -191,7 +197,8 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/,
Status BlobFileReader::ReadRecord(const BlobHandle& handle, BlobRecord* record,
OwnedSlice* buffer) {
Slice blob;
CacheAllocationPtr ubuf(new char[handle.size]);
CacheAllocationPtr ubuf =
AllocateBlock(handle.size, options_.memory_allocator());
Status s = file_->Read(IOOptions(), handle.offset, handle.size, &blob,
ubuf.get(), nullptr /*aligned_buf*/);
if (!s.ok()) {
Expand All @@ -211,7 +218,7 @@ Status BlobFileReader::ReadRecord(const BlobHandle& handle, BlobRecord* record,
return s;
}
buffer->reset(std::move(ubuf), blob);
s = decoder.DecodeRecord(&blob, record, buffer);
s = decoder.DecodeRecord(&blob, record, buffer, options_.memory_allocator());
return s;
}

Expand All @@ -237,7 +244,8 @@ Status BlobFilePrefetcher::Get(const ReadOptions& options,

Status InitUncompressionDict(
const BlobFileFooter& footer, RandomAccessFileReader* file,
std::unique_ptr<UncompressionDict>* uncompression_dict) {
std::unique_ptr<UncompressionDict>* uncompression_dict,
MemoryAllocator* memory_allocator) {
// TODO: Cache the compression dictionary in either block cache or blob cache.
#if ZSTD_VERSION_NUMBER < 10103
return Status::NotSupported("the version of libztsd is too low");
Expand All @@ -248,7 +256,9 @@ Status InitUncompressionDict(
assert(footer.meta_index_handle.size() > 0);
BlockHandle meta_index_handle = footer.meta_index_handle;
Slice blob;
CacheAllocationPtr ubuf(new char[meta_index_handle.size()]);

CacheAllocationPtr ubuf =
AllocateBlock(meta_index_handle.size(), memory_allocator);
Status s = file->Read(IOOptions(), meta_index_handle.offset(),
meta_index_handle.size(), &blob, ubuf.get(),
nullptr /*aligned_buf*/);
Expand Down Expand Up @@ -276,7 +286,8 @@ Status InitUncompressionDict(
}

Slice dict_slice;
CacheAllocationPtr dict_buf(new char[dict_block.size()]);
CacheAllocationPtr dict_buf =
AllocateBlock(dict_block.size(), memory_allocator);
s = file->Read(IOOptions(), dict_block.offset(), dict_block.size(),
&dict_slice, dict_buf.get(), nullptr /*aligned_buf*/);
if (!s.ok()) {
Expand Down
7 changes: 4 additions & 3 deletions src/blob_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class BlobFileReader {
TitanStats* stats);

// Gets the blob record pointed by the handle in this file. The data
// of the record is stored in the provided buffer, so the buffer
// of the record is stored in the value slice underlying, so the value slice
// must be valid when the record is used.
Status Get(const ReadOptions& options, const BlobHandle& handle,
BlobRecord* record, PinnableSlice* buffer);
BlobRecord* record, PinnableSlice* value);

private:
friend class BlobFilePrefetcher;
Expand Down Expand Up @@ -78,7 +78,8 @@ class BlobFilePrefetcher : public Cleanable {
// uncompression dictionary
Status InitUncompressionDict(
const BlobFileFooter& footer, RandomAccessFileReader* file,
std::unique_ptr<UncompressionDict>* uncompression_dict);
std::unique_ptr<UncompressionDict>* uncompression_dict,
MemoryAllocator* allocator);

} // namespace titandb
} // namespace rocksdb
5 changes: 3 additions & 2 deletions src/blob_format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ Status BlobDecoder::DecodeHeader(Slice* src) {
}

Status BlobDecoder::DecodeRecord(Slice* src, BlobRecord* record,
OwnedSlice* buffer) {
OwnedSlice* buffer,
MemoryAllocator* allocator) {
TEST_SYNC_POINT_CALLBACK("BlobDecoder::DecodeRecord", &crc_);

Slice input(src->data(), record_size_);
Expand All @@ -86,7 +87,7 @@ Status BlobDecoder::DecodeRecord(Slice* src, BlobRecord* record,
}
UncompressionContext ctx(compression_);
UncompressionInfo info(ctx, *uncompression_dict_, compression_);
Status s = Uncompress(info, input, buffer);
Status s = Uncompress(info, input, buffer, allocator);
if (!s.ok()) {
return s;
}
Expand Down
3 changes: 2 additions & 1 deletion src/blob_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class BlobDecoder {
: BlobDecoder(&UncompressionDict::GetEmptyDict(), kNoCompression) {}

Status DecodeHeader(Slice* src);
Status DecodeRecord(Slice* src, BlobRecord* record, OwnedSlice* buffer);
Status DecodeRecord(Slice* src, BlobRecord* record, OwnedSlice* buffer,
MemoryAllocator* allocator = nullptr);

void SetUncompressionDict(const UncompressionDict* uncompression_dict) {
uncompression_dict_ = uncompression_dict;
Expand Down
4 changes: 2 additions & 2 deletions src/blob_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ namespace rocksdb {
namespace titandb {

Status BlobStorage::Get(const ReadOptions& options, const BlobIndex& index,
BlobRecord* record, PinnableSlice* buffer) {
BlobRecord* record, PinnableSlice* value) {
auto sfile = FindFile(index.file_number).lock();
if (!sfile)
return Status::Corruption("Missing blob file: " +
std::to_string(index.file_number));
return file_cache_->Get(options, sfile->file_number(), sfile->file_size(),
index.blob_handle, record, buffer);
index.blob_handle, record, value);
}

Status BlobStorage::NewPrefetcher(uint64_t file_number,
Expand Down
2 changes: 1 addition & 1 deletion src/blob_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class BlobStorage {
// buffer is used to store the record data, so the buffer must be
// valid when the record is used.
Status Get(const ReadOptions& options, const BlobIndex& index,
BlobRecord* record, PinnableSlice* buffer);
BlobRecord* record, PinnableSlice* value);

// Creates a prefetcher for the specified file number.
Status NewPrefetcher(uint64_t file_number,
Expand Down
11 changes: 3 additions & 8 deletions src/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -701,15 +701,14 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
assert(s.ok());
if (!s.ok()) return s;

BlobRecord record;
PinnableSlice buffer;

auto storage =
static_cast_with_check<TitanColumnFamilyHandle>(handle)->GetBlobStorage();
if (storage) {
StopWatch read_sw(env_->GetSystemClock().get(), statistics(stats_.get()),
TITAN_BLOB_FILE_READ_MICROS);
s = storage->Get(options, index, &record, &buffer);
value->Reset();
BlobRecord record;
s = storage->Get(options, index, &record, value);
RecordTick(statistics(stats_.get()), TITAN_BLOB_FILE_NUM_KEYS_READ);
RecordTick(statistics(stats_.get()), TITAN_BLOB_FILE_BYTES_READ,
index.blob_handle.size);
Expand All @@ -726,10 +725,6 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
options.snapshot->GetSequenceNumber(),
s.ToString().c_str());
}
if (s.ok()) {
value->Reset();
value->PinSelf(record.value);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here introduces one memory copy

}
return s;
}

Expand Down
6 changes: 3 additions & 3 deletions src/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ Slice Compress(const CompressionInfo& info, const Slice& input,
}

Status Uncompress(const UncompressionInfo& info, const Slice& input,
OwnedSlice* output) {
OwnedSlice* output, MemoryAllocator* allocator) {
assert(info.type() != kNoCompression);
size_t usize = 0;
CacheAllocationPtr ubuf = UncompressData(info, input.data(), input.size(),
&usize, kCompressionFormat);
CacheAllocationPtr ubuf = UncompressData(
info, input.data(), input.size(), &usize, kCompressionFormat, allocator);
if (!ubuf.get()) {
return Status::Corruption("Corrupted compressed blob");
}
Expand Down
2 changes: 1 addition & 1 deletion src/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Slice Compress(const CompressionInfo& info, const Slice& input,
// If successful, fills "*buffer" with the uncompressed data and
// points "*output" to it.
Status Uncompress(const UncompressionInfo& info, const Slice& input,
OwnedSlice* output);
OwnedSlice* output, MemoryAllocator* allocator = nullptr);

void UnrefCacheHandle(void* cache, void* handle);

Expand Down