Skip to content

Commit

Permalink
add executorBarrier to nimble parallel writer (#102)
Browse files Browse the repository at this point in the history
Summary:

adding executor barrier for the fieldwriter for parallelism

Differential Revision: D64775045
  • Loading branch information
Scott Young authored and facebook-github-bot committed Nov 25, 2024
1 parent 9c87aaf commit 0fbc21f
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 11 deletions.
55 changes: 48 additions & 7 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include "dwio/nimble/velox/SchemaBuilder.h"
#include "dwio/nimble/velox/SchemaTypes.h"
#include "folly/ScopeGuard.h"
#include "folly/concurrency/ConcurrentHashMap.h"
#include "velox/common/base/CompareFlags.h"
#include "velox/dwio/common/ExecutorBarrier.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DictionaryVector.h"
#include "velox/vector/FlatVector.h"
Expand Down Expand Up @@ -369,6 +371,11 @@ class RowFieldWriter : public FieldWriter {
: FieldWriter{context, context.schemaBuilder.createRowTypeBuilder(type->size())},
nullsStream_{context_.createNullsStreamData<bool>(
typeBuilder_->asRow().nullsDescriptor())} {
if (context.writeExecutor) {
barrier_ = std::make_unique<velox::dwio::common::ExecutorBarrier>(
std::move(context_.writeExecutor));
}

auto rowType =
std::dynamic_pointer_cast<const velox::RowType>(type->type());

Expand Down Expand Up @@ -417,8 +424,26 @@ class RowFieldWriter : public FieldWriter {
Decoded{decoded},
[&](auto offset) { childRanges.add(offset, 1); });
}
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);

if (barrier_) {
for (auto i = 0; i < fields_.size(); ++i) {
const auto& kind = fields_[i]->typeBuilder()->kind();
if (kind == Kind::FlatMap) {
// if flatmap handle within due to fieldvaluewriter creation
fields_[i]->write(row->childAt(i), *childRangesPtr);
} else {
barrier_->add([&field = fields_[i],
&rowItem = row->childAt(i),
&childRanges = *childRangesPtr]() {
field->write(rowItem, childRanges);
});
}
}
barrier_->waitAll();
} else {
for (auto i = 0; i < fields_.size(); ++i) {
fields_[i]->write(row->childAt(i), *childRangesPtr);
}
}
}

Expand All @@ -437,6 +462,7 @@ class RowFieldWriter : public FieldWriter {
}

private:
std::unique_ptr<velox::dwio::common::ExecutorBarrier> barrier_;
std::vector<std::unique_ptr<FieldWriter>> fields_;
NullsStreamData& nullsStream_;
};
Expand Down Expand Up @@ -836,7 +862,12 @@ class FlatMapFieldWriter : public FieldWriter {
NimbleTypeTraits<K>::scalarKind)),
nullsStream_{context_.createNullsStreamData<bool>(
typeBuilder_->asFlatMap().nullsDescriptor())},
valueType_{type->childAt(1)} {}
valueType_{type->childAt(1)} {
if (context.writeExecutor) {
barrier_ = std::make_unique<velox::dwio::common::ExecutorBarrier>(
std::move(context.writeExecutor));
}
}

