You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I created a unit test and the array_join is marked as unsupported.
I investigated the incident on a client's eventlog and it looks like the SparkUI shows the array_join in the PhysicalPlan "details section". Since the description is too long, the SparkGraphNode does not load the entire description (truncated)
The source code is org.apache.spark.sql.execution.ExecutionPage.scala
The physicalPlanDescription is built using org.apache.spark.sql.execution.QueryExecution.explainString
Note that there is Spark configuration:
SQLConf.MAX_TO_STRING_FIELDS.key which limits the number of fields by SQL Config
MAX_PLAN_STRING_LENGTH: Maximum number of characters to output for a plan string.
The fix is not going to be very easy/quick.
The implementation is to re-visit the SqlPlanParser. Each time a node is constructed, we need to:
call explainString() on the physical description.
build graphNode from SparkPlanInfo
Use the node Id to locate the description of each node in the output of explainString
Modify the SqlPlanParser code to parse the output of the above step instead of the graphNode description.
test("array_join is not supported") {
TrampolineUtil.withTempDir { outputLoc =>
TrampolineUtil.withTempDir { eventLogDir =>
val (eventLog, _) = ToolTestUtils.generateEventLog(eventLogDir, "arrayjoin") { spark =>
import spark.implicits._
val df = Seq(
(List("a", "b", "c"), List("b", "c")),
(List("a", "a"), List("b", "c")),
(List("aa"), List("b", "c"))
).toDF("x", "y")
df.write.parquet(s"$outputLoc/test_arrayjoin")
val df2 = spark.read.parquet(s"$outputLoc/test_arrayjoin")
val df3 = df2.withColumn("arr_join", array_join(col("x"), "."))
df3.explain(true)
df3
}
//val pluginTypeChecker = new PluginTypeChecker()
val app = createAppFromEventlog(eventLog)
assert(app.sqlPlans.size == 2)
val pluginTypeChecker = new PluginTypeChecker()
val parsedPlans = app.sqlPlans.map { case (sqlID, plan) =>
SQLPlanParser.parseSQLPlan(app.appId, plan, sqlID, "", pluginTypeChecker, app)
}
val allExecInfo = getAllExecsFromPlan(parsedPlans.toSeq)
val projectExecs = allExecInfo.filter(_.exec == "Project")
assertSizeAndNotSupported(1, projectExecs)
}
}
}
Describe the bug
Qual tool did not detect the UDF or
array_join
as unsupported operatorsCC: @viadea
The text was updated successfully, but these errors were encountered: