Skip to content

Commit

Permalink
update test and fix issues
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Jun 21, 2023
1 parent 01acec2 commit 20d6d33
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ public static Object getValueAsObject(ColumnVector vector, int rowId) {
// avoid the nested if-else statements.
final DataType dataType = vector.getDataType();

if (vector.isNullAt(rowId)) {
return null;
}

if (dataType instanceof BooleanType) {
return vector.getBoolean(rowId);
} else if (dataType instanceof ByteType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.delta.kernel.parquet;

import static io.delta.kernel.parquet.ParquetConverters.initNullabilityVector;
import static io.delta.kernel.parquet.ParquetConverters.setNullabilityToTrue;
import java.util.Arrays;
import java.util.Optional;
import org.apache.parquet.io.api.Converter;
Expand All @@ -38,6 +39,7 @@ public class ArrayConverter
private int currentRowIndex;
private boolean[] nullability;
private int[] offsets;
private int collectorIndexAtStart;

public ArrayConverter(
int maxBatchSize,
Expand All @@ -52,6 +54,7 @@ public ArrayConverter(
innerElementType
);

// initialize working state
this.nullability = initNullabilityVector(maxBatchSize);
this.offsets = new int[maxBatchSize + 1];
}
Expand All @@ -71,13 +74,15 @@ public Converter getConverter(int fieldIndex)
@Override
public void start()
{
converter.start();
collectorIndexAtStart = converter.currentEntryIndex;
}

@Override
public void end()
{
converter.end();
int collectorIndexAtEnd = converter.currentEntryIndex;
this.nullability[currentRowIndex] = collectorIndexAtEnd == collectorIndexAtStart;
this.offsets[currentRowIndex + 1] = collectorIndexAtEnd;
}

@Override
Expand All @@ -91,7 +96,6 @@ public ColumnVector getDataColumnVector(int batchSize)
converter.getArrayVector()
);
this.currentRowIndex = 0;
this.converter.currentEntryIndex = 0;
this.nullability = initNullabilityVector(nullability.length);
this.offsets = new int[offsets.length];

Expand All @@ -101,14 +105,10 @@ public ColumnVector getDataColumnVector(int batchSize)
@Override
public boolean moveToNextRow()
{
boolean isNull = converter.isLastValueNull;
nullability[currentRowIndex] = isNull;
offsets[currentRowIndex + 1] = converter.currentEntryIndex;
currentRowIndex++;

resizeIfNeeded();

return isNull;
return nullability[currentRowIndex - 1];
}

@Override
Expand All @@ -117,6 +117,8 @@ public void resizeIfNeeded()
if (nullability.length == currentRowIndex) {
int newSize = nullability.length * 2;
this.nullability = Arrays.copyOf(this.nullability, newSize);
setNullabilityToTrue(this.nullability, newSize / 2, newSize);

this.offsets = Arrays.copyOf(this.offsets, newSize + 1);
}
}
Expand All @@ -128,12 +130,8 @@ public static class ArrayCollector

// working state
private int currentEntryIndex;
private boolean isLastValueNull;

