From 2d0751e5f9b73a77203127447f6ec610ff34c72c Mon Sep 17 00:00:00 2001
From: Jacob Khaliqi <jacobkhaliqi@gmail.com>
Date: Mon, 9 Sep 2024 15:23:08 -0700
Subject: [PATCH] feat(parquet): Added in rle decoder for boolean

Co-authored-by: Minhan Cao <minhan.duc.cao@gmail.com>
---
 velox/dwio/parquet/reader/PageReader.cpp      |  12 ++
 velox/dwio/parquet/reader/PageReader.h        |  18 ++-
 velox/dwio/parquet/reader/RleBooleanDecoder.h | 114 ++++++++++++++
 .../parquet/tests/examples/rleboolean.parquet | Bin 0 -> 646 bytes
 .../parquet/tests/reader/E2EFilterTest.cpp    |  14 ++
 .../tests/reader/ParquetTableScanTest.cpp     | 144 ++++++++++++++++++
 velox/dwio/parquet/writer/Writer.cpp          |   4 +
 velox/dwio/parquet/writer/Writer.h            |   2 +-
 8 files changed, 305 insertions(+), 3 deletions(-)
 create mode 100644 velox/dwio/parquet/reader/RleBooleanDecoder.h
 create mode 100644 velox/dwio/parquet/tests/examples/rleboolean.parquet

diff --git a/velox/dwio/parquet/reader/PageReader.cpp b/velox/dwio/parquet/reader/PageReader.cpp
index 81f429c59a5a0..2105e316d9a8b 100644
--- a/velox/dwio/parquet/reader/PageReader.cpp
+++ b/velox/dwio/parquet/reader/PageReader.cpp
@@ -700,6 +700,16 @@ void PageReader::makeDecoder() {
         break;
       }
       FMT_FALLTHROUGH;
+    case Encoding::RLE:
+      switch (parquetType) {
+        case thrift::Type::BOOLEAN:
+          rleBooleanDecoder_ = std::make_unique<RleBooleanDecoder>(
+              pageData_, pageData_ + encodedDataSize_, encodedDataSize_);
+          break;
+        default:
+          VELOX_UNSUPPORTED("RLE decoder only supports boolean");
+      }
+      break;
     default:
       VELOX_UNSUPPORTED("Encoding not supported yet: {}", encoding_);
   }
@@ -742,6 +752,8 @@ void PageReader::skip(int64_t numRows) {
     deltaBpDecoder_->skip(toSkip);
   } else if (deltaByteArrDecoder_) {
     deltaByteArrDecoder_->skip(toSkip);
+  } else if (rleBooleanDecoder_) {
+    rleBooleanDecoder_->skip(toSkip);
   } else {
     VELOX_FAIL("No decoder to skip");
   }
diff --git a/velox/dwio/parquet/reader/PageReader.h b/velox/dwio/parquet/reader/PageReader.h
index 879f21d226910..ddbdbd28e52ba 100644
--- a/velox/dwio/parquet/reader/PageReader.h
+++ b/velox/dwio/parquet/reader/PageReader.h
@@ -25,6 +25,7 @@
 #include "velox/dwio/parquet/reader/DeltaBpDecoder.h"
 #include "velox/dwio/parquet/reader/DeltaByteArrayDecoder.h"
 #include "velox/dwio/parquet/reader/ParquetTypeWithId.h"
+#include "velox/dwio/parquet/reader/RleBooleanDecoder.h"
 #include "velox/dwio/parquet/reader/RleBpDataDecoder.h"
 #include "velox/dwio/parquet/reader/StringDecoder.h"
 #include "velox/dwio/parquet/writer/arrow/util/RleEncodingInternal.h"
@@ -338,9 +339,21 @@ class PageReader {
     VELOX_CHECK(!isDictionary(), "BOOLEAN types are never dictionary-encoded");
     if (nulls) {
       nullsFromFastPath = false;
-      booleanDecoder_->readWithVisitor<true>(nulls, visitor);
+      switch (encoding_) {
+        case thrift::Encoding::RLE:
+          rleBooleanDecoder_->readWithVisitor<true>(nulls, visitor);
+          break;
+        default:
+          booleanDecoder_->readWithVisitor<true>(nulls, visitor);
+      }
     } else {
-      booleanDecoder_->readWithVisitor<false>(nulls, visitor);
+      switch (encoding_) {
+        case thrift::Encoding::RLE:
+          rleBooleanDecoder_->readWithVisitor<false>(nulls, visitor);
+          break;
+        default:
+          booleanDecoder_->readWithVisitor<false>(nulls, visitor);
+      }
     }
   }
 
@@ -501,6 +514,7 @@ class PageReader {
   std::unique_ptr<BooleanDecoder> booleanDecoder_;
   std::unique_ptr<DeltaBpDecoder> deltaBpDecoder_;
   std::unique_ptr<DeltaByteArrayDecoder> deltaByteArrDecoder_;
+  std::unique_ptr<RleBooleanDecoder> rleBooleanDecoder_;
   // Add decoders for other encodings here.
 };
 
diff --git a/velox/dwio/parquet/reader/RleBooleanDecoder.h b/velox/dwio/parquet/reader/RleBooleanDecoder.h
new file mode 100644
index 0000000000000..e4ebf638a5fe3
--- /dev/null
+++ b/velox/dwio/parquet/reader/RleBooleanDecoder.h
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) Facebook, Inc. and its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "velox/dwio/parquet/reader/RleBpDecoder.h"
+#include "velox/dwio/parquet/writer/arrow/util/BitStreamUtilsInternal.h"
+
+namespace facebook::velox::parquet {
+
+class RleBooleanDecoder : public RleBpDecoder {
+ public:
+  static constexpr int32_t kLengthOffset = 4;
+  RleBooleanDecoder(const char* start, const char* end, int32_t len)
+      : RleBpDecoder{start + kLengthOffset, end, 1} {
+    if (len < kLengthOffset) {
+      VELOX_FAIL("Received invalid length : {} (corrupt data page?)", len);
+    }
+    numBytes_ =
+        ::arrow::bit_util::FromLittleEndian(::arrow::util::SafeLoadAs<uint32_t>(
+            reinterpret_cast<const uint8_t*>(start)));
+    if (numBytes_ > static_cast<uint32_t>(len - kLengthOffset)) {
+      VELOX_FAIL(
+          "Received invalid number of bytes : " + std::to_string(numBytes_) +
+          " (corrupt data page?)");
+    }
+  }
+
+  void skip(uint64_t numValues) {
+    skip<false>(numValues, 0, nullptr);
+  }
+
+  template <bool hasNulls>
+  inline void skip(int32_t numValues, int32_t current, const uint64_t* nulls) {
+    if constexpr (hasNulls) {
+      numValues = bits::countNonNulls(nulls, current, current + numValues);
+    }
+
+    RleBpDecoder::skip(numValues);
+  }
+
+  template <bool hasNulls, typename Visitor>
+  void readWithVisitor(const uint64_t* nulls, Visitor visitor) {
+    int32_t current = visitor.start();
+    skip<hasNulls>(current, 0, nulls);
+    int32_t toSkip = 0;
+    bool atEnd = false;
+    const bool allowNulls = hasNulls && visitor.allowNulls();
+    bool* b = nullptr;
+    for (;;) {
+      if (hasNulls && allowNulls && bits::isBitNull(nulls, current)) {
+        toSkip = visitor.processNull(atEnd);
+      } else {
+        if (hasNulls && !allowNulls) {
+          toSkip = visitor.checkAndSkipNulls(nulls, current, atEnd);
+          if (!Visitor::dense) {
+            skip<false>(toSkip, current, nullptr);
+          }
+          if (atEnd) {
+            return;
+          }
+        }
+
+        // We are at a non-null value on a row to visit.
+        if (!remainingValues_) {
+          readHeader();
+        }
+        if (repeating_) {
+          toSkip = visitor.process(value_, atEnd);
+        } else {
+          value_ = readBitField();
+          toSkip = visitor.process(value_, atEnd);
+        }
+        --remainingValues_;
+      }
+      ++current;
+      if (toSkip) {
+        skip<hasNulls>(toSkip, current, nulls);
+        current += toSkip;
+      }
+      if (atEnd) {
+        return;
+      }
+    }
+  }
+
+ private:
+  int64_t readBitField() {
+    auto value = dwio::common::safeLoadBits(
+                     bufferStart_, bitOffset_, bitWidth_, lastSafeWord_) &
+        bitMask_;
+    bitOffset_ += bitWidth_;
+    bufferStart_ += bitOffset_ >> 3;
+    bitOffset_ &= 7;
+    return value;
+  }
+
+  uint32_t numBytes_ = 0;
+};
+
+} // namespace facebook::velox::parquet
diff --git a/velox/dwio/parquet/tests/examples/rleboolean.parquet b/velox/dwio/parquet/tests/examples/rleboolean.parquet
new file mode 100644
index 0000000000000000000000000000000000000000..3f2feef35cd4e33a143be142c9d74aadecd5cb1d
GIT binary patch
literal 646
zcmWG=3^EjD6O|Bkh!Is0RRUr*Q7%y?1_l)Y23AG}`ECvd2uMy!NJvOyVfdS@uz(FH
zM1m#;hU{SV>rhR6qGlkQK#)z81#FWFvNb?k7#L*kyC||iED;j50;y*}R?mp)tbjxo
zh8Y*1RzYnbN(Tc2-@`||s2<@0+ra^L6Z`YC%t*GR0iD%5ft8_mkM?eMki(cHYdm-h
zic*V9^5ct>Gg5OCMR^!R8C01hn34@(3_}>h2*xmmF-%|#QwA{=Ng2r;H3k_`21!vi
zNss}eEMh8RPGVVV95xb+j6fu&qyeUZRwFAkgexopDr5j6kU|iRrqBqi&_XOh?GB@j
znF^Y$F<90{EJbY(6U;&etQMNURW4wL*$7mL#YR)O(^^;{W-^Fzfz1TDP3#%ahYSqv
KK!*kZQy>5-EL5%l

literal 0
HcmV?d00001

diff --git a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
index 2845659deb80c..74fbd524c380a 100644
--- a/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
+++ b/velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
@@ -746,6 +746,20 @@ TEST_F(E2EFilterTest, configurableWriteSchema) {
   test(type, newType);
 }
 
+TEST_F(E2EFilterTest, booleanRle) {
+  options_.enableDictionary = false;
+  options_.encoding = facebook::velox::parquet::arrow::Encoding::RLE;
+  options_.parquetDataPageVersion = "V2";
+
+  testWithTypes(
+      "boolean_val:boolean,"
+      "boolean_null:boolean",
+      [&]() { makeAllNulls("boolean_null"); },
+      false,
+      {"boolean_val"},
+      20);
+}
+
 // Define main so that gflags get processed.
 int main(int argc, char** argv) {
   testing::InitGoogleTest(&argc, argv);
diff --git a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
index 2c239e14256ba..5e4c69e69c991 100644
--- a/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
+++ b/velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
@@ -1123,6 +1123,150 @@ TEST_F(ParquetTableScanTest, deltaByteArray) {
   assertSelect({"a"}, "SELECT a from expected");
 }
 
+TEST_F(ParquetTableScanTest, rleBoolean) {
+  loadData(
+      getExampleFilePath("rleboolean.parquet"),
+      ROW({"c0", "c1", "c2", "c3", "c4", "c5"},
+          {BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN(), BOOLEAN()}),
+      makeRowVector(
+          {"c0", "c1", "c2", "c3", "c4", "c5"},
+          {
+              makeNullableFlatVector<bool>(
+                  {true,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt}),
+              makeNullableFlatVector<bool>(
+                  {false,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt}),
+              makeNullableFlatVector<bool>(
+                  {std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt,
+                   std::nullopt}),
+              makeFlatVector<bool>(std::vector<bool>{
+                  true,
+                  true,
+                  true,
+                  true,
+                  true,
+                  true,
+                  true,
+                  true,
+                  true,
+                  true,
+                  true,
+                  true,
+                  true,
+                  true,
+                  true,
+                  true,
+                  true,
+                  true}),
+              makeFlatVector<bool>(std::vector<bool>{
+                  false,
+                  false,
+                  false,
+                  false,
+                  false,
+                  false,
+                  false,
+                  false,
+                  false,
+                  false,
+                  false,
+                  false,
+                  false,
+                  false,
+                  false,
+                  false,
+                  false,
+                  false}),
+              makeNullableFlatVector<bool>(
+                  {false,
+                   false,
+                   false,
+                   std::nullopt,
+                   std::nullopt,
+                   true,
+                   true,
+                   true,
+                   true,
+                   true,
+                   false,
+                   true,
+                   std::nullopt,
+                   false,
+                   true,
+                   std::nullopt,
+                   std::nullopt,
+                   false}),
+          }));
+  std::shared_ptr<connector::ColumnHandle> c0 = makeColumnHandle(
+      "c0", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
+  std::shared_ptr<connector::ColumnHandle> c1 = makeColumnHandle(
+      "c1", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
+  std::shared_ptr<connector::ColumnHandle> c2 = makeColumnHandle(
+      "c2", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
+  std::shared_ptr<connector::ColumnHandle> c3 = makeColumnHandle(
+      "c2", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
+  std::shared_ptr<connector::ColumnHandle> c4 = makeColumnHandle(
+      "c3", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
+  std::shared_ptr<connector::ColumnHandle> c5 = makeColumnHandle(
+      "c4", BOOLEAN(), BOOLEAN(), {}, HiveColumnHandle::ColumnType::kRegular);
+
+  assertSelect({"c0"}, "SELECT c0 FROM tmp");
+  assertSelect({"c1"}, "SELECT c1 FROM tmp");
+  assertSelect({"c2"}, "SELECT c2 FROM tmp");
+  assertSelect({"c3"}, "SELECT c3 FROM tmp");
+  assertSelect({"c4"}, "SELECT c4 FROM tmp");
+  assertSelect({"c5"}, "SELECT c5 FROM tmp");
+}
+
 int main(int argc, char** argv) {
   testing::InitGoogleTest(&argc, argv);
   folly::Init init{&argc, &argv, false};
diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp
index bce6919a7cc63..ec868975ac766 100644
--- a/velox/dwio/parquet/writer/Writer.cpp
+++ b/velox/dwio/parquet/writer/Writer.cpp
@@ -147,6 +147,10 @@ std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
       static_cast<int64_t>(flushPolicy->rowsInRowGroup()));
   properties = properties->codec_options(options.codecOptions);
   properties = properties->enable_store_decimal_as_integer();
+  if (options.parquetDataPageVersion == "V2") {
+    properties = properties->data_page_version(
+        facebook::velox::parquet::arrow::ParquetDataPageVersion::V2);
+  }
   return properties->build();
 }
 
diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h
index 26969a59d52fb..7adc3ed8dd7ca 100644
--- a/velox/dwio/parquet/writer/Writer.h
+++ b/velox/dwio/parquet/writer/Writer.h
@@ -108,7 +108,7 @@ struct WriterOptions : public dwio::common::WriterOptions {
   /// Timestamp time zone for Parquet write through Arrow bridge.
   std::optional<std::string> parquetWriteTimestampTimeZone;
   bool writeInt96AsTimestamp = false;
-
+  std::optional<std::string> parquetDataPageVersion = std::nullopt;
   // Parsing session and hive configs.
 
   // This isn't a typo; session and hive connector config names are different