-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(spark): incorrect deriveRecordType() for Expand #316
base: main
Are you sure you want to change the base?
Conversation
Incorporates the test code changes in draft PR #314 |
public Type getType() { | ||
return getDuplicates().get(0).getType(); | ||
public Stream<Type> getTypes() { | ||
return getDuplicates().stream().map(Expression::getType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe I'm misunderstanding something here - but are you sure this is right? The Substrait spec comments say
All duplicates must return the same type class but may differ in nullability.
so it doesn't seem right to take a list of the duplicates' types.
var fields = | ||
getFields().isEmpty() | ||
? initial.fields().stream() | ||
: Stream.concat(initial.fields().stream(), getFields().get(0).getTypes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this also doesn't seem right (but possible I'm not reading this right!) - the spec says
In addition to a field being emitted per input field an extra int64 field is emitted which
contains a zero-indexed ordinal corresponding to the duplicate definition.
so I'd expect the record type to be something like initial.fields().map([for each field handle nullability based on duplicates]) + Type.Int64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The record type is defined in the projections (fields) matrix, which includes the int64 index field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean there's an explicit SwitchingField with duplicates [Lit(1), Lit(2), ..], coming from Spark?
If I understood that right, I feel like it's not the intention of the Substrait spec that the index field would be included there, but rather that it should be handled separately. (Maybe a bit unfortunate for Spark case as that means removing that field when converting to Substrait and adding it back when converting to Spark, but if that's what the spec means then that's what we should do to be compatible :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to a field being emitted per input field an extra int64 field is emitted which
contains a zero-indexed ordinal corresponding to the duplicate definition.
I think there's a broader problem with this description in the context of Spark. In the case of this test query:
select l_partkey, sum(l_tax), sum(distinct l_tax),
avg(l_discount), avg(distinct l_discount) from lineitem group by l_partkey
The logical tree gets rewritten by the optimizer to have an Expand
relation with three inputs. But each entry in the projections
array contains six output expressions (one for each of the five items in the select
clause plus the index field (which is not even at the end of the list)). So the record type is more than just the inputs plus an int64.
Full spark logical plan for this is:
Aggregate [l_partkey#10L], [l_partkey#10L, first(sum(spark_catalog.default.lineitem.l_tax)#1980, true) FILTER (WHERE (gid#1975 = 0)) AS sum(l_tax)#1966, sum(spark_catalog.default.lineitem.l_tax#1976) FILTER (WHERE (gid#1975 = 1)) AS sum(DISTINCT l_tax)#1967, cast((first(avg(unscaledvalue(spark_catalog.default.lineitem.l_discount))#1982, true) FILTER (WHERE (gid#1975 = 0)) / 1.0) as decimal(14,4)) AS avg(l_discount)#1968, cast((avg(unscaledvalue(spark_catalog.default.lineitem.l_discount)#1977L) FILTER (WHERE (gid#1975 = 2)) / 1.0) as decimal(14,4)) AS avg(DISTINCT l_discount)#1969]
+- Aggregate [l_partkey#10L, spark_catalog.default.lineitem.l_tax#1976, unscaledvalue(spark_catalog.default.lineitem.l_discount)#1977L, gid#1975], [l_partkey#10L, spark_catalog.default.lineitem.l_tax#1976, unscaledvalue(spark_catalog.default.lineitem.l_discount)#1977L, gid#1975, sum(spark_catalog.default.lineitem.l_tax#1978) AS sum(spark_catalog.default.lineitem.l_tax)#1980, avg(unscaledvalue(spark_catalog.default.lineitem.l_discount)#1979L) AS avg(unscaledvalue(spark_catalog.default.lineitem.l_discount))#1982]
+- Expand [[l_partkey#10L, null, null, 0, l_tax#16, UnscaledValue(l_discount#15)], [l_partkey#10L, l_tax#16, null, 1, null, null], [l_partkey#10L, null, UnscaledValue(l_discount#15), 2, null, null]], [l_partkey#10L, spark_catalog.default.lineitem.l_tax#1976, unscaledvalue(spark_catalog.default.lineitem.l_discount)#1977L, gid#1975, spark_catalog.default.lineitem.l_tax#1978, unscaledvalue(spark_catalog.default.lineitem.l_discount)#1979L]
+- Project [l_partkey#10L, l_discount#15, l_tax#16]
+- Relation spark_catalog.default.lineitem[l_orderkey#9L,l_partkey#10L,l_suppkey#11L,l_linenumber#12,l_quantity#13,l_extendedprice#14,l_discount#15,l_tax#16,l_returnflag#17,l_linestatus#18,l_shipdate#19,l_commitdate#20,l_receiptdate#21,l_shipinstruct#22,l_shipmode#23,l_comment#24] parquet
In the expand relation, the projection expressions are stored in a two dimensional array. The spark matrix needs to be transposed in order to map the expressions into substrait, and vice-versa. I hadn’t noticed this earlier. Also, the remap field should not be used because the outputs are defined directly in the projections array. Signed-off-by: Andrew Coleman <[email protected]>
67ff12c
to
0de3a59
Compare
Yes, I was on the wrong track entirely with this. Turns out I needed to transpose the projections matrix that's defined in Spark before conversion to Substrait (and vice-versa). |
I think we need to tweak the spec as well as the protobuf and written spec don't fully agree on the output type: substrait-io/substrait#714 |
In the Expand relation, the record type was being calculated incorrectly, leading to errors when round-tripping to protobuf and back.