public ArrayCollector(
int maxBatchSize,
ArrayType typeFromClient,
GroupType innerArrayType)
public ArrayCollector(int maxBatchSize, ArrayType typeFromClient, GroupType innerArrayType)
{
this.converter = ParquetConverters.createConverter(
maxBatchSize,
Expand Down Expand Up @@ -167,17 +165,17 @@ public void end()
if (!converter.isPrimitive()) {
converter.asGroupConverter().end();
}
isLastValueNull = ((ParquetConverters.BaseConverter) converter).moveToNextRow();

if (!isLastValueNull) {
currentEntryIndex++;
}
((ParquetConverters.BaseConverter) converter).moveToNextRow();
currentEntryIndex++;
}

public ColumnVector getArrayVector()
{
return ((ParquetConverters.BaseConverter) converter)
ColumnVector vector = ((ParquetConverters.BaseConverter) converter)
.getDataColumnVector(currentEntryIndex);

currentEntryIndex = 0;
return vector;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.delta.kernel.parquet;

import static io.delta.kernel.parquet.ParquetConverters.initNullabilityVector;
import static io.delta.kernel.parquet.ParquetConverters.setNullabilityToTrue;
import java.util.Arrays;
import java.util.Optional;
import org.apache.parquet.io.api.Converter;
Expand All @@ -38,6 +39,7 @@ public class MapConverter
private int currentRowIndex;
private boolean[] nullability;
private int[] offsets;
private int collectorIndexAtStart;

public MapConverter(
int maxBatchSize,
Expand All @@ -53,11 +55,7 @@ public MapConverter(
);

// initialize working state
this.nullability = new boolean[maxBatchSize];
// Initialize all values as null. As Parquet calls this converter only for non-null
// values, make the corresponding value to false.
Arrays.fill(this.nullability, true);

this.nullability = initNullabilityVector(maxBatchSize);
this.offsets = new int[maxBatchSize + 1];
}

Expand All @@ -76,13 +74,15 @@ public Converter getConverter(int fieldIndex)
@Override
public void start()
{
converter.asGroupConverter().start();
collectorIndexAtStart = converter.currentEntryIndex;
}

@Override
public void end()
{
converter.asGroupConverter().end();
int collectorIndexAtEnd = converter.currentEntryIndex;
this.nullability[currentRowIndex] = collectorIndexAtEnd == collectorIndexAtStart;
this.offsets[currentRowIndex + 1] = collectorIndexAtEnd;
}

@Override
Expand All @@ -100,19 +100,17 @@ public ColumnVector getDataColumnVector(int batchSize)
this.converter.currentEntryIndex = 0;
this.nullability = initNullabilityVector(nullability.length);
this.offsets = new int[offsets.length];

return vector;
}

@Override
public boolean moveToNextRow()
{
boolean isNull = converter.isLastValueNull;
nullability[currentRowIndex] = isNull;
offsets[currentRowIndex + 1] = converter.currentEntryIndex;
currentRowIndex++;
resizeIfNeeded();

return isNull;
return nullability[currentRowIndex - 1];
}

@Override
Expand All @@ -121,6 +119,7 @@ public void resizeIfNeeded()
if (nullability.length == currentRowIndex) {
int newSize = nullability.length * 2;
this.nullability = Arrays.copyOf(this.nullability, newSize);
setNullabilityToTrue(this.nullability, newSize / 2, newSize);
this.offsets = Arrays.copyOf(this.offsets, newSize + 1);
}
}
Expand All @@ -132,7 +131,6 @@ public static class MapCollector

// working state
private int currentEntryIndex;
private boolean isLastValueNull;

public MapCollector(
int maxBatchSize,
Expand Down Expand Up @@ -178,17 +176,11 @@ public void end()
.filter(conv -> !conv.isPrimitive())
.forEach(conv -> ((GroupConverter) conv).end());

long memberNullCount = Arrays.stream(converters)
Arrays.stream(converters)
.map(converter -> (ParquetConverters.BaseConverter) converter)
.map(converters -> converters.moveToNextRow())
.filter(result -> result)
.count();
.forEach(converter -> converter.moveToNextRow());

isLastValueNull = memberNullCount == converters.length;

if (!isLastValueNull) {
currentEntryIndex++;
}
currentEntryIndex++;
}

public ColumnVector getKeyVector()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public void resizeIfNeeded()
if (nullability.length == currentRowIndex) {
int newSize = nullability.length * 2;
this.nullability = Arrays.copyOf(this.nullability, newSize);
setNullabilityToTrue(this.nullability, newSize / 2, newSize);
}
}

Expand Down Expand Up @@ -311,6 +312,7 @@ public void resizeIfNeeded()
int newSize = values.length * 2;
this.values = Arrays.copyOf(this.values, newSize);
this.nullability = Arrays.copyOf(this.nullability, newSize);
setNullabilityToTrue(this.nullability, newSize / 2, newSize);
}
}
}
Expand Down Expand Up @@ -354,6 +356,7 @@ public void resizeIfNeeded()
int newSize = values.length * 2;
this.values = Arrays.copyOf(this.values, newSize);
this.nullability = Arrays.copyOf(this.nullability, newSize);
setNullabilityToTrue(this.nullability, newSize / 2, newSize);
}
}
}
Expand Down Expand Up @@ -397,6 +400,7 @@ public void resizeIfNeeded()
int newSize = values.length * 2;
this.values = Arrays.copyOf(this.values, newSize);
this.nullability = Arrays.copyOf(this.nullability, newSize);
setNullabilityToTrue(this.nullability, newSize / 2, newSize);
}
}
}
Expand Down Expand Up @@ -442,6 +446,7 @@ public void resizeIfNeeded()
int newSize = values.length * 2;
this.values = Arrays.copyOf(this.values, newSize);
this.nullability = Arrays.copyOf(this.nullability, newSize);
setNullabilityToTrue(this.nullability, newSize / 2, newSize);
}
}
}
Expand Down Expand Up @@ -485,6 +490,7 @@ public void resizeIfNeeded()
int newSize = values.length * 2;
this.values = Arrays.copyOf(this.values, newSize);
this.nullability = Arrays.copyOf(this.nullability, newSize);
setNullabilityToTrue(this.nullability, newSize / 2, newSize);
}
}
}
Expand Down Expand Up @@ -528,6 +534,7 @@ public void resizeIfNeeded()
int newSize = values.length * 2;
this.values = Arrays.copyOf(this.values, newSize);
this.nullability = Arrays.copyOf(this.nullability, newSize);
setNullabilityToTrue(this.nullability, newSize / 2, newSize);
}
}
}
Expand Down Expand Up @@ -572,6 +579,7 @@ public void resizeIfNeeded()
int newSize = values.length * 2;
this.values = Arrays.copyOf(this.values, newSize);
this.nullability = Arrays.copyOf(this.nullability, newSize);
setNullabilityToTrue(this.nullability, newSize / 2, newSize);
}
}
}
Expand Down Expand Up @@ -616,6 +624,7 @@ public void resizeIfNeeded()
int newSize = values.length * 2;
this.values = Arrays.copyOf(this.values, newSize);
this.nullability = Arrays.copyOf(this.nullability, newSize);
setNullabilityToTrue(this.nullability, newSize / 2, newSize);
}
}
}
Expand All @@ -629,4 +638,11 @@ static boolean[] initNullabilityVector(int size)

