diff --git a/core/pom.xml b/core/pom.xml
index e5f56a9f1..ab1327cce 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -160,6 +160,21 @@
3.3.6
+
+ bd321
+
+
+ buildver
+ 321
+
+
+
+ 321
+ ${bdspark321.version}
+ ${delta20x.version}
+ 3.3.6
+
+
release322
@@ -443,6 +458,7 @@
3.1.4-SNAPSHOT
3.2.0
3.2.1
+ 3.2.1-bd1-SNAPSHOT
3.2.2
3.2.3
3.2.4
@@ -502,6 +518,7 @@
${platform-encoding}
${platform-encoding}
${platform-encoding}
+ 1.2.2-bd103
@@ -582,6 +599,40 @@
${spark.version}
test
+
+ org.apache.hive
+ hive-common
+ ${hive.version}
+ provided
+
+
+ org.apache.hive
+ hive-exec
+ ${hive.version}
+ provided
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/core/src/main/resources/opScore-onprem-bytedance0325.csv b/core/src/main/resources/opScore-onprem-bytedance0325.csv
new file mode 100644
index 000000000..e8e8a7fb9
--- /dev/null
+++ b/core/src/main/resources/opScore-onprem-bytedance0325.csv
@@ -0,0 +1,283 @@
+CPUOperator,Score
+CoalesceExec,3.0
+CollectLimitExec,3.0
+ExpandExec,2.68
+FileSourceScanExec,0.5
+FilterExec,22.04
+GenerateExec,3.0
+GlobalLimitExec,3.0
+LocalLimitExec,3.0
+ProjectExec,1.5
+RangeExec,3.0
+SampleExec,3.0
+SortExec,6.64
+SubqueryBroadcastExec,3.0
+TakeOrderedAndProjectExec,3.0
+UnionExec,3.0
+CustomShuffleReaderExec,3.0
+HashAggregateExec,10.55
+ObjectHashAggregateExec,3.0
+SortAggregateExec,3.0
+InMemoryTableScanExec,3.0
+DataWritingCommandExec,3.0
+ExecutedCommandExec,3.0
+BatchScanExec,0.5
+BroadcastExchangeExec,30.0
+ShuffleExchangeExec,1.0
+BroadcastHashJoinExec,40.00
+BroadcastNestedLoopJoinExec,3.0
+CartesianProductExec,3.0
+ShuffledHashJoinExec,22.29
+SortMergeJoinExec,22.33
+AggregateInPandasExec,1.2
+ArrowEvalPythonExec,1.2
+FlatMapCoGroupsInPandasExec,3.0
+FlatMapGroupsInPandasExec,1.2
+MapInPandasExec,1.2
+WindowInPandasExec,1.2
+WindowExec,127.61
+HiveTableScanExec,0.5
+Abs,4
+Acos,4
+Acosh,4
+Add,4
+AggregateExpression,4
+Alias,4
+And,4
+ApproximatePercentile,4
+ArrayContains,4
+ArrayExcept,4
+ArrayExists,4
+ArrayIntersect,4
+ArrayMax,4
+ArrayMin,4
+ArrayRemove,4
+ArrayRepeat,4
+ArrayTransform,4
+ArrayUnion,4
+ArraysOverlap,4
+ArraysZip,4
+Asin,4
+Asinh,4
+AtLeastNNonNulls,4
+Atan,4
+Atanh,4
+AttributeReference,4
+Average,4
+BRound,4
+BitLength,4
+BitwiseAnd,4
+BitwiseNot,4
+BitwiseOr,4
+BitwiseXor,4
+CaseWhen,4
+Cbrt,4
+Ceil,4
+CheckOverflow,4
+Coalesce,4
+CollectList,4
+CollectSet,4
+Concat,4
+ConcatWs,4
+Contains,4
+Conv,4
+Cos,4
+Cosh,4
+Cot,4
+Count,4
+CreateArray,4
+CreateMap,4
+CreateNamedStruct,4
+CurrentRow$,4
+DateAdd,4
+DateAddInterval,4
+DateDiff,4
+DateFormatClass,4
+DateSub,4
+DayOfMonth,4
+DayOfWeek,4
+DayOfYear,4
+DenseRank,4
+Divide,4
+DynamicPruningExpression,4
+ElementAt,4
+EndsWith,4
+EqualNullSafe,4
+EqualTo,4
+Exp,4
+Explode,4
+Expm1,4
+First,4
+Flatten,4
+Floor,4
+FormatNumber,4
+FromUTCTimestamp,4
+FromUnixTime,4
+GetArrayItem,4
+GetArrayStructFields,4
+GetJsonObject,4
+GetMapValue,4
+GetStructField,4
+GetTimestamp,4
+GreaterThan,4
+GreaterThanOrEqual,4
+Greatest,4
+HiveGenericUDF,4
+HiveSimpleUDF,4
+Hour,4
+Hypot,4
+If,4
+In,4
+InSet,4
+InitCap,4
+InputFileBlockLength,4
+InputFileBlockStart,4
+InputFileName,4
+IntegralDivide,4
+IsNaN,4
+IsNotNull,4
+IsNull,4
+JsonToStructs,4
+JsonTuple,4
+KnownFloatingPointNormalized,4
+KnownNotNull,4
+Lag,4
+LambdaFunction,4
+Last,4
+LastDay,4
+Lead,4
+Least,4
+Length,4
+LessThan,4
+LessThanOrEqual,4
+Like,4
+Literal,4
+Log,4
+Log10,4
+Log1p,4
+Log2,4
+Logarithm,4
+Lower,4
+MakeDecimal,4
+MapConcat,4
+MapEntries,4
+MapFilter,4
+MapKeys,4
+MapValues,4
+Max,4
+Md5,4
+MicrosToTimestamp,4
+MillisToTimestamp,4
+Min,4
+Minute,4
+MonotonicallyIncreasingID,4
+Month,4
+Multiply,4
+Murmur3Hash,4
+NaNvl,4
+NamedLambdaVariable,4
+NormalizeNaNAndZero,4
+Not,4
+NthValue,4
+OctetLength,4
+Or,4
+ParseUrl,4
+Percentile,4
+PercentRank,4
+PivotFirst,4
+Pmod,4
+PosExplode,4
+Pow,4
+PreciseTimestampConversion,4
+PromotePrecision,4
+PythonUDF,4
+Quarter,4
+RLike,4
+RaiseError,4
+Rand,4
+Rank,4
+RegExpExtract,4
+RegExpExtractAll,4
+RegExpReplace,4
+Remainder,4
+ReplicateRows,4
+Reverse,4
+Rint,4
+Round,4
+RowNumber,4
+ScalaUDF,4
+ScalarSubquery,4
+Second,4
+SecondsToTimestamp,4
+Sequence,4
+ShiftLeft,4
+ShiftRight,4
+ShiftRightUnsigned,4
+Signum,4
+Sin,4
+Sinh,4
+Size,4
+SortArray,4
+SortOrder,4
+SparkPartitionID,4
+SpecifiedWindowFrame,100
+Sqrt,4
+Stack,4
+StartsWith,4
+StddevPop,4
+StddevSamp,4
+StringInstr,4
+StringLPad,4
+StringLocate,4
+StringRPad,4
+StringRepeat,4
+StringReplace,4
+StringSplit,4
+StringToMap,4
+StringTranslate,4
+StringTrim,4
+StringTrimLeft,4
+StringTrimRight,4
+StructsToJson,4
+Substring,4
+SubstringIndex,4
+Subtract,4
+Sum,4
+Tan,4
+Tanh,4
+TimeAdd,4
+ToDegrees,4
+ToRadians,4
+ToUnixTimestamp,4
+TransformKeys,4
+TransformValues,4
+UnaryMinus,4
+UnaryPositive,4
+UnboundedFollowing$,4
+UnboundedPreceding$,4
+UnixTimestamp,4
+UnscaledValue,4
+Upper,4
+VariancePop,4
+VarianceSamp,4
+WeekDay,4
+WindowExpression,100
+WindowSpecDefinition,100
+XxHash64,4
+Year,4
+WriteFilesExec,4
+Empty2Null,4
+Ascii,4
+ToUTCTimestamp,4
+KMeans-pyspark,8.86
+KMeans-scala,1
+PCA-pyspark,2.24
+PCA-scala,2.69
+LinearRegression-pyspark,2
+LinearRegression-scala,1
+RandomForestClassifier-pyspark,6.31
+RandomForestClassifier-scala,1
+RandomForestRegressor-pyspark,3.66
+RandomForestRegressor-scala,1
+XGBoost-pyspark,1
+XGBoost-scala,3.31
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala
new file mode 100644
index 000000000..cfb8298d5
--- /dev/null
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2021-2023, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.tool.udf
+
+import java.io.{BufferedReader, File, InputStreamReader}
+import java.util
+
+import org.apache.hadoop.hive.ql.exec.{Description, UDF}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.rapids.tool.util.UTF8Source
+
+@Description(name = "estimate_event_rapids_ml",
+ value = "_FUNC_(applicationId, event_path) ",
+ extended = "Example:\n > SELECT estimate_event_rapids(app_1, hdfs://...)" +
+ " - Returns rapids-tool ml score.")
+class EstimateEventRapids2MapMLUDF extends UDF with Logging {
+
+ private def runProc(applicationId: String, command: String): Unit = {
+ logInfo(s"Running rapids-tools for $applicationId: $command")
+ val proc = Runtime.getRuntime.exec(command)
+ val in = new BufferedReader(new InputStreamReader(proc.getInputStream))
+ Iterator.continually(in.readLine()).takeWhile(_ != null).foreach(println)
+ in.close()
+ proc.waitFor
+ }
+
+ private def readMlOutput(applicationId: String, mlOutputFolder: String): String = {
+ val dir = new File(s"$mlOutputFolder")
+ val predictFiles = dir.listFiles.find(_.getName.startsWith("prediction"))
+ var predictScore = "0.0"
+ if (predictFiles.nonEmpty) {
+ val filePath = s"$mlOutputFolder/${predictFiles.head}"
+ val source = UTF8Source.fromFile(filePath)
+ try {
+ for (line <- source.getLines()) {
+ if (line.startsWith("Overall estimated speedup")) {
+ predictScore = line.split("\\s+").last
+ logInfo(s"predict score for $applicationId is $predictScore")
+ }
+ }
+ } finally {
+ source.close()
+ }
+ } else {
+ logError(s"Can not find prediction file in $mlOutputFolder")
+ }
+ predictScore
+ }
+
+ def evaluate(applicationId: String, eventDir: String): java.util.Map[String, String] = {
+ val eventPath = eventDir + "/" + applicationId + "_1"
+ val outputPrefix = "/opt/tiger/yodel/container/rapids_tools"
+ val outputFolder = s"file://$outputPrefix/$applicationId"
+ val command1 = s"spark_rapids qualification --platform onprem --eventlogs $eventPath " +
+ s"--output_folder $outputFolder"
+ runProc(applicationId, command1)
+
+ val mlOutputFolder = s"$outputPrefix/${applicationId}_ml"
+ val command2 = "spark_rapids prediction --platform onprem " +
+ s"--qual_output ${outputFolder.substring(7)}" +
+ s"--output_folder $mlOutputFolder"
+ runProc(applicationId, command2)
+
+ val score = readMlOutput(applicationId, mlOutputFolder)
+ val javaMap: util.HashMap[String, String] = new util.HashMap[String, String]()
+ logInfo(s"applicationId ml predict score is $score")
+ javaMap.put("rapids_ml_score", score)
+ javaMap
+ }
+}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala
new file mode 100644
index 000000000..88c0d5715
--- /dev/null
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2021-2023, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.tool.udf
+
+import java.util
+
+import org.apache.hadoop.hive.ql.exec.{Description, UDF}
+
+import org.apache.spark.internal.Logging
+
+@Description(name = "estimate_event_rapids_2_map",
+ value = "_FUNC_(output_path, event_path, score_file, enabledML) ",
+ extended = "Example:\n > SELECT estimate_event_rapids(hdfs://..., app_1, hdfs://...)" +
+ " - Returns QualificationSummaryInfo map.")
+class EstimateEventRapids2MapUDF extends UDF with Logging {
+ def evaluate(outputDir: String, applicationId: String, eventDir: String,
+ scoreFile: String = "onprem", enabledML: Boolean = false): java.util.Map[String, String] = {
+ val estimateOutput =
+ EstimateEventRapidsUDF.estimateLog(outputDir, applicationId, eventDir, scoreFile, enabledML)
+ val firstQualificationSummaryInfo = estimateOutput._2.headOption
+ val predictScore = estimateOutput._3
+ firstQualificationSummaryInfo.map { q =>
+ val javaMap: util.HashMap[String, String] = new util.HashMap[String, String]()
+ javaMap.put("appName", q.appName)
+ javaMap.put("appId", q.appId)
+ javaMap.put("Recommendation", q.estimatedInfo.recommendation)
+ javaMap.put("Speedup", q.taskSpeedupFactor.toString)
+ javaMap.put("LongestSQLDuration", q.longestSqlDuration.toString)
+ javaMap.put("UnsupportedExecs", q.unSupportedExecs)
+ javaMap.put("UnsupportedExpressions", q.unSupportedExprs)
+ javaMap.put("PredictSpeedup", predictScore)
+ javaMap
+ }.getOrElse(new util.HashMap[String, String]())
+ }
+}
diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala
new file mode 100644
index 000000000..85dffe481
--- /dev/null
+++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala
@@ -0,0 +1,148 @@
+/*
+ * Copyright (c) 2021-2023, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.nvidia.spark.rapids.tool.udf
+
+import java.io.{BufferedReader, File, InputStreamReader}
+
+import scala.util.control.NonFatal
+
+import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory}
+import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps
+import com.nvidia.spark.rapids.tool.qualification.{PluginTypeChecker, Qualification}
+import org.apache.hadoop.hive.ql.exec.{Description, UDF}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.rapids.tool.qualification.QualificationSummaryInfo
+import org.apache.spark.sql.rapids.tool.util.{RapidsToolsConfUtil, UTF8Source}
+
+@Description(name = "estimate_event_rapids",
+ value = "_FUNC_(output_path, event_path) ",
+ extended = "Example:\n > SELECT estimate_event_rapids(hdfs://..., app_1, hdfs://...)")
+class EstimateEventRapidsUDF extends UDF with Logging {
+ def evaluate(outputDir: String, applicationId: String, eventDir: String,
+ scoreFile: String = "onprem"): Int = {
+ val (exitCode, _, _) =
+ EstimateEventRapidsUDF.estimateLog(outputDir, applicationId, eventDir, scoreFile)
+ exitCode
+ }
+}
+
+object EstimateEventRapidsUDF extends Logging {
+ def estimateLog(outputDir: String, applicationId: String, eventDir: String,
+ scoreFile: String = "onprem",
+ enabledML: Boolean = false): (Int, Seq[QualificationSummaryInfo], String) = {
+ val eventPath = eventDir + "/" + applicationId + "_1"
+ val outputDirectory = outputDir + "/" + applicationId
+ val numOutputRows = 1000
+ val hadoopConf = RapidsToolsConfUtil.newHadoopConf()
+ // timeout: 20min
+ val timeout = Option(1200L)
+ val nThreads = 1
+ val order = "desc"
+
+ val platform = try {
+ val clusterPropsOpt = loadClusterProps("./worker_info.yaml")
+ PlatformFactory.createInstance("", clusterPropsOpt)
+ } catch {
+ case NonFatal(e) =>
+ logError("Error creating the platform", e)
+ return (1, Seq[QualificationSummaryInfo](), "")
+ }
+ val pluginTypeChecker = try {
+ new PluginTypeChecker(
+ platform,
+ None)
+ } catch {
+ case ie: IllegalStateException =>
+ logError("Error creating the plugin type checker!", ie)
+ return (1, Seq[QualificationSummaryInfo](), "")
+ }
+
+ val (eventLogFsFiltered, _) = EventLogPathProcessor.processAllPaths(
+ None, None, List(eventPath), hadoopConf)
+ val filteredLogs = eventLogFsFiltered
+
+ // uiEnabled = false
+ val qual = new Qualification(
+ outputPath=outputDirectory,
+ numRows=numOutputRows,
+ hadoopConf=hadoopConf,
+ timeout= timeout,
+ nThreads=nThreads,
+ order=order,
+ pluginTypeChecker=pluginTypeChecker,
+ reportReadSchema=false,
+ printStdout=false,
+ enablePB=true,
+ reportSqlLevel=false,
+ maxSQLDescLength=100,
+ mlOpsEnabled=false,
+ penalizeTransitions=true,
+ tunerContext=None,
+ clusterReport = false
+ )
+ try {
+ val res: Seq[QualificationSummaryInfo] = qual.qualifyApps(filteredLogs)
+ val predictScore = if (enabledML && res.nonEmpty) {
+ // 传python
+ // outputDirectory
+ execMLPredict(applicationId, outputDirectory)
+ } else ""
+ (0, res, predictScore)
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Error when analyze ${applicationId}, path is ${eventPath}.", e)
+ (1, Seq[QualificationSummaryInfo](), "")
+ }
+ }
+
+ private def execMLPredict(applicationId: String, outputDirectory: String): String = {
+ var proc: Process = null
+ val mlOutputPrefix = s"${applicationId}_pre"
+ val mlOutput = s"ml_qualx_output/$mlOutputPrefix"
+ val command = "spark_rapids prediction --platform onprem " +
+ s"--qual_output $outputDirectory" +
+ "--output_folder " +
+ s"./$mlOutput"
+ proc = Runtime.getRuntime.exec(command)
+ val in = new BufferedReader(new InputStreamReader(proc.getInputStream))
+ Iterator.continually(in.readLine()).takeWhile(_ != null).foreach(println)
+ in.close()
+ proc.waitFor
+
+ val dir = new File(s"./$mlOutput")
+ // 过滤出以"predict"开头的文件
+ val predictFiles = dir.listFiles.find(_.getName.startsWith("prediction"))
+ assert(predictFiles.nonEmpty, "can not find prediction file")
+
+ // read ml output
+ var predictScore = "0.0"
+ val filePath = s"./$mlOutput/${predictFiles.head}"
+ val source = UTF8Source.fromFile(filePath)
+ try {
+ for (line <- source.getLines()) {
+ if (line.startsWith("Overall estimated speedup")) {
+ predictScore = line.split("\\s+").last
+ logInfo(s"predict score for $applicationId is $predictScore")
+ }
+ }
+ } finally {
+ source.close()
+ }
+ predictScore
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala
index c56bbe33b..bf268bfab 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala
@@ -35,8 +35,9 @@ class SparkPlanInfoWithStage(
override val children: Seq[SparkPlanInfoWithStage],
metadata: scala.Predef.Map[String, String],
metrics: Seq[SQLMetricInfo],
- val stageId: Option[Int]) extends SparkPlanInfo(nodeName, simpleString, children,
- metadata, metrics) {
+ val stageId: Option[Int],
+ planId: Int = 0) extends SparkPlanInfo(nodeName, simpleString, children,
+ metadata, metrics, planId) {
import SparkPlanInfoWithStage._
diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsPlanGraph.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsPlanGraph.scala
index a112da0b9..a37b6a0c3 100644
--- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsPlanGraph.scala
+++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsPlanGraph.scala
@@ -203,7 +203,7 @@ object ToolsPlanGraph {
def constructGraphNode(id: Long, name: String, desc: String,
metrics: collection.Seq[SQLPlanMetric]): SparkPlanGraphNode = {
try {
- new SparkPlanGraphNode(id, name, desc, metrics)
+ new SparkPlanGraphNode(id, name, desc, metrics, 0)
} catch {
case _: java.lang.NoSuchMethodError =>
dbRuntimeReflection.constructNode(id, name, desc, metrics)
@@ -229,7 +229,7 @@ object ToolsPlanGraph {
nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
metrics: collection.Seq[SQLPlanMetric]): SparkPlanGraphCluster = {
try {
- new SparkPlanGraphCluster(id, name, desc, nodes, metrics)
+ new SparkPlanGraphCluster(id, name, desc, nodes, metrics, 0)
} catch {
case _: java.lang.NoSuchMethodError =>
dbRuntimeReflection.constructCluster(id, name, desc, nodes, metrics)