Skip to content

Commit

Permalink
Support ingest without pausing writes
Browse files Browse the repository at this point in the history
  • Loading branch information
hhwyt committed Jan 7, 2025
1 parent e255444 commit 7340dac
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 55 deletions.
39 changes: 31 additions & 8 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5735,6 +5735,16 @@ Status DBImpl::IngestExternalFiles(
if (args.empty()) {
return Status::InvalidArgument("ingestion arg list is empty");
}
// Supported only when all args have the consistent `allow_write` behavior, as
// `allow_write` determines whether stopping writes to the DB affects all
// args.
bool allow_write = args[0].options.allow_write;
for (const auto& arg : args) {
if (arg.options.allow_write != allow_write) {
return Status::InvalidArgument(
"Inconsistent allow_writes values across ingestion arguments");
}
}
{
std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
for (const auto& arg : args) {
Expand Down Expand Up @@ -5858,6 +5868,17 @@ Status DBImpl::IngestExternalFiles(
// So wait here to ensure there is no pending write to memtable.
WaitForPendingWrites();

if (allow_write) {
// If allow_write is true, writes to the DB are resumed here,
// allowing users to write normally during the subsequent ingest process.
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);
}
TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:AfterPendingWrites",
nullptr);

num_running_ingest_file_ += static_cast<int>(num_cfs);
TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");

Expand Down Expand Up @@ -5890,9 +5911,10 @@ Status DBImpl::IngestExternalFiles(
flush_opts.check_if_compaction_disabled = true;
if (immutable_db_options_.atomic_flush) {
mutex_.Unlock();
status = AtomicFlushMemTables(
flush_opts, FlushReason::kExternalFileIngestion,
{} /* provided_candidate_cfds */, true /* entered_write_thread */);
status = AtomicFlushMemTables(flush_opts,
FlushReason::kExternalFileIngestion,
{} /* provided_candidate_cfds */,
!allow_write /* entered_write_thread */);
mutex_.Lock();
} else {
for (size_t i = 0; i != num_cfs; ++i) {
Expand All @@ -5903,7 +5925,7 @@ Status DBImpl::IngestExternalFiles(
->cfd();
status = FlushMemTable(cfd, flush_opts,
FlushReason::kExternalFileIngestion,
true /* entered_write_thread */);
!allow_write /* entered_write_thread */);
mutex_.Lock();
if (!status.ok()) {
break;
Expand Down Expand Up @@ -6008,11 +6030,12 @@ Status DBImpl::IngestExternalFiles(
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
}

// Resume writes to the DB
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
if (!allow_write) {
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w);
}
write_thread_.ExitUnbatched(&w);

if (status.ok()) {
for (auto& job : ingestion_jobs) {
Expand Down
Loading

0 comments on commit 7340dac

Please sign in to comment.