Skip to content

Commit

Permalink
Introduce SstFileManager::SetMaxAllowedSpaceUsage() to cap disk space…
Browse files Browse the repository at this point in the history
… usage

Summary:
Introude SstFileManager::SetMaxAllowedSpaceUsage() that can be used to limit the maximum space usage allowed for RocksDB.
When this limit is exceeded WriteImpl() will fail and return Status::Aborted()

Test Plan: unit testing

Reviewers: yhchiang, anthony, andrewkr, sdong

Reviewed By: sdong

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D53763
  • Loading branch information
IslamAbdelRahman committed Feb 17, 2016
1 parent 3943d16 commit df9ba6d
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 34 deletions.
41 changes: 29 additions & 12 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ CompactionJob::CompactionJob(
const EnvOptions& env_options, VersionSet* versions,
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, Status* db_bg_error,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
Expand All @@ -231,6 +232,8 @@ CompactionJob::CompactionJob(
db_directory_(db_directory),
output_directory_(output_directory),
stats_(stats),
db_mutex_(db_mutex),
db_bg_error_(db_bg_error),
existing_snapshots_(std::move(existing_snapshots)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
table_cache_(std::move(table_cache)),
Expand Down Expand Up @@ -499,16 +502,11 @@ Status CompactionJob::Run() {
}

TablePropertiesCollection tp;
auto sfm =
static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
for (const auto& state : compact_->sub_compact_states) {
for (const auto& output : state.outputs) {
auto fn = TableFileName(db_options_.db_paths, output.meta.fd.GetNumber(),
output.meta.fd.GetPathId());
tp[fn] = output.table_properties;
if (sfm && output.meta.fd.GetPathId() == 0) {
sfm->OnAddFile(fn);
}
}
}
compact_->compaction->SetOutputTableProperties(std::move(tp));
Expand All @@ -524,18 +522,17 @@ Status CompactionJob::Run() {
return status;
}

Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options,
InstrumentedMutex* db_mutex) {
Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_INSTALL);
db_mutex->AssertHeld();
db_mutex_->AssertHeld();
Status status = compact_->status;
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
cfd->internal_stats()->AddCompactionStats(
compact_->compaction->output_level(), compaction_stats_);

if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options, db_mutex);
status = InstallCompactionResults(mutable_cf_options);
}
VersionStorageInfo::LevelSummaryStorage tmp;
auto vstorage = cfd->current()->storage_info();
Expand Down Expand Up @@ -861,13 +858,33 @@ Status CompactionJob::FinishCompactionOutputFile(
event_logger_, cfd->ioptions()->listeners, meta->fd, info);
}
}

// Report new file to SstFileManagerImpl
auto sfm =
static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
if (sfm && meta->fd.GetPathId() == 0) {
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
auto fn = TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(),
meta->fd.GetPathId());
sfm->OnAddFile(fn);
if (sfm->IsMaxAllowedSpaceReached()) {
InstrumentedMutexLock l(db_mutex_);
if (db_bg_error_->ok()) {
s = Status::IOError("Max allowed space was reached");
*db_bg_error_ = s;
TEST_SYNC_POINT(
"CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached");
}
}
}

sub_compact->builder.reset();
return s;
}

Status CompactionJob::InstallCompactionResults(
const MutableCFOptions& mutable_cf_options, InstrumentedMutex* db_mutex) {
db_mutex->AssertHeld();
const MutableCFOptions& mutable_cf_options) {
db_mutex_->AssertHeld();

auto* compaction = compact_->compaction;
// paranoia: verify that the files that we started with
Expand Down Expand Up @@ -902,7 +919,7 @@ Status CompactionJob::InstallCompactionResults(
}
return versions_->LogAndApply(compaction->column_family_data(),
mutable_cf_options, compaction->edit(),
db_mutex, db_directory_);
db_mutex_, db_directory_);
}

