Skip to content

Commit

Permalink
Add CheckInRange API
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Zhang <[email protected]>
  • Loading branch information
v01dstar committed Oct 3, 2024
1 parent a2bd301 commit 3dccaa1
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 9 deletions.
2 changes: 2 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,8 @@ class DBImpl : public DB {
Status ValidateForMerge(const MergeInstanceOptions& merge_options,
DBImpl* rhs);

Status CheckInRange(const Slice* begin, const Slice* end) override;

Status MergeDisjointInstances(const MergeInstanceOptions& merge_options,
const std::vector<DB*>& instances) override;

Expand Down
33 changes: 30 additions & 3 deletions db/db_impl/db_impl_merge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,32 @@ Status DBImpl::ValidateForMerge(const MergeInstanceOptions& mopts,
return Status::OK();
}

Status DBImpl::CheckInRange(const Slice* begin, const Slice* end) {
Status s;
if (begin == nullptr && end == nullptr) {
return s;
}
for (auto cfd : *versions_->GetColumnFamilySet()) {
assert(cfd != nullptr);
auto* comparator = cfd->user_comparator();
PinnableSlice smallest, largest;
bool found = false;
s = cfd->GetUserKeyRange(&smallest, &largest, &found);
if (!s.ok()) {
return s;
}
if (!found) {
continue;
}
if (begin != nullptr && comparator->Compare(smallest, *begin) < 0) {
return Status::InvalidArgument("Has data smaller than left boundary");
} else if (end != nullptr && comparator->Compare(largest, *end) >= 0) {
return Status::InvalidArgument("Has data larger than right boundary");
}
}
return s;
}

Status DBImpl::MergeDisjointInstances(const MergeInstanceOptions& merge_options,
const std::vector<DB*>& instances) {
Status s;
Expand Down Expand Up @@ -155,7 +181,8 @@ Status DBImpl::MergeDisjointInstances(const MergeInstanceOptions& merge_options,
//
// - Acquire snapshots of table files (`SuperVersion`).
//
// - Do memtable merge if needed. We do this together with acquiring snapshot
// - Do memtable merge if needed. We do this together with acquiring
// snapshot
// to avoid the case where a memtable is flushed shortly after being
// merged, and the resulting L0 data is merged again as a table file.
assert(s.ok());
Expand All @@ -166,8 +193,8 @@ Status DBImpl::MergeDisjointInstances(const MergeInstanceOptions& merge_options,
// source data. See [A].
uint64_t max_seq_number = 0;
// RocksDB's recovery is heavily dependent on the one-on-one mapping between
// memtable and WAL (even when WAL is empty). Each memtable keeps a record of
// `next_log_number` to mark its position within a series of WALs. This
// memtable and WAL (even when WAL is empty). Each memtable keeps a record
// of `next_log_number` to mark its position within a series of WALs. This
// counter must be monotonic. We work around this issue by setting the
// counters of all involved memtables to the same maximum value. See [B].
uint64_t max_log_number = 0;
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);

while (writer.ConsumeOne());
while (writer.ConsumeOne())
;
MultiBatchWriteCommit(writer.request);

WriteStatusCheck(writer.status);
Expand Down
8 changes: 3 additions & 5 deletions db/db_properties_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,6 @@ TEST_F(DBPropertiesTest, EstimateCompressionRatio) {
ASSERT_GT(CompressionRatioAtLevel(1), 10.0);
}


class CountingUserTblPropCollector : public TablePropertiesCollector {
public:
const char* Name() const override { return "CountingUserTblPropCollector"; }
Expand Down Expand Up @@ -2171,7 +2170,7 @@ TEST_F(DBPropertiesTest, GetMapPropertyWriteStallStats) {
WriteStallCause::kMemtableLimit}) {
if (test_cause == WriteStallCause::kWriteBufferManagerLimit) {
options.write_buffer_manager.reset(
new WriteBufferManager(100000, nullptr, true));
new WriteBufferManager(100000, nullptr, 1.0));
} else if (test_cause == WriteStallCause::kMemtableLimit) {
options.max_write_buffer_number = 2;
options.disable_auto_compactions = true;
Expand Down Expand Up @@ -2207,13 +2206,13 @@ TEST_F(DBPropertiesTest, GetMapPropertyWriteStallStats) {
if (test_cause == WriteStallCause::kWriteBufferManagerLimit) {
ASSERT_OK(dbfull()->Put(
WriteOptions(), handles_[1], Key(1),
DummyString(options.write_buffer_manager->buffer_size())));
DummyString(options.write_buffer_manager->flush_size())));

WriteOptions wo;
wo.no_slowdown = true;
Status s = dbfull()->Put(
wo, handles_[1], Key(2),
DummyString(options.write_buffer_manager->buffer_size()));
DummyString(options.write_buffer_manager->flush_size()));
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(s.ToString().find("Write stall") != std::string::npos);
} else if (test_cause == WriteStallCause::kMemtableLimit) {
Expand Down Expand Up @@ -2364,7 +2363,6 @@ TEST_F(DBPropertiesTest, TableMetaIndexKeys) {
} while (ChangeOptions());
}


} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
6 changes: 6 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ class DB {
return Status::NotSupported("`MergeDisjointInstances` not implemented");
}

// Check all data written before this call is in the range [begin, end).
// Return InvalidArgument if not.
virtual Status CheckInRange(const Slice* /*begin*/, const Slice* /*end*/) {
return Status::NotSupported("`AssertInRange` not implemented");
}

virtual Status Resume() { return Status::NotSupported(); }

// Close the DB by releasing resources, closing files etc. This should be
Expand Down

0 comments on commit 3dccaa1

Please sign in to comment.