Skip to content

Commit

Permalink
Refine blob cache related code (#309)
Browse files Browse the repository at this point in the history
Signed-off-by: Connor1996 <[email protected]>
  • Loading branch information
Connor1996 authored Feb 5, 2024
1 parent e875b4c commit 55f2cf8
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 39 deletions.
4 changes: 4 additions & 0 deletions include/titan/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ struct TitanCFOptions : public ColumnFamilyOptions {
return *this;
}

MemoryAllocator* memory_allocator() const {
return blob_cache ? blob_cache->memory_allocator() : nullptr;
}

void Dump(Logger* logger) const;
void UpdateMutableOptions(const MutableTitanCFOptions& new_options);
};
Expand Down
4 changes: 2 additions & 2 deletions src/blob_file_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ BlobFileCache::BlobFileCache(const TitanDBOptions& db_options,

Status BlobFileCache::Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const BlobHandle& handle,
BlobRecord* record, PinnableSlice* buffer) {
BlobRecord* record, PinnableSlice* value) {
Cache::Handle* cache_handle = nullptr;
Status s = FindFile(file_number, file_size, &cache_handle);
if (!s.ok()) return s;

auto reader = reinterpret_cast<BlobFileReader*>(cache_->Value(cache_handle));
s = reader->Get(options, handle, record, buffer);
s = reader->Get(options, handle, record, value);
cache_->Release(cache_handle);
return s;
}
Expand Down
2 changes: 1 addition & 1 deletion src/blob_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class BlobFileCache {
// the buffer must be valid when the record is used.
Status Get(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const BlobHandle& handle, BlobRecord* record,
PinnableSlice* buffer);
PinnableSlice* value);