void CompactionJob::RecordCompactionIOStats() {
Expand Down
11 changes: 6 additions & 5 deletions db/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class CompactionJob {
const EnvOptions& env_options, VersionSet* versions,
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory,
Statistics* stats,
Statistics* stats, InstrumentedMutex* db_mutex,
Status* db_bg_error,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
Expand All @@ -77,8 +78,7 @@ class CompactionJob {
Status Run();

// REQUIRED: mutex held
Status Install(const MutableCFOptions& mutable_cf_options,
InstrumentedMutex* db_mutex);
Status Install(const MutableCFOptions& mutable_cf_options);

private:
struct SubcompactionState;
Expand All @@ -95,8 +95,7 @@ class CompactionJob {

Status FinishCompactionOutputFile(const Status& input_status,
SubcompactionState* sub_compact);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options,
InstrumentedMutex* db_mutex);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
void RecordCompactionIOStats();
Status OpenCompactionOutputFile(SubcompactionState* sub_compact);
void CleanupCompaction();
Expand Down Expand Up @@ -130,6 +129,8 @@ class CompactionJob {
Directory* db_directory_;
Directory* output_directory_;
Statistics* stats_;
InstrumentedMutex* db_mutex_;
Status* db_bg_error_;
// If there were two snapshots with seq numbers s1 and
// s2 and s1 < s2, and if we find two instances of a key k1 then lies
// entirely within s1 and s2, then the earlier version of k1 can be safely
Expand Down
10 changes: 5 additions & 5 deletions db/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,9 @@ class CompactionJobTest : public testing::Test {
EventLogger event_logger(db_options_.info_log.get());
CompactionJob compaction_job(
0, &compaction, db_options_, env_options_, versions_.get(),
&shutting_down_, &log_buffer, nullptr, nullptr, nullptr, snapshots,
earliest_write_conflict_snapshot, table_cache_, &event_logger, false,
false, dbname_, &compaction_job_stats_);
&shutting_down_, &log_buffer, nullptr, nullptr, nullptr, &mutex_,
&bg_error_, snapshots, earliest_write_conflict_snapshot, table_cache_,
&event_logger, false, false, dbname_, &compaction_job_stats_);

VerifyInitializationOfCompactionJobStats(compaction_job_stats_);

Expand All @@ -262,8 +262,7 @@ class CompactionJobTest : public testing::Test {
s = compaction_job.Run();
ASSERT_OK(s);
mutex_.Lock();
ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions(),
&mutex_));
ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions()));
mutex_.Unlock();

if (expected_results.size() == 0) {
Expand Down Expand Up @@ -295,6 +294,7 @@ class CompactionJobTest : public testing::Test {
ColumnFamilyData* cfd_;
std::unique_ptr<CompactionFilter> compaction_filter_;
std::shared_ptr<MergeOperator> merge_op_;
Status bg_error_;
};

TEST_F(CompactionJobTest, Simple) {
Expand Down
24 changes: 15 additions & 9 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1510,21 +1510,26 @@ Status DBImpl::FlushMemTableToOutputFile(
bg_error_ = s;
}
RecordFlushIOStats();
#ifndef ROCKSDB_LITE
if (s.ok()) {
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options,
job_context->job_id, flush_job.GetTableProperties());
#endif // ROCKSDB_LITE
auto sfm =
static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
if (sfm) {
// Notify sst_file_manager that a new file was added
std::string file_path = MakeTableFileName(db_options_.db_paths[0].path,
file_meta.fd.GetNumber());
sfm->OnAddFile(file_path);
if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) {
bg_error_ = Status::IOError("Max allowed space was reached");
TEST_SYNC_POINT(
"DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached");
}
}
}
#endif // ROCKSDB_LITE
return s;
}

Expand Down Expand Up @@ -1818,9 +1823,9 @@ Status DBImpl::CompactFilesImpl(
CompactionJob compaction_job(
job_context->job_id, c.get(), db_options_, env_options_, versions_.get(),
&shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, snapshot_seqs,
earliest_write_conflict_snapshot, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_,
snapshot_seqs, earliest_write_conflict_snapshot, table_cache_,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->compaction_measure_io_stats, dbname_,
nullptr); // Here we pass a nullptr for CompactionJobStats because
// CompactFiles does not trigger OnCompactionCompleted(),
Expand All @@ -1843,7 +1848,7 @@ Status DBImpl::CompactFilesImpl(
compaction_job.Run();
mutex_.Lock();

Status status = compaction_job.Install(*c->mutable_cf_options(), &mutex_);
Status status = compaction_job.Install(*c->mutable_cf_options());
if (status.ok()) {
InstallSuperVersionAndScheduleWorkWrapper(
c->column_family_data(), job_context, *c->mutable_cf_options());
Expand Down Expand Up @@ -2994,8 +2999,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
CompactionJob compaction_job(
job_context->job_id, c.get(), db_options_, env_options_,
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, snapshot_seqs,
earliest_write_conflict_snapshot, table_cache_, &event_logger_,
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_,
&bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->compaction_measure_io_stats, dbname_,
&compaction_job_stats);
Expand All @@ -3006,7 +3012,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
mutex_.Lock();

status = compaction_job.Install(*c->mutable_cf_options(), &mutex_);
status = compaction_job.Install(*c->mutable_cf_options());
if (status.ok()) {
InstallSuperVersionAndScheduleWorkWrapper(
c->column_family_data(), job_context, *c->mutable_cf_options());
Expand Down
98 changes: 97 additions & 1 deletion db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8636,7 +8636,6 @@ TEST_F(DBTest, DeletingOldWalAfterDrop) {
EXPECT_GT(lognum2, lognum1);
}

#ifndef ROCKSDB_LITE
TEST_F(DBTest, DBWithSstFileManager) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Expand Down Expand Up @@ -8701,6 +8700,7 @@ TEST_F(DBTest, DBWithSstFileManager) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

#ifndef ROCKSDB_LITE
TEST_F(DBTest, RateLimitedDelete) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DBTest::RateLimitedDelete:1", "DeleteScheduler::BackgroundEmptyTrash"},
Expand Down Expand Up @@ -8873,6 +8873,102 @@ TEST_F(DBTest, DestroyDBWithRateLimitedDelete) {
}
#endif // ROCKSDB_LITE

TEST_F(DBTest, DBWithMaxSpaceAllowed) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());

Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.disable_auto_compactions = true;
DestroyAndReopen(options);

Random rnd(301);

// Generate a file containing 100 keys.
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 50)));
}
ASSERT_OK(Flush());

