Skip to content

Commit

Permalink
DRILL-8458: use correct size of definition level bytes slice when rea…
Browse files Browse the repository at this point in the history
…ding Parquet data page
  • Loading branch information
handmadecode committed Oct 23, 2023
1 parent 25d3774 commit f2b8f3c
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,13 @@ public DataPage readPage() {
int pageBufOffset = 0;
ByteBuffer bb = (ByteBuffer) pageBuf.position(pageBufOffset);
BytesInput repLevelBytes = BytesInput.from(
(ByteBuffer) bb.slice().limit(pageBufOffset + repLevelSize)
(ByteBuffer) bb.slice().limit(repLevelSize)
);
pageBufOffset += repLevelSize;

bb = (ByteBuffer) pageBuf.position(pageBufOffset);
final BytesInput defLevelBytes = BytesInput.from(
(ByteBuffer) bb.slice().limit(pageBufOffset + defLevelSize)
(ByteBuffer) bb.slice().limit(defLevelSize)
);
pageBufOffset += defLevelSize;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet;

import java.io.IOException;

import org.apache.drill.categories.ParquetTest;
import org.apache.drill.categories.UnlikelyTest;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;


@Category({ParquetTest.class, UnlikelyTest.class})
public class TestRepeatedColumn extends ClusterTest {

// The parquet file used in the tests, can be generated by calling createParquetTestFile().
private static final String REPEATED_INT_DATAFILE = "cp.`parquet/repeated_int.parquet`";

// Schema used to generate the parquet test file.
private static final String SCHEMA =
"message ParquetRepeated { \n" +
" required int32 rowKey; \n" +
" repeated int32 repeatedInt ( INTEGER(32,true) ) ; \n" +
"} \n";


private static final int NUM_RECORDS = 100;
private static final int[] REPEATED_VALUES = {666, 1492, 4711};


@BeforeClass
public static void setUp() throws Exception {
startCluster(ClusterFixture.builder(dirTestWatcher));
}


@Test
public void testSelectRepeatedInt() throws Exception {
// DRILL-8458
String query = "select repeatedInt as r from %s";
testBuilder()
.sqlQuery(query, REPEATED_INT_DATAFILE)
.unOrdered()
.expectsNumRecords(NUM_RECORDS)
.go();
}


public static void createParquetTestFile(String filePath, int numRows) throws IOException {

MessageType messageType = MessageTypeParser.parseMessageType(SCHEMA);
GroupWriteSupport.setSchema(messageType, ParquetSimpleTestFileGenerator.conf);
SimpleGroupFactory groupFactory = new SimpleGroupFactory(messageType);

try (ParquetWriter<Group> writer = createParquetWriter(filePath)) {

for (int i = 0; i< numRows; i++) {

Group g = groupFactory.newGroup();
g.append("rowKey", i+1);
for (int r :REPEATED_VALUES) {
g.append("repeatedInt", r);
}

writer.write(g);
}
}
}


private static ParquetWriter<Group> createParquetWriter(String filePath) throws IOException {

return
ExampleParquetWriter.builder(ParquetSimpleTestFileGenerator.initFile(filePath))
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withCompressionCodec(CompressionCodecName.GZIP)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.withConf(ParquetSimpleTestFileGenerator.conf)
.build();
}
}
Binary file not shown.

0 comments on commit f2b8f3c

Please sign in to comment.