Skip to content

Commit

Permalink
Titan followup changes
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Zhang <[email protected]>
  • Loading branch information
v01dstar committed Oct 5, 2024
1 parent eb7cf9a commit be2a999
Show file tree
Hide file tree
Showing 12 changed files with 354 additions and 252 deletions.
16 changes: 10 additions & 6 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,15 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
}

if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
ikey_.type != kTypeWideColumnEntity && ikey_.type != kTypeDeletion) {
ikey_.type != kTypeWideColumnEntity && ikey_.type != kTypeDeletion &&
ikey_.type != kTypeTitanBlobIndex) {
return true;
}

CompactionFilter::Decision decision =
CompactionFilter::Decision::kUndetermined;
CompactionFilter::ValueType value_type = CompactionFilter::ValueType::kValue;
if (ikey_.type == kTypeBlobIndex) {
if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeTitanBlobIndex) {
value_type = CompactionFilter::ValueType::kBlobIndex;
} else if (ikey_.type == kTypeWideColumnEntity) {
value_type = CompactionFilter::ValueType::kWideColumnEntity;
Expand All @@ -248,7 +249,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
// to get sequence number.
assert(compaction_filter_);
const Slice& filter_key =
(ikey_.type != kTypeBlobIndex ||
((ikey_.type != kTypeBlobIndex && ikey_.type != kTypeTitanBlobIndex) ||
!compaction_filter_->IsStackedBlobDbInternalCompactionFilter())
? ikey_.user_key
: key_;
Expand Down Expand Up @@ -621,7 +622,8 @@ void CompactionIterator::NextFromInput() {
// not compact out. We will keep this Put, but can drop it's data.
// (See Optimization 3, below.)
if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
ikey_.type != kTypeWideColumnEntity) {
ikey_.type != kTypeWideColumnEntity &&
ikey_.type != kTypeTitanBlobIndex) {
ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output",
ikey_.DebugString(allow_data_in_errors_, true).c_str());
assert(false);
Expand All @@ -635,7 +637,8 @@ void CompactionIterator::NextFromInput() {
assert(false);
}

if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity) {
if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity ||
ikey_.type == kTypeTitanBlobIndex) {
ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
}
Expand Down Expand Up @@ -801,7 +804,8 @@ void CompactionIterator::NextFromInput() {
// happened
if (next_ikey.type != kTypeValue &&
next_ikey.type != kTypeBlobIndex &&
next_ikey.type != kTypeWideColumnEntity) {
next_ikey.type != kTypeWideColumnEntity &&
next_ikey.type != kTypeTitanBlobIndex) {
++iter_stats_.num_single_del_mismatch;
}

Expand Down
19 changes: 13 additions & 6 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
case kTypeValue:
case kTypeBlobIndex:
case kTypeWideColumnEntity:
case kTypeTitanBlobIndex:
if (!PrepareValue()) {
return false;
}
Expand All @@ -405,7 +406,8 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
!iter_.iter()->IsKeyPinned() /* copy */);
}

