Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for parquet_writer_version session property #10573

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include "velox/connectors/hive/HiveConnector.h" // @manual
#include "velox/core/QueryCtx.h"
#include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual
#include "velox/dwio/parquet/reader/PageReader.h"
#include "velox/dwio/parquet/tests/ParquetTestBase.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/Cursor.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
Expand Down Expand Up @@ -74,6 +76,26 @@ class ParquetWriterTest : public ParquetTestBase {
opts);
};

facebook::velox::parquet::thrift::PageType::type getDataPageVersion(
const dwio::common::MemorySink* sinkPtr,
const facebook::velox::parquet::ColumnChunkMetaDataPtr& colChunkPtr) {
std::string_view sinkData(sinkPtr->data(), sinkPtr->size());
auto readFile = std::make_shared<InMemoryReadFile>(sinkData);
auto file = std::make_shared<ReadFileInputStream>(std::move(readFile));
auto inputStream = std::make_unique<SeekableFileInputStream>(
std::move(file),
colChunkPtr.dataPageOffset(),
150,
*leafPool_,
LogType::TEST);
auto pageReader = std::make_unique<PageReader>(
std::move(inputStream),
*leafPool_,
colChunkPtr.compression(),
colChunkPtr.totalCompressedSize());
return pageReader->readPageHeader().type;
};

inline static const std::string kHiveConnectorId = "test-hive";
};

Expand Down Expand Up @@ -143,6 +165,50 @@ TEST_F(ParquetWriterTest, compression) {
assertReadWithReaderAndExpected(schema, *rowReader, data, *leafPool_);
};

TEST_F(ParquetWriterTest, datapageVersion) {
auto schema = ROW({"c0"}, {INTEGER()});
const int64_t kRows = 1;
const auto data = makeRowVector({
makeFlatVector<int32_t>(kRows, [](auto row) { return 987; }),
});

// Set parquet datapage version and write data - then read to ensure the
// property took effect.
const auto testDataPageVersion =
[&](facebook::velox::parquet::arrow::ParquetDataPageVersion
dataPageVersion) {
// Create an in-memory writer.
auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto sinkPtr = sink.get();
facebook::velox::parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.parquetDataPageVersion = dataPageVersion;

auto writer = std::make_unique<facebook::velox::parquet::Writer>(
std::move(sink), writerOptions, rootPool_, schema);
writer->write(data);
writer->close();

dwio::common::ReaderOptions readerOptions{leafPool_.get()};
auto reader = createReaderInMemory(*sinkPtr, readerOptions);
auto readDataPageVersion = getDataPageVersion(
sinkPtr, reader->fileMetaData().rowGroup(0).columnChunk(0));
return readDataPageVersion;
};

ASSERT_EQ(
testDataPageVersion(
facebook::velox::parquet::arrow::ParquetDataPageVersion::V1),
thrift::PageType::type::DATA_PAGE);

ASSERT_EQ(
testDataPageVersion(
facebook::velox::parquet::arrow::ParquetDataPageVersion::V2),
thrift::PageType::type::DATA_PAGE_V2);
};

DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) {
SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::write",
Expand Down
18 changes: 18 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
static_cast<int64_t>(flushPolicy->rowsInRowGroup()));
properties = properties->codec_options(options.codecOptions);
properties = properties->enable_store_decimal_as_integer();
properties = properties->data_page_version(options.parquetDataPageVersion);
return properties->build();
}

Expand Down Expand Up @@ -433,6 +434,20 @@ std::optional<std::string> getTimestampTimeZone(
return std::nullopt;
}

arrow::ParquetDataPageVersion getParquetDataPageVersion(
const config::ConfigBase& config,
const char* configKey) {
const auto version = config.get<std::string>(configKey);

if (version == "PARQUET_1_0") {
return arrow::ParquetDataPageVersion::V1;
} else if (version == "PARQUET_2_0") {
return arrow::ParquetDataPageVersion::V2;
} else {
VELOX_FAIL("Unsupported parquet datapage version {}", version.value());
}
}

} // namespace

void WriterOptions::processSessionConfigs(const config::ConfigBase& config) {
Expand All @@ -445,6 +460,9 @@ void WriterOptions::processSessionConfigs(const config::ConfigBase& config) {
parquetWriteTimestampTimeZone =
getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone);
}

parquetDataPageVersion =
getParquetDataPageVersion(config, kParquetSessionDataPageVersion);
}

void WriterOptions::processHiveConnectorConfigs(
Expand Down
8 changes: 8 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "velox/dwio/common/Options.h"
#include "velox/dwio/common/Writer.h"
#include "velox/dwio/common/WriterFactory.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Types.h"
#include "velox/dwio/parquet/writer/arrow/util/Compression.h"
#include "velox/vector/ComplexVector.h"
Expand Down Expand Up @@ -99,6 +100,10 @@ struct WriterOptions : public dwio::common::WriterOptions {

arrow::Encoding::type encoding = arrow::Encoding::PLAIN;

// Default Parquet datapage version is V2.
arrow::ParquetDataPageVersion parquetDataPageVersion =
arrow::ParquetDataPageVersion::V2;

// The default factory allows the writer to construct the default flush
// policy with the configs in its ctor.
std::function<std::unique_ptr<DefaultFlushPolicy>()> flushPolicyFactory;
Expand All @@ -122,6 +127,9 @@ struct WriterOptions : public dwio::common::WriterOptions {
static constexpr const char* kParquetHiveConnectorWriteTimestampUnit =
"hive.parquet.writer.timestamp-unit";

static constexpr const char* kParquetSessionDataPageVersion =
"parquet_writer_version";

// Process hive connector and session configs.
void processSessionConfigs(const config::ConfigBase& config) override;
void processHiveConnectorConfigs(const config::ConfigBase& config) override;
Expand Down
Loading