void write(const velox::VectorPtr& vector, const OrderedRanges& ranges)
override {
Expand Down Expand Up @@ -999,8 +1030,16 @@ class FlatMapFieldWriter : public FieldWriter {
// Now actually ingest the map values
if (nonNullCount > 0) {
auto& values = map->mapValues();
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);

if (barrier_) {
for (auto& pair : currentValueFields_) {
barrier_->add([&]() { pair.second->write(values, nonNullCount); });
}
barrier_->waitAll();
} else {
for (auto& pair : currentValueFields_) {
pair.second->write(values, nonNullCount);
}
}
}
nonNullCount_ += nonNullCount;
Expand Down Expand Up @@ -1037,6 +1076,7 @@ class FlatMapFieldWriter : public FieldWriter {
}

private:
std::unique_ptr<velox::dwio::common::ExecutorBarrier> barrier_;
FlatMapValueFieldWriter* getValueFieldWriter(KeyType key, uint32_t size) {
auto it = currentValueFields_.find(key);
if (it != currentValueFields_.end()) {
Expand Down Expand Up @@ -1075,7 +1115,8 @@ class FlatMapFieldWriter : public FieldWriter {

NullsStreamData& nullsStream_;
// This map store the FlatMapValue fields used in current flush unit.
folly::F14FastMap<KeyType, FlatMapValueFieldWriter*> currentValueFields_;
folly::ConcurrentHashMap<KeyType, FlatMapValueFieldWriter*>
currentValueFields_;

// This map stores the FlatMapPassthrough fields.
folly::F14FastMap<
Expand All @@ -1086,7 +1127,7 @@ class FlatMapFieldWriter : public FieldWriter {
uint64_t nonNullCount_ = 0;
// This map store all FlatMapValue fields encountered by the VeloxWriter
// across the whole file.
folly::F14FastMap<KeyType, std::unique_ptr<FlatMapValueFieldWriter>>
folly::ConcurrentHashMap<KeyType, std::unique_ptr<FlatMapValueFieldWriter>>
allValueFields_;
};

Expand Down
3 changes: 3 additions & 0 deletions dwio/nimble/velox/FieldWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,14 @@ class DecodingContextPool {
struct FieldWriterContext {
explicit FieldWriterContext(
velox::memory::MemoryPool& memoryPool,
std::shared_ptr<folly::Executor> writeExecutor = nullptr,
std::unique_ptr<velox::memory::MemoryReclaimer> reclaimer = nullptr,
std::function<void(void)> vectorDecoderVisitor = []() {})
: bufferMemoryPool{memoryPool.addLeafChild(
"field_writer_buffer",
true,
std::move(reclaimer))},
writeExecutor{std::move(writeExecutor)},
inputBufferGrowthPolicy{
DefaultInputBufferGrowthPolicy::withDefaultRanges()},
decodingContextPool_{std::make_unique<DecodingContextPool>(
Expand All @@ -107,6 +109,7 @@ struct FieldWriterContext {
}

std::shared_ptr<velox::memory::MemoryPool> bufferMemoryPool;
std::shared_ptr<folly::Executor> writeExecutor;
SchemaBuilder schemaBuilder;

folly::F14FastSet<uint32_t> flatMapNodeIds;
Expand Down
7 changes: 6 additions & 1 deletion dwio/nimble/velox/VeloxWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ class WriterContext : public FieldWriterContext {
WriterContext(
velox::memory::MemoryPool& memoryPool,
VeloxWriterOptions options)
: FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor},
: FieldWriterContext{
memoryPool,
options.writeExecutor,
options.reclaimerFactory(),
options.vectorDecoderVisitor
},
options{std::move(options)},
logger{this->options.metricsLogger} {
flushPolicy = this->options.flushPolicyFactory();
Expand Down
2 changes: 2 additions & 0 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ struct VeloxWriterOptions {
// If provided, internal encoding operations will happen in parallel using
// this executor.
std::shared_ptr<folly::Executor> encodingExecutor;
// If provided, internal write operations will happen in parallel
std::shared_ptr<folly::Executor> writeExecutor;

bool enableChunking = false;

Expand Down
11 changes: 8 additions & 3 deletions dwio/nimble/velox/tests/VeloxReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2433,6 +2433,8 @@ TEST_F(VeloxReaderTests, FuzzSimple) {
if (parallelismFactor > 0) {
writerOptions.encodingExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.writeExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
}

for (auto i = 0; i < iterations; ++i) {
Expand Down Expand Up @@ -2519,9 +2521,12 @@ TEST_F(VeloxReaderTests, FuzzComplex) {
for (auto parallelismFactor :
{0U, 1U, 2U, std::thread::hardware_concurrency()}) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
writerOptions.encodingExecutor = parallelismFactor > 0
? std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor)
: nullptr;
if (parallelismFactor > 0) {
writerOptions.encodingExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.writeExecutor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
}

for (auto i = 0; i < iterations; ++i) {
writeAndVerify(
Expand Down

0 comments on commit 0fbc21f

Please sign in to comment.