-
Notifications
You must be signed in to change notification settings - Fork 205
feat: Add support for complex types in native shuffle #1655
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
0365fb9
to
eec034a
Compare
@@ -2889,6 +2855,31 @@ object QueryPlanSerde extends Logging with CometExprShim { | |||
} | |||
} | |||
|
|||
def supportedShuffleDataType(dt: DataType): Boolean = dt match { | |||
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | | |||
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was moved and is not new. I added TimestampNTZType
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test with TimestampNTZType for shuffle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I found that TimestampNTZType was not supported because the test was initially failing. The fuzz test generates a file with all supported types (but maps are currently explicitly disabled in this test suite).
_: DateType | _: BooleanType => | ||
true | ||
case _ => | ||
// Native shuffle doesn't support struct/array yet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it does! This method is removed and we now have a single supportedShuffleDataType
method that is used for both native and columnar shuffle type checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for that, I was so confused about having this supported check in at least 3 places
/** | ||
* Determine which data types are supported as hash-partition keys in a shuffle. | ||
*/ | ||
def supportedShufflePartitionDataType(dt: DataType): Boolean = dt match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def supportedShufflePartitionDataType(dt: DataType): Boolean = dt match { | |
def supportedShufflePartitionKeyDataType(dt: DataType): Boolean = dt match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I applied this change myself since I had to update the caller sites as well.
@@ -2889,6 +2857,48 @@ object QueryPlanSerde extends Logging with CometExprShim { | |||
} | |||
} | |||
|
|||
/** | |||
* Determine which data types are supported as hash-partition keys in a shuffle. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Determine which data types are supported as hash-partition keys in a shuffle. | |
* Determine which data types are supported as hash-partition keys in a shuffle. | |
Hash Partition Key determines how data should be collocated for operations like `groupByKey`, `reduceByKey` or `join` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
I'm seeing a number of failures like this:
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1655 +/- ##
============================================
+ Coverage 56.12% 58.80% +2.68%
- Complexity 976 1082 +106
============================================
Files 119 125 +6
Lines 11743 12592 +849
Branches 2251 2362 +111
============================================
+ Hits 6591 7405 +814
- Misses 4012 4015 +3
- Partials 1140 1172 +32 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
@parthchandra @mbutrovich Ths PR is now ready for review |
Shout out to @Kontinuation! #1511 removed a lot a of the custom logic in the shuffle writer that would have needed to be extended to support complex types. Instead we now rely on Arrow functions that already support complex types. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm thanks @andygrove
@@ -161,6 +162,18 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { | |||
} | |||
} | |||
|
|||
test("shuffle") { | |||
val df = spark.read.parquet(filename) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the data have complex type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the data has arrays and structs but not maps yet
@@ -2889,6 +2855,31 @@ object QueryPlanSerde extends Logging with CometExprShim { | |||
} | |||
} | |||
|
|||
def supportedShuffleDataType(dt: DataType): Boolean = dt match { | |||
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | | |||
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have a test with TimestampNTZType for shuffle?
* Determine which data types are supported in a shuffle. | ||
*/ | ||
def supportedShuffleDataType(dt: DataType): Boolean = dt match { | ||
case _: BooleanType => true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : BooleanType moved here alone because of the code style checks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. At one point I was seeing errors related to boolean and had made functional changes here that I later reverted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted the style change
_: DoubleType | _: TimestampType | _: TimestampType | _: DecimalType | _: DateType => | ||
_: DoubleType | _: TimestampType | _: TimestampNTZType | _: DecimalType | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unrelated to the goal of the PR but I noticed we had TimestampType twice and no TimestampNTZType
Which issue does this PR close?
Closes #.
Rationale for this change
What changes are included in this PR?
supportedDataType
method for columnar and native shuffleTimestampNTZType
as a supported typeHow are these changes tested?