Skip to content

Commit

Permalink
add support for parquet_writer_version session property
Browse files Browse the repository at this point in the history
  • Loading branch information
svm1 committed Jan 29, 2025
1 parent 1807de1 commit d9fc5ec
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
19 changes: 8 additions & 11 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#include "velox/dwio/parquet/RegisterParquetWriter.h" // @manual
#include "velox/dwio/parquet/reader/PageReader.h"
#include "velox/dwio/parquet/tests/ParquetTestBase.h"
#include "velox/exec/Cursor.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/exec/Cursor.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
Expand Down Expand Up @@ -79,9 +79,9 @@ class ParquetWriterTest : public ParquetTestBase {
opts);
};

facebook::velox::parquet::thrift::PageType::type getDataPageVersion(
parquet::thrift::PageType::type getDataPageVersion(
const dwio::common::MemorySink* sinkPtr,
const facebook::velox::parquet::ColumnChunkMetaDataPtr& colChunkPtr) {
const parquet::ColumnChunkMetaDataPtr& colChunkPtr) {
std::string_view sinkData(sinkPtr->data(), sinkPtr->size());
auto readFile = std::make_shared<InMemoryReadFile>(sinkData);
auto file = std::make_shared<ReadFileInputStream>(std::move(readFile));
Expand Down Expand Up @@ -178,18 +178,17 @@ TEST_F(ParquetWriterTest, datapageVersion) {
// Set parquet datapage version and write data - then read to ensure the
// property took effect.
const auto testDataPageVersion =
[&](facebook::velox::parquet::arrow::ParquetDataPageVersion
dataPageVersion) {
[&](parquet::arrow::ParquetDataPageVersion dataPageVersion) {
// Create an in-memory writer.
auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto sinkPtr = sink.get();
facebook::velox::parquet::WriterOptions writerOptions;
parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.parquetDataPageVersion = dataPageVersion;

auto writer = std::make_unique<facebook::velox::parquet::Writer>(
auto writer = std::make_unique<parquet::Writer>(
std::move(sink), writerOptions, rootPool_, schema);
writer->write(data);
writer->close();
Expand All @@ -202,13 +201,11 @@ TEST_F(ParquetWriterTest, datapageVersion) {
};

ASSERT_EQ(
testDataPageVersion(
facebook::velox::parquet::arrow::ParquetDataPageVersion::V1),
testDataPageVersion(parquet::arrow::ParquetDataPageVersion::V1),
thrift::PageType::type::DATA_PAGE);

ASSERT_EQ(
testDataPageVersion(
facebook::velox::parquet::arrow::ParquetDataPageVersion::V2),
testDataPageVersion(parquet::arrow::ParquetDataPageVersion::V2),
thrift::PageType::type::DATA_PAGE_V2);
};

Expand Down
14 changes: 9 additions & 5 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
static_cast<int64_t>(flushPolicy->rowsInRowGroup()));
properties = properties->codec_options(options.codecOptions);
properties = properties->enable_store_decimal_as_integer();
properties = properties->data_page_version(options.parquetDataPageVersion.value());
properties =
properties->data_page_version(options.parquetDataPageVersion.value_or(
arrow::ParquetDataPageVersion::V1));

return properties->build();
}
Expand Down Expand Up @@ -489,10 +491,12 @@ void WriterOptions::processConfigs(
}

if (!parquetDataPageVersion) {
parquetDataPageVersion =
getParquetDataPageVersion(session, kParquetSessionDataPageVersion).has_value()
? getParquetDataPageVersion(session, kParquetSessionDataPageVersion)
: getParquetDataPageVersion(connectorConfig, kParquetSessionDataPageVersion);
parquetDataPageVersion =
getParquetDataPageVersion(session, kParquetSessionDataPageVersion)
.has_value()
? getParquetDataPageVersion(session, kParquetSessionDataPageVersion)
: getParquetDataPageVersion(
connectorConfig, kParquetHiveConnectorDataPageVersion);
}
}

Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ struct WriterOptions : public dwio::common::WriterOptions {
"hive.parquet.writer.timestamp-unit";
static constexpr const char* kParquetSessionDataPageVersion =
"parquet_writer_version";
static constexpr const char* kParquetHiveConnectorDataPageVersion =
"hive.parquet.writer.version";

// Process hive connector and session configs.
void processConfigs(
Expand Down

0 comments on commit d9fc5ec

Please sign in to comment.