return nullability;
}

static void setNullabilityToTrue(boolean[] nullability, int start, int end)
{
// Initialize all values as null. As Parquet calls this converter only for non-null
// values, make the corresponding value to false.
Arrays.fill(nullability, start, end, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import static org.junit.Assert.assertArrayEquals;
Expand Down Expand Up @@ -99,12 +101,9 @@ public void readAllTypesOfData()
List<ColumnarBatch> batches =
readAsBatches(batchReader, ALL_TYPES_FILE, ALL_TYPES_FILE_SCHEMA);

// Verify few values, we don't have a good test infra to compare the ColumnVectors.
verifyRowFromAllTypesFile(ALL_TYPES_FILE_SCHEMA, batches, 3);
verifyRowFromAllTypesFile(ALL_TYPES_FILE_SCHEMA, batches, 47);
verifyRowFromAllTypesFile(ALL_TYPES_FILE_SCHEMA, batches, 90);
verifyRowFromAllTypesFile(ALL_TYPES_FILE_SCHEMA, batches, 93);
verifyRowFromAllTypesFile(ALL_TYPES_FILE_SCHEMA, batches, 182);
for (int rowId = 0; rowId < 200; rowId += 2) {
verifyRowFromAllTypesFile(ALL_TYPES_FILE_SCHEMA, batches, rowId);
}
}

@Test
Expand Down Expand Up @@ -309,12 +308,47 @@ private static void verifyRowFromAllTypesFile(
break;
}
case "array_of_prims": {
boolean expIsNull = rowId % 25 == 0;
if (expIsNull) {
assertTrue(vector.isNullAt(batchWithIdx._2));
}
else {
List<Integer> expArray = Arrays.asList(rowId, null, rowId + 1);
List<Integer> actArray = vector.getArray(batchWithIdx._2);
assertEquals(expArray, actArray);
}
break;
}
case "array_of_structs": {
assertFalse(vector.isNullAt(batchWithIdx._2));
List<Row> actArray = vector.getArray(batchWithIdx._2);
assertTrue(actArray.size() == 2);
Row item0 = actArray.get(0);
assertEquals(rowId, item0.getLong(0));
assertNull(actArray.get(1));
break;
}
case "map_of_prims": {
boolean expIsNull = rowId % 28 == 0;
if (expIsNull) {
assertTrue(vector.isNullAt(batchWithIdx._2));
}
else {
Map<Integer, Long> actValue = vector.getMap(batchWithIdx._2);
assertTrue(actValue.size() == 2);

// entry 0: key = rowId
Integer key0 = rowId;
Long actValue0 = actValue.get(key0);
Long expValue0 = (rowId % 29 == 0) ? null : (rowId + 2L);
assertEquals(expValue0, actValue0);

// entry 1: key = if (rowId % 27 != 0) rowId + 2 else null
Integer key1 = (rowId % 27 == 0) ? null : rowId + 2;
Long actValue1 = actValue.get(key1);
Long expValue1 = rowId + 9L;
assertEquals(expValue1, actValue1);
}
break;
}
case "map_of_complex": {
Expand Down

0 comments on commit 20d6d33

Please sign in to comment.