Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Default Parquet reader implementation #1846

Closed
wants to merge 9 commits into from

Conversation

vkorukanti
Copy link
Collaborator

@vkorukanti vkorukanti commented Jun 20, 2023

Context

This PR is part of #1783.

Description

It implements Parquet reader based on parquet-mr and generates the output as columnar batches of ColumnVector and ColumnarBatch interface implementations.

How was this patch tested?

UTs

Copy link
Collaborator

@allisonport-db allisonport-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly left minor comments and a few questions. Two main items/questions

(1) I'd like to understand more about the need for the nullability array? It seems to me like it's only necessary for the GroupConverters and not the primitive ones. Also, the group converters need to be fixed for empty groups vs null values.

(2) We have multiple (primitive) converters for the same parquet type based on our datatypes. What's the reasoning for converting types in the converters vs the column vectors? We kind of convert between types in two places

  • Converters for types in the PrimitiveConverters
  • In the ColumnVector for BinaryColumnVector (string vs binary)

Essentially why not have converters only for the parquet types -- and convert to our data type in the column vectors as we already are doing in BinaryColumnVector?

return null;
}

private static Type pruneSubfields(Type type, DataType deltaDatatype)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this and pruneSchema can use the same code if we generalize it for GroupType?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up a bit, but not clean to merge them. MessageType and Group type needs two different inputs. For now keeping them separate. It only adds extra 5lines of code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only prunes the 2nd level of nesting right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we still factor out the shared code?

