From dc21b2ee32c4e9ba0b04834d780c023c2b32a065 Mon Sep 17 00:00:00 2001 From: Deepak Majeti Date: Thu, 31 Aug 2023 18:05:38 -0700 Subject: [PATCH] Pass Connector Properties in FileSink Options (#6325) Summary: For a FileSink on S3 storage, we need the Connector Properties to create the FileSink. Pull Request resolved: https://github.com/facebookincubator/velox/pull/6325 Reviewed By: kewang1024 Differential Revision: D48859308 Pulled By: xiaoxmeng fbshipit-source-id: 336e254f0913c9c0f94c60eb8bb91862ea4ab382 --- velox/connectors/hive/HiveConnector.cpp | 6 +- velox/connectors/hive/HiveDataSink.cpp | 5 +- velox/connectors/hive/HiveDataSink.h | 6 +- velox/dwio/common/FileSink.h | 9 ++ .../dwio/dwrf/test/ColumnWriterStatsTests.cpp | 10 +- velox/dwio/dwrf/test/E2EWriterTests.cpp | 134 +++++++++--------- .../dwrf/test/FloatColumnWriterBenchmark.cpp | 2 +- velox/dwio/dwrf/test/ReaderTest.cpp | 18 +-- 8 files changed, 107 insertions(+), 83 deletions(-) diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 89c470d8f0dc..0e24735e235c 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -105,7 +105,11 @@ std::unique_ptr HiveConnector::createDataSink( VELOX_CHECK_NOT_NULL( hiveInsertHandle, "Hive connector expecting hive write handle!"); return std::make_unique( - inputType, hiveInsertHandle, connectorQueryCtx, commitStrategy); + inputType, + hiveInsertHandle, + connectorQueryCtx, + commitStrategy, + connectorProperties()); } std::unique_ptr HivePartitionFunctionSpec::create( diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index fb186ce99c13..3b4fc25ac6b1 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -272,11 +272,13 @@ HiveDataSink::HiveDataSink( RowTypePtr inputType, std::shared_ptr insertTableHandle, const ConnectorQueryCtx* connectorQueryCtx, - CommitStrategy commitStrategy) + CommitStrategy commitStrategy, + const std::shared_ptr& connectorProperties) : inputType_(std::move(inputType)), insertTableHandle_(std::move(insertTableHandle)), connectorQueryCtx_(connectorQueryCtx), commitStrategy_(commitStrategy), + connectorProperties_(connectorProperties), maxOpenWriters_( HiveConfig::maxPartitionsPerWriters(connectorQueryCtx_->config())), partitionChannels_(getPartitionChannels(insertTableHandle_)), @@ -446,6 +448,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) { dwio::common::FileSink::create( writePath, {.bufferWrite = false, + .connectorProperties = connectorProperties_, .pool = connectorQueryCtx_->memoryPool(), .metricLogger = dwio::common::MetricsLog::voidLog(), .stats = ioStats_.back().get()}), diff --git a/velox/connectors/hive/HiveDataSink.h b/velox/connectors/hive/HiveDataSink.h index c48043e6b3d2..b3d526c282c7 100644 --- a/velox/connectors/hive/HiveDataSink.h +++ b/velox/connectors/hive/HiveDataSink.h @@ -390,7 +390,8 @@ class HiveDataSink : public DataSink { RowTypePtr inputType, std::shared_ptr insertTableHandle, const ConnectorQueryCtx* connectorQueryCtx, - CommitStrategy commitStrategy); + CommitStrategy commitStrategy, + const std::shared_ptr& connectorProperties); static uint32_t maxBucketCount() { static const uint32_t kMaxBucketCount = 100'000; @@ -455,8 +456,9 @@ class HiveDataSink : public DataSink { const RowTypePtr inputType_; const std::shared_ptr insertTableHandle_; - const ConnectorQueryCtx* connectorQueryCtx_; + const ConnectorQueryCtx* const connectorQueryCtx_; const CommitStrategy commitStrategy_; + const std::shared_ptr connectorProperties_; const uint32_t maxOpenWriters_; const std::vector partitionChannels_; const std::unique_ptr partitionIdGenerator_; diff --git a/velox/dwio/common/FileSink.h b/velox/dwio/common/FileSink.h index a307612f7ade..77fc53b95abd 100644 --- a/velox/dwio/common/FileSink.h +++ b/velox/dwio/common/FileSink.h @@ -24,6 +24,10 @@ #include "velox/dwio/common/IoStatistics.h" #include "velox/dwio/common/MetricsLog.h" +namespace facebook::velox { +class Config; +} + namespace facebook::velox::dwio::common { /// An abstract interface for providing a file write sink to different storage @@ -33,6 +37,9 @@ class FileSink : public Closeable { struct Options { /// If true, allows file sink to buffer data before persist to storage. bool bufferWrite{true}; + /// Connector properties are required to create a FileSink on FileSystems + /// such as S3. + const std::shared_ptr& connectorProperties{nullptr}; memory::MemoryPool* pool{nullptr}; MetricsLogPtr metricLogger{MetricsLog::voidLog()}; IoStatistics* stats{nullptr}; @@ -40,6 +47,7 @@ class FileSink : public Closeable { FileSink(std::string name, const Options& options) : name_{std::move(name)}, + connectorProperties_{options.connectorProperties}, pool_(options.pool), metricLogger_{options.metricLogger}, stats_{options.stats}, @@ -98,6 +106,7 @@ class FileSink : public Closeable { const std::function&)>& callback); const std::string name_; + const std::shared_ptr connectorProperties_; memory::MemoryPool* const pool_; const MetricsLogPtr metricLogger_; IoStatistics* const stats_; diff --git a/velox/dwio/dwrf/test/ColumnWriterStatsTests.cpp b/velox/dwio/dwrf/test/ColumnWriterStatsTests.cpp index 4f607840fb93..d1595eafd9eb 100644 --- a/velox/dwio/dwrf/test/ColumnWriterStatsTests.cpp +++ b/velox/dwio/dwrf/test/ColumnWriterStatsTests.cpp @@ -156,11 +156,13 @@ class ColumnWriterStatsTest : public ::testing::Test { dwio::common::FileSink::Options{.pool = leafPool_.get()}); auto sinkPtr = sink.get(); - auto config = std::make_shared(); - config->set(Config::ROW_INDEX_STRIDE, folly::to(batch->size())); + auto config = std::make_shared(); + config->set( + dwrf::Config::ROW_INDEX_STRIDE, folly::to(batch->size())); if (flatMapColId >= 0) { - config->set(Config::FLATTEN_MAP, true); - config->set(Config::MAP_FLAT_COLS, {folly::to(flatMapColId)}); + config->set(dwrf::Config::FLATTEN_MAP, true); + config->set( + dwrf::Config::MAP_FLAT_COLS, {folly::to(flatMapColId)}); } dwrf::WriterOptions options; options.config = config; diff --git a/velox/dwio/dwrf/test/E2EWriterTests.cpp b/velox/dwio/dwrf/test/E2EWriterTests.cpp index a029947cdf6f..69a81621fe03 100644 --- a/velox/dwio/dwrf/test/E2EWriterTests.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTests.cpp @@ -74,11 +74,11 @@ class E2EWriterTests : public Test { size_t stripes = 3; // write file to memory - auto config = std::make_shared(); - config->set(Config::FLATTEN_MAP, true); + auto config = std::make_shared(); + config->set(dwrf::Config::FLATTEN_MAP, true); config->set>( - Config::MAP_FLAT_COLS, mapColumnIds); - config->set(Config::MAP_STATISTICS, true); + dwrf::Config::MAP_FLAT_COLS, mapColumnIds); + config->set(dwrf::Config::MAP_STATISTICS, true); auto sink = std::make_unique( 200 * 1024 * 1024, @@ -126,13 +126,13 @@ class E2EWriterTests : public Test { size_t stripes = 3; // write file to memory - auto config = std::make_shared(); + auto config = std::make_shared(); // Ensure we cross stride boundary - config->set(Config::ROW_INDEX_STRIDE, strideSize); - config->set(Config::FLATTEN_MAP, true); + config->set(dwrf::Config::ROW_INDEX_STRIDE, strideSize); + config->set(dwrf::Config::FLATTEN_MAP, true); config->set>( - Config::MAP_FLAT_COLS, mapColumnIds); - config->set(Config::MAP_STATISTICS, true); + dwrf::Config::MAP_FLAT_COLS, mapColumnIds); + config->set(dwrf::Config::MAP_STATISTICS, true); auto sink = std::make_unique( 400 * 1024 * 1024, @@ -263,10 +263,11 @@ TEST_F(E2EWriterTests, DISABLED_TestFileCreation) { "struct_val:struct" ">"); - auto config = std::make_shared(); - config->set(Config::FLATTEN_MAP, true); + auto config = std::make_shared(); + config->set(dwrf::Config::FLATTEN_MAP, true); config->set( - Config::MAP_FLAT_COLS, {12, 13}); /* this is the second and third map */ + dwrf::Config::MAP_FLAT_COLS, + {12, 13}); /* this is the second and third map */ std::vector batches; for (size_t i = 0; i < batchCount; ++i) { @@ -325,11 +326,12 @@ TEST_F(E2EWriterTests, E2E) { "struct_val:struct" ">"); - auto config = std::make_shared(); - config->set(Config::ROW_INDEX_STRIDE, static_cast(1000)); - config->set(Config::FLATTEN_MAP, true); + auto config = std::make_shared(); + config->set(dwrf::Config::ROW_INDEX_STRIDE, static_cast(1000)); + config->set(dwrf::Config::FLATTEN_MAP, true); config->set( - Config::MAP_FLAT_COLS, {12, 13}); /* this is the second and third map */ + dwrf::Config::MAP_FLAT_COLS, + {12, 13}); /* this is the second and third map */ std::vector batches; for (size_t i = 0; i < batchCount; ++i) { @@ -358,14 +360,14 @@ TEST_F(E2EWriterTests, FlatMapDictionaryEncoding) { "map_val:map>" ">"); - auto config = std::make_shared(); - config->set(Config::ROW_INDEX_STRIDE, static_cast(1000)); - config->set(Config::FLATTEN_MAP, true); - config->set(Config::MAP_FLAT_COLS, {0, 1, 2, 3, 4}); - config->set(Config::MAP_FLAT_DISABLE_DICT_ENCODING, false); - config->set(Config::DICTIONARY_NUMERIC_KEY_SIZE_THRESHOLD, 1.0f); - config->set(Config::DICTIONARY_STRING_KEY_SIZE_THRESHOLD, 1.0f); - config->set(Config::ENTROPY_KEY_STRING_SIZE_THRESHOLD, 0.0f); + auto config = std::make_shared(); + config->set(dwrf::Config::ROW_INDEX_STRIDE, static_cast(1000)); + config->set(dwrf::Config::FLATTEN_MAP, true); + config->set(dwrf::Config::MAP_FLAT_COLS, {0, 1, 2, 3, 4}); + config->set(dwrf::Config::MAP_FLAT_DISABLE_DICT_ENCODING, false); + config->set(dwrf::Config::DICTIONARY_NUMERIC_KEY_SIZE_THRESHOLD, 1.0f); + config->set(dwrf::Config::DICTIONARY_STRING_KEY_SIZE_THRESHOLD, 1.0f); + config->set(dwrf::Config::ENTROPY_KEY_STRING_SIZE_THRESHOLD, 0.0f); std::vector batches; std::mt19937 gen; @@ -396,10 +398,10 @@ TEST_F(E2EWriterTests, MaxFlatMapKeys) { auto batch = createRowVector(pool.get(), type, 1, b::create(*pool, b::rows{row})); - auto config = std::make_shared(); - config->set(Config::FLATTEN_MAP, true); - config->set(Config::MAP_FLAT_COLS, {0}); - config->set(Config::MAP_FLAT_MAX_KEYS, keyLimit); + auto config = std::make_shared(); + config->set(dwrf::Config::FLATTEN_MAP, true); + config->set(dwrf::Config::MAP_FLAT_COLS, {0}); + config->set(dwrf::Config::MAP_FLAT_MAX_KEYS, keyLimit); E2EWriterTestUtil::testWriter( *pool, type, E2EWriterTestUtil::generateBatches(batch), 1, 1, config); @@ -420,9 +422,9 @@ TEST_F(E2EWriterTests, PresentStreamIsSuppressedOnFlatMap) { auto batch = createRowVector(pool.get(), type, 1, b::create(*pool, b::rows{row})); - auto config = std::make_shared(); - config->set(Config::FLATTEN_MAP, true); - config->set(Config::MAP_FLAT_COLS, {0}); + auto config = std::make_shared(); + config->set(dwrf::Config::FLATTEN_MAP, true); + config->set(dwrf::Config::MAP_FLAT_COLS, {0}); auto sink = std::make_unique( 200 * 1024 * 1024, @@ -471,10 +473,10 @@ TEST_F(E2EWriterTests, TooManyFlatMapKeys) { auto batch = createRowVector(pool.get(), type, 1, b::create(*pool, b::rows{row})); - auto config = std::make_shared(); - config->set(Config::FLATTEN_MAP, true); - config->set(Config::MAP_FLAT_COLS, {0}); - config->set(Config::MAP_FLAT_MAX_KEYS, keyLimit); + auto config = std::make_shared(); + config->set(dwrf::Config::FLATTEN_MAP, true); + config->set(dwrf::Config::MAP_FLAT_COLS, {0}); + config->set(dwrf::Config::MAP_FLAT_MAX_KEYS, keyLimit); EXPECT_THROW( E2EWriterTestUtil::testWriter( @@ -527,10 +529,10 @@ TEST_F(E2EWriterTests, FlatMapBackfill) { batches.push_back(batch); // TODO: Add another batch inside last stride, to test for backfill in stride. - auto config = std::make_shared(); - config->set(Config::FLATTEN_MAP, true); - config->set(Config::MAP_FLAT_COLS, {0}); - config->set(Config::ROW_INDEX_STRIDE, strideSize); + auto config = std::make_shared(); + config->set(dwrf::Config::FLATTEN_MAP, true); + config->set(dwrf::Config::MAP_FLAT_COLS, {0}); + config->set(dwrf::Config::ROW_INDEX_STRIDE, strideSize); E2EWriterTestUtil::testWriter( *pool, @@ -572,13 +574,14 @@ void testFlatMapWithNulls( pool.get(), type, rowCount, b::create(*pool, std::move(rows))); batches.push_back(batch); - auto config = std::make_shared(); - config->set(Config::FLATTEN_MAP, true); - config->set(Config::MAP_FLAT_COLS, {0}); - config->set(Config::ROW_INDEX_STRIDE, strideSize); + auto config = std::make_shared(); + config->set(dwrf::Config::FLATTEN_MAP, true); + config->set(dwrf::Config::MAP_FLAT_COLS, {0}); + config->set(dwrf::Config::ROW_INDEX_STRIDE, strideSize); config->set( - Config::MAP_FLAT_DISABLE_DICT_ENCODING, !enableFlatmapDictionaryEncoding); - config->set(Config::MAP_FLAT_DICT_SHARE, shareDictionary); + dwrf::Config::MAP_FLAT_DISABLE_DICT_ENCODING, + !enableFlatmapDictionaryEncoding); + config->set(dwrf::Config::MAP_FLAT_DICT_SHARE, shareDictionary); E2EWriterTestUtil::testWriter( *pool, @@ -638,10 +641,10 @@ TEST_F(E2EWriterTests, FlatMapEmpty) { pool.get(), type, rowCount, b::create(*pool, std::move(rows))); batches.push_back(batch); - auto config = std::make_shared(); - config->set(Config::FLATTEN_MAP, true); - config->set(Config::MAP_FLAT_COLS, {0}); - config->set(Config::ROW_INDEX_STRIDE, strideSize); + auto config = std::make_shared(); + config->set(dwrf::Config::FLATTEN_MAP, true); + config->set(dwrf::Config::MAP_FLAT_COLS, {0}); + config->set(dwrf::Config::ROW_INDEX_STRIDE, strideSize); E2EWriterTestUtil::testWriter( *pool, @@ -759,7 +762,7 @@ TEST_F(E2EWriterTests, PartialStride) { size_t size = 1'000; - auto config = std::make_shared(); + auto config = std::make_shared(); auto sink = std::make_unique( 2 * 1024 * 1024, dwio::common::FileSink::Options{.pool = leafPool_.get()}); @@ -826,11 +829,11 @@ TEST_F(E2EWriterTests, OversizeRows) { "map_val_field_2:map>>>" ">," ">"); - auto config = std::make_shared(); - config->set(Config::DISABLE_LOW_MEMORY_MODE, true); - config->set(Config::STRIPE_SIZE, 10 * kSizeMB); + auto config = std::make_shared(); + config->set(dwrf::Config::DISABLE_LOW_MEMORY_MODE, true); + config->set(dwrf::Config::STRIPE_SIZE, 10 * kSizeMB); config->set( - Config::RAW_DATA_SIZE_PER_BATCH, folly::to(20 * 1024UL)); + dwrf::Config::RAW_DATA_SIZE_PER_BATCH, folly::to(20 * 1024UL)); // Retained bytes in vector: 44704 auto singleBatch = E2EWriterTestUtil::generateBatches( @@ -860,9 +863,9 @@ TEST_F(E2EWriterTests, OversizeBatches) { "float_val:float," "double_val:double," ">"); - auto config = std::make_shared(); - config->set(Config::DISABLE_LOW_MEMORY_MODE, true); - config->set(Config::STRIPE_SIZE, 10 * kSizeMB); + auto config = std::make_shared(); + config->set(dwrf::Config::DISABLE_LOW_MEMORY_MODE, true); + config->set(dwrf::Config::STRIPE_SIZE, 10 * kSizeMB); // Test splitting a gigantic batch. auto singleBatch = E2EWriterTestUtil::generateBatches( @@ -905,11 +908,11 @@ TEST_F(E2EWriterTests, OverflowLengthIncrements) { "struct<" "struct_val:struct" ">"); - auto config = std::make_shared(); - config->set(Config::DISABLE_LOW_MEMORY_MODE, true); - config->set(Config::STRIPE_SIZE, 10 * kSizeMB); + auto config = std::make_shared(); + config->set(dwrf::Config::DISABLE_LOW_MEMORY_MODE, true); + config->set(dwrf::Config::STRIPE_SIZE, 10 * kSizeMB); config->set( - Config::RAW_DATA_SIZE_PER_BATCH, + dwrf::Config::RAW_DATA_SIZE_PER_BATCH, folly::to(500 * 1024UL * 1024UL)); const size_t size = 1024; @@ -1238,7 +1241,8 @@ void testWriter( const std::shared_ptr& type, size_t batchCount, std::function generator, - const std::shared_ptr config = std::make_shared()) { + const std::shared_ptr config = + std::make_shared()) { std::vector batches; for (auto i = 0; i < batchCount; ++i) { batches.push_back(generator()); @@ -1418,9 +1422,9 @@ TEST_F(E2EWriterTests, fuzzFlatmap) { {"flatmap2", MAP(VARCHAR(), ARRAY(REAL()))}, {"flatmap3", MAP(INTEGER(), MAP(INTEGER(), REAL()))}, }); - auto config = std::make_shared(); - config->set(Config::FLATTEN_MAP, true); - config->set(Config::MAP_FLAT_COLS, {0, 1, 2}); + auto config = std::make_shared(); + config->set(dwrf::Config::FLATTEN_MAP, true); + config->set(dwrf::Config::MAP_FLAT_COLS, {0, 1, 2}); auto seed = folly::Random::rand32(); LOG(INFO) << "seed: " << seed; std::mt19937 rng{seed}; diff --git a/velox/dwio/dwrf/test/FloatColumnWriterBenchmark.cpp b/velox/dwio/dwrf/test/FloatColumnWriterBenchmark.cpp index 529a990fddde..3a73c4986291 100644 --- a/velox/dwio/dwrf/test/FloatColumnWriterBenchmark.cpp +++ b/velox/dwio/dwrf/test/FloatColumnWriterBenchmark.cpp @@ -75,7 +75,7 @@ void runBenchmark(int nullEvery) { braces.dismiss(); for (auto i = 0; i < kNumIterations; i++) { - auto config = std::make_shared(); + auto config = std::make_shared(); WriterContext context{ config, memory::defaultMemoryManager().addRootPool( diff --git a/velox/dwio/dwrf/test/ReaderTest.cpp b/velox/dwio/dwrf/test/ReaderTest.cpp index 85ded65269a4..bfe13b64a5a0 100644 --- a/velox/dwio/dwrf/test/ReaderTest.cpp +++ b/velox/dwio/dwrf/test/ReaderTest.cpp @@ -1448,7 +1448,7 @@ iterateVector(const VectorPtr& vector, IteraterCallback cb, size_t index = 0) { void testBufferLifeCycle( const std::shared_ptr& schema, - const std::shared_ptr& config, + const std::shared_ptr& config, std::mt19937& rng, size_t batchSize, bool hasNull) { @@ -1506,7 +1506,7 @@ void testBufferLifeCycle( void testFlatmapAsMapFieldLifeCycle( const std::shared_ptr& schema, - const std::shared_ptr& config, + const std::shared_ptr& config, std::mt19937& rng, size_t batchSize, bool hasNull) { @@ -1622,9 +1622,9 @@ TEST(TestReader, testBufferLifeCycle) { ROW({DOUBLE(), BIGINT()}), }); - auto config = std::make_shared(); - config->set(Config::FLATTEN_MAP, true); - config->set(Config::MAP_FLAT_COLS, {0, 1, 2, 3}); + auto config = std::make_shared(); + config->set(dwrf::Config::FLATTEN_MAP, true); + config->set(dwrf::Config::MAP_FLAT_COLS, {0, 1, 2, 3}); auto seed = folly::Random::rand32(); LOG(INFO) << "seed: " << seed; @@ -1642,9 +1642,9 @@ TEST(TestReader, testFlatmapAsMapFieldLifeCycle) { MAP(VARCHAR(), INTEGER()), }); - auto config = std::make_shared(); - config->set(Config::FLATTEN_MAP, true); - config->set(Config::MAP_FLAT_COLS, {0}); + auto config = std::make_shared(); + config->set(dwrf::Config::FLATTEN_MAP, true); + config->set(dwrf::Config::MAP_FLAT_COLS, {0}); auto seed = folly::Random::rand32(); LOG(INFO) << "seed: " << seed; @@ -1840,7 +1840,7 @@ createWriterReader( std::move(sink), asRowType(batches[0]->type()), batches, - std::make_shared(), + std::make_shared(), E2EWriterTestUtil::simpleFlushPolicyFactory(true)); std::string_view data(sinkPtr->data(), sinkPtr->size()); auto input = std::make_unique(