Skip to content

Commit

Permalink
feat(parquet): Added in rle decoder for boolean
Browse files Browse the repository at this point in the history
Co-authored-by: Minhan Cao <[email protected]>
  • Loading branch information
jkhaliqi and minhancao committed Jan 22, 2025
1 parent 1a803e1 commit d1d1dcd
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 2 deletions.
12 changes: 12 additions & 0 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,16 @@ void PageReader::makeDecoder() {
"DELTA_BINARY_PACKED decoder only supports INT32 and INT64");
}
break;
case Encoding::RLE:
switch (parquetType) {
case thrift::Type::BOOLEAN:
rleBooleanDecoder_ = std::make_unique<RleBooleanDecoder>(
pageData_, pageData_ + encodedDataSize_, encodedDataSize_);
break;
default:
VELOX_UNSUPPORTED("RLE decoder only supports BOOLEAN");
}
break;
case Encoding::DELTA_BYTE_ARRAY:
if (parquetType == thrift::Type::BYTE_ARRAY) {
deltaByteArrDecoder_ =
Expand Down Expand Up @@ -755,6 +765,8 @@ void PageReader::skip(int64_t numRows) {
deltaBpDecoder_->skip(toSkip);
} else if (deltaByteArrDecoder_) {
deltaByteArrDecoder_->skip(toSkip);
} else if (rleBooleanDecoder_) {
rleBooleanDecoder_->skip(toSkip);
} else {
VELOX_FAIL("No decoder to skip");
}
Expand Down
18 changes: 16 additions & 2 deletions velox/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/dwio/parquet/reader/DeltaBpDecoder.h"
#include "velox/dwio/parquet/reader/DeltaByteArrayDecoder.h"
#include "velox/dwio/parquet/reader/ParquetTypeWithId.h"
#include "velox/dwio/parquet/reader/RleBooleanDecoder.h"
#include "velox/dwio/parquet/reader/RleBpDataDecoder.h"
#include "velox/dwio/parquet/reader/StringDecoder.h"

Expand Down Expand Up @@ -338,9 +339,21 @@ class PageReader {
VELOX_CHECK(!isDictionary(), "BOOLEAN types are never dictionary-encoded");
if (nulls) {
nullsFromFastPath = false;
booleanDecoder_->readWithVisitor<true>(nulls, visitor);
switch (encoding_) {
case thrift::Encoding::RLE:
rleBooleanDecoder_->readWithVisitor<true>(nulls, visitor);
break;
default:
booleanDecoder_->readWithVisitor<true>(nulls, visitor);
}
} else {
booleanDecoder_->readWithVisitor<false>(nulls, visitor);
switch (encoding_) {
case thrift::Encoding::RLE:
rleBooleanDecoder_->readWithVisitor<false>(nulls, visitor);
break;
default:
booleanDecoder_->readWithVisitor<false>(nulls, visitor);
}
}
}

Expand Down Expand Up @@ -499,6 +512,7 @@ class PageReader {
std::unique_ptr<BooleanDecoder> booleanDecoder_;
std::unique_ptr<DeltaBpDecoder> deltaBpDecoder_;
std::unique_ptr<DeltaByteArrayDecoder> deltaByteArrDecoder_;
std::unique_ptr<RleBooleanDecoder> rleBooleanDecoder_;
// Add decoders for other encodings here.
};

Expand Down
114 changes: 114 additions & 0 deletions velox/dwio/parquet/reader/RleBooleanDecoder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/dwio/parquet/common/BitStreamUtilsInternal.h"
#include "velox/dwio/parquet/reader/RleBpDecoder.h"

namespace facebook::velox::parquet {

class RleBooleanDecoder : public RleBpDecoder {
public:
static constexpr int32_t kLengthOffset = 4;
RleBooleanDecoder(const char* start, const char* end, int32_t len)
: RleBpDecoder{start + kLengthOffset, end, 1} {
if (len < kLengthOffset) {
VELOX_FAIL("Received invalid length : {} (corrupt data page?)", len);
}
numBytes_ =
::arrow::bit_util::FromLittleEndian(::arrow::util::SafeLoadAs<uint32_t>(
reinterpret_cast<const uint8_t*>(start)));
if (numBytes_ > static_cast<uint32_t>(len - kLengthOffset)) {
VELOX_FAIL(
"Received invalid number of bytes : " + std::to_string(numBytes_) +
" (corrupt data page?)");
}
}

void skip(uint64_t numValues) {
skip<false>(numValues, 0, nullptr);
}

template <bool hasNulls>
inline void skip(int32_t numValues, int32_t current, const uint64_t* nulls) {
if constexpr (hasNulls) {
numValues = bits::countNonNulls(nulls, current, current + numValues);
}

RleBpDecoder::skip(numValues);
}

template <bool hasNulls, typename Visitor>
void readWithVisitor(const uint64_t* nulls, Visitor visitor) {
int32_t current = visitor.start();
skip<hasNulls>(current, 0, nulls);
int32_t toSkip = 0;
bool atEnd = false;
const bool allowNulls = hasNulls && visitor.allowNulls();
bool* b = nullptr;
for (;;) {
if (hasNulls && allowNulls && bits::isBitNull(nulls, current)) {
toSkip = visitor.processNull(atEnd);
} else {
if (hasNulls && !allowNulls) {
toSkip = visitor.checkAndSkipNulls(nulls, current, atEnd);
if (!Visitor::dense) {
skip<false>(toSkip, current, nullptr);
}
if (atEnd) {
return;
}
}

// We are at a non-null value on a row to visit.
if (!remainingValues_) {
readHeader();
}
if (repeating_) {
toSkip = visitor.process(value_, atEnd);
} else {
value_ = readBitField();
toSkip = visitor.process(value_, atEnd);
}
--remainingValues_;
}
++current;
if (toSkip) {
skip<hasNulls>(toSkip, current, nulls);
current += toSkip;
}
if (atEnd) {
return;
}
}
}

private:
int64_t readBitField() {
auto value = dwio::common::safeLoadBits(
bufferStart_, bitOffset_, bitWidth_, lastSafeWord_) &
bitMask_;
bitOffset_ += bitWidth_;
bufferStart_ += bitOffset_ >> 3;
bitOffset_ &= 7;
return value;
}

uint32_t numBytes_ = 0;
};

} // namespace facebook::velox::parquet
15 changes: 15 additions & 0 deletions velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,21 @@ TEST_F(E2EFilterTest, configurableWriteSchema) {
test(type, newType);
}

TEST_F(E2EFilterTest, booleanRle) {
options_.enableDictionary = false;
options_.encoding = facebook::velox::parquet::arrow::Encoding::RLE;
options_.parquetDataPageVersion =
facebook::velox::parquet::WriterOptions::DataPageVersion::V2;

testWithTypes(
"boolean_val:boolean,"
"boolean_null:boolean",
[&]() { makeAllNulls("boolean_null"); },
false,
{"boolean_val"},
20);
}

// Define main so that gflags get processed.
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
Expand Down
151 changes: 151 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,157 @@ TEST_F(ParquetTableScanTest, deltaByteArray) {
assertSelect({"a"}, "SELECT a from expected");
}

TEST_F(ParquetTableScanTest, rleBoolean) {
WriterOptions options;
options.enableDictionary = false;
options.encoding = facebook::velox::parquet::arrow::Encoding::RLE;
options.parquetDataPageVersion =
facebook::velox::parquet::WriterOptions::DataPageVersion::V2;

auto vector = makeRowVector(
{"c0", "c1", "c2", "c3", "c4", "c5"},
{
makeNullableFlatVector<bool>(
{true,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt}),
makeNullableFlatVector<bool>(
{false,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt}),
makeNullableFlatVector<bool>(
{std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt,
std::nullopt}),
makeFlatVector<bool>(std::vector<bool>{
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true,
true}),
makeFlatVector<bool>(std::vector<bool>{
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false}),
makeNullableFlatVector<bool>(
{false,
false,
false,
std::nullopt,
std::nullopt,
true,
true,
true,
true,
true,
false,
true,
std::nullopt,
false,
true,
std::nullopt,
std::nullopt,
false}),
});
auto schema = asRowType(vector->type());
auto file = TempFilePath::create();
writeToParquetFile(file->getPath(), {vector}, options);
loadData(file->getPath(), schema, vector);

std::shared_ptr<connector::ColumnHandle> c0 = makeColumnHandle(
"c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c1 = makeColumnHandle(
"c1", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c2 = makeColumnHandle(
"c2", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c3 = makeColumnHandle(
"c2", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c4 = makeColumnHandle(
"c3", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
std::shared_ptr<connector::ColumnHandle> c5 = makeColumnHandle(
"c4", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);

assertSelect({"c0"}, "SELECT c0 FROM tmp");
assertSelect({"c1"}, "SELECT c1 FROM tmp");
assertSelect({"c2"}, "SELECT c2 FROM tmp");
assertSelect({"c3"}, "SELECT c3 FROM tmp");
assertSelect({"c4"}, "SELECT c4 FROM tmp");
assertSelect({"c5"}, "SELECT c5 FROM tmp");
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::Init init{&argc, &argv, false};
Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
static_cast<int64_t>(flushPolicy->rowsInRowGroup()));
properties = properties->codec_options(options.codecOptions);
properties = properties->enable_store_decimal_as_integer();
if (options.parquetDataPageVersion == WriterOptions::DataPageVersion::V2) {
properties = properties->data_page_version(
facebook::velox::parquet::arrow::ParquetDataPageVersion::V2);
}
return properties->build();
}

Expand Down
6 changes: 6 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ struct WriterOptions : public dwio::common::WriterOptions {
/// Timestamp time zone for Parquet write through Arrow bridge.
std::optional<std::string> parquetWriteTimestampTimeZone;
bool writeInt96AsTimestamp = false;
enum class DataPageVersion {
kDefault,
V1,
V2,
};
DataPageVersion parquetDataPageVersion = DataPageVersion::kDefault;

// Parsing session and hive configs.

Expand Down

0 comments on commit d1d1dcd

Please sign in to comment.