private static List<Type> pruneFields(GroupType type, DataType deltaDataType) {
        // prune fields including nested pruning like in pruneSchema
        return deltaType.fields().stream()
            .map(column -> {
                Type type = findSubFieldType(type, column);
                if (type != null && column instanceof StructType) {
                       return type.withNewFields(pruneFields(type, column);
                } else {
                      return type;
                }
            })
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need the MessageType for the pruneSchema.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageType extends GroupType through right? So pruneSchema can just call a helper. Not a big deal though, can we just either fix the pruning for nested fields or add a note that nested schema pruning is only 2nd level?

protected int currentRowIndex;
protected boolean[] nullability;

BasePrimitiveColumnConverter(int maxBatchSize)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my understanding, is maxBatchSize just more of a "suggested batch size" for initializing arrays? here & for the complex type converters

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it is a suggested size. I started with maxBatchSize for maximum possible size, but that got useless when it comes to the complex types. Renaming to suggestedSize

{
private final int size;
private final DataType dataType;
private final Optional<boolean[]> nullability;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to ask this here or on any of the converters; but is this needed for any of the primitive types? Does checking the value array for a null value not suffice for the primitives? (assuming they are instantiated with null values in the beginning)

  • Why do we override isNullAt for DefaultBinaryVector?

Copy link
Collaborator

@allisonport-db allisonport-db Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm java primitives can't be null. Do we need to override isNullAt for DefaultBinaryVector though? Seems like it could be the same.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need nullability vector as the values are of java primitive type arrays which can't be null.

For DefaultBinaryVector, we basically rely on byte[][], the first level array can contain nulls.

* @return
*/
@Override
public boolean isNullAt(int rowId)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again confused about the discrepancy for DefaultBinaryVector. Seems like !nullability.isPresent() is weird default behavior, and since DefaultBinaryVector is the only type where we don't provide a nullability array I wonder if it should be optional at all?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, I'm mainly wondering what's the expected behavior when no nullability array is provided? The default being all values are null in that scenario seems a little weird.

If we expect any child class that omits the nullability array to override this can we throw an error here when it isn't present?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the prev reply.

public void end()
{
int collectorIndexAtEnd = converter.currentEntryIndex;
this.nullability[currentRowIndex] = collectorIndexAtEnd == collectorIndexAtStart;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is wrong for empty arrays (I tried it out and they're read as a null value). Same thing for all the other group converters.

If start/end is called at all shouldn't nullability[currentRowIndex] = false?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. This could be a problem for empty array. Fixing it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debugging for the empty array case and null array case, unfortunately Parquet calls GroupConvert.start and end in both cases. Looking at the Parquet-Mr provided implementation of same for Avro conversion, they seem to have the same issue. If there is no element set call, then it is considered a null.

I added a test case for now. Once this PR is merged, will create an issue to if the behavior in other engines and if it is going to be a problem.

@vkorukanti
Copy link
Collaborator Author

(1) I'd like to understand more about the need for the nullability array? It seems to me like it's only necessary for the GroupConverters and not the primitive ones. Also, the group converters need to be fixed for empty groups vs null values.

The nullability array is needed for primitive types, because the values array is of Java primitive types where you can't have the null value. Also this is optional. So in future if we know the value is not going to have any null, then we don't need to pass any nullable array.

(2) We have multiple (primitive) converters for the same parquet type based on our datatypes. What's the reasoning for converting types in the converters vs the column vectors? We kind of convert between types in two places

  • Converters for types in the PrimitiveConverters
  • In the ColumnVector for BinaryColumnVector (string vs binary)

Essentially why not have converters only for the parquet types -- and convert to our data type in the column vectors as we already are doing in BinaryColumnVector?

I think there is some duplicate code here. Will clean up.

@vkorukanti
Copy link
Collaborator Author

vkorukanti commented Jun 22, 2023

(2) We have multiple (primitive) converters for the same parquet type based on our datatypes. What's the reasoning for converting types in the converters vs the column vectors? We kind of convert between types in two places

  • Converters for types in the PrimitiveConverters
  • In the ColumnVector for BinaryColumnVector (string vs binary)

Essentially why not have converters only for the parquet types -- and convert to our data type in the column vectors as we already are doing in BinaryColumnVector?

I think there is some duplicate code here. Will clean up.

I think the reason we have different types is because we have different vectors and different access methods (getShort vs getInt). These vectors need different value vector types. Thats the reason we have. Let me know if you think we can remove any specific converter.

@allisonport-db
Copy link
Collaborator

(2) We have multiple (primitive) converters for the same parquet type based on our datatypes. What's the reasoning for converting types in the converters vs the column vectors? We kind of convert between types in two places

  • Converters for types in the PrimitiveConverters
  • In the ColumnVector for BinaryColumnVector (string vs binary)

Essentially why not have converters only for the parquet types -- and convert to our data type in the column vectors as we already are doing in BinaryColumnVector?

I think there is some duplicate code here. Will clean up.

I think the reason we have different types is because we have different vectors and different access methods (getShort vs getInt). These vectors need different value vector types. Thats the reason we have. Let me know if you think we can remove any specific converter.

I guess it's just inconsistent since BinaryType and StringType share DefaultBinaryVector? But the other types that share the same parquet base type do not.

@vkorukanti
Copy link
Collaborator Author

(2) We have multiple (primitive) converters for the same parquet type based on our datatypes. What's the reasoning for converting types in the converters vs the column vectors? We kind of convert between types in two places

  • Converters for types in the PrimitiveConverters
  • In the ColumnVector for BinaryColumnVector (string vs binary)

Essentially why not have converters only for the parquet types -- and convert to our data type in the column vectors as we already are doing in BinaryColumnVector?

I think there is some duplicate code here. Will clean up.

I think the reason we have different types is because we have different vectors and different access methods (getShort vs getInt). These vectors need different value vector types. Thats the reason we have. Let me know if you think we can remove any specific converter.

I guess it's just inconsistent since BinaryType and StringType share DefaultBinaryVector? But the other types that share the same parquet base type do not.

String and Binary vector contents are same byte[][], but not the same for for example: Short and Integer vectors (its short[] vs int[])

}

@Override
public boolean hasNext()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't idempotent; but I think I'll need to update this to add the row_index anyways so I'm fine with adding a comment to fix this and I'll address it in my PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, this should be fixed. I will push a fix.

return null;
}

private static Type pruneSubfields(Type type, DataType deltaDatatype)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only prunes the 2nd level of nesting right?

return null;
}

private static Type pruneSubfields(Type type, DataType deltaDatatype)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we still factor out the shared code?

private static List<Type> pruneFields(GroupType type, DataType deltaDataType) {
        // prune fields including nested pruning like in pruneSchema
        return deltaType.fields().stream()
            .map(column -> {
                Type type = findSubFieldType(type, column);
                if (type != null && column instanceof StructType) {
                       return type.withNewFields(pruneFields(type, column);
                } else {
                      return type;
                }
            })
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
}

* @return
*/
public static Object getValueAsObject(ColumnVector vector, int rowId) {
// TODO: may be it is better to just provide a `getObject` on the `ColumnVector` to
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subclasses can access it right?

GroupType typeFromFile)
{
this.typeFromClient = typeFromClient;
final GroupType innerElementType = (GroupType) typeFromFile.getType("list");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should note this down to address. I remember there being a lot of different variations

Comment on lines 159 to 160
protected int currentRowIndex;
protected boolean[] nullability;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking resetWorkingState() would override and reset these, and is called in implementations of getDataColumnVector. Not a big deal though just unifies code

case "decimal": {
throw new UnsupportedOperationException("not yet implemented: " + name);
}
case "nested_struct": {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test case for null for structs as well? (Or at least future todo?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are second level structs which are null.

@allisonport-db
Copy link
Collaborator

I think the reason we have different types is because we have different vectors and different access methods (getShort vs getInt). These vectors need different value vector types. Thats the reason we have. Let me know if you think we can remove any specific converter.

I guess it's just inconsistent since BinaryType and StringType share DefaultBinaryVector? But the other types that share the same parquet base type do not.

String and Binary vector contents are same byte[][], but not the same for for example: Short and Integer vectors (its short[] vs int[])

I guess my question then would be why do they need different value vector types. What's the difference between doing the conversion between types in for example getShort vs ShortColumnConverter.addInt. Checking for illegal access to a getter could be done in the same way as in DefaultBinaryVector. Mostly just asking questions for my own understanding

@vkorukanti
Copy link
Collaborator Author

vkorukanti commented Jun 22, 2023

Storage space and unnecessary type check for every call of getShort

@vkorukanti vkorukanti deleted the pr2-parquet branch September 14, 2023 11:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants