Skip to content

feat: Add support for complex types in native shuffle #1655

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 19, 2025

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Apr 17, 2025

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

  • Use common supportedDataType method for columnar and native shuffle
  • Add TimestampNTZType as a supported type
  • Add fuzz test for shuffle that asserts that shuffle is native when experimental native scans are enabled

How are these changes tested?

@andygrove andygrove force-pushed the shuffle-type-checks branch from 0365fb9 to eec034a Compare April 17, 2025 17:08
@@ -2889,6 +2855,31 @@ object QueryPlanSerde extends Logging with CometExprShim {
}
}

def supportedShuffleDataType(dt: DataType): Boolean = dt match {
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType |
Copy link
Member Author

@andygrove andygrove Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was moved and is not new. I added TimestampNTZType.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test with TimestampNTZType for shuffle?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I found that TimestampNTZType was not supported because the test was initially failing. The fuzz test generates a file with all supported types (but maps are currently explicitly disabled in this test suite).

_: DateType | _: BooleanType =>
true
case _ =>
// Native shuffle doesn't support struct/array yet
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it does! This method is removed and we now have a single supportedShuffleDataType method that is used for both native and columnar shuffle type checks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for that, I was so confused about having this supported check in at least 3 places

/**
* Determine which data types are supported as hash-partition keys in a shuffle.
*/
def supportedShufflePartitionDataType(dt: DataType): Boolean = dt match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def supportedShufflePartitionDataType(dt: DataType): Boolean = dt match {
def supportedShufflePartitionKeyDataType(dt: DataType): Boolean = dt match {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I applied this change myself since I had to update the caller sites as well.

@@ -2889,6 +2857,48 @@ object QueryPlanSerde extends Logging with CometExprShim {
}
}

/**
* Determine which data types are supported as hash-partition keys in a shuffle.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Determine which data types are supported as hash-partition keys in a shuffle.
* Determine which data types are supported as hash-partition keys in a shuffle.
Hash Partition Key determines how data should be collocated for operations like `groupByKey`, `reduceByKey` or `join`

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@andygrove andygrove marked this pull request as ready for review April 17, 2025 19:02
@andygrove
Copy link
Member Author

I'm seeing a number of failures like this:

2025-04-17T19:12:54.7914570Z - columnar shuffle on struct including nulls *** FAILED *** (352 milliseconds)
2025-04-17T19:12:54.7916470Z   List() had length 0 instead of expected length 1 Sort [_1#4948 ASC NULLS FIRST], false, 0
2025-04-17T19:12:54.7917940Z   +- Exchange hashpartitioning(_1#4948, _2#4949, 10), REPARTITION_BY_NUM, [plan_id=13332]
2025-04-17T19:12:54.7919190Z      +- Filter (isnotnull(_1#4948) AND (_1#4948 > 1))
2025-04-17T19:12:54.7922160Z         +- FileScan parquet [_1#4948,_2#4949] Batched: true, DataFilters: [isnotnull(_1#4948), (_1#4948 > 1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/runner/work/datafusion-comet/datafusion-comet/spark/target..., PartitionFilters: [], PushedFilters: [IsNotNull(_1), GreaterThan(_1,1)], ReadSchema: struct<_1:int,_2:struct<_1:int,_2:string>>

@codecov-commenter
Copy link

codecov-commenter commented Apr 18, 2025

Codecov Report

Attention: Patch coverage is 88.46154% with 6 lines in your changes missing coverage. Please review.

Project coverage is 58.80%. Comparing base (f09f8af) to head (b5b4d27).
Report is 149 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 90.69% 0 Missing and 4 partials ⚠️
...org/apache/comet/CometSparkSessionExtensions.scala 77.77% 0 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1655      +/-   ##
============================================
+ Coverage     56.12%   58.80%   +2.68%     
- Complexity      976     1082     +106     
============================================
  Files           119      125       +6     
  Lines         11743    12592     +849     
  Branches       2251     2362     +111     
============================================
+ Hits           6591     7405     +814     
- Misses         4012     4015       +3     
- Partials       1140     1172      +32     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove
Copy link
Member Author

@parthchandra @mbutrovich Ths PR is now ready for review

@mbutrovich
Copy link
Contributor

Shout out to @Kontinuation! #1511 removed a lot a of the custom logic in the shuffle writer that would have needed to be extended to support complex types. Instead we now rely on Arrow functions that already support complex types.

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @andygrove

@@ -161,6 +162,18 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("shuffle") {
val df = spark.read.parquet(filename)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the data have complex type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the data has arrays and structs but not maps yet

@@ -2889,6 +2855,31 @@ object QueryPlanSerde extends Logging with CometExprShim {
}
}

def supportedShuffleDataType(dt: DataType): Boolean = dt match {
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test with TimestampNTZType for shuffle?

* Determine which data types are supported in a shuffle.
*/
def supportedShuffleDataType(dt: DataType): Boolean = dt match {
case _: BooleanType => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : BooleanType moved here alone because of the code style checks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. At one point I was seeing errors related to boolean and had made functional changes here that I later reverted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted the style change

Comment on lines -2923 to +2939
_: DoubleType | _: TimestampType | _: TimestampType | _: DecimalType | _: DateType =>
_: DoubleType | _: TimestampType | _: TimestampNTZType | _: DecimalType |
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to the goal of the PR but I noticed we had TimestampType twice and no TimestampNTZType

@andygrove andygrove merged commit c04784a into apache:main Apr 19, 2025
78 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants