Skip to content

fix: add proto roundtrips for Spark tests and fix issues it surfaces #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Mar 28, 2025

Conversation

Blizzara
Copy link
Contributor

@Blizzara Blizzara commented Oct 28, 2024

Adds testing for substrait-spark that going from POJO (ie. substrait-java plan) -> Proto -> POJO results in the same POJO.

The test showed a bunch of cases where that assertion fails, mainly due to the java pojos containing a derived outputType which was in many cases incorrect when created from the proto.

// count only needs to be set when it is not -1
builder.count(rel.getCount());
}
var builder = Fetch.builder().input(input).offset(rel.getOffset()).count(rel.getCount());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while the idea of not setting count if it's -1 is fine, this makes roundtrip tests fail if count is set in the pojo. Alternative fix is to ensure in the pojo it's never set if -1.

@@ -131,7 +131,7 @@ class ToSubstraitRel extends AbstractLogicalPlanVisitor with Logging {
val aggregates = collectAggregates(actualResultExprs, aggExprToOutputOrdinal)
val aggOutputMap = aggregates.zipWithIndex.map {
case (e, i) =>
AttributeReference(s"agg_func_$i", e.dataType)() -> e
AttributeReference(s"agg_func_$i", e.dataType, nullable = e.nullable)() -> e
Copy link
Contributor Author

@Blizzara Blizzara Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these were causing wrong nullability for the type in the created pojos. I don't think that type field is used anywhere so it didn't cause harm, but still failed roundtrip tests as the type isn't written in proto and then it got correctly evaluated from other fields on read.

@Blizzara Blizzara force-pushed the avo/stronger-testing branch 2 times, most recently from 623ee12 to e374c85 Compare November 21, 2024 17:31
@Blizzara Blizzara force-pushed the avo/stronger-testing branch from 13a3b99 to ad8c73b Compare March 5, 2025 10:07
@Blizzara Blizzara force-pushed the avo/stronger-testing branch from b6b2307 to e7be830 Compare March 5, 2025 15:23
@Blizzara Blizzara changed the title [wip] fix: add proto roundtrips for Spark tests and fix issues it surfaces fix: add proto roundtrips for Spark tests and fix issues it surfaces Mar 5, 2025
@Blizzara Blizzara force-pushed the avo/stronger-testing branch 2 times, most recently from 58ac64e to 4451193 Compare March 5, 2025 15:43
@Blizzara Blizzara marked this pull request as ready for review March 5, 2025 15:48
@Blizzara Blizzara force-pushed the avo/stronger-testing branch 2 times, most recently from 2945e97 to fb788e1 Compare March 5, 2025 16:16
@Blizzara
Copy link
Contributor Author

Blizzara commented Mar 5, 2025

@vbarua @andrew-coleman this has been open for a while, but now finally ready for review! The testing change collides a bit with Andrew's #333, but either should be trivial to rebase once the other is in.

@@ -42,7 +46,38 @@ public static Set.SetOp fromProto(SetRel.SetOp proto) {

@Override
protected Type.Struct deriveRecordType() {
return getInputs().get(0).getRecordType();
// The different inputs may have schemas that differ in nullability, but not in type.
// In that case we should return a schema that is nullable where any of the inputs is nullable.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the docs for this (https://substrait.io/relations/logical_relations/#set-operation-types), the output nullability depends on which set operation is being performed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, I realized that as well but forgot to fix 😅 I'll try to tomorrow..

@Blizzara Blizzara force-pushed the avo/stronger-testing branch 2 times, most recently from 2dc72e8 to 1925efc Compare March 7, 2025 13:32
Copy link
Member

@vbarua vbarua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found some time to actually look at this. Have one comment about the nullability of Scalar Subqueries, and one requests for tests for the Set output type derivation logic.


// As defined in https://substrait.io/relations/logical_relations/#set-operation-types
return switch (getSetOp()) {
case UNKNOWN -> first; // alternative would be to throw an exception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meta: out of scope for this PR, but given that there is no default operation for when this is not specified it might make sense to not allow UKNOWN in the POJOs. That is, we should force the user to set the operation field.

"Scalar subquery must have exactly one field");
}
// Result can be null if the query returns no rows
return TypeCreator.asNullable(type.fields().get(0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Result can be null if the query returns no rows

Is this actually the case? If you have a non-nullable column, but there are no values, that column is still non-nullable as far as I understand it.

In cany case, the spec indicates:

// A subquery with one row and one column. This is often an aggregate
// though not required to be.

So I think it would be safe to just use the nullability of the field as is. If there is no row returned by the subquery that's a violation of the spec as written.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue was that Spark was always reporting nullable=true, which meant the from-proto didn't match the from-spark version. However this "fix" I had was wrong indeed, I made a more correct one here: 4f7f274. Thanks!

val protoPlan = io.substrait.proto.Rel.parseFrom(bytes)
val substraitPlan2 =
new ProtoRelConverter(extensionCollector, SparkExtension.COLLECTION).from(protoPlan)
substraitPlan2.shouldEqualPlainly(substraitPlan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good check to have generally ✨

@@ -550,10 +550,12 @@ private[logical] class WithLogicalSubQuery(toSubstraitRel: ToSubstraitRel)
expr match {
case s: ScalarSubquery if s.outerAttrs.isEmpty && s.joinCond.isEmpty =>
val rel = toSubstraitRel.visit(s.plan)
val t =
s.plan.schema.fields.head // Using this instead of s.dataType/s.nullable to get correct nullability
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's two ScalarSubquery classes in Spark, the one we're using here is https://github.com/apache/spark/blob/9fe78e3d33499060467ecdc0c2631beae0b0316c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala#L411 which always sets nullability to true. However the other one picks the nullability based on the plan's schema: https://github.com/apache/spark/blob/9fe78e3d33499060467ecdc0c2631beae0b0316c/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala#L68. I'm not very sure why there's a difference, but picking it from the plan seems to align better with what we expect here (and makes the tests pass)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They both have override def nullable: Boolean = true (line 69 in your second link)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops indeed, I can't read 🤦. Still, I think the change here is fine nonetheless.

int finalI = i;
boolean anyOtherIsRequired = rest.stream().anyMatch(t -> !t.fields().get(finalI).nullable());
fields.add(anyOtherIsRequired ? TypeCreator.asNotNullable(typeA) : typeA);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Tiny nit - both of these functions share a lot of common code (only 2 lines differ) - could they be refactored somehow? Not important though :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it before as a single function, but it ends up making it harder to read since while the difference is small, it's non-trivial :/

Copy link
Member

@vbarua vbarua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good. Will merge them shortly.

@vbarua vbarua merged commit fd74922 into substrait-io:main Mar 28, 2025
12 of 13 checks passed
@Blizzara Blizzara deleted the avo/stronger-testing branch March 28, 2025 07:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants