From f2b8f3c0972105ad99cd2c5e540af4a1155b9eb3 Mon Sep 17 00:00:00 2001 From: Peter Franzen Date: Mon, 23 Oct 2023 15:09:00 +0200 Subject: [PATCH] DRILL-8458: use correct size of definition level bytes slice when reading Parquet data page --- .../hadoop/ColumnChunkIncReadStore.java | 4 +- .../store/parquet/TestRepeatedColumn.java | 109 ++++++++++++++++++ .../resources/parquet/repeated_int.parquet | Bin 0 -> 651 bytes 3 files changed, 111 insertions(+), 2 deletions(-) create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestRepeatedColumn.java create mode 100644 exec/java-exec/src/test/resources/parquet/repeated_int.parquet diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java index 773a861213c..7834eaa8166 100644 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java @@ -217,13 +217,13 @@ public DataPage readPage() { int pageBufOffset = 0; ByteBuffer bb = (ByteBuffer) pageBuf.position(pageBufOffset); BytesInput repLevelBytes = BytesInput.from( - (ByteBuffer) bb.slice().limit(pageBufOffset + repLevelSize) + (ByteBuffer) bb.slice().limit(repLevelSize) ); pageBufOffset += repLevelSize; bb = (ByteBuffer) pageBuf.position(pageBufOffset); final BytesInput defLevelBytes = BytesInput.from( - (ByteBuffer) bb.slice().limit(pageBufOffset + defLevelSize) + (ByteBuffer) bb.slice().limit(defLevelSize) ); pageBufOffset += defLevelSize; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestRepeatedColumn.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestRepeatedColumn.java new file mode 100644 index 00000000000..a151a015384 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestRepeatedColumn.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import java.io.IOException; + +import org.apache.drill.categories.ParquetTest; +import org.apache.drill.categories.UnlikelyTest; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +@Category({ParquetTest.class, UnlikelyTest.class}) +public class TestRepeatedColumn extends ClusterTest { + + // The parquet file used in the tests, can be generated by calling createParquetTestFile(). + private static final String REPEATED_INT_DATAFILE = "cp.`parquet/repeated_int.parquet`"; + + // Schema used to generate the parquet test file. + private static final String SCHEMA = + "message ParquetRepeated { \n" + + " required int32 rowKey; \n" + + " repeated int32 repeatedInt ( INTEGER(32,true) ) ; \n" + + "} \n"; + + + private static final int NUM_RECORDS = 100; + private static final int[] REPEATED_VALUES = {666, 1492, 4711}; + + + @BeforeClass + public static void setUp() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher)); + } + + + @Test + public void testSelectRepeatedInt() throws Exception { + // DRILL-8458 + String query = "select repeatedInt as r from %s"; + testBuilder() + .sqlQuery(query, REPEATED_INT_DATAFILE) + .unOrdered() + .expectsNumRecords(NUM_RECORDS) + .go(); + } + + + public static void createParquetTestFile(String filePath, int numRows) throws IOException { + + MessageType messageType = MessageTypeParser.parseMessageType(SCHEMA); + GroupWriteSupport.setSchema(messageType, ParquetSimpleTestFileGenerator.conf); + SimpleGroupFactory groupFactory = new SimpleGroupFactory(messageType); + + try (ParquetWriter writer = createParquetWriter(filePath)) { + + for (int i = 0; i< numRows; i++) { + + Group g = groupFactory.newGroup(); + g.append("rowKey", i+1); + for (int r :REPEATED_VALUES) { + g.append("repeatedInt", r); + } + + writer.write(g); + } + } + } + + + private static ParquetWriter createParquetWriter(String filePath) throws IOException { + + return + ExampleParquetWriter.builder(ParquetSimpleTestFileGenerator.initFile(filePath)) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withCompressionCodec(CompressionCodecName.GZIP) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withConf(ParquetSimpleTestFileGenerator.conf) + .build(); + } +} diff --git a/exec/java-exec/src/test/resources/parquet/repeated_int.parquet b/exec/java-exec/src/test/resources/parquet/repeated_int.parquet new file mode 100644 index 0000000000000000000000000000000000000000..90dccbd0404e70237a08fe1f4726308324ac8491 GIT binary patch literal 651 zcmah{&ubGw6n?uI)+H(?HgA}P1y>>mQCgzZXiU;e@rQ)rE`(q-(Y!cVB)nf%g z)RW*viwE(jSC7&|g&sWi5D-NF0jVCvgKx6dcyk$M=Y4P9_rCYu1gDpLG_ZoLW!%;f z)K;V)033O2(yqQ+H{}U$1;F5`qi`p2C;AZ+{tWiYI7#{l$l^&EbP*>qoCY zT-TuaGyQl#g&4T2;S+5c$1KlNBIq)}rFZG4|F45%rcG1CKdyb>I6Jom5NsF$k$_d! zEx|N(5-d^qbc^r7&-Dp>^YCBT<|cN?50=e$G<>hAGljIz9SXwi_KF-W$)OBKGC@Xo z6@-xu)-K3<%3xxD_Q$)}A61WCd^6*eDV}d@mTlEs(+-xf@jCzz^L5l=0! zYBVnru4Yi0@ZYET@lD0gUlmB4Yw|~hit2_auVvkl%)E4v$ez~^Qz`bzyJ0%)Nf76U zd2{B{%vqJ$o$jFT_@2Mu)t%XBrQ1uKPAiFJ;