Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for PythonUDAF expression #937

Draft
wants to merge 9 commits into
base: dev
Choose a base branch
from

Conversation

cindyyuanjiang
Copy link
Collaborator

@cindyyuanjiang cindyyuanjiang commented Apr 13, 2024

Fixes #936

Changes

Added support for PythonUDAF expression.

Testing
Tested using spark_rapids commands with event log generated locally using pyspark shell. Confirmed behavior is same as PythonUDF support in tools.

@cindyyuanjiang cindyyuanjiang self-assigned this Apr 13, 2024
@cindyyuanjiang cindyyuanjiang added bug Something isn't working core_tools Scope the core module (scala) labels Apr 13, 2024
Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cindyyuanjiang
I don't believe the PR covers the support for PythonUDAF correctly.

There is a bug in handling PythonUDF. So, it is not a good model to follow. Instead, the PR should fix both of them.
If you have eventlogs that has the PythonUDF/PythonUDAF then this is a good start to look more closely at how the SQLPlanParser handles/detects it.
I had intuition that "PythonUDF/PythonUDAF" won't actually appear in the eventlogs, do they?

@@ -357,7 +357,7 @@ object RDDCheckHelper {

object ExecHelper {
private val UDFRegExLookup = Set(
".*UDF.*".r
".*UDF.*".r, ".*UDAF.*".r
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is correct to add the *UDAF* to account for other UDAFs such as Hive and Scala.
However, the problem that the Q tool uses UDFRegExLookup to decide whether something is uDF or not.
That being said, any node description containing *UDAF* is going to be flagged as not-supported because it is a UDF.
Note that the same problem exists for PythonUDF which is supposed to be supported.

@cindyyuanjiang
Copy link
Collaborator Author

cindyyuanjiang commented Apr 15, 2024

There is a bug in handling PythonUDF. So, it is not a good model to follow. Instead, the PR should fix both of them. If you have eventlogs that has the PythonUDF/PythonUDAF then this is a good start to look more closely at how the SQLPlanParser handles/detects it. I had intuition that "PythonUDF/PythonUDAF" won't actually appear in the eventlogs, do they?

Thanks @amahussein! PythonUDF does show up in eventlogs, for example:
"BatchEvalPython [<lambda>(name#1)#6, to_upper(name#1)#8, add_one(age#2L)#9], [pythonUDF0#24, pythonUDF1#25, pythonUDF2#26]"

PythonUDF is not labeled as unsupported in the output, example of rapids_4_spark_qualification_output_unsupportedOperators.csv is as follows:

App ID,SQL ID,Stage ID,ExecId,Unsupported Type,Unsupported Operator,Details,Stage Duration,App Duration,Action
"local-1713205378082",0,3,"0","Exec","BatchEvalPython","Contains UDF",304,138960,"IgnorePerf"
"local-1713205378082",0,3,"1","ReadRDD","Scan ExistingRDD","Is Dataset or RDD",304,138960,"IgnorePerf"
"local-1713205378082",0,0,"2","Exec","BatchEvalPython","Contains UDF",848,138960,"IgnorePerf"
"local-1713205378082",0,1,"3","Exec","BatchEvalPython","Contains UDF",107,138960,"IgnorePerf"
"local-1713205378082",0,2,"4","Exec","BatchEvalPython","Contains UDF",201,138960,"IgnorePerf"
"local-1713205378082",0,-1,"5","Exec","CollectLimit","This is disabled by default because Collect Limit replacement can be slower on the GPU; if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU",0,138960,"IgnorePerf"
"local-1713205378082",0,-1,"6","Exec","Project","Contains UDF",0,138960,"IgnorePerf"
"local-1713205378082",0,-1,"6","Expr","slen","Is UDF",0,138960,"IgnorePerf"
"local-1713205378082",0,-1,"6","Expr","to_upper","Is UDF",0,138960,"IgnorePerf"
"local-1713205378082",0,-1,"6","Expr","add_one","Is UDF",0,138960,"IgnorePerf"

@amahussein
Copy link
Collaborator

PythonUDF is not labeled as unsupported in the output

It is not explicitl mentioned but you can see that BatchEvalPython is marked as unsupported because it thought that PythobnUDF is a UDF expr.

"BatchEvalPython","Contains UDF"

That's the part I was mentioning here that the tools was considering PythonUDF as non-supported.

  • For BatchEvalPython, we should decide if this should be marked as non-/supported. For example, if all PythonUDF expressions appear within BatchEvalPython then we should mark it as supported if it contains PythonUDF.

  • Q: What did you name your method? If you did not name it, then maybe what we see here is just the name assigned by default from Spark registrar. Are PythonUDF0, PythonUDF1 and PythonUDF2 representing the PythonUDF arguments?

Signed-off-by: cindyyuanjiang <[email protected]>
@cindyyuanjiang
Copy link
Collaborator Author

It is not explicitl mentioned but you can see that BatchEvalPython is marked as unsupported because it thought that PythobnUDF is a UDF expr.

Thanks @amahussein!
Made some changes to the PR. To mark pythonUDF as supported:

  • Distinguish the UDF regex expressions for Scala and Python
  • For python UDF Execs, mark them as supported and set its unsupported expressions to empty

Now we have python UDF as supported, should we modify skipUDFCheckExecs?
This is an initial approach, opening for discussion. Thanks.

Signed-off-by: cindyyuanjiang <[email protected]>
@@ -313,7 +315,7 @@ object ExecInfo {
duration,
nodeId,
opType,
isSupported,
finalSupported,
children,
stages,
shouldRemove,
Copy link
Collaborator

@parthosa parthosa Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make changes to the usage of variable containsUDF when passing to createExecNoNode()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Now containsUDF denotes that the exec contains Scala UDFs after we changed the matching regex, so for python UDFs, this value would remain false as intended.

Signed-off-by: cindyyuanjiang <[email protected]>
parthosa
parthosa previously approved these changes Apr 16, 2024
Copy link
Collaborator

@parthosa parthosa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @cindyyuanjiang for this change.

Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cindyyuanjiang

  • I cannot see how the changes will treat pythonUDAF as supported. All the changes are related to pythonUDF
  • I don't see how the Q tool shows the batchEvalPython as supported. That means that the changes done in ExecHelper did change the behavior of the flow. Note that pythonUDF/PythonUDAF are expressions and we should not expect to see the Q tool changes the decision about an exec (batchEvalPython) without main changes to the Exec parser. If you confirm that batchEvalPython is always linked to pythonUDF/PythonUDAF, then we should simply add code to catch that exec and mark it as supported without need to parse the content.

CC: @nartal1 any thoughts?

@@ -331,7 +331,7 @@ abstract class AppBase(
}
}

private val UDFRegex = ".*UDF.*"
private val UDFRegex = ".*(?<!python)UDF.*"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this won't match pythonUDAF

".*(?<!python)UDF.*".r
)

private val pythonUDFRegExLookUp = Set(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here as my previous comment. There are three regular expressions that look overlapping.

)

private val pythonUDFRegExLookUp = Set(
".*pythonUDF.*".r
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not take pythonUDAF into considerations.

val projectExec = execInfo.filter(_.exec.contains("Project"))
assertSizeAndSupported(2, projectExec)
val batchEvalPythonExec = execInfo.filter(_.exec.contains("BatchEvalPython"))
assertSizeAndSupported(1, batchEvalPythonExec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is something I missing here. I cannot find anywhere in the code that we handle BatchEvalPython exec.
How come that the result shows it as supported?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BatchEvalPython should not be supported. I will fix this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we confirmed that for a fact?
I mean did we verify that it is most likely that BatchEvalPython is associated with PythonUDF and PythonUDAF? if that's the case then all we need to do is considering that the entire exec is supported with being conservative on the speedup factor.
As far as I can see in the plugin code, the RAPIDS has very limited capability accelerating those expressions anyway. So, the speedup should be near 1.


// if the expression is RDD because of the node name, then we do not want to add the
// unsupportedExpressions because it becomes bogus.
val finalUnsupportedExpr = if (rddCheckRes.nodeDescRDD) {
val finalUnsupportedExpr = if (rddCheckRes.nodeDescRDD || containsPythonUDF) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct.
The code comment just above that change is explaining that we avoid parsing the node content when it is rddCheckRes.nodeDescRDD . But the new code tries to check if it is pythonUDF anyway which could lead to bogus results.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion. I will address this in the next iteration.

@nartal1
Copy link
Collaborator

nartal1 commented Apr 16, 2024

I don't see how the Q tool shows the batchEvalPython as supported. That means that the changes done in ExecHelper did change the behavior of the flow. Note that pythonUDF/PythonUDAF are expressions and we should not expect to see the Q tool changes the decision about an exec (batchEvalPython) without main changes to the Exec parser. If you confirm that batchEvalPython is always linked to pythonUDF/PythonUDAF, then we should simply add code to catch that exec and mark it as supported without need to parse the content.

I am confused how batchEvalPython is marked as supported and is passing the test.
@cindyyuanjiang - Could you please confirm if batchEvalPython is linked with either pythonUDF/PythonUDAF as per @amahussein suggestion? I think adding a new parser for batchEvalPython makes sense in that case.

Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be more conservative with the speedup value we assign for PythonUDF and PythonUDAF.
I would suggest a speedup-factor closer to 1.0 actually.

@cindyyuanjiang
Copy link
Collaborator Author

cindyyuanjiang commented Apr 17, 2024

Thanks @amahussein @nartal1 for the feedback!
I made some updates to the logic for handling pythonUDFs and updated the speedup factors.

pythonUDAF
I made multiple attempts to generate event logs for pythonUDAFs but this operator name never show up. The ones I generated included AggregateInPandas which is already handled by existing implementation, and will be marked as supported. We can have further discussion on how to improve this.

pythonUDF
I separated the regex matching patterns for Scala and python so that we can use function isUDF for Scala UDF and isPythonUDF for python UDF.
For Exec that contains pythonUDF in its description, this PR marks it as supported if the Exec is supported in the CSV even if it might have unsupported expressions which can be UDFs. This is an estimation and optimistic. This approach requires us to add this logic for individual exec parsers, this PR currently only handles ProjectExec, open this for discussion.

Note:
BatchEvalPython is a physical plan node specifically designed to handle the batch execution of Python UDFs. This is not supported by the plugin (! <BatchEvalPythonExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.python.BatchEvalPythonExec).

Followup: had an offline discussion. I will add a parser for BatchEvalPython.

@parthosa
Copy link
Collaborator

parthosa commented Apr 17, 2024

Yes, It seems that all PythonUDAF expressions gets converted to AggregateInPandasExec

https://github.com/apache/spark/blob/4957a40d6e6bf68226c8047687e8f30c93adb8ce/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L640-L646

@cindyyuanjiang cindyyuanjiang marked this pull request as draft April 17, 2024 22:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working core_tools Scope the core module (scala)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add Support for PythonUDAF expression
4 participants