if (ikey_.type == kTypeBlobIndex) {
if (ikey_.type == kTypeBlobIndex ||
ikey_.type == kTypeTitanBlobIndex) {
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
return false;
}
Expand Down Expand Up @@ -593,7 +595,8 @@ bool DBIter::MergeValuesNewToOld() {
merge_context_.PushOperand(
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (kTypeBlobIndex == ikey.type) {
} else if (kTypeBlobIndex == ikey.type ||
kTypeTitanBlobIndex == ikey.type) {
if (expose_blob_index_) {
status_ =
Status::NotSupported("BlobDB does not support merge operator.");
Expand Down Expand Up @@ -919,6 +922,7 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeValue:
case kTypeBlobIndex:
case kTypeWideColumnEntity:
case kTypeTitanBlobIndex:
if (iter_.iter()->IsValuePinned()) {
pinned_value_ = iter_.value();
} else {
Expand Down Expand Up @@ -1004,7 +1008,8 @@ bool DBIter::FindValueForCurrentKey() {
return false;
}
return true;
} else if (last_not_merge_type == kTypeBlobIndex) {
} else if (last_not_merge_type == kTypeBlobIndex ||
last_not_merge_type == kTypeTitanBlobIndex) {
if (expose_blob_index_) {
status_ =
Status::NotSupported("BlobDB does not support merge operator.");
Expand Down Expand Up @@ -1041,6 +1046,7 @@ bool DBIter::FindValueForCurrentKey() {
SetValueAndColumnsFromPlain(pinned_value_);

break;
case kTypeTitanBlobIndex:
case kTypeBlobIndex:
if (!SetBlobValueIfNeeded(saved_key_.GetUserKey(), pinned_value_)) {
return false;
Expand Down Expand Up @@ -1143,10 +1149,10 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
saved_timestamp_.assign(ts.data(), ts.size());
}
if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex ||
ikey.type == kTypeWideColumnEntity) {
ikey.type == kTypeWideColumnEntity || ikey.type == kTypeTitanBlobIndex) {
assert(iter_.iter()->IsValuePinned());
pinned_value_ = iter_.value();
if (ikey.type == kTypeBlobIndex) {
if (ikey.type == kTypeBlobIndex || ikey.type == kTypeTitanBlobIndex) {
if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) {
return false;
}
Expand Down Expand Up @@ -1213,7 +1219,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
merge_context_.PushOperand(
iter_.value(), iter_.iter()->IsValuePinned() /* operand_pinned */);
PERF_COUNTER_ADD(internal_merge_count, 1);
} else if (ikey.type == kTypeBlobIndex) {
} else if (ikey.type == kTypeBlobIndex ||
ikey.type == kTypeTitanBlobIndex) {
if (expose_blob_index_) {
status_ =
Status::NotSupported("BlobDB does not support merge operator.");
Expand Down
26 changes: 14 additions & 12 deletions db/dbformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ enum ValueType : unsigned char {
kTypeColumnFamilyValue = 0x5, // WAL only.
kTypeColumnFamilyMerge = 0x6, // WAL only.
kTypeSingleDeletion = 0x7,
kTypeColumnFamilySingleDeletion = 0x8, // WAL only.
kTypeBeginPrepareXID = 0x9, // WAL only.
kTypeEndPrepareXID = 0xA, // WAL only.
kTypeCommitXID = 0xB, // WAL only.
kTypeRollbackXID = 0xC, // WAL only.
kTypeNoop = 0xD, // WAL only.
kTypeColumnFamilyRangeDeletion = 0xE, // WAL only.
kTypeRangeDeletion = 0xF, // meta block
kTypeColumnFamilyBlobIndex = 0x10, // Blob DB only
kTypeTitanBlobIndex = 0x11, // Titan Blob DB only
kTypeColumnFamilySingleDeletion = 0x8, // WAL only.
kTypeBeginPrepareXID = 0x9, // WAL only.
kTypeEndPrepareXID = 0xA, // WAL only.
kTypeCommitXID = 0xB, // WAL only.
kTypeRollbackXID = 0xC, // WAL only.
kTypeNoop = 0xD, // WAL only.
kTypeColumnFamilyRangeDeletion = 0xE, // WAL only.
kTypeRangeDeletion = 0xF, // meta block
kTypeTitanColumnFamilyBlobIndex = 0x10, // Blob DB only
kTypeTitanBlobIndex = 0x11, // Titan Blob DB only
// When the prepared record is also persisted in db, we use a different
// record. This is to ensure that the WAL that is generated by a WritePolicy
// is not mistakenly read by another, which would result into data
Expand All @@ -68,7 +68,8 @@ enum ValueType : unsigned char {
kTypeCommitXIDAndTimestamp = 0x15, // WAL only
kTypeWideColumnEntity = 0x16,
kTypeColumnFamilyWideColumnEntity = 0x17, // WAL only
kTypeBlobIndex = 0x18, // RocksDB native Blob DB only
kTypeColumnFamilyBlobIndex = 0x18, // RocksDB native Blob DB only
kTypeBlobIndex = 0x19, // RocksDB native Blob DB only
kTypeMaxValid, // Should be after the last valid type, only used for
// validation
kMaxValue = 0x7F // Not used for storing records.
Expand All @@ -82,7 +83,8 @@ extern const ValueType kValueTypeForSeekForPrev;
// (i.e. a type used in memtable skiplist and sst file datablock).
inline bool IsValueType(ValueType t) {
return t <= kTypeMerge || kTypeSingleDeletion == t || kTypeBlobIndex == t ||
kTypeDeletionWithTimestamp == t || kTypeWideColumnEntity == t;
kTypeDeletionWithTimestamp == t || kTypeWideColumnEntity == t ||
kTypeTitanBlobIndex == t;
}

// Checks whether a type is from user operation
Expand Down
6 changes: 4 additions & 2 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -987,12 +987,14 @@ static bool SaveValue(void* arg, const char* entry) {
}

if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex ||
type == kTypeWideColumnEntity || type == kTypeDeletion ||
type == kTypeSingleDeletion || type == kTypeDeletionWithTimestamp) &&
type == kTypeTitanBlobIndex || type == kTypeWideColumnEntity ||
type == kTypeDeletion || type == kTypeSingleDeletion ||
type == kTypeDeletionWithTimestamp) &&
max_covering_tombstone_seq > seq) {
type = kTypeRangeDeletion;
}
switch (type) {
case kTypeTitanBlobIndex:
case kTypeBlobIndex: {
if (!s->do_merge) {
*(s->status) = Status::NotSupported(
Expand Down
74 changes: 74 additions & 0 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ struct BatchContentClassifier : public WriteBatch::Handler {
return Status::OK();
}

Status PutTitanBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_BLOB_INDEX;
return Status::OK();
}

Status MarkBeginPrepare(bool unprepare) override {
content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
if (unprepare) {
Expand Down Expand Up @@ -413,11 +418,13 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
}
break;
case kTypeColumnFamilyBlobIndex:
case kTypeTitanColumnFamilyBlobIndex:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch BlobIndex");
}
FALLTHROUGH_INTENDED;
case kTypeBlobIndex:
case kTypeTitanBlobIndex:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch BlobIndex");
Expand Down Expand Up @@ -595,6 +602,15 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
found++;
}
break;
case kTypeTitanColumnFamilyBlobIndex:
case kTypeTitanBlobIndex:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BLOB_INDEX));
s = handler->PutTitanBlobIndexCF(column_family, key, value);
if (LIKELY(s.ok())) {
found++;
}
break;
case kTypeLogData:
handler->LogData(blob);
// A batch might have nothing but LogData. It is still a batch.
Expand Down Expand Up @@ -1616,6 +1632,34 @@ Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
return save.commit();
}

Status WriteBatchInternal::PutTitanBlobIndex(WriteBatch* b,
uint32_t column_family_id,
const Slice& key,
const Slice& value) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeTitanBlobIndex));
} else {
b->rep_.push_back(static_cast<char>(kTypeTitanColumnFamilyBlobIndex));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_BLOB_INDEX,
std::memory_order_relaxed);
if (b->prot_info_ != nullptr) {
// See comment in first `WriteBatchInternal::Put()` overload concerning the
// `ValueType` argument passed to `ProtectKVO()`.
b->prot_info_->entries_.emplace_back(
ProtectionInfo64()
.ProtectKVO(key, value, kTypeTitanBlobIndex)
.ProtectC(column_family_id));
}
return save.commit();
}

Status WriteBatch::PutLogData(const Slice& blob) {
LocalSavePoint save(this);
rep_.push_back(static_cast<char>(kTypeLogData));
Expand Down Expand Up @@ -1736,6 +1780,10 @@ Status WriteBatch::VerifyChecksum() const {
case kTypeBlobIndex:
tag = kTypeBlobIndex;
break;
case kTypeTitanColumnFamilyBlobIndex:
case kTypeTitanBlobIndex:
tag = kTypeTitanBlobIndex;
break;
case kTypeLogData:
case kTypeBeginPrepareXID:
case kTypeEndPrepareXID:
Expand Down Expand Up @@ -2654,6 +2702,27 @@ class MemTableInserter : public WriteBatch::Handler {
return ret_status;
}

Status PutTitanBlobIndexCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
const auto* kv_prot_info = NextProtectionInfo();
Status ret_status;
if (kv_prot_info != nullptr) {
// Memtable needs seqno, doesn't need CF ID
auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
// Same as PutCF except for value type.
ret_status = PutCFImpl(column_family_id, key, value, kTypeTitanBlobIndex,
&mem_kv_prot_info);
} else {
ret_status = PutCFImpl(column_family_id, key, value, kTypeTitanBlobIndex,
nullptr /* kv_prot_info */);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}

void CheckMemtableFull() {
if (flush_scheduler_ != nullptr) {
auto* cfd = cf_mems_->current();
Expand Down Expand Up @@ -3066,6 +3135,11 @@ class ProtectionInfoUpdater : public WriteBatch::Handler {
return UpdateProtInfo(cf, key, val, kTypeBlobIndex);
}

Status PutTitanBlobIndexCF(uint32_t cf, const Slice& key,
const Slice& val) override {
return UpdateProtInfo(cf, key, val, kTypeTitanBlobIndex);
}

Status MarkBeginPrepare(bool /* unprepare */) override {
return Status::OK();
}
Expand Down
4 changes: 4 additions & 0 deletions db/write_batch_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <array>
#include <vector>

#include "db/dbformat.h"
#include "db/flush_scheduler.h"
#include "db/kv_checksum.h"
#include "db/trim_history_scheduler.h"
Expand Down Expand Up @@ -118,6 +119,9 @@ class WriteBatchInternal {
static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value);

static Status PutTitanBlobIndex(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value);

static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid,
const bool write_after_commit = true,
const bool unprepared_batch = false);
Expand Down
6 changes: 6 additions & 0 deletions include/rocksdb/write_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,12 @@ class WriteBatch : public WriteBatchBase {
return Status::InvalidArgument("PutBlobIndexCF not implemented");
}

virtual Status PutTitanBlobIndexCF(uint32_t /*column_family_id*/,
const Slice& /*key*/,
const Slice& /*value*/) {
return Status::InvalidArgument("PutTitanBlobIndexCF not implemented");
}

// The default implementation of LogData does nothing.
virtual void LogData(const Slice& blob);

Expand Down
Loading

0 comments on commit be2a999

Please sign in to comment.