// Creates a prefetcher for the specified file number.
Status NewPrefetcher(uint64_t file_number, uint64_t file_size,
Expand Down
6 changes: 4 additions & 2 deletions src/blob_file_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ bool BlobFileIterator::Init() {

if (blob_file_header.flags & BlobFileHeader::kHasUncompressionDictionary) {
status_ = InitUncompressionDict(blob_file_footer, file_.get(),
&uncompression_dict_);
&uncompression_dict_,
titan_cf_options_.memory_allocator());
if (!status_.ok()) {
return false;
}
Expand Down Expand Up @@ -146,7 +147,8 @@ void BlobFileIterator::GetBlobRecord() {
nullptr /*aligned_buf*/, true /*for_compaction*/);
if (status_.ok()) {
status_ =
decoder_.DecodeRecord(&record_slice, &cur_blob_record_, &uncompressed_);
decoder_.DecodeRecord(&record_slice, &cur_blob_record_, &uncompressed_,
titan_cf_options_.memory_allocator());
}
if (!status_.ok()) return;

Expand Down
37 changes: 24 additions & 13 deletions src/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ Status BlobFileReader::Open(const TitanCFOptions& options,
reader->footer_ = footer;
if (header.flags & BlobFileHeader::kHasUncompressionDictionary) {
s = InitUncompressionDict(footer, reader->file_.get(),
&reader->uncompression_dict_);
&reader->uncompression_dict_,
options.memory_allocator());
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -151,7 +152,7 @@ BlobFileReader::BlobFileReader(const TitanCFOptions& options,

Status BlobFileReader::Get(const ReadOptions& /*options*/,
const BlobHandle& handle, BlobRecord* record,
PinnableSlice* buffer) {
PinnableSlice* value) {
TEST_SYNC_POINT("BlobFileReader::Get");

std::string cache_key;
Expand All @@ -162,8 +163,11 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/,
if (cache_handle) {
RecordTick(statistics(stats_), TITAN_BLOB_CACHE_HIT);
auto blob = reinterpret_cast<OwnedSlice*>(cache_->Value(cache_handle));
buffer->PinSlice(*blob, UnrefCacheHandle, cache_.get(), cache_handle);
return DecodeInto(*blob, record);
Status s = DecodeInto(*blob, record);
if (!s.ok()) return s;
value->PinSlice(record->value, UnrefCacheHandle, cache_.get(),
cache_handle);
return s;
}
}
RecordTick(statistics(stats_), TITAN_BLOB_CACHE_MISS);
Expand All @@ -178,11 +182,13 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/,
auto cache_value = new OwnedSlice(std::move(blob));
auto cache_size = cache_value->size() + sizeof(*cache_value);
cache_->Insert(cache_key, cache_value, cache_size,
&DeleteCacheValue<OwnedSlice>, &cache_handle);
buffer->PinSlice(*cache_value, UnrefCacheHandle, cache_.get(),
cache_handle);
&DeleteCacheValue<OwnedSlice>, &cache_handle,
Cache::Priority::BOTTOM);
value->PinSlice(record->value, UnrefCacheHandle, cache_.get(),
cache_handle);
} else {
buffer->PinSlice(blob, OwnedSlice::CleanupFunc, blob.release(), nullptr);
value->PinSlice(record->value, OwnedSlice::CleanupFunc, blob.release(),
nullptr);
}

return Status::OK();
Expand All @@ -191,7 +197,8 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/,
Status BlobFileReader::ReadRecord(const BlobHandle& handle, BlobRecord* record,
OwnedSlice* buffer) {
Slice blob;
CacheAllocationPtr ubuf(new char[handle.size]);
CacheAllocationPtr ubuf =
AllocateBlock(handle.size, options_.memory_allocator());
Status s = file_->Read(IOOptions(), handle.offset, handle.size, &blob,
ubuf.get(), nullptr /*aligned_buf*/);
if (!s.ok()) {
Expand All @@ -211,7 +218,7 @@ Status BlobFileReader::ReadRecord(const BlobHandle& handle, BlobRecord* record,
return s;
}
buffer->reset(std::move(ubuf), blob);
s = decoder.DecodeRecord(&blob, record, buffer);
s = decoder.DecodeRecord(&blob, record, buffer, options_.memory_allocator());
return s;
}

Expand All @@ -237,7 +244,8 @@ Status BlobFilePrefetcher::Get(const ReadOptions& options,

Status InitUncompressionDict(
const BlobFileFooter& footer, RandomAccessFileReader* file,
std::unique_ptr<UncompressionDict>* uncompression_dict) {
std::unique_ptr<UncompressionDict>* uncompression_dict,
MemoryAllocator* memory_allocator) {
// TODO: Cache the compression dictionary in either block cache or blob cache.
#if ZSTD_VERSION_NUMBER < 10103
return Status::NotSupported("the version of libztsd is too low");
Expand All @@ -248,7 +256,9 @@ Status InitUncompressionDict(
assert(footer.meta_index_handle.size() > 0);
BlockHandle meta_index_handle = footer.meta_index_handle;
Slice blob;
CacheAllocationPtr ubuf(new char[meta_index_handle.size()]);

CacheAllocationPtr ubuf =
AllocateBlock(meta_index_handle.size(), memory_allocator);
Status s = file->Read(IOOptions(), meta_index_handle.offset(),
meta_index_handle.size(), &blob, ubuf.get(),
nullptr /*aligned_buf*/);
Expand Down Expand Up @@ -276,7 +286,8 @@ Status InitUncompressionDict(
}

Slice dict_slice;
CacheAllocationPtr dict_buf(new char[dict_block.size()]);
CacheAllocationPtr dict_buf =
AllocateBlock(dict_block.size(), memory_allocator);
s = file->Read(IOOptions(), dict_block.offset(), dict_block.size(),
&dict_slice, dict_buf.get(), nullptr /*aligned_buf*/);
if (!s.ok()) {
Expand Down
7 changes: 4 additions & 3 deletions src/blob_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class BlobFileReader {
TitanStats* stats);

// Gets the blob record pointed by the handle in this file. The data
// of the record is stored in the provided buffer, so the buffer
// of the record is stored in the value slice underlying, so the value slice
// must be valid when the record is used.
Status Get(const ReadOptions& options, const BlobHandle& handle,
BlobRecord* record, PinnableSlice* buffer);
BlobRecord* record, PinnableSlice* value);

private:
friend class BlobFilePrefetcher;
Expand Down Expand Up @@ -78,7 +78,8 @@ class BlobFilePrefetcher : public Cleanable {
// uncompression dictionary
Status InitUncompressionDict(
const BlobFileFooter& footer, RandomAccessFileReader* file,
std::unique_ptr<UncompressionDict>* uncompression_dict);
std::unique_ptr<UncompressionDict>* uncompression_dict,
MemoryAllocator* allocator);

} // namespace titandb
} // namespace rocksdb
5 changes: 3 additions & 2 deletions src/blob_format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ Status BlobDecoder::DecodeHeader(Slice* src) {
}

Status BlobDecoder::DecodeRecord(Slice* src, BlobRecord* record,
OwnedSlice* buffer) {
OwnedSlice* buffer,
MemoryAllocator* allocator) {
TEST_SYNC_POINT_CALLBACK("BlobDecoder::DecodeRecord", &crc_);

Slice input(src->data(), record_size_);
Expand All @@ -86,7 +87,7 @@ Status BlobDecoder::DecodeRecord(Slice* src, BlobRecord* record,
}
UncompressionContext ctx(compression_);
UncompressionInfo info(ctx, *uncompression_dict_, compression_);
Status s = Uncompress(info, input, buffer);
Status s = Uncompress(info, input, buffer, allocator);
if (!s.ok()) {
return s;
}
Expand Down
3 changes: 2 additions & 1 deletion src/blob_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class BlobDecoder {
: BlobDecoder(&UncompressionDict::GetEmptyDict(), kNoCompression) {}

Status DecodeHeader(Slice* src);
Status DecodeRecord(Slice* src, BlobRecord* record, OwnedSlice* buffer);
Status DecodeRecord(Slice* src, BlobRecord* record, OwnedSlice* buffer,
MemoryAllocator* allocator = nullptr);

void SetUncompressionDict(const UncompressionDict* uncompression_dict) {
uncompression_dict_ = uncompression_dict;
Expand Down
4 changes: 2 additions & 2 deletions src/blob_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ namespace rocksdb {
namespace titandb {

Status BlobStorage::Get(const ReadOptions& options, const BlobIndex& index,
BlobRecord* record, PinnableSlice* buffer) {
BlobRecord* record, PinnableSlice* value) {
auto sfile = FindFile(index.file_number).lock();
if (!sfile)
return Status::Corruption("Missing blob file: " +
std::to_string(index.file_number));
return file_cache_->Get(options, sfile->file_number(), sfile->file_size(),
index.blob_handle, record, buffer);
index.blob_handle, record, value);
}

Status BlobStorage::NewPrefetcher(uint64_t file_number,
Expand Down
2 changes: 1 addition & 1 deletion src/blob_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class BlobStorage {
// buffer is used to store the record data, so the buffer must be
// valid when the record is used.
Status Get(const ReadOptions& options, const BlobIndex& index,
BlobRecord* record, PinnableSlice* buffer);
BlobRecord* record, PinnableSlice* value);

// Creates a prefetcher for the specified file number.
Status NewPrefetcher(uint64_t file_number,
Expand Down
11 changes: 3 additions & 8 deletions src/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -701,15 +701,14 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
assert(s.ok());
if (!s.ok()) return s;

BlobRecord record;
PinnableSlice buffer;

auto storage =
static_cast_with_check<TitanColumnFamilyHandle>(handle)->GetBlobStorage();
if (storage) {
StopWatch read_sw(env_->GetSystemClock().get(), statistics(stats_.get()),
TITAN_BLOB_FILE_READ_MICROS);
s = storage->Get(options, index, &record, &buffer);
value->Reset();
BlobRecord record;
s = storage->Get(options, index, &record, value);
RecordTick(statistics(stats_.get()), TITAN_BLOB_FILE_NUM_KEYS_READ);
RecordTick(statistics(stats_.get()), TITAN_BLOB_FILE_BYTES_READ,
index.blob_handle.size);
Expand All @@ -726,10 +725,6 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
options.snapshot->GetSequenceNumber(),
s.ToString().c_str());
}
if (s.ok()) {
value->Reset();
value->PinSelf(record.value);
}
return s;
}

Expand Down
6 changes: 3 additions & 3 deletions src/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ Slice Compress(const CompressionInfo& info, const Slice& input,
}

Status Uncompress(const UncompressionInfo& info, const Slice& input,
OwnedSlice* output) {
OwnedSlice* output, MemoryAllocator* allocator) {
assert(info.type() != kNoCompression);
size_t usize = 0;
CacheAllocationPtr ubuf = UncompressData(info, input.data(), input.size(),
&usize, kCompressionFormat);
CacheAllocationPtr ubuf = UncompressData(
info, input.data(), input.size(), &usize, kCompressionFormat, allocator);
if (!ubuf.get()) {
return Status::Corruption("Corrupted compressed blob");
}
Expand Down
2 changes: 1 addition & 1 deletion src/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Slice Compress(const CompressionInfo& info, const Slice& input,
// If successful, fills "*buffer" with the uncompressed data and
// points "*output" to it.
Status Uncompress(const UncompressionInfo& info, const Slice& input,
OwnedSlice* output);
OwnedSlice* output, MemoryAllocator* allocator = nullptr);

void UnrefCacheHandle(void* cache, void* handle);

Expand Down

0 comments on commit 55f2cf8

Please sign in to comment.