Skip to content

Commit

Permalink
Pass Connector Properties in FileSink Options (facebookincubator#6325)
Browse files Browse the repository at this point in the history
Summary:
For a FileSink on S3 storage, we need the Connector Properties to create the FileSink.

Pull Request resolved: facebookincubator#6325

Reviewed By: kewang1024

Differential Revision: D48859308

Pulled By: xiaoxmeng

fbshipit-source-id: 336e254f0913c9c0f94c60eb8bb91862ea4ab382
  • Loading branch information
majetideepak authored and facebook-github-bot committed Sep 1, 2023
1 parent 53c3a26 commit dc21b2e
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 83 deletions.
6 changes: 5 additions & 1 deletion velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ std::unique_ptr<DataSink> HiveConnector::createDataSink(
VELOX_CHECK_NOT_NULL(
hiveInsertHandle, "Hive connector expecting hive write handle!");
return std::make_unique<HiveDataSink>(
inputType, hiveInsertHandle, connectorQueryCtx, commitStrategy);
inputType,
hiveInsertHandle,
connectorQueryCtx,
commitStrategy,
connectorProperties());
}

std::unique_ptr<core::PartitionFunction> HivePartitionFunctionSpec::create(
Expand Down
5 changes: 4 additions & 1 deletion velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,13 @@ HiveDataSink::HiveDataSink(
RowTypePtr inputType,
std::shared_ptr<const HiveInsertTableHandle> insertTableHandle,
const ConnectorQueryCtx* connectorQueryCtx,
CommitStrategy commitStrategy)
CommitStrategy commitStrategy,
const std::shared_ptr<const Config>& connectorProperties)
: inputType_(std::move(inputType)),
insertTableHandle_(std::move(insertTableHandle)),
connectorQueryCtx_(connectorQueryCtx),
commitStrategy_(commitStrategy),
connectorProperties_(connectorProperties),
maxOpenWriters_(
HiveConfig::maxPartitionsPerWriters(connectorQueryCtx_->config())),
partitionChannels_(getPartitionChannels(insertTableHandle_)),
Expand Down Expand Up @@ -446,6 +448,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
dwio::common::FileSink::create(
writePath,
{.bufferWrite = false,
.connectorProperties = connectorProperties_,
.pool = connectorQueryCtx_->memoryPool(),
.metricLogger = dwio::common::MetricsLog::voidLog(),
.stats = ioStats_.back().get()}),
Expand Down
6 changes: 4 additions & 2 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,8 @@ class HiveDataSink : public DataSink {
RowTypePtr inputType,
std::shared_ptr<const HiveInsertTableHandle> insertTableHandle,
const ConnectorQueryCtx* connectorQueryCtx,
CommitStrategy commitStrategy);
CommitStrategy commitStrategy,
const std::shared_ptr<const Config>& connectorProperties);

static uint32_t maxBucketCount() {
static const uint32_t kMaxBucketCount = 100'000;
Expand Down Expand Up @@ -455,8 +456,9 @@ class HiveDataSink : public DataSink {

const RowTypePtr inputType_;
const std::shared_ptr<const HiveInsertTableHandle> insertTableHandle_;
const ConnectorQueryCtx* connectorQueryCtx_;
const ConnectorQueryCtx* const connectorQueryCtx_;
const CommitStrategy commitStrategy_;
const std::shared_ptr<const Config> connectorProperties_;
const uint32_t maxOpenWriters_;
const std::vector<column_index_t> partitionChannels_;
const std::unique_ptr<PartitionIdGenerator> partitionIdGenerator_;
Expand Down
9 changes: 9 additions & 0 deletions velox/dwio/common/FileSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
#include "velox/dwio/common/IoStatistics.h"
#include "velox/dwio/common/MetricsLog.h"

namespace facebook::velox {
class Config;
}

namespace facebook::velox::dwio::common {

/// An abstract interface for providing a file write sink to different storage
Expand All @@ -33,13 +37,17 @@ class FileSink : public Closeable {
struct Options {
/// If true, allows file sink to buffer data before persist to storage.
bool bufferWrite{true};
/// Connector properties are required to create a FileSink on FileSystems
/// such as S3.
const std::shared_ptr<const Config>& connectorProperties{nullptr};
memory::MemoryPool* pool{nullptr};
MetricsLogPtr metricLogger{MetricsLog::voidLog()};
IoStatistics* stats{nullptr};
};

FileSink(std::string name, const Options& options)
: name_{std::move(name)},
connectorProperties_{options.connectorProperties},
pool_(options.pool),
metricLogger_{options.metricLogger},
stats_{options.stats},
Expand Down Expand Up @@ -98,6 +106,7 @@ class FileSink : public Closeable {
const std::function<uint64_t(const DataBuffer<char>&)>& callback);

const std::string name_;
const std::shared_ptr<const Config> connectorProperties_;
memory::MemoryPool* const pool_;
const MetricsLogPtr metricLogger_;
IoStatistics* const stats_;
Expand Down
10 changes: 6 additions & 4 deletions velox/dwio/dwrf/test/ColumnWriterStatsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,13 @@ class ColumnWriterStatsTest : public ::testing::Test {
dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto sinkPtr = sink.get();

auto config = std::make_shared<Config>();
config->set(Config::ROW_INDEX_STRIDE, folly::to<uint32_t>(batch->size()));
auto config = std::make_shared<dwrf::Config>();
config->set(
dwrf::Config::ROW_INDEX_STRIDE, folly::to<uint32_t>(batch->size()));
if (flatMapColId >= 0) {
config->set(Config::FLATTEN_MAP, true);
config->set(Config::MAP_FLAT_COLS, {folly::to<uint32_t>(flatMapColId)});
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set(
dwrf::Config::MAP_FLAT_COLS, {folly::to<uint32_t>(flatMapColId)});
}
dwrf::WriterOptions options;
options.config = config;
Expand Down
134 changes: 69 additions & 65 deletions velox/dwio/dwrf/test/E2EWriterTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ class E2EWriterTests : public Test {
size_t stripes = 3;

// write file to memory
auto config = std::make_shared<Config>();
config->set(Config::FLATTEN_MAP, true);
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set<const std::vector<uint32_t>>(
Config::MAP_FLAT_COLS, mapColumnIds);
config->set(Config::MAP_STATISTICS, true);
dwrf::Config::MAP_FLAT_COLS, mapColumnIds);
config->set(dwrf::Config::MAP_STATISTICS, true);

auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
Expand Down Expand Up @@ -126,13 +126,13 @@ class E2EWriterTests : public Test {
size_t stripes = 3;

// write file to memory
auto config = std::make_shared<Config>();
auto config = std::make_shared<dwrf::Config>();
// Ensure we cross stride boundary
config->set(Config::ROW_INDEX_STRIDE, strideSize);
config->set(Config::FLATTEN_MAP, true);
config->set(dwrf::Config::ROW_INDEX_STRIDE, strideSize);
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set<const std::vector<uint32_t>>(
Config::MAP_FLAT_COLS, mapColumnIds);
config->set(Config::MAP_STATISTICS, true);
dwrf::Config::MAP_FLAT_COLS, mapColumnIds);
config->set(dwrf::Config::MAP_STATISTICS, true);

auto sink = std::make_unique<MemorySink>(
400 * 1024 * 1024,
Expand Down Expand Up @@ -263,10 +263,11 @@ TEST_F(E2EWriterTests, DISABLED_TestFileCreation) {
"struct_val:struct<a:float,b:double>"
">");

auto config = std::make_shared<Config>();
config->set(Config::FLATTEN_MAP, true);
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set(
Config::MAP_FLAT_COLS, {12, 13}); /* this is the second and third map */
dwrf::Config::MAP_FLAT_COLS,
{12, 13}); /* this is the second and third map */

std::vector<VectorPtr> batches;
for (size_t i = 0; i < batchCount; ++i) {
Expand Down Expand Up @@ -325,11 +326,12 @@ TEST_F(E2EWriterTests, E2E) {
"struct_val:struct<a:float,b:double>"
">");

auto config = std::make_shared<Config>();
config->set(Config::ROW_INDEX_STRIDE, static_cast<uint32_t>(1000));
config->set(Config::FLATTEN_MAP, true);
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::ROW_INDEX_STRIDE, static_cast<uint32_t>(1000));
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set(
Config::MAP_FLAT_COLS, {12, 13}); /* this is the second and third map */
dwrf::Config::MAP_FLAT_COLS,
{12, 13}); /* this is the second and third map */

std::vector<VectorPtr> batches;
for (size_t i = 0; i < batchCount; ++i) {
Expand Down Expand Up @@ -358,14 +360,14 @@ TEST_F(E2EWriterTests, FlatMapDictionaryEncoding) {
"map_val:map<bigint,map<int, string>>"
">");

auto config = std::make_shared<Config>();
config->set(Config::ROW_INDEX_STRIDE, static_cast<uint32_t>(1000));
config->set(Config::FLATTEN_MAP, true);
config->set(Config::MAP_FLAT_COLS, {0, 1, 2, 3, 4});
config->set(Config::MAP_FLAT_DISABLE_DICT_ENCODING, false);
config->set(Config::DICTIONARY_NUMERIC_KEY_SIZE_THRESHOLD, 1.0f);
config->set(Config::DICTIONARY_STRING_KEY_SIZE_THRESHOLD, 1.0f);
config->set(Config::ENTROPY_KEY_STRING_SIZE_THRESHOLD, 0.0f);
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::ROW_INDEX_STRIDE, static_cast<uint32_t>(1000));
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set(dwrf::Config::MAP_FLAT_COLS, {0, 1, 2, 3, 4});
config->set(dwrf::Config::MAP_FLAT_DISABLE_DICT_ENCODING, false);
config->set(dwrf::Config::DICTIONARY_NUMERIC_KEY_SIZE_THRESHOLD, 1.0f);
config->set(dwrf::Config::DICTIONARY_STRING_KEY_SIZE_THRESHOLD, 1.0f);
config->set(dwrf::Config::ENTROPY_KEY_STRING_SIZE_THRESHOLD, 0.0f);

std::vector<VectorPtr> batches;
std::mt19937 gen;
Expand Down Expand Up @@ -396,10 +398,10 @@ TEST_F(E2EWriterTests, MaxFlatMapKeys) {
auto batch =
createRowVector(pool.get(), type, 1, b::create(*pool, b::rows{row}));

auto config = std::make_shared<Config>();
config->set(Config::FLATTEN_MAP, true);
config->set(Config::MAP_FLAT_COLS, {0});
config->set(Config::MAP_FLAT_MAX_KEYS, keyLimit);
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set(dwrf::Config::MAP_FLAT_COLS, {0});
config->set(dwrf::Config::MAP_FLAT_MAX_KEYS, keyLimit);

E2EWriterTestUtil::testWriter(
*pool, type, E2EWriterTestUtil::generateBatches(batch), 1, 1, config);
Expand All @@ -420,9 +422,9 @@ TEST_F(E2EWriterTests, PresentStreamIsSuppressedOnFlatMap) {
auto batch =
createRowVector(pool.get(), type, 1, b::create(*pool, b::rows{row}));

auto config = std::make_shared<Config>();
config->set(Config::FLATTEN_MAP, true);
config->set(Config::MAP_FLAT_COLS, {0});
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set(dwrf::Config::MAP_FLAT_COLS, {0});

auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
Expand Down Expand Up @@ -471,10 +473,10 @@ TEST_F(E2EWriterTests, TooManyFlatMapKeys) {
auto batch =
createRowVector(pool.get(), type, 1, b::create(*pool, b::rows{row}));

auto config = std::make_shared<Config>();
config->set(Config::FLATTEN_MAP, true);
config->set(Config::MAP_FLAT_COLS, {0});
config->set(Config::MAP_FLAT_MAX_KEYS, keyLimit);
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set(dwrf::Config::MAP_FLAT_COLS, {0});
config->set(dwrf::Config::MAP_FLAT_MAX_KEYS, keyLimit);

EXPECT_THROW(
E2EWriterTestUtil::testWriter(
Expand Down Expand Up @@ -527,10 +529,10 @@ TEST_F(E2EWriterTests, FlatMapBackfill) {
batches.push_back(batch);
// TODO: Add another batch inside last stride, to test for backfill in stride.

auto config = std::make_shared<Config>();
config->set(Config::FLATTEN_MAP, true);
config->set(Config::MAP_FLAT_COLS, {0});
config->set(Config::ROW_INDEX_STRIDE, strideSize);
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set(dwrf::Config::MAP_FLAT_COLS, {0});
config->set(dwrf::Config::ROW_INDEX_STRIDE, strideSize);

E2EWriterTestUtil::testWriter(
*pool,
Expand Down Expand Up @@ -572,13 +574,14 @@ void testFlatMapWithNulls(
pool.get(), type, rowCount, b::create(*pool, std::move(rows)));
batches.push_back(batch);

auto config = std::make_shared<Config>();
config->set(Config::FLATTEN_MAP, true);
config->set(Config::MAP_FLAT_COLS, {0});
config->set(Config::ROW_INDEX_STRIDE, strideSize);
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set(dwrf::Config::MAP_FLAT_COLS, {0});
config->set(dwrf::Config::ROW_INDEX_STRIDE, strideSize);
config->set(
Config::MAP_FLAT_DISABLE_DICT_ENCODING, !enableFlatmapDictionaryEncoding);
config->set(Config::MAP_FLAT_DICT_SHARE, shareDictionary);
dwrf::Config::MAP_FLAT_DISABLE_DICT_ENCODING,
!enableFlatmapDictionaryEncoding);
config->set(dwrf::Config::MAP_FLAT_DICT_SHARE, shareDictionary);

E2EWriterTestUtil::testWriter(
*pool,
Expand Down Expand Up @@ -638,10 +641,10 @@ TEST_F(E2EWriterTests, FlatMapEmpty) {
pool.get(), type, rowCount, b::create(*pool, std::move(rows)));
batches.push_back(batch);

auto config = std::make_shared<Config>();
config->set(Config::FLATTEN_MAP, true);
config->set(Config::MAP_FLAT_COLS, {0});
config->set(Config::ROW_INDEX_STRIDE, strideSize);
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set(dwrf::Config::MAP_FLAT_COLS, {0});
config->set(dwrf::Config::ROW_INDEX_STRIDE, strideSize);

E2EWriterTestUtil::testWriter(
*pool,
Expand Down Expand Up @@ -759,7 +762,7 @@ TEST_F(E2EWriterTests, PartialStride) {

size_t size = 1'000;

auto config = std::make_shared<Config>();
auto config = std::make_shared<dwrf::Config>();
auto sink = std::make_unique<MemorySink>(
2 * 1024 * 1024,
dwio::common::FileSink::Options{.pool = leafPool_.get()});
Expand Down Expand Up @@ -826,11 +829,11 @@ TEST_F(E2EWriterTests, OversizeRows) {
"map_val_field_2:map<string, map<string, map<string, map<string, string>>>>"
">,"
">");
auto config = std::make_shared<Config>();
config->set(Config::DISABLE_LOW_MEMORY_MODE, true);
config->set(Config::STRIPE_SIZE, 10 * kSizeMB);
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::DISABLE_LOW_MEMORY_MODE, true);
config->set(dwrf::Config::STRIPE_SIZE, 10 * kSizeMB);
config->set(
Config::RAW_DATA_SIZE_PER_BATCH, folly::to<uint64_t>(20 * 1024UL));
dwrf::Config::RAW_DATA_SIZE_PER_BATCH, folly::to<uint64_t>(20 * 1024UL));

// Retained bytes in vector: 44704
auto singleBatch = E2EWriterTestUtil::generateBatches(
Expand Down Expand Up @@ -860,9 +863,9 @@ TEST_F(E2EWriterTests, OversizeBatches) {
"float_val:float,"
"double_val:double,"
">");
auto config = std::make_shared<Config>();
config->set(Config::DISABLE_LOW_MEMORY_MODE, true);
config->set(Config::STRIPE_SIZE, 10 * kSizeMB);
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::DISABLE_LOW_MEMORY_MODE, true);
config->set(dwrf::Config::STRIPE_SIZE, 10 * kSizeMB);

// Test splitting a gigantic batch.
auto singleBatch = E2EWriterTestUtil::generateBatches(
Expand Down Expand Up @@ -905,11 +908,11 @@ TEST_F(E2EWriterTests, OverflowLengthIncrements) {
"struct<"
"struct_val:struct<bigint_val:bigint>"
">");
auto config = std::make_shared<Config>();
config->set(Config::DISABLE_LOW_MEMORY_MODE, true);
config->set(Config::STRIPE_SIZE, 10 * kSizeMB);
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::DISABLE_LOW_MEMORY_MODE, true);
config->set(dwrf::Config::STRIPE_SIZE, 10 * kSizeMB);
config->set(
Config::RAW_DATA_SIZE_PER_BATCH,
dwrf::Config::RAW_DATA_SIZE_PER_BATCH,
folly::to<uint64_t>(500 * 1024UL * 1024UL));

const size_t size = 1024;
Expand Down Expand Up @@ -1238,7 +1241,8 @@ void testWriter(
const std::shared_ptr<const Type>& type,
size_t batchCount,
std::function<VectorPtr()> generator,
const std::shared_ptr<Config> config = std::make_shared<Config>()) {
const std::shared_ptr<dwrf::Config> config =
std::make_shared<dwrf::Config>()) {
std::vector<VectorPtr> batches;
for (auto i = 0; i < batchCount; ++i) {
batches.push_back(generator());
Expand Down Expand Up @@ -1418,9 +1422,9 @@ TEST_F(E2EWriterTests, fuzzFlatmap) {
{"flatmap2", MAP(VARCHAR(), ARRAY(REAL()))},
{"flatmap3", MAP(INTEGER(), MAP(INTEGER(), REAL()))},
});
auto config = std::make_shared<Config>();
config->set(Config::FLATTEN_MAP, true);
config->set(Config::MAP_FLAT_COLS, {0, 1, 2});
auto config = std::make_shared<dwrf::Config>();
config->set(dwrf::Config::FLATTEN_MAP, true);
config->set(dwrf::Config::MAP_FLAT_COLS, {0, 1, 2});
auto seed = folly::Random::rand32();
LOG(INFO) << "seed: " << seed;
std::mt19937 rng{seed};
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/test/FloatColumnWriterBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void runBenchmark(int nullEvery) {
braces.dismiss();

for (auto i = 0; i < kNumIterations; i++) {
auto config = std::make_shared<Config>();
auto config = std::make_shared<dwrf::Config>();
WriterContext context{
config,
memory::defaultMemoryManager().addRootPool(
Expand Down
Loading

0 comments on commit dc21b2e

Please sign in to comment.