-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel] Default Parquet reader implementation #1846
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly left minor comments and a few questions. Two main items/questions
(1) I'd like to understand more about the need for the nullability array? It seems to me like it's only necessary for the GroupConverters and not the primitive ones. Also, the group converters need to be fixed for empty groups vs null values.
(2) We have multiple (primitive) converters for the same parquet type based on our datatypes. What's the reasoning for converting types in the converters vs the column vectors? We kind of convert between types in two places
- Converters for types in the PrimitiveConverters
- In the ColumnVector for BinaryColumnVector (string vs binary)
Essentially why not have converters only for the parquet types -- and convert to our data type in the column vectors as we already are doing in BinaryColumnVector?
return null; | ||
} | ||
|
||
private static Type pruneSubfields(Type type, DataType deltaDatatype) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this and pruneSchema
can use the same code if we generalize it for GroupType
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaned up a bit, but not clean to merge them. MessageType and Group type needs two different inputs. For now keeping them separate. It only adds extra 5lines of code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only prunes the 2nd level of nesting right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we still factor out the shared code?
private static List<Type> pruneFields(GroupType type, DataType deltaDataType) {
// prune fields including nested pruning like in pruneSchema
return deltaType.fields().stream()
.map(column -> {
Type type = findSubFieldType(type, column);
if (type != null && column instanceof StructType) {
return type.withNewFields(pruneFields(type, column);
} else {
return type;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need the MessageType for the pruneSchema
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageType extends GroupType through right? So pruneSchema can just call a helper. Not a big deal though, can we just either fix the pruning for nested fields or add a note that nested schema pruning is only 2nd level?
kernel/kernel-default/src/main/java/io/delta/kernel/DefaultKernelUtils.java
Outdated
Show resolved
Hide resolved
kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultParquetHandler.java
Outdated
Show resolved
Hide resolved
kernel/kernel-default/src/main/java/io/delta/kernel/client/DefaultParquetHandler.java
Outdated
Show resolved
Hide resolved
kernel/kernel-default/src/main/java/io/delta/kernel/data/vector/DefaultBinaryVector.java
Outdated
Show resolved
Hide resolved
kernel/kernel-default/src/main/java/io/delta/kernel/parquet/RowConverter.java
Outdated
Show resolved
Hide resolved
protected int currentRowIndex; | ||
protected boolean[] nullability; | ||
|
||
BasePrimitiveColumnConverter(int maxBatchSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my understanding, is maxBatchSize
just more of a "suggested batch size" for initializing arrays? here & for the complex type converters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, it is a suggested size. I started with maxBatchSize for maximum possible size, but that got useless when it comes to the complex types. Renaming to suggestedSize
{ | ||
private final int size; | ||
private final DataType dataType; | ||
private final Optional<boolean[]> nullability; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to ask this here or on any of the converters; but is this needed for any of the primitive types? Does checking the value array for a null value not suffice for the primitives? (assuming they are instantiated with null values in the beginning)
- Why do we override
isNullAt
forDefaultBinaryVector
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nvm java primitives can't be null. Do we need to override isNullAt for DefaultBinaryVector though? Seems like it could be the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need nullability vector as the values are of java primitive type arrays which can't be null.
For DefaultBinaryVector
, we basically rely on byte[][], the first level array can contain nulls.
* @return | ||
*/ | ||
@Override | ||
public boolean isNullAt(int rowId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again confused about the discrepancy for DefaultBinaryVector. Seems like !nullability.isPresent()
is weird default behavior, and since DefaultBinaryVector is the only type where we don't provide a nullability
array I wonder if it should be optional at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, I'm mainly wondering what's the expected behavior when no nullability
array is provided? The default being all values are null in that scenario seems a little weird.
If we expect any child class that omits the nullability
array to override this can we throw an error here when it isn't present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see the prev reply.
public void end() | ||
{ | ||
int collectorIndexAtEnd = converter.currentEntryIndex; | ||
this.nullability[currentRowIndex] = collectorIndexAtEnd == collectorIndexAtStart; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is wrong for empty arrays (I tried it out and they're read as a null value). Same thing for all the other group converters.
If start/end is called at all shouldn't nullability[currentRowIndex] = false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. This could be a problem for empty array. Fixing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debugging for the empty array case and null array case, unfortunately Parquet calls GroupConvert.start and end in both cases. Looking at the Parquet-Mr provided implementation of same for Avro conversion, they seem to have the same issue. If there is no element set call, then it is considered a null.
I added a test case for now. Once this PR is merged, will create an issue to if the behavior in other engines and if it is going to be a problem.
kernel/kernel-default/src/test/java/io/delta/kernel/parquet/TestParquetBatchReader.java
Show resolved
Hide resolved
The nullability array is needed for primitive types, because the values array is of Java primitive types where you can't have the null value. Also this is optional. So in future if we know the value is not going to have any null, then we don't need to pass any nullable array.
I think there is some duplicate code here. Will clean up. |
I think the reason we have different types is because we have different vectors and different access methods (getShort vs getInt). These vectors need different value vector types. Thats the reason we have. Let me know if you think we can remove any specific converter. |
I guess it's just inconsistent since BinaryType and StringType share |
String and Binary vector contents are same |
} | ||
|
||
@Override | ||
public boolean hasNext() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't idempotent; but I think I'll need to update this to add the row_index anyways so I'm fine with adding a comment to fix this and I'll address it in my PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, this should be fixed. I will push a fix.
return null; | ||
} | ||
|
||
private static Type pruneSubfields(Type type, DataType deltaDatatype) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only prunes the 2nd level of nesting right?
return null; | ||
} | ||
|
||
private static Type pruneSubfields(Type type, DataType deltaDatatype) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we still factor out the shared code?
private static List<Type> pruneFields(GroupType type, DataType deltaDataType) {
// prune fields including nested pruning like in pruneSchema
return deltaType.fields().stream()
.map(column -> {
Type type = findSubFieldType(type, column);
if (type != null && column instanceof StructType) {
return type.withNewFields(pruneFields(type, column);
} else {
return type;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
* @return | ||
*/ | ||
public static Object getValueAsObject(ColumnVector vector, int rowId) { | ||
// TODO: may be it is better to just provide a `getObject` on the `ColumnVector` to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subclasses can access it right?
GroupType typeFromFile) | ||
{ | ||
this.typeFromClient = typeFromClient; | ||
final GroupType innerElementType = (GroupType) typeFromFile.getType("list"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should note this down to address. I remember there being a lot of different variations
protected int currentRowIndex; | ||
protected boolean[] nullability; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking resetWorkingState() would override and reset these, and is called in implementations of getDataColumnVector. Not a big deal though just unifies code
case "decimal": { | ||
throw new UnsupportedOperationException("not yet implemented: " + name); | ||
} | ||
case "nested_struct": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test case for null for structs as well? (Or at least future todo?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are second level structs which are null.
I guess my question then would be why do they need different value vector types. What's the difference between doing the conversion between types in for example |
Storage space and unnecessary type check for every call of |
kernel/kernel-default/src/main/java/io/delta/kernel/DefaultKernelUtils.java
Show resolved
Hide resolved
…nelUtils.java Co-authored-by: Allison Portis <[email protected]>
Context
This PR is part of #1783.
Description
It implements Parquet reader based on
parquet-mr
and generates the output as columnar batches ofColumnVector
andColumnarBatch
interface implementations.How was this patch tested?
UTs