uint64_t first_file_size = 0;
auto files_in_db = GetAllSSTFiles(&first_file_size);
ASSERT_EQ(sfm->GetTotalSize(), first_file_size);

// Set the maximum allowed space usage to the current total size
sfm->SetMaxAllowedSpaceUsage(first_file_size + 1);

ASSERT_OK(Put("key1", "val1"));
// This flush will cause bg_error_ and will fail
ASSERT_NOK(Flush());
}

TEST_F(DBTest, DBWithMaxSpaceAllowedRandomized) {
// This test will set a maximum allowed space for the DB, then it will
// keep filling the DB until the limit is reached and bg_error_ is set.
// When bg_error_ is set we will verify that the DB size is greater
// than the limit.

std::vector<int> max_space_limits_mbs = {1, 2, 4, 8, 10};

bool bg_error_set = false;
uint64_t total_sst_files_size = 0;

int reached_max_space_on_flush = 0;
int reached_max_space_on_compaction = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
[&](void* arg) {
bg_error_set = true;
GetAllSSTFiles(&total_sst_files_size);
reached_max_space_on_flush++;
});

rocksdb::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached",
[&](void* arg) {
bg_error_set = true;
GetAllSSTFiles(&total_sst_files_size);
reached_max_space_on_compaction++;
});

for (auto limit_mb : max_space_limits_mbs) {
bg_error_set = false;
total_sst_files_size = 0;
rocksdb::SyncPoint::GetInstance()->ClearTrace();
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());

Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.write_buffer_size = 1024 * 512; // 512 Kb
DestroyAndReopen(options);
Random rnd(301);

sfm->SetMaxAllowedSpaceUsage(limit_mb * 1024 * 1024);

int keys_written = 0;
uint64_t estimated_db_size = 0;
while (true) {
auto s = Put(RandomString(&rnd, 10), RandomString(&rnd, 50));
if (!s.ok()) {
break;
}
keys_written++;
// Check the estimated db size vs the db limit just to make sure we
// dont run into an infinite loop
estimated_db_size = keys_written * 60; // ~60 bytes per key
ASSERT_LT(estimated_db_size, limit_mb * 1024 * 1024 * 2);
}
ASSERT_TRUE(bg_error_set);
ASSERT_GE(total_sst_files_size, limit_mb * 1024 * 1024);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

ASSERT_GT(reached_max_space_on_flush, 0);
ASSERT_GT(reached_max_space_on_compaction, 0);
}

TEST_F(DBTest, UnsupportedManualSync) {
DestroyAndReopen(CurrentOptions());
env_->is_wal_sync_thread_safe_.store(false);
Expand Down
9 changes: 8 additions & 1 deletion db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1015,9 +1015,13 @@ void DBTestBase::CopyFile(const std::string& source,
ASSERT_OK(destfile->Close());
}

std::unordered_map<std::string, uint64_t> DBTestBase::GetAllSSTFiles() {
std::unordered_map<std::string, uint64_t> DBTestBase::GetAllSSTFiles(
uint64_t* total_size) {
std::unordered_map<std::string, uint64_t> res;

if (total_size) {
*total_size = 0;
}
std::vector<std::string> files;
env_->GetChildren(dbname_, &files);
for (auto& file_name : files) {
Expand All @@ -1028,6 +1032,9 @@ std::unordered_map<std::string, uint64_t> DBTestBase::GetAllSSTFiles() {
uint64_t file_size = 0;
env_->GetFileSize(file_path, &file_size);
res[file_path] = file_size;
if (total_size) {
*total_size += file_size;
}
}
}
return res;
Expand Down
3 changes: 2 additions & 1 deletion db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,8 @@ class DBTestBase : public testing::Test {
void CopyFile(const std::string& source, const std::string& destination,
uint64_t size = 0);

std::unordered_map<std::string, uint64_t> GetAllSSTFiles();
std::unordered_map<std::string, uint64_t> GetAllSSTFiles(
uint64_t* total_size = nullptr);
};

} // namespace rocksdb
Loading

0 comments on commit df9ba6d

Please sign in to comment.