From 5dcc95bb259dcf10b449117e23c1eaccc9f30e25 Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Sat, 14 Sep 2024 15:44:39 +0800 Subject: [PATCH 01/17] Compatible with bd spark --- core/pom.xml | 16 ++++++++++++++++ .../rapids/tool/profiling/ApplicationInfo.scala | 3 ++- .../sql/rapids/tool/util/ToolsPlanGraph.scala | 4 ++-- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index e5f56a9f1..64217f4f0 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -160,6 +160,21 @@ 3.3.6 + + bd321 + + + buildver + 321 + + + + 321 + ${bdspark321.version} + ${delta20x.version} + 3.3.6 + + release322 @@ -443,6 +458,7 @@ 3.1.4-SNAPSHOT 3.2.0 3.2.1 + 3.2.1-bd1-SNAPSHOT 3.2.2 3.2.3 3.2.4 diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala index c56bbe33b..468838243 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala @@ -35,7 +35,8 @@ class SparkPlanInfoWithStage( override val children: Seq[SparkPlanInfoWithStage], metadata: scala.Predef.Map[String, String], metrics: Seq[SQLMetricInfo], - val stageId: Option[Int]) extends SparkPlanInfo(nodeName, simpleString, children, + val stageId: Option[Int], + planId: Int = 0) extends SparkPlanInfo(nodeName, simpleString, children, metadata, metrics) { import SparkPlanInfoWithStage._ diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsPlanGraph.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsPlanGraph.scala index a112da0b9..a37b6a0c3 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsPlanGraph.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ToolsPlanGraph.scala @@ -203,7 +203,7 @@ object ToolsPlanGraph { def constructGraphNode(id: Long, name: String, desc: String, metrics: collection.Seq[SQLPlanMetric]): SparkPlanGraphNode = { try { - new SparkPlanGraphNode(id, name, desc, metrics) + new SparkPlanGraphNode(id, name, desc, metrics, 0) } catch { case _: java.lang.NoSuchMethodError => dbRuntimeReflection.constructNode(id, name, desc, metrics) @@ -229,7 +229,7 @@ object ToolsPlanGraph { nodes: mutable.ArrayBuffer[SparkPlanGraphNode], metrics: collection.Seq[SQLPlanMetric]): SparkPlanGraphCluster = { try { - new SparkPlanGraphCluster(id, name, desc, nodes, metrics) + new SparkPlanGraphCluster(id, name, desc, nodes, metrics, 0) } catch { case _: java.lang.NoSuchMethodError => dbRuntimeReflection.constructCluster(id, name, desc, nodes, metrics) From 1e5a8b415f5a119f87cea099b81e9cfaafce764d Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Fri, 20 Oct 2023 11:31:35 +0800 Subject: [PATCH 02/17] EstimateEventRapidsUDF v1 change path add description add description add timeout 20min add return map udf udf use java map instead of scala map add udf 2 make FileSourceScanExecParser factor always 1 rename Support opScore for bd 0325 --- core/pom.xml | 35 +++ .../opScore-onprem-bytedance0325.csv | 283 ++++++++++++++++++ .../tool/udf/EstimateEventRapids2MapUDF.scala | 47 +++ .../tool/udf/EstimateEventRapidsUDF.scala | 77 +++++ 4 files changed, 442 insertions(+) create mode 100644 core/src/main/resources/opScore-onprem-bytedance0325.csv create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala diff --git a/core/pom.xml b/core/pom.xml index 64217f4f0..ab1327cce 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -518,6 +518,7 @@ ${platform-encoding} ${platform-encoding} ${platform-encoding} + 1.2.2-bd103 @@ -598,6 +599,40 @@ ${spark.version} test + + org.apache.hive + hive-common + ${hive.version} + provided + + + org.apache.hive + hive-exec + ${hive.version} + provided + + + + + + + + + + + + + + + + + + + + + + + diff --git a/core/src/main/resources/opScore-onprem-bytedance0325.csv b/core/src/main/resources/opScore-onprem-bytedance0325.csv new file mode 100644 index 000000000..e8e8a7fb9 --- /dev/null +++ b/core/src/main/resources/opScore-onprem-bytedance0325.csv @@ -0,0 +1,283 @@ +CPUOperator,Score +CoalesceExec,3.0 +CollectLimitExec,3.0 +ExpandExec,2.68 +FileSourceScanExec,0.5 +FilterExec,22.04 +GenerateExec,3.0 +GlobalLimitExec,3.0 +LocalLimitExec,3.0 +ProjectExec,1.5 +RangeExec,3.0 +SampleExec,3.0 +SortExec,6.64 +SubqueryBroadcastExec,3.0 +TakeOrderedAndProjectExec,3.0 +UnionExec,3.0 +CustomShuffleReaderExec,3.0 +HashAggregateExec,10.55 +ObjectHashAggregateExec,3.0 +SortAggregateExec,3.0 +InMemoryTableScanExec,3.0 +DataWritingCommandExec,3.0 +ExecutedCommandExec,3.0 +BatchScanExec,0.5 +BroadcastExchangeExec,30.0 +ShuffleExchangeExec,1.0 +BroadcastHashJoinExec,40.00 +BroadcastNestedLoopJoinExec,3.0 +CartesianProductExec,3.0 +ShuffledHashJoinExec,22.29 +SortMergeJoinExec,22.33 +AggregateInPandasExec,1.2 +ArrowEvalPythonExec,1.2 +FlatMapCoGroupsInPandasExec,3.0 +FlatMapGroupsInPandasExec,1.2 +MapInPandasExec,1.2 +WindowInPandasExec,1.2 +WindowExec,127.61 +HiveTableScanExec,0.5 +Abs,4 +Acos,4 +Acosh,4 +Add,4 +AggregateExpression,4 +Alias,4 +And,4 +ApproximatePercentile,4 +ArrayContains,4 +ArrayExcept,4 +ArrayExists,4 +ArrayIntersect,4 +ArrayMax,4 +ArrayMin,4 +ArrayRemove,4 +ArrayRepeat,4 +ArrayTransform,4 +ArrayUnion,4 +ArraysOverlap,4 +ArraysZip,4 +Asin,4 +Asinh,4 +AtLeastNNonNulls,4 +Atan,4 +Atanh,4 +AttributeReference,4 +Average,4 +BRound,4 +BitLength,4 +BitwiseAnd,4 +BitwiseNot,4 +BitwiseOr,4 +BitwiseXor,4 +CaseWhen,4 +Cbrt,4 +Ceil,4 +CheckOverflow,4 +Coalesce,4 +CollectList,4 +CollectSet,4 +Concat,4 +ConcatWs,4 +Contains,4 +Conv,4 +Cos,4 +Cosh,4 +Cot,4 +Count,4 +CreateArray,4 +CreateMap,4 +CreateNamedStruct,4 +CurrentRow$,4 +DateAdd,4 +DateAddInterval,4 +DateDiff,4 +DateFormatClass,4 +DateSub,4 +DayOfMonth,4 +DayOfWeek,4 +DayOfYear,4 +DenseRank,4 +Divide,4 +DynamicPruningExpression,4 +ElementAt,4 +EndsWith,4 +EqualNullSafe,4 +EqualTo,4 +Exp,4 +Explode,4 +Expm1,4 +First,4 +Flatten,4 +Floor,4 +FormatNumber,4 +FromUTCTimestamp,4 +FromUnixTime,4 +GetArrayItem,4 +GetArrayStructFields,4 +GetJsonObject,4 +GetMapValue,4 +GetStructField,4 +GetTimestamp,4 +GreaterThan,4 +GreaterThanOrEqual,4 +Greatest,4 +HiveGenericUDF,4 +HiveSimpleUDF,4 +Hour,4 +Hypot,4 +If,4 +In,4 +InSet,4 +InitCap,4 +InputFileBlockLength,4 +InputFileBlockStart,4 +InputFileName,4 +IntegralDivide,4 +IsNaN,4 +IsNotNull,4 +IsNull,4 +JsonToStructs,4 +JsonTuple,4 +KnownFloatingPointNormalized,4 +KnownNotNull,4 +Lag,4 +LambdaFunction,4 +Last,4 +LastDay,4 +Lead,4 +Least,4 +Length,4 +LessThan,4 +LessThanOrEqual,4 +Like,4 +Literal,4 +Log,4 +Log10,4 +Log1p,4 +Log2,4 +Logarithm,4 +Lower,4 +MakeDecimal,4 +MapConcat,4 +MapEntries,4 +MapFilter,4 +MapKeys,4 +MapValues,4 +Max,4 +Md5,4 +MicrosToTimestamp,4 +MillisToTimestamp,4 +Min,4 +Minute,4 +MonotonicallyIncreasingID,4 +Month,4 +Multiply,4 +Murmur3Hash,4 +NaNvl,4 +NamedLambdaVariable,4 +NormalizeNaNAndZero,4 +Not,4 +NthValue,4 +OctetLength,4 +Or,4 +ParseUrl,4 +Percentile,4 +PercentRank,4 +PivotFirst,4 +Pmod,4 +PosExplode,4 +Pow,4 +PreciseTimestampConversion,4 +PromotePrecision,4 +PythonUDF,4 +Quarter,4 +RLike,4 +RaiseError,4 +Rand,4 +Rank,4 +RegExpExtract,4 +RegExpExtractAll,4 +RegExpReplace,4 +Remainder,4 +ReplicateRows,4 +Reverse,4 +Rint,4 +Round,4 +RowNumber,4 +ScalaUDF,4 +ScalarSubquery,4 +Second,4 +SecondsToTimestamp,4 +Sequence,4 +ShiftLeft,4 +ShiftRight,4 +ShiftRightUnsigned,4 +Signum,4 +Sin,4 +Sinh,4 +Size,4 +SortArray,4 +SortOrder,4 +SparkPartitionID,4 +SpecifiedWindowFrame,100 +Sqrt,4 +Stack,4 +StartsWith,4 +StddevPop,4 +StddevSamp,4 +StringInstr,4 +StringLPad,4 +StringLocate,4 +StringRPad,4 +StringRepeat,4 +StringReplace,4 +StringSplit,4 +StringToMap,4 +StringTranslate,4 +StringTrim,4 +StringTrimLeft,4 +StringTrimRight,4 +StructsToJson,4 +Substring,4 +SubstringIndex,4 +Subtract,4 +Sum,4 +Tan,4 +Tanh,4 +TimeAdd,4 +ToDegrees,4 +ToRadians,4 +ToUnixTimestamp,4 +TransformKeys,4 +TransformValues,4 +UnaryMinus,4 +UnaryPositive,4 +UnboundedFollowing$,4 +UnboundedPreceding$,4 +UnixTimestamp,4 +UnscaledValue,4 +Upper,4 +VariancePop,4 +VarianceSamp,4 +WeekDay,4 +WindowExpression,100 +WindowSpecDefinition,100 +XxHash64,4 +Year,4 +WriteFilesExec,4 +Empty2Null,4 +Ascii,4 +ToUTCTimestamp,4 +KMeans-pyspark,8.86 +KMeans-scala,1 +PCA-pyspark,2.24 +PCA-scala,2.69 +LinearRegression-pyspark,2 +LinearRegression-scala,1 +RandomForestClassifier-pyspark,6.31 +RandomForestClassifier-scala,1 +RandomForestRegressor-pyspark,3.66 +RandomForestRegressor-scala,1 +XGBoost-pyspark,1 +XGBoost-scala,3.31 diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala new file mode 100644 index 000000000..4986cb5a0 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.udf + +import java.util + +import org.apache.hadoop.hive.ql.exec.{Description, UDF} + +import org.apache.spark.internal.Logging + +@Description(name = "estimate_event_rapids_2_map", + value = "_FUNC_(output_path, event_path) ", + extended = "Example:\n > SELECT estimate_event_rapids(hdfs://..., app_1, hdfs://...)" + + " - Returns QualificationSummaryInfo map.") +class EstimateEventRapids2MapUDF extends UDF with Logging { + def evaluate(outputDir: String, applicationId: String, eventDir: String, + scoreFile: String = "onprem"): java.util.Map[String, String] = { + val estimateOutput = + EstimateEventRapidsUDF.estimateLog(outputDir, applicationId, eventDir, scoreFile) + val firstQualificationSummaryInfo = estimateOutput._2.headOption + firstQualificationSummaryInfo.map { q => + val javaMap: util.HashMap[String, String] = new util.HashMap[String, String]() + javaMap.put("appName", q.appName) + javaMap.put("appId", q.appId) + javaMap.put("Recommendation", q.estimatedInfo.recommendation) + javaMap.put("Speedup", q.taskSpeedupFactor.toString) + javaMap.put("LongestSQLDuration", q.longestSqlDuration.toString) + javaMap.put("UnsupportedExecs", q.unSupportedExecs) + javaMap.put("UnsupportedExpressions", q.unSupportedExprs) + javaMap + }.getOrElse(new util.HashMap[String, String]()) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala new file mode 100644 index 000000000..85979f226 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.udf + +import scala.util.control.NonFatal + +import com.nvidia.spark.rapids.tool.EventLogPathProcessor +import com.nvidia.spark.rapids.tool.qualification.{PluginTypeChecker, Qualification} +import org.apache.hadoop.hive.ql.exec.{Description, UDF} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.rapids.tool.qualification.QualificationSummaryInfo +import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil + +@Description(name = "estimate_event_rapids", + value = "_FUNC_(output_path, event_path) ", + extended = "Example:\n > SELECT estimate_event_rapids(hdfs://..., app_1, hdfs://...)") +class EstimateEventRapidsUDF extends UDF with Logging { + def evaluate(outputDir: String, applicationId: String, eventDir: String, + scoreFile: String = "onprem"): Int = { + val (exitCode, _) = + EstimateEventRapidsUDF.estimateLog(outputDir, applicationId, eventDir, scoreFile) + exitCode + } +} + +object EstimateEventRapidsUDF extends Logging { + def estimateLog(outputDir: String, applicationId: String, eventDir: String, + scoreFile: String = "onprem"): (Int, Seq[QualificationSummaryInfo]) = { + val eventPath = eventDir + "/" + applicationId + "_1" + val outputDirectory = outputDir + "/" + applicationId + val numOutputRows = 1000 + val hadoopConf = RapidsToolsConfUtil.newHadoopConf() + // timeout: 20min + val timeout = Option(1200L) + val nThreads = 1 + val order = "desc" + val pluginTypeChecker = try { + new PluginTypeChecker("onprem", None) + } catch { + case ie: IllegalStateException => + logError("Error creating the plugin type checker!", ie) + return (1, Seq[QualificationSummaryInfo]()) + } + + val (eventLogFsFiltered, allEventLogs) = EventLogPathProcessor.processAllPaths( + None, None, List(eventPath), hadoopConf) + val filteredLogs = eventLogFsFiltered + + val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, + timeout, nThreads, order, pluginTypeChecker, false, false, false, + true, false, 100, false, true) + try { + val res = qual.qualifyApps(filteredLogs) + (0, res) + } catch { + case NonFatal(e) => + logError(s"Error when analyze ${applicationId}, path is ${eventPath}.") + e.printStackTrace() + (1, Seq[QualificationSummaryInfo]()) + } + } +} From 7dfe11c8170aebb8fcd337a1ed9a42110f52efa6 Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Sat, 14 Sep 2024 17:16:49 +0800 Subject: [PATCH 03/17] wip for new Qualification --- .../tool/udf/EstimateEventRapidsUDF.scala | 29 ++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala index 85979f226..d4f30ad72 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala @@ -18,8 +18,10 @@ package com.nvidia.spark.rapids.tool.udf import scala.util.control.NonFatal -import com.nvidia.spark.rapids.tool.EventLogPathProcessor +import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory} +import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps import com.nvidia.spark.rapids.tool.qualification.{PluginTypeChecker, Qualification} +import com.nvidia.spark.rapids.tool.qualification.QualificationMain.logError import org.apache.hadoop.hive.ql.exec.{Description, UDF} import org.apache.spark.internal.Logging @@ -49,8 +51,26 @@ object EstimateEventRapidsUDF extends Logging { val timeout = Option(1200L) val nThreads = 1 val order = "desc" + // val pluginTypeChecker = try { + // new PluginTypeChecker("onprem", None) + // } catch { + // case ie: IllegalStateException => + // logError("Error creating the plugin type checker!", ie) + // return (1, Seq[QualificationSummaryInfo]()) + // } + + val platform = try { + val clusterPropsOpt = loadClusterProps("") + PlatformFactory.createInstance("", clusterPropsOpt) + } catch { + case NonFatal(e) => + logError("Error creating the platform", e) + return (1, Seq[QualificationSummaryInfo]()) + } val pluginTypeChecker = try { - new PluginTypeChecker("onprem", None) + new PluginTypeChecker( + platform, + None) } catch { case ie: IllegalStateException => logError("Error creating the plugin type checker!", ie) @@ -61,9 +81,10 @@ object EstimateEventRapidsUDF extends Logging { None, None, List(eventPath), hadoopConf) val filteredLogs = eventLogFsFiltered + // uiEnabled = false val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, - timeout, nThreads, order, pluginTypeChecker, false, false, false, - true, false, 100, false, true) + timeout, nThreads, order, pluginTypeChecker = pluginTypeChecker, reportReadSchema = false, + printStdout = false, enablePB = true, reportSqlLevel = false, maxSQLDescLength = 100, mlOpsEnabled = false) try { val res = qual.qualifyApps(filteredLogs) (0, res) From 1e12968b65d639cb6ba4ef460063a62d023d1758 Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Sat, 14 Sep 2024 19:36:28 +0800 Subject: [PATCH 04/17] new version for rapids --- .../rapids/tool/ToolTextFileWriter.scala | 1 + .../tool/udf/EstimateEventRapids2MapUDF.scala | 2 + .../tool/udf/EstimateEventRapidsUDF.scala | 85 ++++++++++++++----- .../tool/profiling/ApplicationInfo.scala | 2 +- 4 files changed, 69 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala index 87701419a..30beaa9b2 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala @@ -44,6 +44,7 @@ class ToolTextFileWriter( // this overwrites existing path private var utf8Writer: Option[BufferedWriter] = { try { + // pass hadoopConf to None Some(FSUtils.getUTF8BufferedWriter(textOutputLoc, hadoopConf)) } catch { case NonFatal(e) => diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala index 4986cb5a0..b7fd0cdc3 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala @@ -32,6 +32,7 @@ class EstimateEventRapids2MapUDF extends UDF with Logging { val estimateOutput = EstimateEventRapidsUDF.estimateLog(outputDir, applicationId, eventDir, scoreFile) val firstQualificationSummaryInfo = estimateOutput._2.headOption + val predictScore = estimateOutput._3 firstQualificationSummaryInfo.map { q => val javaMap: util.HashMap[String, String] = new util.HashMap[String, String]() javaMap.put("appName", q.appName) @@ -41,6 +42,7 @@ class EstimateEventRapids2MapUDF extends UDF with Logging { javaMap.put("LongestSQLDuration", q.longestSqlDuration.toString) javaMap.put("UnsupportedExecs", q.unSupportedExecs) javaMap.put("UnsupportedExpressions", q.unSupportedExprs) + javaMap.put("PredictSpeedup", predictScore) javaMap }.getOrElse(new util.HashMap[String, String]()) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala index d4f30ad72..bbb880489 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala @@ -16,17 +16,19 @@ package com.nvidia.spark.rapids.tool.udf +import java.io.{BufferedReader, InputStreamReader} + import scala.util.control.NonFatal import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory} import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps import com.nvidia.spark.rapids.tool.qualification.{PluginTypeChecker, Qualification} -import com.nvidia.spark.rapids.tool.qualification.QualificationMain.logError + import org.apache.hadoop.hive.ql.exec.{Description, UDF} import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.tool.qualification.QualificationSummaryInfo -import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil +import org.apache.spark.sql.rapids.tool.util.{RapidsToolsConfUtil, UTF8Source} @Description(name = "estimate_event_rapids", value = "_FUNC_(output_path, event_path) ", @@ -34,7 +36,7 @@ import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil class EstimateEventRapidsUDF extends UDF with Logging { def evaluate(outputDir: String, applicationId: String, eventDir: String, scoreFile: String = "onprem"): Int = { - val (exitCode, _) = + val (exitCode, _, _) = EstimateEventRapidsUDF.estimateLog(outputDir, applicationId, eventDir, scoreFile) exitCode } @@ -42,7 +44,8 @@ class EstimateEventRapidsUDF extends UDF with Logging { object EstimateEventRapidsUDF extends Logging { def estimateLog(outputDir: String, applicationId: String, eventDir: String, - scoreFile: String = "onprem"): (Int, Seq[QualificationSummaryInfo]) = { + scoreFile: String = "onprem", + enabledML: Boolean = false): (Int, Seq[QualificationSummaryInfo], String) = { val eventPath = eventDir + "/" + applicationId + "_1" val outputDirectory = outputDir + "/" + applicationId val numOutputRows = 1000 @@ -51,13 +54,6 @@ object EstimateEventRapidsUDF extends Logging { val timeout = Option(1200L) val nThreads = 1 val order = "desc" - // val pluginTypeChecker = try { - // new PluginTypeChecker("onprem", None) - // } catch { - // case ie: IllegalStateException => - // logError("Error creating the plugin type checker!", ie) - // return (1, Seq[QualificationSummaryInfo]()) - // } val platform = try { val clusterPropsOpt = loadClusterProps("") @@ -65,7 +61,7 @@ object EstimateEventRapidsUDF extends Logging { } catch { case NonFatal(e) => logError("Error creating the platform", e) - return (1, Seq[QualificationSummaryInfo]()) + return (1, Seq[QualificationSummaryInfo](), "") } val pluginTypeChecker = try { new PluginTypeChecker( @@ -74,25 +70,74 @@ object EstimateEventRapidsUDF extends Logging { } catch { case ie: IllegalStateException => logError("Error creating the plugin type checker!", ie) - return (1, Seq[QualificationSummaryInfo]()) + return (1, Seq[QualificationSummaryInfo](), "") } - val (eventLogFsFiltered, allEventLogs) = EventLogPathProcessor.processAllPaths( + val (eventLogFsFiltered, _) = EventLogPathProcessor.processAllPaths( None, None, List(eventPath), hadoopConf) val filteredLogs = eventLogFsFiltered // uiEnabled = false - val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, - timeout, nThreads, order, pluginTypeChecker = pluginTypeChecker, reportReadSchema = false, - printStdout = false, enablePB = true, reportSqlLevel = false, maxSQLDescLength = 100, mlOpsEnabled = false) + val qual = new Qualification( + outputPath=outputDirectory, + numRows=numOutputRows, + hadoopConf=hadoopConf, + timeout= timeout, + nThreads=nThreads, + order=order, + pluginTypeChecker=pluginTypeChecker, + reportReadSchema=false, + printStdout=false, + enablePB=true, + reportSqlLevel=false, + maxSQLDescLength=100, + mlOpsEnabled=false, + penalizeTransitions=true, + tunerContext=None, + clusterReport = false + ) try { - val res = qual.qualifyApps(filteredLogs) - (0, res) + val res: Seq[QualificationSummaryInfo] = qual.qualifyApps(filteredLogs) + val predictScore = if (enabledML && res.nonEmpty) { + // 传python + // outputDirectory + execMLPredict(outputDirectory) + } else "" + (0, res, predictScore) } catch { case NonFatal(e) => logError(s"Error when analyze ${applicationId}, path is ${eventPath}.") e.printStackTrace() - (1, Seq[QualificationSummaryInfo]()) + (1, Seq[QualificationSummaryInfo](), "") + } + } + + private def execMLPredict(outputDirectory: String): String = { + var proc: Process = null + val mlOutput = "ml_qualx_output" + val command = "spark_rapids prediction --platform onprem " + + s"--qual_output $outputDirectory" + + "--output_folder " + + s"./$mlOutput" + proc = Runtime.getRuntime.exec(command) + val in = new BufferedReader(new InputStreamReader(proc.getInputStream)) + Iterator.continually(in.readLine()).takeWhile(_ != null).foreach(println) + in.close() + proc.waitFor + + // read ml output + var predictScore = "0.0" + val filePath = s"./$mlOutput" + val source = UTF8Source.fromFile(filePath) + try { + for (line <- source.getLines()) { + if (line.startsWith("Overall estimated speedup")) { + predictScore = line.split("\\s+").last + } + } + } finally { + source.close() } + predictScore } } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala index 468838243..bf268bfab 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala @@ -37,7 +37,7 @@ class SparkPlanInfoWithStage( metrics: Seq[SQLMetricInfo], val stageId: Option[Int], planId: Int = 0) extends SparkPlanInfo(nodeName, simpleString, children, - metadata, metrics) { + metadata, metrics, planId) { import SparkPlanInfoWithStage._ From cee6004ca6f11a051ad60340a2bbe08f20135d44 Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Sat, 14 Sep 2024 19:50:06 +0800 Subject: [PATCH 05/17] use local output for ml --- .../scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala index 30beaa9b2..4710402a0 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala @@ -45,7 +45,7 @@ class ToolTextFileWriter( private var utf8Writer: Option[BufferedWriter] = { try { // pass hadoopConf to None - Some(FSUtils.getUTF8BufferedWriter(textOutputLoc, hadoopConf)) + Some(FSUtils.getUTF8BufferedWriter(textOutputLoc, None)) } catch { case NonFatal(e) => logError(s"Failed to open output path [$textOutputLoc] for writing", e) From 096f5f63abbafe0e00e2d3ca627fc91f742790d5 Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Sat, 14 Sep 2024 19:53:52 +0800 Subject: [PATCH 06/17] use local output for ml --- .../spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala index b7fd0cdc3..88c0d5715 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapUDF.scala @@ -23,14 +23,14 @@ import org.apache.hadoop.hive.ql.exec.{Description, UDF} import org.apache.spark.internal.Logging @Description(name = "estimate_event_rapids_2_map", - value = "_FUNC_(output_path, event_path) ", + value = "_FUNC_(output_path, event_path, score_file, enabledML) ", extended = "Example:\n > SELECT estimate_event_rapids(hdfs://..., app_1, hdfs://...)" + " - Returns QualificationSummaryInfo map.") class EstimateEventRapids2MapUDF extends UDF with Logging { def evaluate(outputDir: String, applicationId: String, eventDir: String, - scoreFile: String = "onprem"): java.util.Map[String, String] = { + scoreFile: String = "onprem", enabledML: Boolean = false): java.util.Map[String, String] = { val estimateOutput = - EstimateEventRapidsUDF.estimateLog(outputDir, applicationId, eventDir, scoreFile) + EstimateEventRapidsUDF.estimateLog(outputDir, applicationId, eventDir, scoreFile, enabledML) val firstQualificationSummaryInfo = estimateOutput._2.headOption val predictScore = estimateOutput._3 firstQualificationSummaryInfo.map { q => From 83341b2351bad181975fd65757937c3f6dba860a Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Sat, 14 Sep 2024 20:12:02 +0800 Subject: [PATCH 07/17] try to fix loadClusterProps --- .../nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala index bbb880489..3a6c788d7 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala @@ -56,7 +56,7 @@ object EstimateEventRapidsUDF extends Logging { val order = "desc" val platform = try { - val clusterPropsOpt = loadClusterProps("") + val clusterPropsOpt = loadClusterProps("./worker_info.yaml") PlatformFactory.createInstance("", clusterPropsOpt) } catch { case NonFatal(e) => From a52f14d2a7f0b99769527874942f6bfb2f8f5243 Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Sat, 14 Sep 2024 20:51:12 +0800 Subject: [PATCH 08/17] find prediction file --- .../spark/rapids/tool/udf/EstimateEventRapidsUDF.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala index 3a6c788d7..5ef2714bd 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala @@ -16,14 +16,13 @@ package com.nvidia.spark.rapids.tool.udf -import java.io.{BufferedReader, InputStreamReader} +import java.io.{BufferedReader, File, InputStreamReader} import scala.util.control.NonFatal import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, PlatformFactory} import com.nvidia.spark.rapids.tool.profiling.AutoTuner.loadClusterProps import com.nvidia.spark.rapids.tool.qualification.{PluginTypeChecker, Qualification} - import org.apache.hadoop.hive.ql.exec.{Description, UDF} import org.apache.spark.internal.Logging @@ -125,9 +124,14 @@ object EstimateEventRapidsUDF extends Logging { in.close() proc.waitFor + val dir = new File(".") + // 过滤出以"predict"开头的文件 + val predictFiles = dir.listFiles.find(_.getName.startsWith("prediction")) + assert(predictFiles.nonEmpty, "can not find prediction file") + // read ml output var predictScore = "0.0" - val filePath = s"./$mlOutput" + val filePath = s"./$mlOutput/${predictFiles.head}" val source = UTF8Source.fromFile(filePath) try { for (line <- source.getLines()) { From 49fcc56ce66d1993f6a5cc20fa2ace551674dad9 Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Sat, 14 Sep 2024 21:14:09 +0800 Subject: [PATCH 09/17] fix local dir --- .../spark/rapids/tool/udf/EstimateEventRapidsUDF.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala index 5ef2714bd..600bf661a 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala @@ -46,7 +46,9 @@ object EstimateEventRapidsUDF extends Logging { scoreFile: String = "onprem", enabledML: Boolean = false): (Int, Seq[QualificationSummaryInfo], String) = { val eventPath = eventDir + "/" + applicationId + "_1" - val outputDirectory = outputDir + "/" + applicationId + val outputDirectory = if (enabledML) { + s"tmp_$applicationId" + } else outputDir + "/" + applicationId val numOutputRows = 1000 val hadoopConf = RapidsToolsConfUtil.newHadoopConf() // timeout: 20min @@ -100,7 +102,7 @@ object EstimateEventRapidsUDF extends Logging { val predictScore = if (enabledML && res.nonEmpty) { // 传python // outputDirectory - execMLPredict(outputDirectory) + execMLPredict(applicationId, outputDirectory) } else "" (0, res, predictScore) } catch { @@ -111,9 +113,9 @@ object EstimateEventRapidsUDF extends Logging { } } - private def execMLPredict(outputDirectory: String): String = { + private def execMLPredict(applicationId: String, outputDirectory: String): String = { var proc: Process = null - val mlOutput = "ml_qualx_output" + val mlOutput = s"${applicationId}_pre/ml_qualx_output" val command = "spark_rapids prediction --platform onprem " + s"--qual_output $outputDirectory" + "--output_folder " + From 3347da9ae2ceff3d4d3471b973b39df93e4a495b Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Wed, 18 Sep 2024 12:32:38 +0800 Subject: [PATCH 10/17] outputDirectory for tools should be hdfs --- .../com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala | 3 +-- .../spark/rapids/tool/udf/EstimateEventRapidsUDF.scala | 7 ++----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala index 4710402a0..87701419a 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/ToolTextFileWriter.scala @@ -44,8 +44,7 @@ class ToolTextFileWriter( // this overwrites existing path private var utf8Writer: Option[BufferedWriter] = { try { - // pass hadoopConf to None - Some(FSUtils.getUTF8BufferedWriter(textOutputLoc, None)) + Some(FSUtils.getUTF8BufferedWriter(textOutputLoc, hadoopConf)) } catch { case NonFatal(e) => logError(s"Failed to open output path [$textOutputLoc] for writing", e) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala index 600bf661a..6517a7de8 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala @@ -46,9 +46,7 @@ object EstimateEventRapidsUDF extends Logging { scoreFile: String = "onprem", enabledML: Boolean = false): (Int, Seq[QualificationSummaryInfo], String) = { val eventPath = eventDir + "/" + applicationId + "_1" - val outputDirectory = if (enabledML) { - s"tmp_$applicationId" - } else outputDir + "/" + applicationId + val outputDirectory = outputDir + "/" + applicationId val numOutputRows = 1000 val hadoopConf = RapidsToolsConfUtil.newHadoopConf() // timeout: 20min @@ -107,8 +105,7 @@ object EstimateEventRapidsUDF extends Logging { (0, res, predictScore) } catch { case NonFatal(e) => - logError(s"Error when analyze ${applicationId}, path is ${eventPath}.") - e.printStackTrace() + logError(s"Error when analyze ${applicationId}, path is ${eventPath}.", e) (1, Seq[QualificationSummaryInfo](), "") } } From 5cc3ce231993fe2cf67fb11b4f800198c531c35b Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Wed, 18 Sep 2024 12:34:50 +0800 Subject: [PATCH 11/17] outputDirectory for tools should be hdfs --- .../spark/rapids/tool/udf/EstimateEventRapidsUDF.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala index 6517a7de8..d2da698c7 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala @@ -112,7 +112,8 @@ object EstimateEventRapidsUDF extends Logging { private def execMLPredict(applicationId: String, outputDirectory: String): String = { var proc: Process = null - val mlOutput = s"${applicationId}_pre/ml_qualx_output" + val mlOutputPrefix = s"${applicationId}_pre" + val mlOutput = s"$mlOutputPrefix/ml_qualx_output" val command = "spark_rapids prediction --platform onprem " + s"--qual_output $outputDirectory" + "--output_folder " + @@ -123,7 +124,7 @@ object EstimateEventRapidsUDF extends Logging { in.close() proc.waitFor - val dir = new File(".") + val dir = new File(s"./$mlOutputPrefix") // 过滤出以"predict"开头的文件 val predictFiles = dir.listFiles.find(_.getName.startsWith("prediction")) assert(predictFiles.nonEmpty, "can not find prediction file") From 22152dc266db83c06dcea0643b4c20c8c8c974d1 Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Wed, 18 Sep 2024 14:09:33 +0800 Subject: [PATCH 12/17] outputDirectory for tools should be hdfs --- .../nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala index d2da698c7..12af9a7cd 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala @@ -124,7 +124,7 @@ object EstimateEventRapidsUDF extends Logging { in.close() proc.waitFor - val dir = new File(s"./$mlOutputPrefix") + val dir = new File(s"./$mlOutput") // 过滤出以"predict"开头的文件 val predictFiles = dir.listFiles.find(_.getName.startsWith("prediction")) assert(predictFiles.nonEmpty, "can not find prediction file") From 9ab61afe5da8d1c1dae2511d8f00c990a23217e9 Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Wed, 18 Sep 2024 15:14:58 +0800 Subject: [PATCH 13/17] refact output dir --- .../nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala index 12af9a7cd..85dffe481 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapidsUDF.scala @@ -113,7 +113,7 @@ object EstimateEventRapidsUDF extends Logging { private def execMLPredict(applicationId: String, outputDirectory: String): String = { var proc: Process = null val mlOutputPrefix = s"${applicationId}_pre" - val mlOutput = s"$mlOutputPrefix/ml_qualx_output" + val mlOutput = s"ml_qualx_output/$mlOutputPrefix" val command = "spark_rapids prediction --platform onprem " + s"--qual_output $outputDirectory" + "--output_folder " + @@ -137,6 +137,7 @@ object EstimateEventRapidsUDF extends Logging { for (line <- source.getLines()) { if (line.startsWith("Overall estimated speedup")) { predictScore = line.split("\\s+").last + logInfo(s"predict score for $applicationId is $predictScore") } } } finally { From b1131daecff2d8c389a12981957ac0314c5803c8 Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Fri, 20 Sep 2024 19:05:11 +0800 Subject: [PATCH 14/17] Support ml --- .../udf/EstimateEventRapids2MapMLUDF.scala | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala new file mode 100644 index 000000000..450aa2abc --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.udf + +import java.io.{BufferedReader, File, InputStreamReader} + +import com.nvidia.spark.rapids.tool.udf.EstimateEventRapidsUDF.logInfo +import java.util +import org.apache.hadoop.hive.ql.exec.UDF + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.rapids.tool.util.UTF8Source + +class EstimateEventRapids2MapMLUDF extends UDF with Logging { + + private def runProc(command: String): Unit = { + val proc = Runtime.getRuntime.exec(command) + val in = new BufferedReader(new InputStreamReader(proc.getInputStream)) + Iterator.continually(in.readLine()).takeWhile(_ != null).foreach(println) + in.close() + proc.waitFor + } + + private def readMlOutput(applicationId: String, mlOutputFolder: String): String = { + val dir = new File(s"$mlOutputFolder") + val predictFiles = dir.listFiles.find(_.getName.startsWith("prediction")) + var predictScore = "0.0" + if (predictFiles.nonEmpty) { + val filePath = s"$mlOutputFolder/${predictFiles.head}" + val source = UTF8Source.fromFile(filePath) + try { + for (line <- source.getLines()) { + if (line.startsWith("Overall estimated speedup")) { + predictScore = line.split("\\s+").last + logInfo(s"predict score for $applicationId is $predictScore") + } + } + } finally { + source.close() + } + } else { + logError(s"Can not find prediction file in $mlOutputFolder") + } + predictScore + } + + def evaluate(applicationId: String, eventDir: String): java.util.Map[String, String] = { + val eventPath = eventDir + "/" + applicationId + "_1" + val outputPrefix = "/opt/tiger/yodel/container/rapids_tools" + val outputFolder = s"file://$outputPrefix/$applicationId" + val command1 = s"spark_rapids qualification --platform onprem --eventlogs $eventPath " + + s"--output_folder $outputFolder" + logInfo(s"app $applicationId: $command1") + runProc(command1) + + val mlOutputFolder = s"$outputPrefix/${applicationId}_ml" + val command2 = "spark_rapids prediction --platform onprem " + + s"--qual_output ${outputFolder.substring(7)}" + + s"--output_folder $mlOutputFolder" + runProc(command2) + + val score = readMlOutput(applicationId, mlOutputFolder) + val javaMap: util.HashMap[String, String] = new util.HashMap[String, String]() + logInfo(s"applicationId ml predict score is $score") + javaMap.put("rapids_ml_score", score) + javaMap + } +} From 6349290f820cbbfb47a9827a6ff3a78a5155ba47 Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Fri, 20 Sep 2024 19:06:22 +0800 Subject: [PATCH 15/17] Support ml --- .../spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala index 450aa2abc..261a56119 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala @@ -17,9 +17,8 @@ package com.nvidia.spark.rapids.tool.udf import java.io.{BufferedReader, File, InputStreamReader} - -import com.nvidia.spark.rapids.tool.udf.EstimateEventRapidsUDF.logInfo import java.util + import org.apache.hadoop.hive.ql.exec.UDF import org.apache.spark.internal.Logging From 83d3273fd11895abdd177313683e261d5eea67fd Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Fri, 20 Sep 2024 19:08:38 +0800 Subject: [PATCH 16/17] Support ml --- .../rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala index 261a56119..1b17b279b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala @@ -19,11 +19,15 @@ package com.nvidia.spark.rapids.tool.udf import java.io.{BufferedReader, File, InputStreamReader} import java.util -import org.apache.hadoop.hive.ql.exec.UDF +import org.apache.hadoop.hive.ql.exec.{Description, UDF} import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.tool.util.UTF8Source +@Description(name = "estimate_event_rapids_ml", + value = "_FUNC_(applicationId, event_path) ", + extended = "Example:\n > SELECT estimate_event_rapids(app_1, hdfs://...)" + + " - Returns rapids-tool ml score.") class EstimateEventRapids2MapMLUDF extends UDF with Logging { private def runProc(command: String): Unit = { From 24126311e62b30137005fcf14d804a0b8cad0a10 Mon Sep 17 00:00:00 2001 From: hezhengjie Date: Tue, 24 Sep 2024 21:44:08 +0800 Subject: [PATCH 17/17] print log --- .../rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala index 1b17b279b..cfb8298d5 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/udf/EstimateEventRapids2MapMLUDF.scala @@ -30,7 +30,8 @@ import org.apache.spark.sql.rapids.tool.util.UTF8Source " - Returns rapids-tool ml score.") class EstimateEventRapids2MapMLUDF extends UDF with Logging { - private def runProc(command: String): Unit = { + private def runProc(applicationId: String, command: String): Unit = { + logInfo(s"Running rapids-tools for $applicationId: $command") val proc = Runtime.getRuntime.exec(command) val in = new BufferedReader(new InputStreamReader(proc.getInputStream)) Iterator.continually(in.readLine()).takeWhile(_ != null).foreach(println) @@ -67,14 +68,13 @@ class EstimateEventRapids2MapMLUDF extends UDF with Logging { val outputFolder = s"file://$outputPrefix/$applicationId" val command1 = s"spark_rapids qualification --platform onprem --eventlogs $eventPath " + s"--output_folder $outputFolder" - logInfo(s"app $applicationId: $command1") - runProc(command1) + runProc(applicationId, command1) val mlOutputFolder = s"$outputPrefix/${applicationId}_ml" val command2 = "spark_rapids prediction --platform onprem " + s"--qual_output ${outputFolder.substring(7)}" + s"--output_folder $mlOutputFolder" - runProc(command2) + runProc(applicationId, command2) val score = readMlOutput(applicationId, mlOutputFolder) val javaMap: util.HashMap[String, String] = new util.HashMap[String, String]()