Skip to content

Commit

Permalink
Support writes during ingestion (#400)
Browse files Browse the repository at this point in the history
 

Signed-off-by: hhwyt <[email protected]>
  • Loading branch information
hhwyt authored Jan 20, 2025
1 parent f65a78f commit f86a90f
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 65 deletions.
53 changes: 35 additions & 18 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5735,6 +5735,16 @@ Status DBImpl::IngestExternalFiles(
if (args.empty()) {
return Status::InvalidArgument("ingestion arg list is empty");
}
// Supported only when all args have the consistent `allow_write` behavior, as
// `allow_write` determines whether stopping writes to the DB affects all
// args.
bool allow_write = args[0].options.allow_write;
for (const auto& arg : args) {
if (arg.options.allow_write != allow_write) {
return Status::InvalidArgument(
"Inconsistent allow_writes values across ingestion arguments");
}
}
{
std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
for (const auto& arg : args) {
Expand Down Expand Up @@ -5843,20 +5853,25 @@ Status DBImpl::IngestExternalFiles(
InstrumentedMutexLock l(&mutex_);
TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");

// Stop writes to the DB by entering both write threads
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w;
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
if (!allow_write) {
// Stop writes to the DB by entering both write threads.
write_thread_.EnterUnbatched(&w, &mutex_);
if (two_write_queues_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}

// When unordered_write is enabled, the keys are writing to memtable in an
// unordered way. If the ingestion job checks memtable key range before
// the key landing in memtable, the ingestion job may skip the necessary
// memtable flush.
// So wait here to ensure there is no pending write to memtable.
WaitForPendingWrites();
}

// When unordered_write is enabled, the keys are writing to memtable in an
// unordered way. If the ingestion job checks memtable key range before the
// key landing in memtable, the ingestion job may skip the necessary
// memtable flush.
// So wait here to ensure there is no pending write to memtable.
WaitForPendingWrites();
TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:AfterAllowWriteCheck",
nullptr);

num_running_ingest_file_ += static_cast<int>(num_cfs);
TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
Expand Down Expand Up @@ -5890,9 +5905,10 @@ Status DBImpl::IngestExternalFiles(
flush_opts.check_if_compaction_disabled = true;
if (immutable_db_options_.atomic_flush) {
mutex_.Unlock();
status = AtomicFlushMemTables(
flush_opts, FlushReason::kExternalFileIngestion,
{} /* provided_candidate_cfds */, true /* entered_write_thread */);
status = AtomicFlushMemTables(flush_opts,
FlushReason::kExternalFileIngestion,
{} /* provided_candidate_cfds */,
!allow_write /* entered_write_thread */);
mutex_.Lock();
} else {
for (size_t i = 0; i != num_cfs; ++i) {
Expand All @@ -5903,7 +5919,7 @@ Status DBImpl::IngestExternalFiles(
->cfd();
status = FlushMemTable(cfd, flush_opts,
FlushReason::kExternalFileIngestion,
true /* entered_write_thread */);
!allow_write /* entered_write_thread */);
mutex_.Lock();
if (!status.ok()) {
break;
Expand Down Expand Up @@ -6008,11 +6024,12 @@ Status DBImpl::IngestExternalFiles(
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
}

// Resume writes to the DB
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
if (!allow_write) {
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);
}
write_thread_.ExitUnbatched(&w);

if (status.ok()) {
for (auto& job : ingestion_jobs) {
Expand Down
Loading

0 comments on commit f86a90f

Please sign in to comment.