Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(spark): incorrect deriveRecordType() for Expand #316

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

andrew-coleman
Copy link
Contributor

In the Expand relation, the record type was being calculated incorrectly, leading to errors when round-tripping to protobuf and back.

@andrew-coleman
Copy link
Contributor Author

Incorporates the test code changes in draft PR #314

public Type getType() {
return getDuplicates().get(0).getType();
public Stream<Type> getTypes() {
return getDuplicates().stream().map(Expression::getType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I'm misunderstanding something here - but are you sure this is right? The Substrait spec comments say

All duplicates must return the same type class but may differ in nullability.

so it doesn't seem right to take a list of the duplicates' types.

var fields =
getFields().isEmpty()
? initial.fields().stream()
: Stream.concat(initial.fields().stream(), getFields().get(0).getTypes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also doesn't seem right (but possible I'm not reading this right!) - the spec says

In addition to a field being emitted per input field an extra int64 field is emitted which
contains a zero-indexed ordinal corresponding to the duplicate definition.

so I'd expect the record type to be something like initial.fields().map([for each field handle nullability based on duplicates]) + Type.Int64

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The record type is defined in the projections (fields) matrix, which includes the int64 index field

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean there's an explicit SwitchingField with duplicates [Lit(1), Lit(2), ..], coming from Spark?

If I understood that right, I feel like it's not the intention of the Substrait spec that the index field would be included there, but rather that it should be handled separately. (Maybe a bit unfortunate for Spark case as that means removing that field when converting to Substrait and adding it back when converting to Spark, but if that's what the spec means then that's what we should do to be compatible :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to a field being emitted per input field an extra int64 field is emitted which
contains a zero-indexed ordinal corresponding to the duplicate definition.

I think there's a broader problem with this description in the context of Spark. In the case of this test query:

select l_partkey, sum(l_tax), sum(distinct l_tax),
    avg(l_discount), avg(distinct l_discount) from lineitem group by l_partkey

The logical tree gets rewritten by the optimizer to have an Expand relation with three inputs. But each entry in the projections array contains six output expressions (one for each of the five items in the select clause plus the index field (which is not even at the end of the list)). So the record type is more than just the inputs plus an int64.

Full spark logical plan for this is:

Aggregate [l_partkey#10L], [l_partkey#10L, first(sum(spark_catalog.default.lineitem.l_tax)#1980, true) FILTER (WHERE (gid#1975 = 0)) AS sum(l_tax)#1966, sum(spark_catalog.default.lineitem.l_tax#1976) FILTER (WHERE (gid#1975 = 1)) AS sum(DISTINCT l_tax)#1967, cast((first(avg(unscaledvalue(spark_catalog.default.lineitem.l_discount))#1982, true) FILTER (WHERE (gid#1975 = 0)) / 1.0) as decimal(14,4)) AS avg(l_discount)#1968, cast((avg(unscaledvalue(spark_catalog.default.lineitem.l_discount)#1977L) FILTER (WHERE (gid#1975 = 2)) / 1.0) as decimal(14,4)) AS avg(DISTINCT l_discount)#1969]
+- Aggregate [l_partkey#10L, spark_catalog.default.lineitem.l_tax#1976, unscaledvalue(spark_catalog.default.lineitem.l_discount)#1977L, gid#1975], [l_partkey#10L, spark_catalog.default.lineitem.l_tax#1976, unscaledvalue(spark_catalog.default.lineitem.l_discount)#1977L, gid#1975, sum(spark_catalog.default.lineitem.l_tax#1978) AS sum(spark_catalog.default.lineitem.l_tax)#1980, avg(unscaledvalue(spark_catalog.default.lineitem.l_discount)#1979L) AS avg(unscaledvalue(spark_catalog.default.lineitem.l_discount))#1982]
   +- Expand [[l_partkey#10L, null, null, 0, l_tax#16, UnscaledValue(l_discount#15)], [l_partkey#10L, l_tax#16, null, 1, null, null], [l_partkey#10L, null, UnscaledValue(l_discount#15), 2, null, null]], [l_partkey#10L, spark_catalog.default.lineitem.l_tax#1976, unscaledvalue(spark_catalog.default.lineitem.l_discount)#1977L, gid#1975, spark_catalog.default.lineitem.l_tax#1978, unscaledvalue(spark_catalog.default.lineitem.l_discount)#1979L]
      +- Project [l_partkey#10L, l_discount#15, l_tax#16]
         +- Relation spark_catalog.default.lineitem[l_orderkey#9L,l_partkey#10L,l_suppkey#11L,l_linenumber#12,l_quantity#13,l_extendedprice#14,l_discount#15,l_tax#16,l_returnflag#17,l_linestatus#18,l_shipdate#19,l_commitdate#20,l_receiptdate#21,l_shipinstruct#22,l_shipmode#23,l_comment#24] parquet

In the expand relation, the projection expressions are stored in a
two dimensional array.  The spark matrix needs to be transposed
in order to map the expressions into substrait, and vice-versa.  I hadn’t noticed this earlier.

Also, the remap field should not be used because the outputs
are defined directly in the projections array.

Signed-off-by: Andrew Coleman <[email protected]>
@andrew-coleman
Copy link
Contributor Author

Yes, I was on the wrong track entirely with this. Turns out I needed to transpose the projections matrix that's defined in Spark before conversion to Substrait (and vice-versa).

@vbarua
Copy link
Member

vbarua commented Nov 6, 2024

I think we need to tweak the spec as well as the protobuf and written spec don't fully agree on the output type: substrait-io/substrait#714

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants