diff --git a/core/pom.xml b/core/pom.xml index e5f56a9f1..ab1327cce 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -160,6 +160,21 @@ 3.3.6 + + bd321 + + + buildver + 321 + + + + 321 + ${bdspark321.version} + ${delta20x.version} + 3.3.6 + + release322 @@ -443,6 +458,7 @@ 3.1.4-SNAPSHOT 3.2.0 3.2.1 + 3.2.1-bd1-SNAPSHOT 3.2.2 3.2.3 3.2.4 @@ -502,6 +518,7 @@ ${platform-encoding} ${platform-encoding} ${platform-encoding} + 1.2.2-bd103 @@ -582,6 +599,40 @@ ${spark.version} test + + org.apache.hive + hive-common + ${hive.version} + provided + + + org.apache.hive + hive-exec + ${hive.version} + provided + + + + + + + + + + + + + + + + + + + + + + + diff --git a/core/src/main/resources/opScore-onprem-bytedance0325.csv b/core/src/main/resources/opScore-onprem-bytedance0325.csv new file mode 100644 index 000000000..e8e8a7fb9 --- /dev/null +++ b/core/src/main/resources/opScore-onprem-bytedance0325.csv @@ -0,0 +1,283 @@ +CPUOperator,Score +CoalesceExec,3.0 +CollectLimitExec,3.0 +ExpandExec,2.68 +FileSourceScanExec,0.5 +FilterExec,22.04 +GenerateExec,3.0 +GlobalLimitExec,3.0 +LocalLimitExec,3.0 +ProjectExec,1.5 +RangeExec,3.0 +SampleExec,3.0 +SortExec,6.64 +SubqueryBroadcastExec,3.0 +TakeOrderedAndProjectExec,3.0 +UnionExec,3.0 +CustomShuffleReaderExec,3.0 +HashAggregateExec,10.55 +ObjectHashAggregateExec,3.0 +SortAggregateExec,3.0 +InMemoryTableScanExec,3.0 +DataWritingCommandExec,3.0 +ExecutedCommandExec,3.0 +BatchScanExec,0.5 +BroadcastExchangeExec,30.0 +ShuffleExchangeExec,1.0 +BroadcastHashJoinExec,40.00 +BroadcastNestedLoopJoinExec,3.0 +CartesianProductExec,3.0 +ShuffledHashJoinExec,22.29 +SortMergeJoinExec,22.33 +AggregateInPandasExec,1.2 +ArrowEvalPythonExec,1.2 +FlatMapCoGroupsInPandasExec,3.0 +FlatMapGroupsInPandasExec,1.2 +MapInPandasExec,1.2 +WindowInPandasExec,1.2 +WindowExec,127.61 +HiveTableScanExec,0.5 +Abs,4 +Acos,4 +Acosh,4 +Add,4 +AggregateExpression,4 +Alias,4 +And,4 +ApproximatePercentile,4 +ArrayContains,4 +ArrayExcept,4 +ArrayExists,4 +ArrayIntersect,4 +ArrayMax,4 +ArrayMin,4 +ArrayRemove,4 +ArrayRepeat,4 +ArrayTransform,4 +ArrayUnion,4 +ArraysOverlap,4 +ArraysZip,4 +Asin,4 +Asinh,4 +AtLeastNNonNulls,4 +Atan,4 +Atanh,4 +AttributeReference,4 +Average,4 +BRound,4 +BitLength,4 +BitwiseAnd,4 +BitwiseNot,4 +BitwiseOr,4 +BitwiseXor,4 +CaseWhen,4 +Cbrt,4 +Ceil,4 +CheckOverflow,4 +Coalesce,4 +CollectList,4 +CollectSet,4 +Concat,4 +ConcatWs,4 +Contains,4 +Conv,4 +Cos,4 +Cosh,4 +Cot,4 +Count,4 +CreateArray,4 +CreateMap,4 +CreateNamedStruct,4 +CurrentRow$,4 +DateAdd,4 +DateAddInterval,4 +DateDiff,4 +DateFormatClass,4 +DateSub,4 +DayOfMonth,4 +DayOfWeek,4 +DayOfYear,4 +DenseRank,4 +Divide,4 +DynamicPruningExpression,4 +ElementAt,4 +EndsWith,4 +EqualNullSafe,4 +EqualTo,4 +Exp,4 +Explode,4 +Expm1,4 +First,4 +Flatten,4 +Floor,4 +FormatNumber,4 +FromUTCTimestamp,4 +FromUnixTime,4 +GetArrayItem,4 +GetArrayStructFields,4 +GetJsonObject,4 +GetMapValue,4 +GetStructField,4 +GetTimestamp,4 +GreaterThan,4 +GreaterThanOrEqual,4 +Greatest,4 +HiveGenericUDF,4 +HiveSimpleUDF,4 +Hour,4 +Hypot,4 +If,4 +In,4 +InSet,4 +InitCap,4 +InputFileBlockLength,4 +InputFileBlockStart,4 +InputFileName,4 +IntegralDivide,4 +IsNaN,4 +IsNotNull,4 +IsNull,4 +JsonToStructs,4 +JsonTuple,4 +KnownFloatingPointNormalized,4 +KnownNotNull,4 +Lag,4 +LambdaFunction,4 +Last,4 +LastDay,4 +Lead,4 +Least,4 +Length,4 +LessThan,4 +LessThanOrEqual,4 +Like,4 +Literal,4 +Log,4 +Log10,4 +Log1p,4 +Log2,4 +Logarithm,4 +Lower,4 +MakeDecimal,4 +MapConcat,4 +MapEntries,4 +MapFilter,4 +MapKeys,4 +MapValues,4 +Max,4 +Md5,4 +MicrosToTimestamp,4 +MillisToTimestamp,4 +Min,4 +Minute,4 +MonotonicallyIncreasingID,4 +Month,4 +Multiply,4 +Murmur3Hash,4 +NaNvl,4 +NamedLambdaVariable,4 +NormalizeNaNAndZero,4 +Not,4 +NthValue,4 +OctetLength,4 +Or,4 +ParseUrl,4 +Percentile,4 +PercentRank,4 +PivotFirst,4 +Pmod,4 +PosExplode,4 +Pow,4 +PreciseTimestampConversion,4 +PromotePrecision,4 +PythonUDF,4 +Quarter,4 +RLike,4 +RaiseError,4 +Rand,4 +Rank,4 +RegExpExtract,4 +RegExpExtractAll,4 +RegExpReplace,4 +Remainder,4 +ReplicateRows,4 +Reverse,4 +Rint,4 +Round,4 +RowNumber,4 +ScalaUDF,4 +ScalarSubquery,4 +Second,4 +SecondsToTimestamp,4 +Sequence,4 +ShiftLeft,4 +ShiftRight,4 +ShiftRightUnsigned,4 +Signum,4 +Sin,4 +Sinh,4 +Size,4 +SortArray,4 +SortOrder,4 +SparkPartitionID,4 +SpecifiedWindowFrame,100 +Sqrt,4 +Stack,4 +StartsWith,4 +StddevPop,4 +StddevSamp,4 +StringInstr,4 +StringLPad,4 +StringLocate,4 +StringRPad,4 +StringRepeat,4 +StringReplace,4 +StringSplit,4 +StringToMap,4 +StringTranslate,4 +StringTrim,4 +StringTrimLeft,4 +StringTrimRight,4 +StructsToJson,4 +Substring,4 +SubstringIndex,4 +Subtract,4 +Sum,4 +Tan,4 +Tanh,4 +TimeAdd,4 +ToDegrees,4 +ToRadians,4 +ToUnixTimestamp,4 +TransformKeys,4 +TransformValues,4 +UnaryMinus,4 +UnaryPositive,4 +UnboundedFollowing$,4 +UnboundedPreceding$,4 +UnixTimestamp,4 +UnscaledValue,4 +Upper,4 +VariancePop,4 +VarianceSamp,4 +WeekDay,4 +WindowExpression,100 +WindowSpecDefinition,100 +XxHash64,4 +Year,4 +WriteFilesExec,4 +Empty2Null,4 +Ascii,4 +ToUTCTimestamp,4 +KMeans-pyspark,8.86 +KMeans-scala,1 +PCA-pyspark,2.24 +PCA-scala,2.69 +LinearRegression-pyspark,2 +LinearRegression-scala,1 +RandomForestClassifier-pyspark,6.31 +RandomForestClassifier-scala,1 +RandomForestRegressor-pyspark,3.66 +RandomForestRegressor-scala,1 +XGBoost-pyspark,1 +XGBoost-scala,3.31 diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala new file mode 100644 index 000000000..cfb8298d5 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.udf + +import java.io.{BufferedReader, File, InputStreamReader} +import java.util + +import org.apache.hadoop.hive.ql.exec.{Description, UDF} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.rapids.tool.util.UTF8Source + +@Description(name = "estimate_event_rapids_ml", + value = "_FUNC_(applicationId, event_path) ", + extended = "Example:\n > SELECT estimate_event_rapids(app_1, hdfs://...)" + + " - Returns rapids-tool ml score.") +class EstimateEventRapids2MapMLUDF extends UDF with Logging { + + private def runProc(applicationId: String, command: String): Unit = { + logInfo(s"Running rapids-tools for $applicationId: $command") + val proc = Runtime.getRuntime.exec(command) + val in = new BufferedReader(new InputStreamReader(proc.getInputStream)) + Iterator.continually(in.readLine()).takeWhile(_ != null).foreach(println) + in.close() + proc.waitFor + } + + private def readMlOutput(applicationId: String, mlOutputFolder: String): String = { + val dir = new File(s"$mlOutputFolder") + val predictFiles = dir.listFiles.find(_.getName.startsWith("prediction")) + var predictScore = "0.0" + if (predictFiles.nonEmpty) { + val filePath = s"$mlOutputFolder/${predictFiles.head}" + val source = UTF8Source.fromFile(filePath) + try { + for (line <- source.getLines()) { + if (line.startsWith("Overall estimated speedup")) { + predictScore = line.split("\\s+").last + logInfo(s"predict score for $applicationId is $predictScore") + } + } + } finally { + source.close() + } + } else { + logError(s"Can not find prediction file in $mlOutputFolder") + } + predictScore + } + + def evaluate(applicationId: String, eventDir: String): java.util.Map[String, String] = { + val eventPath = eventDir + "/" + applicationId + "_1" + val outputPrefix = "/opt/tiger/yodel/container/rapids_tools" + val outputFolder = s"file://$outputPrefix/$applicationId" + val command1 = s"spark_rapids qualification --platform onprem --eventlogs $eventPath " + + s"--output_folder $outputFolder" + runProc(applicationId, command1) + + val mlOutputFolder = s"$outputPrefix/${applicationId}_ml" + val command2 = "spark_rapids prediction --platform onprem " + + s"--qual_output ${outputFolder.substring(7)}" + + s"--output_folder $mlOutputFolder" + runProc(applicationId, command2) + + val score = readMlOutput(applicationId, mlOutputFolder) + val javaMap: util.HashMap[String, String] = new util.HashMap[String, String]() + logInfo(s"applicationId ml predict score is $score") + javaMap.put("rapids_ml_score", score) + javaMap + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala new file mode 100644 index 000000000..88c0d5715 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.udf + +import java.util + +import org.apache.hadoop.hive.ql.exec.{Description, UDF} + +import org.apache.spark.internal.Logging + +@Description(name = "estimate_event_rapids_2_map", + value = "_FUNC_(output_path, event_path, score_file, enabledML) ", + extended = "Example:\n > SELECT estimate_event_rapids(hdfs://..., app_1, hdfs://...)" + + " - Returns QualificationSummaryInfo map.") +class EstimateEventRapids2MapUDF extends UDF with Logging { + def evaluate(outputDir: String, applicationId: String, eventDir: String, + scoreFile: String = "onprem", enabledML: Boolean = false): java.util.Map[String, String] = { + val estimateOutput = + EstimateEventRapidsUDF.estimateLog(outputDir, applicationId, eventDir, scoreFile, enabledML) + val firstQualificationSummaryInfo = estimateOutput._2.headOption + val predictScore = estimateOutput._3 + firstQualificationSummaryInfo.map { q => + val javaMap: util.HashMap[String, String] = new util.HashMap[String, String]() + javaMap.put("appName", q.appName) + javaMap.put("appId", q.appId) + javaMap.put("Recommendation", q.estimatedInfo.recommendation) + javaMap.put("Speedup", q.taskSpeedupFactor.toString) + javaMap.put("LongestSQLDuration", q.longestSqlDuration.toString) + javaMap.put("UnsupportedExecs", q.unSupportedExecs) + javaMap.put("UnsupportedExpressions", q.unSupportedExprs) + javaMap.put("PredictSpeedup", predictScore) + javaMap + }.getOrElse(new util.HashMap[String, String]()) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala new file mode 100644 index 000000000..85dffe481 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.udf + +import java.io.{BufferedReader, File, InputStreamReader} + +import scala.util.control.NonFatal + +import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory} +import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps +import com.nvidia.spark.rapids.tool.qualification.{PluginTypeChecker, Qualification} +import org.apache.hadoop.hive.ql.exec.{Description, UDF} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.rapids.tool.qualification.QualificationSummaryInfo +import org.apache.spark.sql.rapids.tool.util.{RapidsToolsConfUtil, UTF8Source} + +@Description(name = "estimate_event_rapids", + value = "_FUNC_(output_path, event_path) ", + extended = "Example:\n > SELECT estimate_event_rapids(hdfs://..., app_1, hdfs://...)") +class EstimateEventRapidsUDF extends UDF with Logging { + def evaluate(outputDir: String, applicationId: String, eventDir: String, + scoreFile: String = "onprem"): Int = { + val (exitCode, _, _) = + EstimateEventRapidsUDF.estimateLog(outputDir, applicationId, eventDir, scoreFile) + exitCode + } +} + +object EstimateEventRapidsUDF extends Logging { + def estimateLog(outputDir: String, applicationId: String, eventDir: String, + scoreFile: String = "onprem", + enabledML: Boolean = false): (Int, Seq[QualificationSummaryInfo], String) = { + val eventPath = eventDir + "/" + applicationId + "_1" + val outputDirectory = outputDir + "/" + applicationId + val numOutputRows = 1000 + val hadoopConf = RapidsToolsConfUtil.newHadoopConf() + // timeout: 20min + val timeout = Option(1200L) + val nThreads = 1 + val order = "desc" + + val platform = try { + val clusterPropsOpt = loadClusterProps("./worker_info.yaml") + PlatformFactory.createInstance("", clusterPropsOpt) + } catch { + case NonFatal(e) => + logError("Error creating the platform", e) + return (1, Seq[QualificationSummaryInfo](), "") + } + val pluginTypeChecker = try { + new PluginTypeChecker( + platform, + None) + } catch { + case ie: IllegalStateException => + logError("Error creating the plugin type checker!", ie) + return (1, Seq[QualificationSummaryInfo](), "") + } + + val (eventLogFsFiltered, _) = EventLogPathProcessor.processAllPaths( + None, None, List(eventPath), hadoopConf) + val filteredLogs = eventLogFsFiltered + + // uiEnabled = false + val qual = new Qualification( + outputPath=outputDirectory, + numRows=numOutputRows, + hadoopConf=hadoopConf, + timeout= timeout, + nThreads=nThreads, + order=order, + pluginTypeChecker=pluginTypeChecker, + reportReadSchema=false, + printStdout=false, + enablePB=true, + reportSqlLevel=false, + maxSQLDescLength=100, + mlOpsEnabled=false, + penalizeTransitions=true, + tunerContext=None, + clusterReport = false + ) + try { + val res: Seq[QualificationSummaryInfo] = qual.qualifyApps(filteredLogs) + val predictScore = if (enabledML && res.nonEmpty) { + // 传python + // outputDirectory + execMLPredict(applicationId, outputDirectory) + } else "" + (0, res, predictScore) + } catch { + case NonFatal(e) => + logError(s"Error when analyze ${applicationId}, path is ${eventPath}.", e) + (1, Seq[QualificationSummaryInfo](), "") + } + } + + private def execMLPredict(applicationId: String, outputDirectory: String): String = { + var proc: Process = null + val mlOutputPrefix = s"${applicationId}_pre" + val mlOutput = s"ml_qualx_output/$mlOutputPrefix" + val command = "spark_rapids prediction --platform onprem " + + s"--qual_output $outputDirectory" + + "--output_folder " + + s"./$mlOutput" + proc = Runtime.getRuntime.exec(command) + val in = new BufferedReader(new InputStreamReader(proc.getInputStream)) + Iterator.continually(in.readLine()).takeWhile(_ != null).foreach(println) + in.close() + proc.waitFor + + val dir = new File(s"./$mlOutput") + // 过滤出以"predict"开头的文件 + val predictFiles = dir.listFiles.find(_.getName.startsWith("prediction")) + assert(predictFiles.nonEmpty, "can not find prediction file") + + // read ml output + var predictScore = "0.0" + val filePath = s"./$mlOutput/${predictFiles.head}" + val source = UTF8Source.fromFile(filePath) + try { + for (line <- source.getLines()) { + if (line.startsWith("Overall estimated speedup")) { + predictScore = line.split("\\s+").last + logInfo(s"predict score for $applicationId is $predictScore") + } + } + } finally { + source.close() + } + predictScore + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala index c56bbe33b..bf268bfab 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala @@ -35,8 +35,9 @@ class SparkPlanInfoWithStage( override val children: Seq[SparkPlanInfoWithStage], metadata: scala.Predef.Map[String, String], metrics: Seq[SQLMetricInfo], - val stageId: Option[Int]) extends SparkPlanInfo(nodeName, simpleString, children, - metadata, metrics) { + val stageId: Option[Int], + planId: Int = 0) extends SparkPlanInfo(nodeName, simpleString, children, + metadata, metrics, planId) { import SparkPlanInfoWithStage._ diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsPlanGraph.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsPlanGraph.scala index a112da0b9..a37b6a0c3 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsPlanGraph.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsPlanGraph.scala @@ -203,7 +203,7 @@ object ToolsPlanGraph { def constructGraphNode(id: Long, name: String, desc: String, metrics: collection.Seq[SQLPlanMetric]): SparkPlanGraphNode = { try { - new SparkPlanGraphNode(id, name, desc, metrics) + new SparkPlanGraphNode(id, name, desc, metrics, 0) } catch { case _: java.lang.NoSuchMethodError => dbRuntimeReflection.constructNode(id, name, desc, metrics) @@ -229,7 +229,7 @@ object ToolsPlanGraph { nodes: mutable.ArrayBuffer[SparkPlanGraphNode], metrics: collection.Seq[SQLPlanMetric]): SparkPlanGraphCluster = { try { - new SparkPlanGraphCluster(id, name, desc, nodes, metrics) + new SparkPlanGraphCluster(id, name, desc, nodes, metrics, 0) } catch { case _: java.lang.NoSuchMethodError => dbRuntimeReflection.constructCluster(id, name, desc, nodes, metrics)