-
Notifications
You must be signed in to change notification settings - Fork 84
fix: add proto roundtrips for Spark tests and fix issues it surfaces #315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
// count only needs to be set when it is not -1 | ||
builder.count(rel.getCount()); | ||
} | ||
var builder = Fetch.builder().input(input).offset(rel.getOffset()).count(rel.getCount()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while the idea of not setting count if it's -1 is fine, this makes roundtrip tests fail if count is set in the pojo. Alternative fix is to ensure in the pojo it's never set if -1.
@@ -131,7 +131,7 @@ class ToSubstraitRel extends AbstractLogicalPlanVisitor with Logging { | |||
val aggregates = collectAggregates(actualResultExprs, aggExprToOutputOrdinal) | |||
val aggOutputMap = aggregates.zipWithIndex.map { | |||
case (e, i) => | |||
AttributeReference(s"agg_func_$i", e.dataType)() -> e | |||
AttributeReference(s"agg_func_$i", e.dataType, nullable = e.nullable)() -> e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these were causing wrong nullability for the type in the created pojos. I don't think that type field is used anywhere so it didn't cause harm, but still failed roundtrip tests as the type isn't written in proto and then it got correctly evaluated from other fields on read.
spark/src/main/scala/io/substrait/spark/logical/ToSubstraitRel.scala
Outdated
Show resolved
Hide resolved
623ee12
to
e374c85
Compare
13a3b99
to
ad8c73b
Compare
b6b2307
to
e7be830
Compare
58ac64e
to
4451193
Compare
2945e97
to
fb788e1
Compare
@vbarua @andrew-coleman this has been open for a while, but now finally ready for review! The testing change collides a bit with Andrew's #333, but either should be trivial to rebase once the other is in. |
@@ -42,7 +46,38 @@ public static Set.SetOp fromProto(SetRel.SetOp proto) { | |||
|
|||
@Override | |||
protected Type.Struct deriveRecordType() { | |||
return getInputs().get(0).getRecordType(); | |||
// The different inputs may have schemas that differ in nullability, but not in type. | |||
// In that case we should return a schema that is nullable where any of the inputs is nullable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the docs for this (https://substrait.io/relations/logical_relations/#set-operation-types), the output nullability depends on which set operation is being performed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, I realized that as well but forgot to fix 😅 I'll try to tomorrow..
2dc72e8
to
1925efc
Compare
1925efc
to
f3fee70
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found some time to actually look at this. Have one comment about the nullability of Scalar Subqueries, and one requests for tests for the Set output type derivation logic.
|
||
// As defined in https://substrait.io/relations/logical_relations/#set-operation-types | ||
return switch (getSetOp()) { | ||
case UNKNOWN -> first; // alternative would be to throw an exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meta: out of scope for this PR, but given that there is no default operation for when this is not specified it might make sense to not allow UKNOWN in the POJOs. That is, we should force the user to set the operation field.
"Scalar subquery must have exactly one field"); | ||
} | ||
// Result can be null if the query returns no rows | ||
return TypeCreator.asNullable(type.fields().get(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Result can be null if the query returns no rows
Is this actually the case? If you have a non-nullable column, but there are no values, that column is still non-nullable as far as I understand it.
In cany case, the spec indicates:
// A subquery with one row and one column. This is often an aggregate
// though not required to be.
So I think it would be safe to just use the nullability of the field as is. If there is no row returned by the subquery that's a violation of the spec as written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issue was that Spark was always reporting nullable=true, which meant the from-proto didn't match the from-spark version. However this "fix" I had was wrong indeed, I made a more correct one here: 4f7f274. Thanks!
val protoPlan = io.substrait.proto.Rel.parseFrom(bytes) | ||
val substraitPlan2 = | ||
new ProtoRelConverter(extensionCollector, SparkExtension.COLLECTION).from(protoPlan) | ||
substraitPlan2.shouldEqualPlainly(substraitPlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good check to have generally ✨
@@ -550,10 +550,12 @@ private[logical] class WithLogicalSubQuery(toSubstraitRel: ToSubstraitRel) | |||
expr match { | |||
case s: ScalarSubquery if s.outerAttrs.isEmpty && s.joinCond.isEmpty => | |||
val rel = toSubstraitRel.visit(s.plan) | |||
val t = | |||
s.plan.schema.fields.head // Using this instead of s.dataType/s.nullable to get correct nullability |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's two ScalarSubquery classes in Spark, the one we're using here is https://github.com/apache/spark/blob/9fe78e3d33499060467ecdc0c2631beae0b0316c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala#L411 which always sets nullability to true. However the other one picks the nullability based on the plan's schema: https://github.com/apache/spark/blob/9fe78e3d33499060467ecdc0c2631beae0b0316c/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala#L68. I'm not very sure why there's a difference, but picking it from the plan seems to align better with what we expect here (and makes the tests pass)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They both have override def nullable: Boolean = true
(line 69 in your second link)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woops indeed, I can't read 🤦. Still, I think the change here is fine nonetheless.
int finalI = i; | ||
boolean anyOtherIsRequired = rest.stream().anyMatch(t -> !t.fields().get(finalI).nullable()); | ||
fields.add(anyOtherIsRequired ? TypeCreator.asNotNullable(typeA) : typeA); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Tiny nit - both of these functions share a lot of common code (only 2 lines differ) - could they be refactored somehow? Not important though :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had it before as a single function, but it ends up making it harder to read since while the difference is small, it's non-trivial :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good. Will merge them shortly.
Adds testing for substrait-spark that going from POJO (ie. substrait-java plan) -> Proto -> POJO results in the same POJO.
The test showed a bunch of cases where that assertion fails, mainly due to the java pojos containing a derived outputType which was in many cases incorrect when created from the proto.