diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java index 773a861213c..7834eaa8166 100644 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java @@ -217,13 +217,13 @@ public DataPage readPage() { int pageBufOffset = 0; ByteBuffer bb = (ByteBuffer) pageBuf.position(pageBufOffset); BytesInput repLevelBytes = BytesInput.from( - (ByteBuffer) bb.slice().limit(pageBufOffset + repLevelSize) + (ByteBuffer) bb.slice().limit(repLevelSize) ); pageBufOffset += repLevelSize; bb = (ByteBuffer) pageBuf.position(pageBufOffset); final BytesInput defLevelBytes = BytesInput.from( - (ByteBuffer) bb.slice().limit(pageBufOffset + defLevelSize) + (ByteBuffer) bb.slice().limit(defLevelSize) ); pageBufOffset += defLevelSize; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestRepeatedColumn.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestRepeatedColumn.java new file mode 100644 index 00000000000..a151a015384 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestRepeatedColumn.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import java.io.IOException; + +import org.apache.drill.categories.ParquetTest; +import org.apache.drill.categories.UnlikelyTest; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +@Category({ParquetTest.class, UnlikelyTest.class}) +public class TestRepeatedColumn extends ClusterTest { + + // The parquet file used in the tests, can be generated by calling createParquetTestFile(). + private static final String REPEATED_INT_DATAFILE = "cp.`parquet/repeated_int.parquet`"; + + // Schema used to generate the parquet test file. + private static final String SCHEMA = + "message ParquetRepeated { \n" + + " required int32 rowKey; \n" + + " repeated int32 repeatedInt ( INTEGER(32,true) ) ; \n" + + "} \n"; + + + private static final int NUM_RECORDS = 100; + private static final int[] REPEATED_VALUES = {666, 1492, 4711}; + + + @BeforeClass + public static void setUp() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher)); + } + + + @Test + public void testSelectRepeatedInt() throws Exception { + // DRILL-8458 + String query = "select repeatedInt as r from %s"; + testBuilder() + .sqlQuery(query, REPEATED_INT_DATAFILE) + .unOrdered() + .expectsNumRecords(NUM_RECORDS) + .go(); + } + + + public static void createParquetTestFile(String filePath, int numRows) throws IOException { + + MessageType messageType = MessageTypeParser.parseMessageType(SCHEMA); + GroupWriteSupport.setSchema(messageType, ParquetSimpleTestFileGenerator.conf); + SimpleGroupFactory groupFactory = new SimpleGroupFactory(messageType); + + try (ParquetWriter writer = createParquetWriter(filePath)) { + + for (int i = 0; i< numRows; i++) { + + Group g = groupFactory.newGroup(); + g.append("rowKey", i+1); + for (int r :REPEATED_VALUES) { + g.append("repeatedInt", r); + } + + writer.write(g); + } + } + } + + + private static ParquetWriter createParquetWriter(String filePath) throws IOException { + + return + ExampleParquetWriter.builder(ParquetSimpleTestFileGenerator.initFile(filePath)) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withCompressionCodec(CompressionCodecName.GZIP) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .withConf(ParquetSimpleTestFileGenerator.conf) + .build(); + } +} diff --git a/exec/java-exec/src/test/resources/parquet/repeated_int.parquet b/exec/java-exec/src/test/resources/parquet/repeated_int.parquet new file mode 100644 index 00000000000..90dccbd0404 Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/repeated_int.parquet differ