diff --git a/include/titan/options.h b/include/titan/options.h index b03aa5f99..ce143f987 100644 --- a/include/titan/options.h +++ b/include/titan/options.h @@ -172,6 +172,10 @@ struct TitanCFOptions : public ColumnFamilyOptions { return *this; } + MemoryAllocator* memory_allocator() const { + return blob_cache ? blob_cache->memory_allocator() : nullptr; + } + void Dump(Logger* logger) const; void UpdateMutableOptions(const MutableTitanCFOptions& new_options); }; diff --git a/src/blob_file_cache.cc b/src/blob_file_cache.cc index 66a1d7f74..6b0bf1a4c 100644 --- a/src/blob_file_cache.cc +++ b/src/blob_file_cache.cc @@ -27,13 +27,13 @@ BlobFileCache::BlobFileCache(const TitanDBOptions& db_options, Status BlobFileCache::Get(const ReadOptions& options, uint64_t file_number, uint64_t file_size, const BlobHandle& handle, - BlobRecord* record, PinnableSlice* buffer) { + BlobRecord* record, PinnableSlice* value) { Cache::Handle* cache_handle = nullptr; Status s = FindFile(file_number, file_size, &cache_handle); if (!s.ok()) return s; auto reader = reinterpret_cast(cache_->Value(cache_handle)); - s = reader->Get(options, handle, record, buffer); + s = reader->Get(options, handle, record, value); cache_->Release(cache_handle); return s; } diff --git a/src/blob_file_cache.h b/src/blob_file_cache.h index ec9d29f5b..2a493021f 100644 --- a/src/blob_file_cache.h +++ b/src/blob_file_cache.h @@ -23,7 +23,7 @@ class BlobFileCache { // the buffer must be valid when the record is used. Status Get(const ReadOptions& options, uint64_t file_number, uint64_t file_size, const BlobHandle& handle, BlobRecord* record, - PinnableSlice* buffer); + PinnableSlice* value); // Creates a prefetcher for the specified file number. Status NewPrefetcher(uint64_t file_number, uint64_t file_size, diff --git a/src/blob_file_iterator.cc b/src/blob_file_iterator.cc index 4b126fdeb..50af8c901 100644 --- a/src/blob_file_iterator.cc +++ b/src/blob_file_iterator.cc @@ -56,7 +56,8 @@ bool BlobFileIterator::Init() { if (blob_file_header.flags & BlobFileHeader::kHasUncompressionDictionary) { status_ = InitUncompressionDict(blob_file_footer, file_.get(), - &uncompression_dict_); + &uncompression_dict_, + titan_cf_options_.memory_allocator()); if (!status_.ok()) { return false; } @@ -146,7 +147,8 @@ void BlobFileIterator::GetBlobRecord() { nullptr /*aligned_buf*/, true /*for_compaction*/); if (status_.ok()) { status_ = - decoder_.DecodeRecord(&record_slice, &cur_blob_record_, &uncompressed_); + decoder_.DecodeRecord(&record_slice, &cur_blob_record_, &uncompressed_, + titan_cf_options_.memory_allocator()); } if (!status_.ok()) return; diff --git a/src/blob_file_reader.cc b/src/blob_file_reader.cc index 6e1f38ee6..62c5ebeef 100644 --- a/src/blob_file_reader.cc +++ b/src/blob_file_reader.cc @@ -116,7 +116,8 @@ Status BlobFileReader::Open(const TitanCFOptions& options, reader->footer_ = footer; if (header.flags & BlobFileHeader::kHasUncompressionDictionary) { s = InitUncompressionDict(footer, reader->file_.get(), - &reader->uncompression_dict_); + &reader->uncompression_dict_, + options.memory_allocator()); if (!s.ok()) { return s; } @@ -151,7 +152,7 @@ BlobFileReader::BlobFileReader(const TitanCFOptions& options, Status BlobFileReader::Get(const ReadOptions& /*options*/, const BlobHandle& handle, BlobRecord* record, - PinnableSlice* buffer) { + PinnableSlice* value) { TEST_SYNC_POINT("BlobFileReader::Get"); std::string cache_key; @@ -162,8 +163,11 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/, if (cache_handle) { RecordTick(statistics(stats_), TITAN_BLOB_CACHE_HIT); auto blob = reinterpret_cast(cache_->Value(cache_handle)); - buffer->PinSlice(*blob, UnrefCacheHandle, cache_.get(), cache_handle); - return DecodeInto(*blob, record); + Status s = DecodeInto(*blob, record); + if (!s.ok()) return s; + value->PinSlice(record->value, UnrefCacheHandle, cache_.get(), + cache_handle); + return s; } } RecordTick(statistics(stats_), TITAN_BLOB_CACHE_MISS); @@ -178,11 +182,13 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/, auto cache_value = new OwnedSlice(std::move(blob)); auto cache_size = cache_value->size() + sizeof(*cache_value); cache_->Insert(cache_key, cache_value, cache_size, - &DeleteCacheValue, &cache_handle); - buffer->PinSlice(*cache_value, UnrefCacheHandle, cache_.get(), - cache_handle); + &DeleteCacheValue, &cache_handle, + Cache::Priority::BOTTOM); + value->PinSlice(record->value, UnrefCacheHandle, cache_.get(), + cache_handle); } else { - buffer->PinSlice(blob, OwnedSlice::CleanupFunc, blob.release(), nullptr); + value->PinSlice(record->value, OwnedSlice::CleanupFunc, blob.release(), + nullptr); } return Status::OK(); @@ -191,7 +197,8 @@ Status BlobFileReader::Get(const ReadOptions& /*options*/, Status BlobFileReader::ReadRecord(const BlobHandle& handle, BlobRecord* record, OwnedSlice* buffer) { Slice blob; - CacheAllocationPtr ubuf(new char[handle.size]); + CacheAllocationPtr ubuf = + AllocateBlock(handle.size, options_.memory_allocator()); Status s = file_->Read(IOOptions(), handle.offset, handle.size, &blob, ubuf.get(), nullptr /*aligned_buf*/); if (!s.ok()) { @@ -211,7 +218,7 @@ Status BlobFileReader::ReadRecord(const BlobHandle& handle, BlobRecord* record, return s; } buffer->reset(std::move(ubuf), blob); - s = decoder.DecodeRecord(&blob, record, buffer); + s = decoder.DecodeRecord(&blob, record, buffer, options_.memory_allocator()); return s; } @@ -237,7 +244,8 @@ Status BlobFilePrefetcher::Get(const ReadOptions& options, Status InitUncompressionDict( const BlobFileFooter& footer, RandomAccessFileReader* file, - std::unique_ptr* uncompression_dict) { + std::unique_ptr* uncompression_dict, + MemoryAllocator* memory_allocator) { // TODO: Cache the compression dictionary in either block cache or blob cache. #if ZSTD_VERSION_NUMBER < 10103 return Status::NotSupported("the version of libztsd is too low"); @@ -248,7 +256,9 @@ Status InitUncompressionDict( assert(footer.meta_index_handle.size() > 0); BlockHandle meta_index_handle = footer.meta_index_handle; Slice blob; - CacheAllocationPtr ubuf(new char[meta_index_handle.size()]); + + CacheAllocationPtr ubuf = + AllocateBlock(meta_index_handle.size(), memory_allocator); Status s = file->Read(IOOptions(), meta_index_handle.offset(), meta_index_handle.size(), &blob, ubuf.get(), nullptr /*aligned_buf*/); @@ -276,7 +286,8 @@ Status InitUncompressionDict( } Slice dict_slice; - CacheAllocationPtr dict_buf(new char[dict_block.size()]); + CacheAllocationPtr dict_buf = + AllocateBlock(dict_block.size(), memory_allocator); s = file->Read(IOOptions(), dict_block.offset(), dict_block.size(), &dict_slice, dict_buf.get(), nullptr /*aligned_buf*/); if (!s.ok()) { diff --git a/src/blob_file_reader.h b/src/blob_file_reader.h index 7856c6ea7..821614028 100644 --- a/src/blob_file_reader.h +++ b/src/blob_file_reader.h @@ -25,10 +25,10 @@ class BlobFileReader { TitanStats* stats); // Gets the blob record pointed by the handle in this file. The data - // of the record is stored in the provided buffer, so the buffer + // of the record is stored in the value slice underlying, so the value slice // must be valid when the record is used. Status Get(const ReadOptions& options, const BlobHandle& handle, - BlobRecord* record, PinnableSlice* buffer); + BlobRecord* record, PinnableSlice* value); private: friend class BlobFilePrefetcher; @@ -78,7 +78,8 @@ class BlobFilePrefetcher : public Cleanable { // uncompression dictionary Status InitUncompressionDict( const BlobFileFooter& footer, RandomAccessFileReader* file, - std::unique_ptr* uncompression_dict); + std::unique_ptr* uncompression_dict, + MemoryAllocator* allocator); } // namespace titandb } // namespace rocksdb diff --git a/src/blob_format.cc b/src/blob_format.cc index f11252b0b..7cb5e4591 100644 --- a/src/blob_format.cc +++ b/src/blob_format.cc @@ -71,7 +71,8 @@ Status BlobDecoder::DecodeHeader(Slice* src) { } Status BlobDecoder::DecodeRecord(Slice* src, BlobRecord* record, - OwnedSlice* buffer) { + OwnedSlice* buffer, + MemoryAllocator* allocator) { TEST_SYNC_POINT_CALLBACK("BlobDecoder::DecodeRecord", &crc_); Slice input(src->data(), record_size_); @@ -86,7 +87,7 @@ Status BlobDecoder::DecodeRecord(Slice* src, BlobRecord* record, } UncompressionContext ctx(compression_); UncompressionInfo info(ctx, *uncompression_dict_, compression_); - Status s = Uncompress(info, input, buffer); + Status s = Uncompress(info, input, buffer, allocator); if (!s.ok()) { return s; } diff --git a/src/blob_format.h b/src/blob_format.h index 809f2e2c3..3e3d23d8c 100644 --- a/src/blob_format.h +++ b/src/blob_format.h @@ -114,7 +114,8 @@ class BlobDecoder { : BlobDecoder(&UncompressionDict::GetEmptyDict(), kNoCompression) {} Status DecodeHeader(Slice* src); - Status DecodeRecord(Slice* src, BlobRecord* record, OwnedSlice* buffer); + Status DecodeRecord(Slice* src, BlobRecord* record, OwnedSlice* buffer, + MemoryAllocator* allocator = nullptr); void SetUncompressionDict(const UncompressionDict* uncompression_dict) { uncompression_dict_ = uncompression_dict; diff --git a/src/blob_storage.cc b/src/blob_storage.cc index dc12ec474..35ac582e0 100644 --- a/src/blob_storage.cc +++ b/src/blob_storage.cc @@ -7,13 +7,13 @@ namespace rocksdb { namespace titandb { Status BlobStorage::Get(const ReadOptions& options, const BlobIndex& index, - BlobRecord* record, PinnableSlice* buffer) { + BlobRecord* record, PinnableSlice* value) { auto sfile = FindFile(index.file_number).lock(); if (!sfile) return Status::Corruption("Missing blob file: " + std::to_string(index.file_number)); return file_cache_->Get(options, sfile->file_number(), sfile->file_size(), - index.blob_handle, record, buffer); + index.blob_handle, record, value); } Status BlobStorage::NewPrefetcher(uint64_t file_number, diff --git a/src/blob_storage.h b/src/blob_storage.h index 4808213fe..8231f57c9 100644 --- a/src/blob_storage.h +++ b/src/blob_storage.h @@ -67,7 +67,7 @@ class BlobStorage { // buffer is used to store the record data, so the buffer must be // valid when the record is used. Status Get(const ReadOptions& options, const BlobIndex& index, - BlobRecord* record, PinnableSlice* buffer); + BlobRecord* record, PinnableSlice* value); // Creates a prefetcher for the specified file number. Status NewPrefetcher(uint64_t file_number, diff --git a/src/db_impl.cc b/src/db_impl.cc index 393948217..69a6a9192 100644 --- a/src/db_impl.cc +++ b/src/db_impl.cc @@ -701,15 +701,14 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options, assert(s.ok()); if (!s.ok()) return s; - BlobRecord record; - PinnableSlice buffer; - auto storage = static_cast_with_check(handle)->GetBlobStorage(); if (storage) { StopWatch read_sw(env_->GetSystemClock().get(), statistics(stats_.get()), TITAN_BLOB_FILE_READ_MICROS); - s = storage->Get(options, index, &record, &buffer); + value->Reset(); + BlobRecord record; + s = storage->Get(options, index, &record, value); RecordTick(statistics(stats_.get()), TITAN_BLOB_FILE_NUM_KEYS_READ); RecordTick(statistics(stats_.get()), TITAN_BLOB_FILE_BYTES_READ, index.blob_handle.size); @@ -726,10 +725,6 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options, options.snapshot->GetSequenceNumber(), s.ToString().c_str()); } - if (s.ok()) { - value->Reset(); - value->PinSelf(record.value); - } return s; } diff --git a/src/util.cc b/src/util.cc index 37af347c8..de19468f1 100644 --- a/src/util.cc +++ b/src/util.cc @@ -30,11 +30,11 @@ Slice Compress(const CompressionInfo& info, const Slice& input, } Status Uncompress(const UncompressionInfo& info, const Slice& input, - OwnedSlice* output) { + OwnedSlice* output, MemoryAllocator* allocator) { assert(info.type() != kNoCompression); size_t usize = 0; - CacheAllocationPtr ubuf = UncompressData(info, input.data(), input.size(), - &usize, kCompressionFormat); + CacheAllocationPtr ubuf = UncompressData( + info, input.data(), input.size(), &usize, kCompressionFormat, allocator); if (!ubuf.get()) { return Status::Corruption("Corrupted compressed blob"); } diff --git a/src/util.h b/src/util.h index 318040eb5..cef2e5f8b 100644 --- a/src/util.h +++ b/src/util.h @@ -66,7 +66,7 @@ Slice Compress(const CompressionInfo& info, const Slice& input, // If successful, fills "*buffer" with the uncompressed data and // points "*output" to it. Status Uncompress(const UncompressionInfo& info, const Slice& input, - OwnedSlice* output); + OwnedSlice* output, MemoryAllocator* allocator = nullptr); void UnrefCacheHandle(void* cache, void* handle);