From 20d6d338599a3949533f81a38031ee99c36067cb Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Tue, 20 Jun 2023 22:26:33 -0700 Subject: [PATCH] update test and fix issues --- .../delta/kernel/data/vector/VectorUtils.java | 4 ++ .../delta/kernel/parquet/ArrayConverter.java | 36 +++++++-------- .../io/delta/kernel/parquet/MapConverter.java | 34 ++++++-------- .../kernel/parquet/ParquetConverters.java | 16 +++++++ .../parquet/TestParquetBatchReader.java | 46 ++++++++++++++++--- 5 files changed, 90 insertions(+), 46 deletions(-) diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/VectorUtils.java b/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/VectorUtils.java index 1084bb848e..4b348a13fd 100644 --- a/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/VectorUtils.java +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/VectorUtils.java @@ -36,6 +36,10 @@ public static Object getValueAsObject(ColumnVector vector, int rowId) { // avoid the nested if-else statements. final DataType dataType = vector.getDataType(); + if (vector.isNullAt(rowId)) { + return null; + } + if (dataType instanceof BooleanType) { return vector.getBoolean(rowId); } else if (dataType instanceof ByteType) { diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/parquet/ArrayConverter.java b/kernel/kernel-default/src/main/java/io/delta/kernel/parquet/ArrayConverter.java index 73aca4daa0..2d3f79c747 100644 --- a/kernel/kernel-default/src/main/java/io/delta/kernel/parquet/ArrayConverter.java +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/parquet/ArrayConverter.java @@ -17,6 +17,7 @@ package io.delta.kernel.parquet; import static io.delta.kernel.parquet.ParquetConverters.initNullabilityVector; +import static io.delta.kernel.parquet.ParquetConverters.setNullabilityToTrue; import java.util.Arrays; import java.util.Optional; import org.apache.parquet.io.api.Converter; @@ -38,6 +39,7 @@ public class ArrayConverter private int currentRowIndex; private boolean[] nullability; private int[] offsets; + private int collectorIndexAtStart; public ArrayConverter( int maxBatchSize, @@ -52,6 +54,7 @@ public ArrayConverter( innerElementType ); + // initialize working state this.nullability = initNullabilityVector(maxBatchSize); this.offsets = new int[maxBatchSize + 1]; } @@ -71,13 +74,15 @@ public Converter getConverter(int fieldIndex) @Override public void start() { - converter.start(); + collectorIndexAtStart = converter.currentEntryIndex; } @Override public void end() { - converter.end(); + int collectorIndexAtEnd = converter.currentEntryIndex; + this.nullability[currentRowIndex] = collectorIndexAtEnd == collectorIndexAtStart; + this.offsets[currentRowIndex + 1] = collectorIndexAtEnd; } @Override @@ -91,7 +96,6 @@ public ColumnVector getDataColumnVector(int batchSize) converter.getArrayVector() ); this.currentRowIndex = 0; - this.converter.currentEntryIndex = 0; this.nullability = initNullabilityVector(nullability.length); this.offsets = new int[offsets.length]; @@ -101,14 +105,10 @@ public ColumnVector getDataColumnVector(int batchSize) @Override public boolean moveToNextRow() { - boolean isNull = converter.isLastValueNull; - nullability[currentRowIndex] = isNull; - offsets[currentRowIndex + 1] = converter.currentEntryIndex; currentRowIndex++; - resizeIfNeeded(); - return isNull; + return nullability[currentRowIndex - 1]; } @Override @@ -117,6 +117,8 @@ public void resizeIfNeeded() if (nullability.length == currentRowIndex) { int newSize = nullability.length * 2; this.nullability = Arrays.copyOf(this.nullability, newSize); + setNullabilityToTrue(this.nullability, newSize / 2, newSize); + this.offsets = Arrays.copyOf(this.offsets, newSize + 1); } } @@ -128,12 +130,8 @@ public static class ArrayCollector // working state private int currentEntryIndex; - private boolean isLastValueNull; - public ArrayCollector( - int maxBatchSize, - ArrayType typeFromClient, - GroupType innerArrayType) + public ArrayCollector(int maxBatchSize, ArrayType typeFromClient, GroupType innerArrayType) { this.converter = ParquetConverters.createConverter( maxBatchSize, @@ -167,17 +165,17 @@ public void end() if (!converter.isPrimitive()) { converter.asGroupConverter().end(); } - isLastValueNull = ((ParquetConverters.BaseConverter) converter).moveToNextRow(); - - if (!isLastValueNull) { - currentEntryIndex++; - } + ((ParquetConverters.BaseConverter) converter).moveToNextRow(); + currentEntryIndex++; } public ColumnVector getArrayVector() { - return ((ParquetConverters.BaseConverter) converter) + ColumnVector vector = ((ParquetConverters.BaseConverter) converter) .getDataColumnVector(currentEntryIndex); + + currentEntryIndex = 0; + return vector; } } } diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/parquet/MapConverter.java b/kernel/kernel-default/src/main/java/io/delta/kernel/parquet/MapConverter.java index 410329a32b..21d5a1a700 100644 --- a/kernel/kernel-default/src/main/java/io/delta/kernel/parquet/MapConverter.java +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/parquet/MapConverter.java @@ -17,6 +17,7 @@ package io.delta.kernel.parquet; import static io.delta.kernel.parquet.ParquetConverters.initNullabilityVector; +import static io.delta.kernel.parquet.ParquetConverters.setNullabilityToTrue; import java.util.Arrays; import java.util.Optional; import org.apache.parquet.io.api.Converter; @@ -38,6 +39,7 @@ public class MapConverter private int currentRowIndex; private boolean[] nullability; private int[] offsets; + private int collectorIndexAtStart; public MapConverter( int maxBatchSize, @@ -53,11 +55,7 @@ public MapConverter( ); // initialize working state - this.nullability = new boolean[maxBatchSize]; - // Initialize all values as null. As Parquet calls this converter only for non-null - // values, make the corresponding value to false. - Arrays.fill(this.nullability, true); - + this.nullability = initNullabilityVector(maxBatchSize); this.offsets = new int[maxBatchSize + 1]; } @@ -76,13 +74,15 @@ public Converter getConverter(int fieldIndex) @Override public void start() { - converter.asGroupConverter().start(); + collectorIndexAtStart = converter.currentEntryIndex; } @Override public void end() { - converter.asGroupConverter().end(); + int collectorIndexAtEnd = converter.currentEntryIndex; + this.nullability[currentRowIndex] = collectorIndexAtEnd == collectorIndexAtStart; + this.offsets[currentRowIndex + 1] = collectorIndexAtEnd; } @Override @@ -100,19 +100,17 @@ public ColumnVector getDataColumnVector(int batchSize) this.converter.currentEntryIndex = 0; this.nullability = initNullabilityVector(nullability.length); this.offsets = new int[offsets.length]; + return vector; } @Override public boolean moveToNextRow() { - boolean isNull = converter.isLastValueNull; - nullability[currentRowIndex] = isNull; - offsets[currentRowIndex + 1] = converter.currentEntryIndex; currentRowIndex++; resizeIfNeeded(); - return isNull; + return nullability[currentRowIndex - 1]; } @Override @@ -121,6 +119,7 @@ public void resizeIfNeeded() if (nullability.length == currentRowIndex) { int newSize = nullability.length * 2; this.nullability = Arrays.copyOf(this.nullability, newSize); + setNullabilityToTrue(this.nullability, newSize / 2, newSize); this.offsets = Arrays.copyOf(this.offsets, newSize + 1); } } @@ -132,7 +131,6 @@ public static class MapCollector // working state private int currentEntryIndex; - private boolean isLastValueNull; public MapCollector( int maxBatchSize, @@ -178,17 +176,11 @@ public void end() .filter(conv -> !conv.isPrimitive()) .forEach(conv -> ((GroupConverter) conv).end()); - long memberNullCount = Arrays.stream(converters) + Arrays.stream(converters) .map(converter -> (ParquetConverters.BaseConverter) converter) - .map(converters -> converters.moveToNextRow()) - .filter(result -> result) - .count(); + .forEach(converter -> converter.moveToNextRow()); - isLastValueNull = memberNullCount == converters.length; - - if (!isLastValueNull) { - currentEntryIndex++; - } + currentEntryIndex++; } public ColumnVector getKeyVector() diff --git a/kernel/kernel-default/src/main/java/io/delta/kernel/parquet/ParquetConverters.java b/kernel/kernel-default/src/main/java/io/delta/kernel/parquet/ParquetConverters.java index 0a688ec29a..e9863f0fd9 100644 --- a/kernel/kernel-default/src/main/java/io/delta/kernel/parquet/ParquetConverters.java +++ b/kernel/kernel-default/src/main/java/io/delta/kernel/parquet/ParquetConverters.java @@ -238,6 +238,7 @@ public void resizeIfNeeded() if (nullability.length == currentRowIndex) { int newSize = nullability.length * 2; this.nullability = Arrays.copyOf(this.nullability, newSize); + setNullabilityToTrue(this.nullability, newSize / 2, newSize); } } @@ -311,6 +312,7 @@ public void resizeIfNeeded() int newSize = values.length * 2; this.values = Arrays.copyOf(this.values, newSize); this.nullability = Arrays.copyOf(this.nullability, newSize); + setNullabilityToTrue(this.nullability, newSize / 2, newSize); } } } @@ -354,6 +356,7 @@ public void resizeIfNeeded() int newSize = values.length * 2; this.values = Arrays.copyOf(this.values, newSize); this.nullability = Arrays.copyOf(this.nullability, newSize); + setNullabilityToTrue(this.nullability, newSize / 2, newSize); } } } @@ -397,6 +400,7 @@ public void resizeIfNeeded() int newSize = values.length * 2; this.values = Arrays.copyOf(this.values, newSize); this.nullability = Arrays.copyOf(this.nullability, newSize); + setNullabilityToTrue(this.nullability, newSize / 2, newSize); } } } @@ -442,6 +446,7 @@ public void resizeIfNeeded() int newSize = values.length * 2; this.values = Arrays.copyOf(this.values, newSize); this.nullability = Arrays.copyOf(this.nullability, newSize); + setNullabilityToTrue(this.nullability, newSize / 2, newSize); } } } @@ -485,6 +490,7 @@ public void resizeIfNeeded() int newSize = values.length * 2; this.values = Arrays.copyOf(this.values, newSize); this.nullability = Arrays.copyOf(this.nullability, newSize); + setNullabilityToTrue(this.nullability, newSize / 2, newSize); } } } @@ -528,6 +534,7 @@ public void resizeIfNeeded() int newSize = values.length * 2; this.values = Arrays.copyOf(this.values, newSize); this.nullability = Arrays.copyOf(this.nullability, newSize); + setNullabilityToTrue(this.nullability, newSize / 2, newSize); } } } @@ -572,6 +579,7 @@ public void resizeIfNeeded() int newSize = values.length * 2; this.values = Arrays.copyOf(this.values, newSize); this.nullability = Arrays.copyOf(this.nullability, newSize); + setNullabilityToTrue(this.nullability, newSize / 2, newSize); } } } @@ -616,6 +624,7 @@ public void resizeIfNeeded() int newSize = values.length * 2; this.values = Arrays.copyOf(this.values, newSize); this.nullability = Arrays.copyOf(this.nullability, newSize); + setNullabilityToTrue(this.nullability, newSize / 2, newSize); } } } @@ -629,4 +638,11 @@ static boolean[] initNullabilityVector(int size) return nullability; } + + static void setNullabilityToTrue(boolean[] nullability, int start, int end) + { + // Initialize all values as null. As Parquet calls this converter only for non-null + // values, make the corresponding value to false. + Arrays.fill(nullability, start, end, true); + } } diff --git a/kernel/kernel-default/src/test/java/io/delta/kernel/parquet/TestParquetBatchReader.java b/kernel/kernel-default/src/test/java/io/delta/kernel/parquet/TestParquetBatchReader.java index 7c6bcdf178..0a221a16fb 100644 --- a/kernel/kernel-default/src/test/java/io/delta/kernel/parquet/TestParquetBatchReader.java +++ b/kernel/kernel-default/src/test/java/io/delta/kernel/parquet/TestParquetBatchReader.java @@ -19,7 +19,9 @@ import java.time.LocalDate; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.hadoop.conf.Configuration; import static org.junit.Assert.assertArrayEquals; @@ -99,12 +101,9 @@ public void readAllTypesOfData() List batches = readAsBatches(batchReader, ALL_TYPES_FILE, ALL_TYPES_FILE_SCHEMA); - // Verify few values, we don't have a good test infra to compare the ColumnVectors. - verifyRowFromAllTypesFile(ALL_TYPES_FILE_SCHEMA, batches, 3); - verifyRowFromAllTypesFile(ALL_TYPES_FILE_SCHEMA, batches, 47); - verifyRowFromAllTypesFile(ALL_TYPES_FILE_SCHEMA, batches, 90); - verifyRowFromAllTypesFile(ALL_TYPES_FILE_SCHEMA, batches, 93); - verifyRowFromAllTypesFile(ALL_TYPES_FILE_SCHEMA, batches, 182); + for (int rowId = 0; rowId < 200; rowId += 2) { + verifyRowFromAllTypesFile(ALL_TYPES_FILE_SCHEMA, batches, rowId); + } } @Test @@ -309,12 +308,47 @@ private static void verifyRowFromAllTypesFile( break; } case "array_of_prims": { + boolean expIsNull = rowId % 25 == 0; + if (expIsNull) { + assertTrue(vector.isNullAt(batchWithIdx._2)); + } + else { + List expArray = Arrays.asList(rowId, null, rowId + 1); + List actArray = vector.getArray(batchWithIdx._2); + assertEquals(expArray, actArray); + } break; } case "array_of_structs": { + assertFalse(vector.isNullAt(batchWithIdx._2)); + List actArray = vector.getArray(batchWithIdx._2); + assertTrue(actArray.size() == 2); + Row item0 = actArray.get(0); + assertEquals(rowId, item0.getLong(0)); + assertNull(actArray.get(1)); break; } case "map_of_prims": { + boolean expIsNull = rowId % 28 == 0; + if (expIsNull) { + assertTrue(vector.isNullAt(batchWithIdx._2)); + } + else { + Map actValue = vector.getMap(batchWithIdx._2); + assertTrue(actValue.size() == 2); + + // entry 0: key = rowId + Integer key0 = rowId; + Long actValue0 = actValue.get(key0); + Long expValue0 = (rowId % 29 == 0) ? null : (rowId + 2L); + assertEquals(expValue0, actValue0); + + // entry 1: key = if (rowId % 27 != 0) rowId + 2 else null + Integer key1 = (rowId % 27 == 0) ? null : rowId + 2; + Long actValue1 = actValue.get(key1); + Long expValue1 = rowId + 9L; + assertEquals(expValue1, actValue1); + } break; } case "map_of_complex": {