diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala index 20ec8cbb9..072a0ff8e 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala @@ -95,13 +95,20 @@ class SparkRestClient(sparkConf: SparkConf) { getLogData(attemptTarget) } } else Future.successful(None) + val futureFailedTasks = if (fetchFailedTasks) { + Future { + getStagesWithFailedTasks(attemptTarget) + } + } else { + Future.successful(Seq.empty) + } SparkRestDerivedData( applicationInfo, Await.result(futureJobDatas, DEFAULT_TIMEOUT), Await.result(futureStageDatas, DEFAULT_TIMEOUT), Await.result(futureExecutorSummaries, Duration(5, SECONDS)), - Seq.empty, + Await.result(futureFailedTasks, DEFAULT_TIMEOUT), Await.result(futureLogData, Duration(5, SECONDS)) ) @@ -211,7 +218,7 @@ class SparkRestClient(sparkConf: SparkConf) { } private def getStageDatas(attemptTarget: WebTarget): Seq[StageDataImpl] = { - val target = attemptTarget.path("stages") + val target = attemptTarget.path("stages/withSummaries") try { get(target, SparkRestObjectMapper.readValue[Seq[StageDataImpl]]) } catch { diff --git a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala index 808a3f4b0..dfd20e43e 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala @@ -143,11 +143,17 @@ trait StageData{ def status: StageStatus def stageId: Int def attemptId: Int + def numTasks: Int def numActiveTasks: Int def numCompleteTasks: Int def numFailedTasks: Int def executorRunTime: Long + def executorCpuTime: Long + def submissionTime: Option[Date] + def firstTaskLaunchedTime: Option[Date] + def completionTime: Option[Date] + def failureReason: Option[String] def inputBytes: Long def inputRecords: Long @@ -166,7 +172,14 @@ trait StageData{ def accumulatorUpdates: Seq[AccumulableInfo] def tasks: Option[Map[Long, TaskDataImpl]] - def executorSummary: Option[Map[String, ExecutorStageSummary]]} + def executorSummary: Option[Map[String, ExecutorStageSummary]] + + def peakJvmUsedMemory: Option[Long] + def peakExecutionMemory: Option[Long] + def peakStorageMemory: Option[Long] + def peakUnifiedMemory: Option[Long] + def taskSummary : Option[TaskMetricDistributions] + def executorMetricsSummary : Option[ExecutorMetricDistributions]} trait TaskData{ def taskId: Long @@ -219,10 +232,15 @@ trait TaskMetricDistributions{ def quantiles: IndexedSeq[Double] def executorDeserializeTime: IndexedSeq[Double] + def executorDeserializeCpuTime: IndexedSeq[Double] def executorRunTime: IndexedSeq[Double] + def executorCpuTime: IndexedSeq[Double] def resultSize: IndexedSeq[Double] def jvmGcTime: IndexedSeq[Double] def resultSerializationTime: IndexedSeq[Double] + def gettingResultTime: IndexedSeq[Double] + def schedulerDelay: IndexedSeq[Double] + def peakExecutionMemory: IndexedSeq[Double] def memoryBytesSpilled: IndexedSeq[Double] def diskBytesSpilled: IndexedSeq[Double] @@ -246,6 +264,7 @@ trait ShuffleReadMetricDistributions{ def localBlocksFetched: IndexedSeq[Double] def fetchWaitTime: IndexedSeq[Double] def remoteBytesRead: IndexedSeq[Double] + def remoteBytesReadToDisk: IndexedSeq[Double] def totalBlocksFetched: IndexedSeq[Double]} trait ShuffleWriteMetricDistributions{ @@ -253,6 +272,25 @@ trait ShuffleWriteMetricDistributions{ def writeRecords: IndexedSeq[Double] def writeTime: IndexedSeq[Double]} +trait ExecutorMetricDistributions{ + def quantiles: IndexedSeq[Double] + def numTasks: IndexedSeq[Double] + def inputBytes : IndexedSeq[Double] + def inputRecords : IndexedSeq[Double] + def outputBytes : IndexedSeq[Double] + def outputRecords : IndexedSeq[Double] + def shuffleRead : IndexedSeq[Double] + def shuffleReadRecords : IndexedSeq[Double] + def shuffleWrite : IndexedSeq[Double] + def shuffleWriteRecords : IndexedSeq[Double] + def memoryBytesSpilled : IndexedSeq[Double] + def diskBytesSpilled : IndexedSeq[Double] + def peakJvmUsedMemory : IndexedSeq[Double] + def peakExecutionMemory : IndexedSeq[Double] + def peakStorageMemory : IndexedSeq[Double] + def peakUnifiedMemory : IndexedSeq[Double]} + + trait AccumulableInfo{ def id: Long def name: String @@ -353,11 +391,17 @@ class StageDataImpl( var status: StageStatus, var stageId: Int, var attemptId: Int, + var numTasks: Int, var numActiveTasks: Int , var numCompleteTasks: Int, var numFailedTasks: Int, var executorRunTime: Long, + var executorCpuTime: Long, + var submissionTime: Option[Date], + var firstTaskLaunchedTime: Option[Date], + var completionTime: Option[Date], + var failureReason: Option[String], var inputBytes: Long, var inputRecords: Long, @@ -376,7 +420,13 @@ class StageDataImpl( var accumulatorUpdates: Seq[AccumulableInfoImpl], var tasks: Option[Map[Long, TaskDataImpl]], - var executorSummary: Option[Map[String, ExecutorStageSummaryImpl]]) extends StageData + var executorSummary: Option[Map[String, ExecutorStageSummaryImpl]], + var peakJvmUsedMemory: Option[Long], + var peakExecutionMemory: Option[Long], + var peakStorageMemory: Option[Long], + var peakUnifiedMemory: Option[Long], + var taskSummary : Option[TaskMetricDistributionsImpl], + var executorMetricsSummary : Option[ExecutorMetricDistributionsImpl]) extends StageData class TaskDataImpl( var taskId: Long, @@ -427,12 +477,16 @@ class ShuffleWriteMetricsImpl( class TaskMetricDistributionsImpl( var quantiles: IndexedSeq[Double], - var executorDeserializeTime: IndexedSeq[Double], + var executorDeserializeCpuTime: IndexedSeq[Double], var executorRunTime: IndexedSeq[Double], + var executorCpuTime: IndexedSeq[Double], var resultSize: IndexedSeq[Double], var jvmGcTime: IndexedSeq[Double], var resultSerializationTime: IndexedSeq[Double], + var gettingResultTime: IndexedSeq[Double], + var schedulerDelay: IndexedSeq[Double], + var peakExecutionMemory: IndexedSeq[Double], var memoryBytesSpilled: IndexedSeq[Double], var diskBytesSpilled: IndexedSeq[Double], @@ -456,6 +510,7 @@ class ShuffleReadMetricDistributionsImpl( var localBlocksFetched: IndexedSeq[Double], var fetchWaitTime: IndexedSeq[Double], var remoteBytesRead: IndexedSeq[Double], + var remoteBytesReadToDisk: IndexedSeq[Double], var totalBlocksFetched: IndexedSeq[Double]) extends ShuffleReadMetricDistributions class ShuffleWriteMetricDistributionsImpl( @@ -468,3 +523,21 @@ class AccumulableInfoImpl( var name: String, var update: Option[String], var value: String) extends AccumulableInfo + +class ExecutorMetricDistributionsImpl( + var quantiles: IndexedSeq[Double], + var numTasks: IndexedSeq[Double], + var inputBytes : IndexedSeq[Double], + var inputRecords : IndexedSeq[Double], + var outputBytes : IndexedSeq[Double], + var outputRecords : IndexedSeq[Double], + var shuffleRead : IndexedSeq[Double], + var shuffleReadRecords : IndexedSeq[Double], + var shuffleWrite : IndexedSeq[Double], + var shuffleWriteRecords : IndexedSeq[Double], + var memoryBytesSpilled : IndexedSeq[Double], + var diskBytesSpilled : IndexedSeq[Double], + var peakJvmUsedMemory : IndexedSeq[Double], + var peakExecutionMemory : IndexedSeq[Double], + var peakStorageMemory : IndexedSeq[Double], + var peakUnifiedMemory : IndexedSeq[Double]) extends ExecutorMetricDistributions diff --git a/app/com/linkedin/drelephant/spark/heuristics/ConfigurationUtils.scala b/app/com/linkedin/drelephant/spark/heuristics/ConfigurationUtils.scala new file mode 100644 index 000000000..75ef7493f --- /dev/null +++ b/app/com/linkedin/drelephant/spark/heuristics/ConfigurationUtils.scala @@ -0,0 +1,128 @@ +package com.linkedin.drelephant.spark.heuristics + +import com.linkedin.drelephant.analysis.SeverityThresholds + +object ConfigurationUtils { + val JVM_USED_MEMORY = "jvmUsedMemory" + + // Spark configuration parameters + val SPARK_EXECUTOR_MEMORY = "spark.executor.memory" + val SPARK_DRIVER_MEMORY = "spark.driver.memory" + val SPARK_EXECUTOR_MEMORY_OVERHEAD = "spark.yarn.executor.memoryOverhead" + val SPARK_DRIVER_MEMORY_OVERHEAD = "spark.yarn.driver.memoryOverhead" + val SPARK_EXECUTOR_CORES = "spark.executor.cores" + val SPARK_DRIVER_CORES = "spark.driver.cores" + val SPARK_EXECUTOR_INSTANCES = "spark.executor.instances" + val SPARK_SQL_SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" + val SPARK_MEMORY_FRACTION = "spark.memory.fraction" + + // Spark default configuration values + val SPARK_EXECUTOR_MEMORY_DEFAULT = "1g" + val SPARK_DRIVER_MEMORY_DEFAULT = "1g" + val SPARK_EXECUTOR_CORES_DEFAULT = 1 + val SPARK_DRIVER_CORES_DEFAULT = 1 + val SPARK_SQL_SHUFFLE_PARTITIONS_DEFAULT = 200 + val SPARK_MEMORY_FRACTION_DEFAULT = 0.6 + + // if the overhead memory is not explicitly specified by the user, the default amount is + // max(0.1 * spark.executor.memory, 384MB) + val SPARK_MEMORY_OVERHEAD_PCT_DEFAULT = 0.1 + + // the minimum amount of overhead memory + val SPARK_MEMORY_OVERHEAD_MIN_DEFAULT = 384L << 20 // 384MB + + // the amount of Spark reserved memory (300MB) + val SPARK_RESERVED_MEMORY = 300L << 20 + + // number of milliseconds in a minute + val MILLIS_PER_MIN = 1000D * 60.0D + + // the index for the median value for executor and task metrics distributions + val DISTRIBUTION_MEDIAN_IDX = 2 + + // the index for the max value for executor and task metrics distributions + val DISTRIBUTION_MAX_IDX = 4 + + // keys for finding Dr. Elephant configuration parameter values + val SPARK_STAGE_EXECUTION_MEMORY_SPILL_THRESHOLD_KEY = "spark_stage_execution_memory_spill_threshold" + val SPARK_STAGE_TASK_SKEW_THRESHOLD_KEY = "spark_stage_task_skew_threshold" + val SPARK_STAGE_TASK_DURATION_THRESHOLD_KEY = "spark_stage_task_duration_threshold" + val SPARK_STAGE_MAX_DATA_PROCESSED_THRESHOLD_KEY = "spark_stage_task_duration_threshold" + val TASK_FAILURE_RATE_SEVERITY_THRESHOLDS_KEY = "stage_task_failure_rate_severity_threshold" + val MAX_DATA_PROCESSED_THRESHOLD_KEY = "execution_memory_spill_max_data_threshold" + val LONG_TASK_TO_STAGE_DURATION_RATIO_KEY = "task_skew_task_to_stage_duration_ratio" + val TASK_SKEW_TASK_DURATION_MIN_THRESHOLD_KEY = "task_skew_task_duration_threshold" + val MAX_RECOMMENDED_PARTITIONS_KEY = "max_recommended_partitions" + + // keys for finding specific recommendations + val EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION_KEY = "execution_memory_spill_large_data_recommendation" + val TASK_SKEW_INPUT_DATA_RECOMMENDATION_KEY = "task_skew_input_data_recommendation" + val TASK_SKEW_GENERIC_RECOMMENDATION_KEY = "task_skew_generic_recommendation" + val LONG_TASKS_LARGE_DATA_RECOMMENDATION_KEY = "long_tasks_large_data_recommendation" + val SLOW_TASKS_RECOMMENDATION_KEY = "slow_tasks_recommendation" + val LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION_KEY = "long tasks_few_partitions" + val LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION_KEY = "long tasks_few_input_partitions" + + // default recommendations + val DEFAULT_EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION = "a large amount of data is being processesd. " + + "Examine the application to see if this can be reduced" + val DEFAULT_TASK_SKEW_INPUT_DATA_RECOMMENDATION = "please try to modify the application to make the input partitions more even" + val DEFAULT_TASK_SKEW_GENERIC_RECOMMENDATION = "please try to modify the application to make the partitions more even" + val DEFAULT_LONG_TASKS_LARGE_DATA_RECOMMENDATION = "please try to reduce the amount of data being processed" + val DEFAULT_SLOW_TASKS_RECOMMENDATION = "please optimize the code to improve performance" + val DEFAULT_LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION = "please increase the number of partitions" + val DEFAULT_LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION = "please increase the number of partitions for reading data" + + + // Severity thresholds for task duration in minutes, when checking to see if the median task + // run time is too long for a stage. + val DEFAULT_TASK_DURATION_THRESHOLDS = + SeverityThresholds(low = 2.5D * MILLIS_PER_MIN, moderate = 5.0D * MILLIS_PER_MIN, + severe = 10.0D * MILLIS_PER_MIN, critical = 15.0D * MILLIS_PER_MIN, ascending = true) + + // Severity thresholds for checking task skew, ratio of maximum to median task run times. + val DEFAULT_TASK_SKEW_THRESHOLDS = + SeverityThresholds(low = 2, moderate = 4, severe = 8, critical = 16, ascending = true) + + // Severity thresholds for checking execution memory spill, ratio of execution spill compared + // to the maximum amount of data (input, output, shuffle read, or shuffle write) processed. + val DEFAULT_EXECUTION_MEMORY_SPILL_THRESHOLDS = + SeverityThresholds(low = 0.01D, moderate = 0.1D, severe = 0.25D, critical = 0.5D, ascending = true) + + // The ascending severity thresholds for the ratio of JVM GC time and task run time, + // checking if too much time is being spent in GC. + val DEFAULT_GC_SEVERITY_A_THRESHOLDS = + SeverityThresholds(low = 0.08D, moderate = 0.09D, severe = 0.1D, critical = 0.15D, ascending = true) + + /** The default severity thresholds for the rate of a stage's tasks failing. */ + val DEFAULT_TASK_FAILURE_RATE_SEVERITY_THRESHOLDS = + SeverityThresholds(low = 0.05D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true) + + // The default threshold (3TB) for checking for maximum amount of data processed, for which to + // alert for execution memory spill. Tasks processing more data would be expected to have some + // amount of spill, due to the large amount of data processed. + // Estimating the size based on some reasonable values for configuration parameters (and how + // much data could be kept in unified memory given these values): + // spark.executor.memory / spark.executor.cores * spark.memory.fraction * + // (1 - spark.memory.storageFraction) * spark.sql.shuffle.partitions + // = 5GB / 2 * 0.6 * (1 - 0.5) * 4000 + val DEFAULT_MAX_DATA_PROCESSED_THRESHOLD = "3TB" + + // The default threshold for the ratio of the time for longest running task for a stage to the + // stage duration. With Spark, some amount of task skew may be OK, since exectuors can process + // multiple tasks, so one executor could process multiple shorter tasks, while another executor + // processes a longer task. However, if the length of the long task is a large fraction of the + // stage duration, then it is likely contributing to the overall stage duration. + val DEFAULT_LONG_TASK_TO_STAGE_DURATION_RATIO = "0.75" + + // Some task skew is also tolerable if the tasks are short (2.5 minutes or less). + val DEFAULT_TASK_SKEW_TASK_DURATION_MIN_THRESHOLD = "150000" + + // The target task duration (2.5 minutes). This is the same as the idle executor timeout. + val DEFAULT_TARGET_TASK_DURATION = "150000" + + // The default maximum number of partitions that would be recommended. More partitions means + // less data per partition, so shorter tasks and less memory needed per task. However more + // partitions also inceases the amount of overhead for shuffle. + val DEFAULT_MAX_RECOMMENDED_PARTITIONS = "4000" +} \ No newline at end of file diff --git a/app/com/linkedin/drelephant/spark/heuristics/StageAnalysisResult.scala b/app/com/linkedin/drelephant/spark/heuristics/StageAnalysisResult.scala new file mode 100644 index 000000000..3a9f3f402 --- /dev/null +++ b/app/com/linkedin/drelephant/spark/heuristics/StageAnalysisResult.scala @@ -0,0 +1,78 @@ +package com.linkedin.drelephant.spark.heuristics + +import com.linkedin.drelephant.analysis.Severity + +/** Stage analysis result. */ +private[heuristics] sealed trait StageAnalysisResult { + + // the severity for the stage and heuristic evaluated + val severity: Severity + + // the heuristics score for the stage and heuristic evaluated + val score: Int + + // information, details and advice from the analysis + val details: Seq[String] +} + +/** Simple stage analysis result, with the severity, score, and details. */ +private[heuristics] case class SimpleStageAnalysisResult( + severity: Severity, + score: Int, + details: Seq[String]) extends StageAnalysisResult + +/** + * Stage analysis result for examining the stage for task skew. + * + * @param severity task skew severity. + * @param score heuristics score for task skew. + * @param details information and recommendations from analysis for task skew. + * @param rawSeverity severity based only on task skew, and not considering other thresholds + * (task duration or ratio of task duration to stage suration). + */ +private[heuristics] case class TaskSkewResult( + severity: Severity, + score: Int, + details: Seq[String], + rawSeverity: Severity) extends StageAnalysisResult + +/** + * Stage analysis result for examining the stage for execution memory spill. + * + * @param severity execution memory spill severity. + * @param score heuristics score for execution memory spill. + * @param details information and recommendations from analysis for execution memory spill. + * @param rawSeverity severity based only on execution memory spill, and not considering other + * thresholds (max amount of data processed for the stage). + * @param memoryBytesSpilled the total amount of execution memory bytes spilled for the stage. + * @param maxTaskBytesSpilled the maximum number of bytes spilled by a task. + */ +private[heuristics] case class ExecutionMemorySpillResult( + severity: Severity, + score: Int, + details: Seq[String], + rawSeverity: Severity, + memoryBytesSpilled: Long, + maxTaskBytesSpilled: Long) extends StageAnalysisResult + +/** + * Stage analysis result for examining the stage for task failures. + * + * @param severity task failure severity. + * @param score heuristic score for task failures. + * @param details information and recommendations from analysis for task failures. + * @param oomSeverity severity for task failures due to OutOfMemory errors. + * @param containerKilledSeverity severity for task failures due to container killed by YARN. + * @param numFailures number of task failures for the stage. + * @param numOOM number of tasks which failed to to OutOfMemory errors. + * @param numContainerKilled number of tasks which failed due to container killed by YARN. + */ +private[heuristics] case class TaskFailureResult( + severity: Severity, + score: Int, + details: Seq[String], + oomSeverity: Severity, + containerKilledSeverity: Severity, + numFailures: Int, + numOOM: Int, + numContainerKilled: Int) extends StageAnalysisResult diff --git a/app/com/linkedin/drelephant/spark/heuristics/StagesAnalyzer.scala b/app/com/linkedin/drelephant/spark/heuristics/StagesAnalyzer.scala new file mode 100644 index 000000000..e4d4a4a7d --- /dev/null +++ b/app/com/linkedin/drelephant/spark/heuristics/StagesAnalyzer.scala @@ -0,0 +1,532 @@ +package com.linkedin.drelephant.spark.heuristics + +import scala.collection.mutable.ArrayBuffer +import com.linkedin.drelephant.analysis.{Severity, SeverityThresholds} +import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData +import com.linkedin.drelephant.spark.data.SparkApplicationData +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{StageData, StageStatus, TaskDataImpl} +import com.linkedin.drelephant.util.{MemoryFormatUtils, Utils} + +/** + * Analysis results for a stage. + * + * @param stageId the stage ID. + * @param executionMemorySpillResult stage analysis result for examining the stage for execution + * memory spill. + * @param longTaskResult stage analysis result for examining the stage for long tasks. + * @param taskSkewResult stage analysis result for examining the stage for task skew. + * @param taskFailureResult stage analysis result for examining the stage for task failures. + * @param stageFailureResult stage analysis result for examining the stage for stage failure. + * @param stageGCResult stage analysis result for examining the stage for GC. + * @param numTasks number of tasks for the stage. + * @param medianRunTime median task run time. + * @param maxRunTime maximum task run time. + * @param stageDuration: wall clock time for the stage in ms. + * @param inputBytes: number of input bytes read + * @param outputBytes: number of output bytes written + * @param shuffleReadBytes number of shuffle read bytes + * @param shuffleWriteBytes number of shuffle write bytes + */ +private[heuristics] case class StageAnalysis( + stageId: Int, + executionMemorySpillResult: ExecutionMemorySpillResult, + longTaskResult: SimpleStageAnalysisResult, + taskSkewResult: TaskSkewResult, + taskFailureResult: TaskFailureResult, + stageFailureResult: SimpleStageAnalysisResult, + stageGCResult: SimpleStageAnalysisResult, + numTasks: Int, + medianRunTime: Option[Double], + maxRunTime: Option[Double], + stageDuration: Option[Long], + inputBytes: Long, + outputBytes: Long, + shuffleReadBytes: Long, + shuffleWriteBytes: Long) { + + def getStageAnalysisResults: Seq[StageAnalysisResult] = + Seq(executionMemorySpillResult, longTaskResult, taskSkewResult, taskFailureResult, + stageFailureResult, stageGCResult) +} + +/** + * Analyzes the stage level metrics for the given application. + * + * @param heuristicConfigurationData heuristic configuration data + * @param data Spark application data + */ +private[heuristics] class StagesAnalyzer( + private val heuristicConfigurationData: HeuristicConfigurationData, + private val data: SparkApplicationData) { + + import ConfigurationUtils._ + + // serverity thresholds for execution memory spill + private val executionMemorySpillThresholds: SeverityThresholds = + SeverityThresholds.parse(heuristicConfigurationData.getParamMap + .get(SPARK_STAGE_EXECUTION_MEMORY_SPILL_THRESHOLD_KEY), ascending = true) + .getOrElse(DEFAULT_EXECUTION_MEMORY_SPILL_THRESHOLDS) + + // severity thresholds for task skew + private val taskSkewThresholds: SeverityThresholds = + SeverityThresholds.parse(heuristicConfigurationData.getParamMap + .get(SPARK_STAGE_TASK_SKEW_THRESHOLD_KEY), ascending = true) + .getOrElse(DEFAULT_TASK_SKEW_THRESHOLDS) + + // severity thresholds for task duration (long running tasks) + private val taskDurationThresholds: SeverityThresholds = + SeverityThresholds.parse(heuristicConfigurationData.getParamMap + .get(SPARK_STAGE_TASK_DURATION_THRESHOLD_KEY), ascending = true) + .getOrElse(DEFAULT_TASK_DURATION_THRESHOLDS) + + // severity thresholds for task failures + private val taskFailureRateSeverityThresholds: SeverityThresholds = + SeverityThresholds.parse(heuristicConfigurationData.getParamMap + .get(TASK_FAILURE_RATE_SEVERITY_THRESHOLDS_KEY), ascending = true) + .getOrElse(DEFAULT_TASK_FAILURE_RATE_SEVERITY_THRESHOLDS) + + // execution memory spill: threshold for processed data, above which some spill is expected + private val maxDataProcessedThreshold = MemoryFormatUtils.stringToBytes( + heuristicConfigurationData.getParamMap + .getOrDefault(MAX_DATA_PROCESSED_THRESHOLD_KEY, DEFAULT_MAX_DATA_PROCESSED_THRESHOLD)) + + // threshold for ratio of max task duration to stage duration, for flagging task skew + private val longTaskToStageDurationRatio = heuristicConfigurationData.getParamMap + .getOrDefault(LONG_TASK_TO_STAGE_DURATION_RATIO_KEY, DEFAULT_LONG_TASK_TO_STAGE_DURATION_RATIO).toDouble + + // min threshold for median task duration, for flagging task skew + private val taskDurationMinThreshold = heuristicConfigurationData.getParamMap + .getOrDefault(TASK_SKEW_TASK_DURATION_MIN_THRESHOLD_KEY, DEFAULT_TASK_SKEW_TASK_DURATION_MIN_THRESHOLD).toLong + + // the maximum number of recommended partitions + private val maxRecommendedPartitions = heuristicConfigurationData.getParamMap + .getOrDefault(MAX_RECOMMENDED_PARTITIONS_KEY, DEFAULT_MAX_RECOMMENDED_PARTITIONS).toInt + + // recommendation to give if there is execution memory spill due to too much data being processed. + // Some amount of spill is expected in this, but alert the users so that they are aware that spill + // is happening. + private val executionMemorySpillRecommendation = heuristicConfigurationData.getParamMap + .getOrDefault(EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION_KEY, + DEFAULT_EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION) + + // recommendation to give if task skew is detected, and input data is read for the stage. + private val taskSkewInputDataRecommendation = heuristicConfigurationData.getParamMap + .getOrDefault(TASK_SKEW_INPUT_DATA_RECOMMENDATION_KEY, DEFAULT_TASK_SKEW_INPUT_DATA_RECOMMENDATION) + + // recommendation to give if task skew is detected, and there is no input data. + private val taskSkewGenericRecommendation = heuristicConfigurationData.getParamMap + .getOrDefault(TASK_SKEW_GENERIC_RECOMMENDATION_KEY, DEFAULT_TASK_SKEW_GENERIC_RECOMMENDATION) + + // recommendation to give if there are long running tasks, and there is a lot of data being + // processed, and many partitions already. In this case, long running tasks may be expected, but + // alert the user, in case it is possible to filter out some data. + private val longTasksLargeDataRecommenation = heuristicConfigurationData.getParamMap + .getOrDefault(LONG_TASKS_LARGE_DATA_RECOMMENDATION_KEY, DEFAULT_LONG_TASKS_LARGE_DATA_RECOMMENDATION) + + // recommendation to give if there are long running tasks, a reasonable number of partitions, + // and not too much data processed. In this case, the tasks are slow. + private val slowTasksRecommendation = heuristicConfigurationData.getParamMap + .getOrDefault(SLOW_TASKS_RECOMMENDATION_KEY, DEFAULT_SLOW_TASKS_RECOMMENDATION) + + // recommendation to give if there are long running tasks and relatively few partitions. + private val longTasksFewPartitionsRecommendation = heuristicConfigurationData.getParamMap + .getOrDefault(LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION_KEY, DEFAULT_LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION) + + // recommendation to give if there are long running tasks, input data is being read (and so + // controlling the number of tasks), and relatively few partitions. + private val longTasksFewInputPartitionsRecommendation = heuristicConfigurationData.getParamMap + .getOrDefault(LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION_KEY, + DEFAULT_LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION) + + + /** @return list of analysis results for all the stages. */ + def getStageAnalysis(): Seq[StageAnalysis] = { + val appConfigurationProperties: Map[String, String] = data.appConfigurationProperties + val curNumPartitions = appConfigurationProperties.get(SPARK_SQL_SHUFFLE_PARTITIONS) + .map(_.toInt).getOrElse(SPARK_SQL_SHUFFLE_PARTITIONS_DEFAULT) + + data.stageDatas.map { stageData => + val medianTime = stageData.taskSummary.collect { + case distribution => distribution.executorRunTime(DISTRIBUTION_MEDIAN_IDX) + } + val maxTime = stageData.taskSummary.collect { + case distribution => distribution.executorRunTime(DISTRIBUTION_MAX_IDX) + } + val stageDuration = (stageData.submissionTime, stageData.completionTime) match { + case (Some(submissionTime), Some(completionTime)) => + Some(completionTime.getTime() - submissionTime.getTime()) + case _ => None + } + val stageId = stageData.stageId + + val executionMemorySpillResult = checkForExecutionMemorySpill(stageId, stageData) + val longTaskResult = checkForLongTasks(stageId, stageData, medianTime, curNumPartitions) + val taskSkewResult = checkForTaskSkew(stageId, stageData, medianTime, maxTime, stageDuration, + executionMemorySpillResult.severity) + val stageFailureResult = checkForStageFailure(stageId, stageData) + val taskFailureResult = checkForTaskFailure(stageId, stageData) + val gcResult = checkForGC(stageId, stageData) + + new StageAnalysis(stageData.stageId, executionMemorySpillResult, longTaskResult, + taskSkewResult, taskFailureResult, stageFailureResult, gcResult, stageData.numTasks, + medianTime, maxTime, stageDuration, stageData.inputBytes, stageData.outputBytes, + stageData.shuffleReadBytes, stageData.shuffleWriteBytes) + } + } + + /** + * Check stage for execution memory spill. + * + * @param stageId stage ID. + * @param stageData stage data. + * @return results of execution memory spill analysis for the stage. + */ + private def checkForExecutionMemorySpill( + stageId: Int, + stageData: StageData): ExecutionMemorySpillResult = { + val maxData = Seq(stageData.inputBytes, stageData.shuffleReadBytes, + stageData.shuffleWriteBytes, stageData.outputBytes).max + val rawSpillSeverity = executionMemorySpillThresholds.severityOf( + stageData.memoryBytesSpilled / maxData.toDouble) + val details = new ArrayBuffer[String] + val executionSpillSeverity = if (maxData < maxDataProcessedThreshold) { + rawSpillSeverity + } else { + // don't flag execution memory spill if there is a lot of data being processed, + // since some spill may be unavoidable in this case. + if (hasSignificantSeverity(rawSpillSeverity)) { + details += s"Stage $stageId: ${executionMemorySpillRecommendation}." + } + Severity.NONE + } + if (hasSignificantSeverity(rawSpillSeverity)) { + val memoryBytesSpilled = MemoryFormatUtils.bytesToString(stageData.memoryBytesSpilled) + details += s"Stage $stageId has $memoryBytesSpilled execution memory spill." + if (maxData > maxDataProcessedThreshold) { + // if a lot of data is being processed, the severity is supressed, but give information + // about the spill to the user, so that they know that spill is happening, and can check + // if the application can be modified to process less data. + details += s"Stage $stageId has ${stageData.numTasks} tasks, " + + s"${MemoryFormatUtils.bytesToString(stageData.inputBytes)} input read, " + + s"${MemoryFormatUtils.bytesToString(stageData.shuffleReadBytes)} shuffle read, " + + s"${MemoryFormatUtils.bytesToString(stageData.shuffleWriteBytes)} shuffle write, " + + s"${MemoryFormatUtils.bytesToString(stageData.outputBytes)} output." + stageData.taskSummary.foreach { summary => + val memorySpill = summary.memoryBytesSpilled(DISTRIBUTION_MEDIAN_IDX).toLong + val inputBytes = summary.inputMetrics.map(_.bytesRead(DISTRIBUTION_MEDIAN_IDX)) + .getOrElse(0.0).toLong + val outputBytes = summary.outputMetrics.map(_.bytesWritten(DISTRIBUTION_MEDIAN_IDX)) + .getOrElse(0.0).toLong + val shuffleReadBytes = summary.shuffleReadMetrics.map(_.readBytes(DISTRIBUTION_MEDIAN_IDX)) + .getOrElse(0.0).toLong + val shuffleWriteBytes = summary.shuffleWriteMetrics.map(_.writeBytes(DISTRIBUTION_MEDIAN_IDX)) + .getOrElse(0.0).toLong + details += s"Stage $stageId has median task values: " + + s"${MemoryFormatUtils.bytesToString(memorySpill)} memory spill, " + + s"${MemoryFormatUtils.bytesToString(inputBytes)} input, " + + s"${MemoryFormatUtils.bytesToString(shuffleReadBytes)} shuffle read, " + + s"${MemoryFormatUtils.bytesToString(shuffleWriteBytes)} shuffle write, " + + s"${MemoryFormatUtils.bytesToString(outputBytes)} output." + } + } + } + + val maxTaskSpill = stageData.taskSummary.collect { + case distribution => distribution.memoryBytesSpilled(DISTRIBUTION_MAX_IDX) + }.map(_.toLong).getOrElse(0L) + val score = Utils.getHeuristicScore(executionSpillSeverity, stageData.numTasks) + + ExecutionMemorySpillResult(executionSpillSeverity, score, details, rawSpillSeverity, + stageData.memoryBytesSpilled, maxTaskSpill) + } + + /** + * Check stage for task skew. + * + * @param stageId stage ID. + * @param stageData stage data + * @param medianTime median task run time (ms). + * @param maxTime maximum task run time (ms). + * @param stageDuration stage duration (ms). + * @param executionSpillSeverity execution spill severity + * @return results of task skew analysis for the stage. + */ + private def checkForTaskSkew( + stageId: Int, + stageData: StageData, + medianTime: Option[Double], + maxTime: Option[Double], + stageDuration: Option[Long], + executionSpillSeverity: Severity): TaskSkewResult = { + val rawSkewSeverity = (medianTime, maxTime) match { + case (Some(median), Some(max)) => + taskSkewThresholds.severityOf(max / median) + case _ => Severity.NONE + } + val maximum = maxTime.getOrElse(0.0D) + val taskSkewSeverity = + if (maximum > taskDurationMinThreshold && + maximum > longTaskToStageDurationRatio * stageDuration.getOrElse(Long.MaxValue)) { + rawSkewSeverity + } else { + Severity.NONE + } + val details = new ArrayBuffer[String] + + if (hasSignificantSeverity(taskSkewSeverity) || hasSignificantSeverity(executionSpillSeverity)) { + // add more information about what might be causing skew if skew is being flagged + // (reported severity is significant), or there is execution memory spill, since skew + // can also cause execution memory spill. + val medianStr = Utils.getDuration(medianTime.map(_.toLong).getOrElse(0L)) + val maximumStr = Utils.getDuration(maxTime.map(_.toLong).getOrElse(0L)) + var inputSkewSeverity = Severity.NONE + if (hasSignificantSeverity(taskSkewSeverity)) { + details += + s"Stage $stageId has skew in task run time (median is $medianStr, max is $maximumStr)." + } + stageData.taskSummary.foreach { summary => + checkSkewedData(stageId, summary.memoryBytesSpilled(DISTRIBUTION_MEDIAN_IDX), + summary.memoryBytesSpilled(DISTRIBUTION_MAX_IDX), "memory bytes spilled", details) + summary.inputMetrics.foreach { input => + inputSkewSeverity = checkSkewedData(stageId, input.bytesRead(DISTRIBUTION_MEDIAN_IDX), + input.bytesRead(DISTRIBUTION_MAX_IDX), "task input bytes", details) + if (hasSignificantSeverity(inputSkewSeverity)) { + // The stage is reading input data, try to adjust the amount of data to even the partitions + details += s"Stage $stageId: ${taskSkewInputDataRecommendation}." + } + } + summary.outputMetrics.foreach { output => + checkSkewedData(stageId, output.bytesWritten(DISTRIBUTION_MEDIAN_IDX), + output.bytesWritten(DISTRIBUTION_MAX_IDX), "task output bytes", details) + } + summary.shuffleReadMetrics.foreach { shuffle => + checkSkewedData(stageId, shuffle.readBytes(DISTRIBUTION_MEDIAN_IDX), + shuffle.readBytes(DISTRIBUTION_MAX_IDX), "task shuffle read bytes", details) + } + summary.shuffleWriteMetrics.foreach { shuffle => + checkSkewedData(stageId, shuffle.writeBytes(DISTRIBUTION_MEDIAN_IDX), + shuffle.writeBytes(DISTRIBUTION_MAX_IDX), "task shuffle write bytes", details) + } + } + if (hasSignificantSeverity(rawSkewSeverity) && !hasSignificantSeverity(inputSkewSeverity)) { + details += s"Stage $stageId: ${taskSkewGenericRecommendation}." + } + } + val score = Utils.getHeuristicScore(taskSkewSeverity, stageData.numTasks) + + TaskSkewResult(taskSkewSeverity, score, details, rawSkewSeverity) + } + + /** + * Check for skewed data. + * + * @param stageId stage ID + * @param median median data size for tasks. + * @param maximum maximum data size for tasks. + * @param description type of data. + * @param details information and recommendations -- any new recommendations + * from analyzing the stage for data skew will be appended. + */ + private def checkSkewedData( + stageId: Int, + median: Double, + maximum: Double, + description: String, + details: ArrayBuffer[String]): Severity = { + val severity = taskSkewThresholds.severityOf(maximum / median) + if (hasSignificantSeverity(severity)) { + details += s"Stage $stageId has skew in $description (median is " + + s"${MemoryFormatUtils.bytesToString(median.toLong)}, " + + s"max is ${MemoryFormatUtils.bytesToString(maximum.toLong)})." + } + severity + } + + /** + * Check the stage for long running tasks. + * + * @param stageId stage ID. + * @param stageData stage data. + * @param medianTime median task run time. + * @param curNumPartitions number of partitions for the Spark application + * (spark.sql.shuffle.partitions). + * @return results of long running task analysis for the stage + */ + private def checkForLongTasks( + stageId: Int, + stageData: StageData, + medianTime: Option[Double], + curNumPartitions: Int): SimpleStageAnalysisResult = { + val longTaskSeverity = stageData.taskSummary.map { distributions => + taskDurationThresholds.severityOf(distributions.executorRunTime(DISTRIBUTION_MEDIAN_IDX)) + }.getOrElse(Severity.NONE) + val details = new ArrayBuffer[String] + if (hasSignificantSeverity(longTaskSeverity)) { + val runTime = Utils.getDuration(medianTime.map(_.toLong).getOrElse(0L)) + val maxData = Seq(stageData.inputBytes, stageData.shuffleReadBytes, stageData.shuffleWriteBytes, + stageData.outputBytes).max + val inputBytes = MemoryFormatUtils.bytesToString(stageData.inputBytes) + val outputBytes = MemoryFormatUtils.bytesToString(stageData.outputBytes) + val shuffleReadBytes = MemoryFormatUtils.bytesToString(stageData.shuffleReadBytes) + val shuffleWriteBytes = MemoryFormatUtils.bytesToString(stageData.shuffleWriteBytes) + details += s"Stage $stageId has a long median task run time of $runTime." + details += s"Stage $stageId has ${stageData.numTasks} tasks, $inputBytes input," + + s" $shuffleReadBytes shuffle read, $shuffleWriteBytes shuffle write, and $outputBytes output." + if (stageData.numTasks >= maxRecommendedPartitions) { + if (maxData >= maxDataProcessedThreshold) { + details += s"Stage $stageId: ${longTasksLargeDataRecommenation}." + } else { + details += s"Stage $stageId: ${slowTasksRecommendation}." + } + } + else { + if (stageData.inputBytes > 0) { + // The stage is reading input data, try to increase the number of readers + details += s"Stage $stageId: ${longTasksFewInputPartitionsRecommendation}." + } else if (stageData.numTasks != curNumPartitions) { + details += s"Stage $stageId: ${longTasksFewPartitionsRecommendation}." + } + } + } + val score = Utils.getHeuristicScore(longTaskSeverity, stageData.numTasks) + + SimpleStageAnalysisResult(longTaskSeverity, score, details) + } + + /** + * Check for stage failure. + * + * @param stageId stage ID. + * @param stageData stage data. + * @return results of stage failure analysis for the stage. + */ + private def checkForStageFailure(stageId: Int, stageData: StageData): SimpleStageAnalysisResult = { + val severity = if (stageData.status == StageStatus.FAILED) { + Severity.CRITICAL + } else { + Severity.NONE + } + val score = Utils.getHeuristicScore(severity, stageData.numTasks) + val details = stageData.failureReason.map(reason => s"Stage $stageId failed: $reason") + SimpleStageAnalysisResult(severity, score, details.toSeq) + } + + /** + * Check for failed tasks, including failures caused by OutOfMemory errors, and containers + * killed by YARN for exceeding memory limits. + * + * @param stageId stage ID. + * @param stageData stage data. + * @return result of failed tasks analysis for the stage. + */ + private def checkForTaskFailure( + stageId: Int, + stageData: StageData): TaskFailureResult = { + val failedTasksStageMap = data.stagesWithFailedTasks.flatMap { stageData => + stageData.tasks.map(tasks => (stageData.stageId, tasks.values)) + }.toMap + + val failedTasks = failedTasksStageMap.get(stageId) + + val details = new ArrayBuffer[String]() + + val taskFailureSeverity = taskFailureRateSeverityThresholds.severityOf( + stageData.numFailedTasks.toDouble / stageData.numTasks) + if (hasSignificantSeverity(taskFailureSeverity)) { + details += s"Stage $stageId has ${stageData.numFailedTasks} failed tasks." + } + + val score = Utils.getHeuristicScore(taskFailureSeverity, stageData.numFailedTasks) + + val (numTasksWithOOM, oomSeverity) = + checkForSpecificTaskError(stageId, stageData, failedTasks, + StagesWithFailedTasksHeuristic.OOM_ERROR, "of OutOfMemory exception.", + details) + + val (numTasksWithContainerKilled, containerKilledSeverity) = + checkForSpecificTaskError(stageId, stageData, failedTasks, + StagesWithFailedTasksHeuristic.OVERHEAD_MEMORY_ERROR, + "the container was killed by YARN for exeeding memory limits.", details) + + TaskFailureResult(taskFailureSeverity, score, details, oomSeverity, containerKilledSeverity, + stageData.numFailedTasks, numTasksWithOOM, numTasksWithContainerKilled) + } + + /** + * Check the stage for a high ratio of time spent in GC compared to task run time. + * + * @param stageId stage ID. + * @param stageData stage data. + * @return result of GC analysis for the stage. + */ + private def checkForGC(stageId: Int, stageData: StageData): SimpleStageAnalysisResult = { + var gcTime = 0.0D + var taskTime = 0.0D + val severity = stageData.taskSummary.map { task => + gcTime = task.jvmGcTime(DISTRIBUTION_MEDIAN_IDX) + taskTime = task.executorRunTime(DISTRIBUTION_MEDIAN_IDX) + DEFAULT_GC_SEVERITY_A_THRESHOLDS.severityOf(gcTime / taskTime) + }.getOrElse(Severity.NONE) + + val score = Utils.getHeuristicScore(severity, stageData.numTasks) + + val details = if (hasSignificantSeverity(severity)) { + Seq(s"Stage ${stageId}: tasks are spending signficant time in GC (median task GC time is " + + s"${Utils.getDuration(gcTime.toLong)}, median task runtime is " + + s"${Utils.getDuration(gcTime.toLong)}") + } else { + Seq.empty + } + + new SimpleStageAnalysisResult(severity, score, details) + } + + /** + * Check the stage for tasks that failed for a specified error. + * + * @param stageId stage ID. + * @param stageData stage data. + * @param failedTasks list of failed tasks. + * @param taskError the error to check for. + * @param errorMessage the message/explanation to print if the the specified error is found. + * @param details information and recommendations -- any new recommendations + * from analyzing the stage for errors causing tasks to fail will be appended. + * @return + */ + private def checkForSpecificTaskError( + stageId: Int, + stageData: StageData, + failedTasks: Option[Iterable[TaskDataImpl]], + taskError: String, + errorMessage: String, + details: ArrayBuffer[String]): (Int, Severity) = { + val numTasksWithError = getNumTasksWithError(failedTasks, taskError) + if (numTasksWithError > 0) { + details += s"Stage $stageId has $numTasksWithError tasks that failed because " + + errorMessage + } + val severity = taskFailureRateSeverityThresholds.severityOf(numTasksWithError.toDouble / stageData.numTasks) + (numTasksWithError, severity) + } + + /** + * Get the number of tasks that failed with the specified error, using a simple string search. + * + * @param tasks list of failed tasks. + * @param error error to look for. + * @return number of failed tasks wit the specified error. + */ + private def getNumTasksWithError(tasks: Option[Iterable[TaskDataImpl]], error: String): Int = { + tasks.map { failedTasks => + failedTasks.filter { task => + val hasError = task.errorMessage.map(_.contains(error)).getOrElse(false) + hasError + }.size + }.getOrElse(0) + } + + /** Given the severity, return true if the severity is not NONE or LOW. */ + private def hasSignificantSeverity(severity: Severity): Boolean = { + severity != Severity.NONE && severity != Severity.LOW + } +} diff --git a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala index 4c0d6f727..17b206c64 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala +++ b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala @@ -60,6 +60,18 @@ object LegacyDataConverters { override def name: String = "" override def executorSummary: Option[collection.Map[String, ExecutorStageSummary]] = None override def status = StageStatus.COMPLETE + override def completionTime: Option[java.util.Date] = None + override def executorCpuTime: Long = 0L + override def executorMetricsSummary: Option[ExecutorMetricDistributions] = None + override def failureReason: Option[String] = None + override def firstTaskLaunchedTime: Option[java.util.Date] = None + override def numTasks: Int = 0 + override def peakExecutionMemory: Option[Long] = None + override def peakJvmUsedMemory: Option[Long] = None + override def peakStorageMemory: Option[Long] = None + override def peakUnifiedMemory: Option[Long] = None + override def submissionTime: Option[java.util.Date] = None + override def taskSummary: Option[TaskMetricDistributions] = None }) } @@ -144,11 +156,19 @@ object LegacyDataConverters { extractStageStatus(stageAttemptId), stageAttemptId.stageId, stageAttemptId.attemptId, + numTasks = 0, stageInfo.numActiveTasks, stageInfo.numCompleteTasks, stageInfo.numFailedTasks, + stageInfo.executorRunTime, - stageInfo.inputBytes, + executorCpuTime = 0, + submissionTime = None, + firstTaskLaunchedTime = None, + completionTime = None, + failureReason = None, + + stageInfo.inputBytes, inputRecords = 0, stageInfo.outputBytes, outputRecords = 0, @@ -158,12 +178,20 @@ object LegacyDataConverters { shuffleWriteRecords = 0, stageInfo.memoryBytesSpilled, stageInfo.diskBytesSpilled, + stageInfo.name, stageInfo.description, schedulingPool = "", + accumulatorUpdates = Seq.empty, tasks = None, - executorSummary = None + executorSummary = None, + peakJvmUsedMemory = None, + peakExecutionMemory = None, + peakStorageMemory = None, + peakUnifiedMemory = None, + taskSummary = None, + executorMetricsSummary = None ) } diff --git a/app/com/linkedin/drelephant/spark/legacydata/SparkStageData.java b/app/com/linkedin/drelephant/spark/legacydata/SparkStageData.java new file mode 100644 index 000000000..406586781 --- /dev/null +++ b/app/com/linkedin/drelephant/spark/legacydata/SparkStageData.java @@ -0,0 +1,30 @@ +/* + * Copyright 2016 LinkedIn Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.linkedin.drelephant.spark.legacydata; + + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + + +/** + * This class contains Spark stage information. + */public class SparkStageData { + + +} diff --git a/app/com/linkedin/drelephant/util/Utils.java b/app/com/linkedin/drelephant/util/Utils.java index 6314483db..9867128ac 100644 --- a/app/com/linkedin/drelephant/util/Utils.java +++ b/app/com/linkedin/drelephant/util/Utils.java @@ -268,6 +268,37 @@ public static String getDurationBreakdown(long millis) { return String.format("%d:%02d:%02d", hours, minutes, seconds); } + /** + * Convert a millisecond duration to a string format, specifying + * milliseconds, seconds, minutes, hours, or days, for the largest unit + * that has a > 1.0 value. + * + * @param millis duration in milliseconds + * @return The string format. + */ + public static String getDuration(long millis) { + double seconds = millis / 1000.0; + if (seconds < 1) { + return millis + " ms"; + } else { + double minutes = seconds / 60.0; + if (minutes < 1) { + return String.format("%.2f sec", seconds); + } else { + double hours = minutes / 60.0; + if (hours < 1) { + return String.format("%.2f min", minutes); + } else { + double days = hours / 24.0; + if (days < 1) { + return String.format("%.2f hr", hours); + } + return String.format("%.2f days", days); + } + } + } + } + /** * Convert a value in MBSeconds to GBHours * @param MBSeconds The value to convert diff --git a/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala b/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala index c20223fb8..00581a68b 100644 --- a/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala +++ b/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala @@ -300,7 +300,7 @@ object SparkRestClientTest { @Path("applications/{appId}/{attemptId}/jobs") def getJobs(): JobsResource = new JobsResource() - @Path("applications/{appId}/{attemptId}/stages") + @Path("applications/{appId}/{attemptId}/stages/withSummaries") def getStages(): StagesResource = new StagesResource() @Path("applications/{appId}/{attemptId}/allexecutors") @@ -382,7 +382,7 @@ object SparkRestClientTest { @Path("applications/{appId}/jobs") def getJobs(): JobsResource = new JobsResource() - @Path("applications/{appId}/stages") + @Path("applications/{appId}/stages/withSummaries") def getStages(): StagesResource = new StagesResource() @Path("applications/{appId}/allexecutors") diff --git a/test/com/linkedin/drelephant/spark/heuristics/SparkTestUtilities.scala b/test/com/linkedin/drelephant/spark/heuristics/SparkTestUtilities.scala new file mode 100644 index 000000000..cf2ad79b7 --- /dev/null +++ b/test/com/linkedin/drelephant/spark/heuristics/SparkTestUtilities.scala @@ -0,0 +1,542 @@ +package com.linkedin.drelephant.spark.heuristics + +import java.util.Date + +import com.linkedin.drelephant.analysis.{ApplicationType, Severity} +import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData +import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1._ +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate + +import scala.collection.JavaConverters + +private [heuristics] object SparkTestUtilities { + import JavaConverters._ + import java.text.SimpleDateFormat + + val OOM_ERROR = "java.lang.OutOfMemoryError" + val OVERHEAD_MEMORY_ERROR = "killed by YARN for exceeding memory limits" + + private val sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss") + + /** Create a ample heuristics configuration data. */ + def createHeuristicConfigurationData( + params: Map[String, String] = Map.empty): HeuristicConfigurationData = + new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava) + + /** + * Builder for creating a StageAnalysis. + * + * @param stageId stage ID. + * @param numTasks total number of tasks for the stage. + */ + case class StageAnalysisBuilder(stageId: Int, numTasks: Int) { + var rawSpillSeverity = Severity.NONE + var executionSpillSeverity = Severity.NONE + var longTaskSeverity = Severity.NONE + var rawSkewSeverity = Severity.NONE + var taskSkewSeverity = Severity.NONE + var failedWithOOMSeverity = Severity.NONE + var failedWithContainerKilledSeverity = Severity.NONE + var gcSeverity = Severity.NONE + var taskFailureSeverity = Severity.NONE + var stageFailureSeverity = Severity.NONE + var spillScore = 0 + var longTaskScore = 0 + var taskSkewScore = 0 + var taskFailureScore = 0 + var stageFailureScore = 0 + var gcScore = 0 + var medianRunTime: Option[Double] = None + var maxRunTime: Option[Double] = None + var memoryBytesSpilled = 0L + var maxTaskBytesSpilled = 0L + var inputBytes: Long = 0L + var outputBytes: Long = 0L + var shuffleReadBytes: Long = 0L + var shuffleWriteBytes: Long = 0L + var numFailedTasks = 0 + var numTasksWithOOM = 0 + var numTasksWithContainerKilled = 0 + var stageDuration = Some((5 * 60 * 1000).toLong) + var spillDetails: Seq[String] = Seq() + var longTaskDetails: Seq[String] = Seq() + var taskSkewDetails: Seq[String] = Seq() + var taskFailureDetails: Seq[String] = Seq() + var stageFailureDetails: Seq[String] = Seq() + var gcDetails: Seq[String] = Seq() + + /** + * Configure execution memory spill related parameters. + * + * @param raw the raw execution memory spill severity. + * @param severity the reported execution memory spill severity. + * @param maxTaskSpillMb maximum amount (MB) of execution memory spill for a task. + * @param bytesSpilledMb total amount (MB) of execution memory spill. + * @return this StageAnalysisBuilder. + */ + def spill( + raw: Severity, + severity: Severity, + score: Int, + maxTaskSpillMb: Long, + bytesSpilledMb: Long, + details: Seq[String]): StageAnalysisBuilder = { + rawSpillSeverity = raw + executionSpillSeverity = severity + spillScore = score + maxTaskBytesSpilled = maxTaskSpillMb << 20 + memoryBytesSpilled = bytesSpilledMb << 20 + spillDetails = details + this + } + + /** Set the amount of input data in MB. */ + def input(inputMb: Long): StageAnalysisBuilder = { + inputBytes = inputMb << 20 + this + } + + /** Set the amount of output data in MB. */ + def output(outputMb: Long): StageAnalysisBuilder = { + outputBytes = outputMb << 20 + this + } + + /** Set the amount of shuffle read data in MB. */ + def shuffleRead(shuffleReadMb: Long): StageAnalysisBuilder = { + shuffleReadBytes = shuffleReadMb << 20 + this + } + + /** Set the amount of shuffle write data in MB. */ + def shuffleWrite(shuffleWriteMb: Long): StageAnalysisBuilder = { + shuffleWriteBytes = shuffleWriteMb << 20 + this + } + + /** Set the stage duration. */ + def duration(sec: Long): StageAnalysisBuilder = { + stageDuration = Some(sec * 1000) + this + } + + /** Set the median and max task runtimes in seconds */ + def taskRuntime(median: Double, maximum: Double): StageAnalysisBuilder = { + medianRunTime = Some(median * 1000) + maxRunTime = Some(maximum * 1000) + this + } + + /** set the long task analysis information */ + def longTask(severity: Severity, score: Int, details: Seq[String]): StageAnalysisBuilder = { + longTaskSeverity = severity + longTaskScore = score + longTaskDetails = details + this + } + + /** set the raw and reported task skew severity and details */ + def skew( + raw: Severity, + severity: Severity, + score: Int, + details: Seq[String]): StageAnalysisBuilder = { + rawSkewSeverity = raw + taskSkewSeverity = severity + taskSkewScore = score + taskSkewDetails = details + this + } + + /** + * Configure stage failure information. + * + * @param severity severity of stage failure. + * @param score score for stage failure analysis + * @param details information and recommendations + * @return + */ + def stageFailure(severity: Severity, + score: Int, + details: Seq[String]): StageAnalysisBuilder = { + stageFailureSeverity = severity + stageFailureScore = score + stageFailureDetails = details + this + } + + /** + * Configure task failure information. + * + * @param taskSeverity severity of all task failures. + * @param oomSeverity severity of task failures due to OutOfMemory errors. + * @param containerKilledSeverity severity of failures due to containers killed by YARN. + * @param score score from task failure analysis. + * @param numFailures total number of task failures. + * @param numOOM total number of tasks failed with OutOfMemory errors. + * @param numContainerKilled total number of tasks failed due to container killed by YARN. + * @param details information and recommendations for task failures + * @return this StageAnalysisBuilder. + */ + def taskFailures( + taskSeverity: Severity, + oomSeverity: Severity, + containerKilledSeverity: Severity, + score: Int, + numFailures: Int, + numOOM: Int, + numContainerKilled: Int, + details: Seq[String]): StageAnalysisBuilder = { + taskFailureSeverity = taskSeverity + failedWithOOMSeverity = oomSeverity + failedWithContainerKilledSeverity = containerKilledSeverity + taskFailureScore = score + numFailedTasks = numFailures + numTasksWithOOM = numOOM + numTasksWithContainerKilled = numContainerKilled + taskFailureDetails = details + this + } + + /** Create the StageAnalysis. */ + def create(): StageAnalysis = { + StageAnalysis( + stageId, + ExecutionMemorySpillResult(executionSpillSeverity, spillScore, spillDetails, + rawSpillSeverity, memoryBytesSpilled, maxTaskBytesSpilled), + SimpleStageAnalysisResult(longTaskSeverity, longTaskScore, longTaskDetails), + TaskSkewResult(taskSkewSeverity, taskSkewScore, taskSkewDetails, rawSkewSeverity), + TaskFailureResult(taskFailureSeverity, taskFailureScore, taskFailureDetails, + failedWithOOMSeverity, failedWithContainerKilledSeverity, numFailedTasks, + numTasksWithOOM, numTasksWithContainerKilled), + SimpleStageAnalysisResult(stageFailureSeverity, stageFailureScore, stageFailureDetails), + SimpleStageAnalysisResult(gcSeverity, gcScore, gcDetails), + numTasks, medianRunTime, maxRunTime, stageDuration, inputBytes, outputBytes, + shuffleReadBytes, shuffleWriteBytes) + } + } + + /** + * Builder for creating StageData. + * + * @param stageId stage ID + * @param numTasks total number of tasks for the stage. + */ + case class StageBuilder(stageId: Int, numTasks: Int) { + val stage = new StageDataImpl( + StageStatus.COMPLETE, + stageId, + attemptId = 0, + numTasks = numTasks, + numActiveTasks = numTasks, + numCompleteTasks = numTasks, + numFailedTasks = 0, + executorRunTime = 0, + executorCpuTime = 0, + submissionTime = Some(sdf.parse("09/09/2018 12:00:00")), + firstTaskLaunchedTime = None, + completionTime = Some(sdf.parse("09/09/2018 12:05:00")), + failureReason = None, + + inputBytes = 0, + inputRecords = 0, + outputBytes = 0, + outputRecords = 0, + shuffleReadBytes = 0, + shuffleReadRecords = 0, + shuffleWriteBytes = 0, + shuffleWriteRecords = 0, + memoryBytesSpilled = 0, + diskBytesSpilled = 0, + name = "foo", + details = "stage details", + schedulingPool = "", + accumulatorUpdates = Seq.empty, + tasks = None, + executorSummary = None, + peakJvmUsedMemory = None, + peakExecutionMemory = None, + peakStorageMemory = None, + peakUnifiedMemory = None, + taskSummary = None, + executorMetricsSummary = None + ) + + /** Create the specified number of tasks for the stage. */ + private def createTasks(numTasks: Int): Map[Long, TaskDataImpl] = { + (1 until (numTasks + 1)).map { i => + (i.toLong, new TaskDataImpl( + taskId = i.toLong, + index = 1, + attempt = 0, + launchTime = new Date(), + executorId = "1", + host = "SomeHost", + taskLocality = "ANY", + speculative = false, + accumulatorUpdates = Seq(), + errorMessage = None, + taskMetrics = None)) + }.toMap + } + + /** Set the stage status. */ + def status(stageStatus: StageStatus, reason: Option[String]): StageBuilder = { + stage.status = stageStatus + stage.failureReason = reason + this + } + + /** + * Set the run times. + * + * @param medianS median task run time in seconds. + * @param maxS maximum task runtime in seconds. + * @param totalS total runtime for all tasks. + * @return this StageBuilder. + */ + def taskRuntime(medianS: Int, maxS: Int, totalS: Int): StageBuilder = { + val taskMetricDistributions = getTaskMetricDistributions() + val medianMs = (medianS * 1000).toDouble + val maxMs = (maxS * 1000).toDouble + taskMetricDistributions.executorRunTime = + IndexedSeq(medianMs/2, medianMs, medianMs, medianMs, maxMs) + stage.executorRunTime = totalS * 1000 + this + } + + /** + * Set the input information. + * + * @param medianMB median amount of input read for a task in MB. + * @param maxMB maximum amount of input read for a task in MB. + * @param totalMB total amount of input read for the stage in MB. + * @return this StageBuilder. + */ + def input(medianMB: Long, maxMB: Long, totalMB: Long): StageBuilder = { + val taskMetricDistributions = getTaskMetricDistributions() + val medianBytes = (medianMB << 20).toDouble + val maxBytes = (maxMB << 20).toDouble + taskMetricDistributions.inputMetrics = + Some(new InputMetricDistributionsImpl( + IndexedSeq(medianBytes/2, medianBytes, medianBytes, medianBytes, maxBytes), + IndexedSeq(1000.0, 2000.0, 2000.0, 2000.0, 3000.0))) + stage.inputBytes = totalMB << 20 + this + } + + /** + * Set the output information. + * + * @param medianMB median amount of output written for a task in MB. + * @param maxMB maximum amount of output written for a task in MB. + * @param totalMB total amount of output written for the stage in MB. + * @return this StageBuilder. + */ + def output(medianMB: Long, maxMB: Long, totalMB: Long): StageBuilder = { + val taskMetricDistributions = getTaskMetricDistributions() + val medianBytes = (medianMB << 20).toDouble + val maxBytes = (maxMB << 20).toDouble + taskMetricDistributions.outputMetrics = + Some(new OutputMetricDistributionsImpl( + IndexedSeq(medianBytes/2, medianBytes, medianBytes, medianBytes, maxBytes), + IndexedSeq(1000.0, 2000.0, 2000.0, 2000.0, 3000.0))) + stage.outputBytes = totalMB << 20 + this + } + + /** + * Set the shuffle read information. + * + * @param medianMB median amount of shuffle read for a task in MB. + * @param maxMB maximum amount of shuffle read for a task in MB. + * @param totalMB total amount of shuffle read for the stage in MB. + * @return this StageBuilder. + */ + def shuffleRead(medianMB: Long, maxMB: Long, totalMB: Long): StageBuilder = { + val taskMetricDistributions = getTaskMetricDistributions() + val medianBytes = (medianMB << 20).toDouble + val maxBytes = (maxMB << 20).toDouble + taskMetricDistributions.shuffleReadMetrics = + Some(new ShuffleReadMetricDistributionsImpl( + IndexedSeq(medianBytes/2, medianBytes, medianBytes, medianBytes, maxBytes), + IndexedSeq(1000.0, 2000.0, 2000.0, 2000.0, 3000.0), + IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0), + IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0), + IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0), + IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0), + IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0), + IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0))) + stage.shuffleReadBytes = totalMB << 20 + this + } + + /** + * Set the shuffle write information. + * + * @param medianMB median amount of shuffle write for a task in MB. + * @param maxMB maximum amount of shuffle write for a task in MB. + * @param totalMB total amount of shuffle write for the stage in MB. + * @return this StageBuilder. + */ + def shuffleWrite(medianMB: Long, maxMB: Long, totalMB: Long): StageBuilder = { + val taskMetricDistributions = getTaskMetricDistributions() + val medianBytes = (medianMB << 20).toDouble + val maxBytes = (maxMB << 20).toDouble + taskMetricDistributions.shuffleWriteMetrics = + Some(new ShuffleWriteMetricDistributionsImpl( + IndexedSeq(medianBytes/2, medianBytes, medianBytes, medianBytes, maxBytes), + IndexedSeq(1000.0, 2000.0, 2000.0, 2000.0, 3000.0), + IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0))) + stage.shuffleWriteBytes = totalMB << 20 + this + } + + /** + * Set the execution memory spill information. + * + * @param medianMB median amount of execution memory spill for a task in MB. + * @param maxMB maximum amount of iexecution memory spill for a task in MB. + * @param totalMB total amount of execution memory spill for the stage in MB. + * @return this StageBuilder. + */ + def spill(medianMB: Long, maxMB: Long, totalMB: Long): StageBuilder = { + val taskMetricDistributions = getTaskMetricDistributions() + val medianBytes = (medianMB << 20).toDouble + val maxBytes = (maxMB << 20).toDouble + taskMetricDistributions.memoryBytesSpilled = + IndexedSeq(medianBytes/2, medianBytes, medianBytes, medianBytes, maxBytes) + stage.memoryBytesSpilled = totalMB << 20 + this + } + + /** + * Set the failure information. + * + * @param numFailed total number of tasks failed. + * @param numOOM total number of tasks which failed due to OutOfMemory. + * @param numContainerKilled total number of ask which failed due to container killed by YARN. + * @return this StageBuilder. + */ + def failures(numFailed: Int, numOOM: Int, numContainerKilled: Int): StageBuilder = { + stage.tasks = Some(createTasks(numFailed)) + (1 until (numOOM + 1)).map { i => + stage.tasks.get(i.toLong).errorMessage = Some(OOM_ERROR) + } + ((numOOM + 1) until (numOOM + numContainerKilled + 1)).map { i => + stage.tasks.get(i.toLong).errorMessage = Some(OVERHEAD_MEMORY_ERROR) + } + ((numOOM + numContainerKilled + 1) until numFailed + 1).map { i => + stage.tasks.get(i.toLong).errorMessage = Some("ArrayIndexOutOfBoundsException") + } + stage.numFailedTasks = numFailed + this + } + + /** Set the stage submission and completion times. */ + def times(submissionTime: String, completionTime: String): StageBuilder = { + stage.submissionTime = Some(sdf.parse(submissionTime)) + stage.completionTime = Some(sdf.parse(completionTime)) + this + } + + /** Create the StageDataImpl. */ + def create(): StageDataImpl = stage + + /** @return a askMetricDistributionsImpl for the StageData, creating it if needed. */ + private def getTaskMetricDistributions(): TaskMetricDistributionsImpl = { + stage.taskSummary match { + case None => + val taskMetricDistributions = + new TaskMetricDistributionsImpl( + quantiles = IndexedSeq(0.0, 0.25, 0.5, 0.75, 1.0), + executorDeserializeTime = IndexedSeq(0.0, 0.0, 0.1, 0.1, 0.2), + executorDeserializeCpuTime = IndexedSeq(0.0, 0.0, 0.1, 0.1, 0.2), + executorRunTime = IndexedSeq(1000.0, 5000.0, 6000.0, 6500.0, 7000.0), + executorCpuTime = IndexedSeq(1000.0, 5000.0, 6000.0, 6500.0, 7000.0), + resultSize = IndexedSeq(0.0, 0.0, 0.0, 0.0), + jvmGcTime = IndexedSeq(0.0, 0.0, 0.0, 0.0), + resultSerializationTime = IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0), + gettingResultTime = IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0), + schedulerDelay = IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0), + peakExecutionMemory = IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0), + memoryBytesSpilled = IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0), + diskBytesSpilled = IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0), + inputMetrics = None, + outputMetrics = None, + shuffleReadMetrics = None, + shuffleWriteMetrics = None) + stage.taskSummary = Some(taskMetricDistributions) + taskMetricDistributions + case Some(taskMetricDistributions) => + taskMetricDistributions + } + } + } + + /** + * Create an executor metrics summary. + * + * @param id executor ID + * @param jvmUsedMemoryMb peak JVM used memory for the executor. + * @param totalGCTimeSec total time spent in GC by the executor. + * @param totalDurationSec total task runtime for the executor. + * @return executor summary. + */ + private[heuristics] def createExecutorSummary( + id: String, + jvmUsedMemoryMb: Long, + totalGCTimeSec: Long, + totalDurationSec: Long): ExecutorSummaryImpl = new ExecutorSummaryImpl( + id, + hostPort = "", + rddBlocks = 0, + memoryUsed=0, + diskUsed = 0, + activeTasks = 0, + failedTasks = 0, + completedTasks = 0, + totalTasks = 0, + maxTasks = 0, + totalDurationSec * 1000, + totalInputBytes=0, + totalShuffleRead=0, + totalShuffleWrite= 0, + maxMemory = 0, + totalGCTimeSec * 1000, + totalMemoryBytesSpilled = 0, + executorLogs = Map.empty, + peakJvmUsedMemory = Map("jvmUsedMemory" -> (jvmUsedMemoryMb << 20)), + peakUnifiedMemory = Map.empty + ) + + /** + * Create the Spark application data. + * + * @param stages list of stage data + * @param executorSummaries list of executor summaries. + * @param properties configuration properties for the Spark application. + * @return Spark application data. + */ + def createSparkApplicationData + (stages: Seq[StageDataImpl], + executorSummaries: Seq[ExecutorSummaryImpl], + properties: Option[Map[String, String]]): SparkApplicationData = { + val appId = "application_1" + + val logDerivedData = properties.map { props => + SparkLogDerivedData( + SparkListenerEnvironmentUpdate(Map("Spark Properties" -> props.toSeq)) + )} + + val restDerivedData = SparkRestDerivedData( + new ApplicationInfoImpl(appId, name = "app", Seq.empty), + jobDatas = Seq.empty, + stageDatas = stages, + executorSummaries = executorSummaries, + stagesWithFailedTasks = stages + ) + SparkApplicationData(appId, restDerivedData, logDerivedData) + } +} diff --git a/test/com/linkedin/drelephant/spark/heuristics/StagesAnalyzerTest.scala b/test/com/linkedin/drelephant/spark/heuristics/StagesAnalyzerTest.scala new file mode 100644 index 000000000..6d240ebef --- /dev/null +++ b/test/com/linkedin/drelephant/spark/heuristics/StagesAnalyzerTest.scala @@ -0,0 +1,465 @@ +package com.linkedin.drelephant.spark.heuristics + +import java.util.Date + +import com.linkedin.drelephant.analysis.{ApplicationType, Severity} +import com.linkedin.drelephant.spark.fetchers.statusapiv1._ +import org.scalatest.{FunSpec, Matchers} + +import scala.collection.JavaConverters + +/** Tests for the StagesAnalyzer. */ +class StagesAnalyzerTest extends FunSpec with Matchers { + import SparkTestUtilities._ + + describe("StagesAnalyzer") { + it("has task failures severity") { + val heuristicConfigurationData = createHeuristicConfigurationData() + val stages = Seq( + StageBuilder(1, 3).create(), + StageBuilder(2, 5).failures(2, 2, 0).create(), + StageBuilder(3, 15).failures(2, 0, 1).create(), + StageBuilder(4, 15).failures(3, 1, 2).create(), + StageBuilder(5, 4).failures(2, 0, 0).status(StageStatus.FAILED, Some("array issues")).create()) + val properties = Map( "spark.sql.shuffle.partitions" -> "200") + val data = createSparkApplicationData(stages, Seq.empty, Some(properties)) + + val expectedAnalysis = Seq( + StageAnalysisBuilder(1, 3).create(), + StageAnalysisBuilder(2, 5) + .taskFailures(Severity.CRITICAL, Severity.CRITICAL, Severity.NONE, 8, 2, 2, 0, + Seq("Stage 2 has 2 failed tasks.", + "Stage 2 has 2 tasks that failed because of OutOfMemory exception.")) + .create(), + StageAnalysisBuilder(3, 15) + .taskFailures(Severity.MODERATE, Severity.NONE, Severity.LOW, 4, 2, 0, 1, + Seq("Stage 3 has 2 failed tasks.", + "Stage 3 has 1 tasks that failed because the container was killed by YARN for exeeding memory limits.")) + .create(), + StageAnalysisBuilder(4, 15) + .taskFailures(Severity.CRITICAL, Severity.LOW, Severity.MODERATE, 12, 3, 1, 2, + Seq("Stage 4 has 3 failed tasks.", + "Stage 4 has 1 tasks that failed because of OutOfMemory exception.", + "Stage 4 has 2 tasks that failed because the container was killed by YARN for exeeding memory limits.")) + .create(), + StageAnalysisBuilder(5, 4) + .taskFailures(Severity.CRITICAL, Severity.NONE, Severity.NONE, 8, 2, 0, 0, + Seq("Stage 5 has 2 failed tasks.")) + .stageFailure(Severity.CRITICAL, 16, Seq("Stage 5 failed: array issues")) + .create()) + + val stageAnalyzer = new StagesAnalyzer(heuristicConfigurationData, data) + val stageAnalysis = stageAnalyzer.getStageAnalysis() + (0 until expectedAnalysis.size).foreach { i => + compareStageAnalysis(stageAnalysis(i), expectedAnalysis(i)) + } + } + + it("has task skew severity") { + val heuristicConfigurationData = createHeuristicConfigurationData() + val stages = Seq( + StageBuilder(1, 5).taskRuntime(200, 250, 600).create(), + StageBuilder(2, 5).taskRuntime(100, 250, 260).input(5, 250, 260).create(), + StageBuilder(3, 5).taskRuntime(20, 250, 53).create(), + StageBuilder(4, 5).taskRuntime(5, 250, 260).input(5, 250, 260).create(), + StageBuilder(5, 5).taskRuntime(50, 250, 350).shuffleRead(50, 250, 350).shuffleWrite(50, 250, 400).create(), + StageBuilder(6, 5).taskRuntime(50, 250, 350).shuffleRead(50, 50, 50).output(50, 50, 50).create(), + StageBuilder(7, 5).taskRuntime(20, 250, 290).shuffleWrite(250, 250, 600).output(20, 250, 290).create(), + StageBuilder(8, 3).taskRuntime(200, 250, 1000).create(), + StageBuilder(9, 3).taskRuntime(5, 250, 70).create(), + StageBuilder(10, 3).taskRuntime(20, 250, 300).input(20, 250, 300).create(), + StageBuilder(11, 3).taskRuntime(50, 250, 350).shuffleRead(50, 250, 350).create(), + StageBuilder(12, 5).taskRuntime(2, 50, 53).times("09/09/2018 12:00:00", "09/09/2018 12:01:00").create(), + StageBuilder(13, 5).taskRuntime(5, 50, 60).input(50, 500, 600).create(), + StageBuilder(14, 5).taskRuntime(5, 200, 210).output(5, 200, 210).create()) + val properties = Map( "spark.sql.shuffle.partitions" -> "5") + val data = createSparkApplicationData(stages, Seq.empty, Some(properties)) + + val expectedAnalysis = Seq( + StageAnalysisBuilder(1, 5).taskRuntime(200, 250) + .longTask(Severity.LOW, 0, Seq()).create(), + StageAnalysisBuilder(2, 5).taskRuntime(100, 250).input(260) + .skew(Severity.LOW, Severity.LOW, 0, + Seq()).create(), + StageAnalysisBuilder(3, 5).taskRuntime(20, 250) + .skew(Severity.SEVERE, Severity.SEVERE, 15, + Seq("Stage 3 has skew in task run time (median is 20.00 sec, max is 4.17 min).", + "Stage 3: please try to modify the application to make the partitions more even.")).create(), + StageAnalysisBuilder(4, 5).taskRuntime(5, 250).input(260) + .skew(Severity.CRITICAL, Severity.CRITICAL, 20, + Seq("Stage 4 has skew in task run time (median is 5.00 sec, max is 4.17 min).", + "Stage 4 has skew in task input bytes (median is 5 MB, max is 250 MB).", + "Stage 4: please try to modify the application to make the input partitions more even.")).create(), + StageAnalysisBuilder(5, 5).taskRuntime(50, 250).shuffleRead(350).shuffleWrite(400) + .skew(Severity.MODERATE, Severity.MODERATE, 10, + Seq("Stage 5 has skew in task run time (median is 50.00 sec, max is 4.17 min).", + "Stage 5 has skew in task shuffle read bytes (median is 50 MB, max is 250 MB).", + "Stage 5 has skew in task shuffle write bytes (median is 50 MB, max is 250 MB).", + "Stage 5: please try to modify the application to make the partitions more even.")).create(), + StageAnalysisBuilder(6, 5).taskRuntime(50, 250).shuffleRead(50).output(50) + .skew(Severity.MODERATE, Severity.MODERATE, 10, + Seq( "Stage 6 has skew in task run time (median is 50.00 sec, max is 4.17 min).", + "Stage 6: please try to modify the application to make the partitions more even.")).create(), + StageAnalysisBuilder(7, 5).taskRuntime(20, 250).shuffleWrite(600).output(290) + .skew(Severity.SEVERE, Severity.SEVERE, 15, + Seq("Stage 7 has skew in task run time (median is 20.00 sec, max is 4.17 min).", + "Stage 7 has skew in task output bytes (median is 20 MB, max is 250 MB).", + "Stage 7: please try to modify the application to make the partitions more even.")).create(), + StageAnalysisBuilder(8, 3).taskRuntime(200, 250) + .longTask(Severity.LOW, 0, Seq()).create(), + StageAnalysisBuilder(9, 3).taskRuntime(5, 250) + .skew(Severity.CRITICAL, Severity.CRITICAL, 12, + Seq("Stage 9 has skew in task run time (median is 5.00 sec, max is 4.17 min).", + "Stage 9: please try to modify the application to make the partitions more even.")).create(), + StageAnalysisBuilder(10, 3).taskRuntime(20, 250).input(300) + .skew(Severity.SEVERE, Severity.SEVERE, 9, + Seq("Stage 10 has skew in task run time (median is 20.00 sec, max is 4.17 min).", + "Stage 10 has skew in task input bytes (median is 20 MB, max is 250 MB).", + "Stage 10: please try to modify the application to make the input partitions more even.")).create(), + StageAnalysisBuilder(11, 3).taskRuntime(50, 250).shuffleRead(350) + .skew(Severity.MODERATE, Severity.MODERATE, 6, + Seq("Stage 11 has skew in task run time (median is 50.00 sec, max is 4.17 min).", + "Stage 11 has skew in task shuffle read bytes (median is 50 MB, max is 250 MB).", + "Stage 11: please try to modify the application to make the partitions more even.")).create(), + StageAnalysisBuilder(12, 5).taskRuntime(2, 50).duration(60) + .skew(Severity.CRITICAL, Severity.NONE, 0, + Seq()).create(), + StageAnalysisBuilder(13, 5).taskRuntime(5, 50).input(600) + .skew(Severity.SEVERE, Severity.NONE, 0, + Seq()).create(), + StageAnalysisBuilder(14, 5).taskRuntime(5, 200).output(210) + .skew(Severity.CRITICAL, Severity.NONE, 0, + Seq()).create()) + + val stageAnalyzer = new StagesAnalyzer(heuristicConfigurationData, data) + val stageAnalysis = stageAnalyzer.getStageAnalysis() + (0 until expectedAnalysis.size).foreach { i => + compareStageAnalysis(stageAnalysis(i), expectedAnalysis(i)) + } + } + + it("has long task severity") { + val heuristicConfigurationData = createHeuristicConfigurationData() + val stages = Seq( + StageBuilder(1, 3).taskRuntime(120, 150, 300).create(), + StageBuilder(2, 3).taskRuntime(180, 200, 400).create(), + StageBuilder(3, 3).taskRuntime(400, 500, 1000).create(), + StageBuilder(4, 3).taskRuntime(700, 900, 2000).create(), + StageBuilder(5, 3).taskRuntime(1200, 1500, 4000).create(), + StageBuilder(6, 3).taskRuntime(700, 3500, 4500).create(), + StageBuilder(7, 2).taskRuntime(700, 900, 2000).create(), + StageBuilder(8, 3).taskRuntime(3000, 3000, 9000).input(2 << 20, 3 << 20, 5 << 20).create(), + StageBuilder(9, 4003).taskRuntime(3000, 3000, 9000).shuffleRead(2 << 20, 3 << 20, 5 << 20).create(), + StageBuilder(10, 4000).taskRuntime(700, 900, 2000).create()) + val properties = Map( "spark.sql.shuffle.partitions" -> "3") + val data = createSparkApplicationData(stages, Seq.empty, Some(properties)) + + val expectedAnalysis = Seq( + StageAnalysisBuilder(1, 3).taskRuntime(120, 150).create(), + StageAnalysisBuilder(2, 3).taskRuntime(180, 200).longTask(Severity.LOW, 0, Seq()).create(), + StageAnalysisBuilder(3, 3).taskRuntime(400, 500).longTask(Severity.MODERATE, 6, + Seq("Stage 3 has a long median task run time of 6.67 min.", + "Stage 3 has 3 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.")).create(), + StageAnalysisBuilder(4, 3).taskRuntime(700, 900).longTask(Severity.SEVERE, 9, + Seq("Stage 4 has a long median task run time of 11.67 min.", + "Stage 4 has 3 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.")).create(), + StageAnalysisBuilder(5, 3).taskRuntime(1200, 1500).longTask(Severity.CRITICAL, 12, + Seq("Stage 5 has a long median task run time of 20.00 min.", + "Stage 5 has 3 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.")).create(), + StageAnalysisBuilder(6, 3).taskRuntime(700, 3500).longTask(Severity.SEVERE, 9, + Seq("Stage 6 has a long median task run time of 11.67 min.", + "Stage 6 has 3 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.")) + .skew(Severity.MODERATE, Severity.MODERATE, 6, + Seq("Stage 6 has skew in task run time (median is 11.67 min, max is 58.33 min).", + "Stage 6: please try to modify the application to make the partitions more even.")).create(), + StageAnalysisBuilder(7, 2).taskRuntime(700, 900).longTask(Severity.SEVERE, 6, + Seq("Stage 7 has a long median task run time of 11.67 min.", + "Stage 7 has 2 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 7: please increase the number of partitions.")).create(), + StageAnalysisBuilder(8, 3).taskRuntime(3000, 3000).longTask(Severity.CRITICAL, 12, + Seq("Stage 8 has a long median task run time of 50.00 min.", + "Stage 8 has 3 tasks, 5 TB input, 0 B shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 8: please increase the number of partitions for reading data.")) + .input(5 << 20).create(), + StageAnalysisBuilder(9, 4003).taskRuntime(3000, 3000).longTask(Severity.CRITICAL, 16012, + Seq("Stage 9 has a long median task run time of 50.00 min.", + "Stage 9 has 4003 tasks, 0 B input, 5 TB shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 9: please try to reduce the amount of data being processed.")) + .shuffleRead(5 << 20).create(), + StageAnalysisBuilder(10, 4000).taskRuntime(700, 900).longTask(Severity.SEVERE, 12000, + Seq("Stage 10 has a long median task run time of 11.67 min.", + "Stage 10 has 4000 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 10: please optimize the code to improve performance.")).create()) + + val stageAnalyzer = new StagesAnalyzer(heuristicConfigurationData, data) + val stageAnalysis = stageAnalyzer.getStageAnalysis() + (0 until expectedAnalysis.size).foreach { i => + compareStageAnalysis(stageAnalysis(i), expectedAnalysis(i)) + } + } + + it("has execution memory spill severity") { + val heuristicConfigurationData = createHeuristicConfigurationData() + val stages = Seq( + StageBuilder(1, 5).taskRuntime(100, 150, 400).shuffleRead(200, 300, 800) + .spill(1, 2, 5).create(), + StageBuilder(2, 5).taskRuntime(100, 150, 400).shuffleRead(200, 300, 800) + .spill(10, 15, 40).create(), + StageBuilder(3, 5).taskRuntime(100, 150, 400).input(500, 2000, 3000) + .spill(100, 150, 400).create(), + StageBuilder(4, 5).taskRuntime(300, 350, 1500).shuffleWrite(1000, 1000, 5000) + .spill(300, 350, 1500).create(), + StageBuilder(5, 5).taskRuntime(300, 2500, 3000).shuffleRead(1000, 5000, 16000) + .shuffleWrite(300, 2500, 3000).spill(300, 2500, 3000).create(), + StageBuilder(6, 3).taskRuntime(50, 250, 350).input(50, 250, 350) + .spill(250, 250, 750).create(), + StageBuilder(7, 3).taskRuntime(50, 250, 350).output(250, 1000, 1500) + .spill(250, 250, 750).create(), + StageBuilder(8, 5).taskRuntime(2, 50, 53) + .times("09/09/2018 12:00:00", "09/09/2018 12:01:00") + .shuffleRead(500, 500, 1500).spill(250, 250, 750).create(), + StageBuilder(9, 5).taskRuntime(50, 250, 350).output(50, 250, 6L << 20) + .spill(50, 250, 2L << 20).create(), + StageBuilder(10, 5).taskRuntime(50, 250, 350).input(50, 250, 6L << 20) + .spill(50, 250, 2L << 20).create(), + StageBuilder(11, 3).taskRuntime(50, 250, 350).input(50, 250, 6L << 20) + .spill(50, 250, 3L << 20).create(), + StageBuilder(12, 3).taskRuntime(50, 250, 350).output(50, 250, 6L << 20) + .spill(50, 250, 4L << 20).create()) + val properties = Map( "spark.sql.shuffle.partitions" -> "5") + val data = createSparkApplicationData(stages, Seq.empty, Some(properties)) + + val expectedAnalysis = Seq( + StageAnalysisBuilder(1, 5).taskRuntime(100, 150).shuffleRead(800) + .spill(Severity.NONE, Severity.NONE, 0, 2, 5, Seq()).create(), + StageAnalysisBuilder(2, 5).taskRuntime(100, 150).shuffleRead(800) + .spill(Severity.LOW, Severity.LOW, 0, 15, 40, Seq()).create(), + StageAnalysisBuilder(3, 5).taskRuntime(100, 150).input(3000) + .spill(Severity.MODERATE, Severity.MODERATE, 10, 150, 400, + Seq("Stage 3 has 400 MB execution memory spill.")) + .skew(Severity.NONE, Severity.NONE, 0, + Seq("Stage 3 has skew in task input bytes (median is 500 MB, max is 1.95 GB).", + "Stage 3: please try to modify the application to make the input partitions more even.")) + .create(), + StageAnalysisBuilder(4, 5).taskRuntime(300, 350).shuffleWrite(5000) + .longTask(Severity.MODERATE, 10, + Seq("Stage 4 has a long median task run time of 5.00 min.", + "Stage 4 has 5 tasks, 0 B input, 0 B shuffle read, 4.88 GB shuffle write, and 0 B output.")) + .spill(Severity.SEVERE, Severity.SEVERE, 15, 350, 1500, + Seq("Stage 4 has 1.46 GB execution memory spill.")).create(), + StageAnalysisBuilder(5, 5).taskRuntime(300, 2500).shuffleRead(16000).shuffleWrite(3000) + .longTask(Severity.MODERATE, 10, Seq("Stage 5 has a long median task run time of 5.00 min.", + "Stage 5 has 5 tasks, 0 B input, 15.62 GB shuffle read, 2.93 GB shuffle write, and 0 B output.")) + .skew(Severity.SEVERE, Severity.SEVERE, 15, + Seq("Stage 5 has skew in task run time (median is 5.00 min, max is 41.67 min).", + "Stage 5 has skew in memory bytes spilled (median is 300 MB, max is 2.44 GB).", + "Stage 5 has skew in task shuffle read bytes (median is 1,000 MB, max is 4.88 GB).", + "Stage 5 has skew in task shuffle write bytes (median is 300 MB, max is 2.44 GB).", + "Stage 5: please try to modify the application to make the partitions more even.")) + .spill(Severity.MODERATE, Severity.MODERATE, 10, 2500, 3000 + , Seq("Stage 5 has 2.93 GB execution memory spill.")).create(), + StageAnalysisBuilder(6, 3).taskRuntime(50, 250).input(350) + .skew(Severity.MODERATE, Severity.MODERATE, 6, + Seq("Stage 6 has skew in task run time (median is 50.00 sec, max is 4.17 min).", + "Stage 6 has skew in task input bytes (median is 50 MB, max is 250 MB).", + "Stage 6: please try to modify the application to make the input partitions more even.")) + .spill(Severity.CRITICAL, Severity.CRITICAL, 12, 250, 750, + Seq("Stage 6 has 750 MB execution memory spill.")).create(), + StageAnalysisBuilder(7, 3).taskRuntime(50, 250).output(1500) + .skew(Severity.MODERATE, Severity.MODERATE, 6, + Seq("Stage 7 has skew in task run time (median is 50.00 sec, max is 4.17 min).", + "Stage 7 has skew in task output bytes (median is 250 MB, max is 1,000 MB).", + "Stage 7: please try to modify the application to make the partitions more even.")) + .spill(Severity.CRITICAL, Severity.CRITICAL, 12, 250, 750, + Seq("Stage 7 has 750 MB execution memory spill.")).create(), + StageAnalysisBuilder(8, 5).taskRuntime(2, 50).duration(60).shuffleRead(1500) + .skew(Severity.CRITICAL, Severity.NONE, 0, + Seq("Stage 8: please try to modify the application to make the partitions more even.")) + .spill(Severity.CRITICAL, Severity.CRITICAL, 20, 250, 750, + Seq("Stage 8 has 750 MB execution memory spill.")).create(), + StageAnalysisBuilder(9, 5).taskRuntime(50, 250).output(6L << 20) + .skew(Severity.MODERATE, Severity.MODERATE, 10, + Seq("Stage 9 has skew in task run time (median is 50.00 sec, max is 4.17 min).", + "Stage 9 has skew in memory bytes spilled (median is 50 MB, max is 250 MB).", + "Stage 9 has skew in task output bytes (median is 50 MB, max is 250 MB).", + "Stage 9: please try to modify the application to make the partitions more even.") + ) + .spill(Severity.SEVERE, Severity.NONE, 0, 250, 2L << 20, + Seq("Stage 9: a large amount of data is being processesd. Examine the application to see if this can be reduced.", + "Stage 9 has 2 TB execution memory spill.", + "Stage 9 has 5 tasks, 0 B input read, 0 B shuffle read, 0 B shuffle write, 6 TB output.", + "Stage 9 has median task values: 50 MB memory spill, 0 B input, 0 B shuffle read, 0 B shuffle write, 50 MB output.")) + .create(), + StageAnalysisBuilder(10, 5).taskRuntime(50, 250).input(6 << 20) + .skew(Severity.MODERATE, Severity.MODERATE, 10, + Seq("Stage 10 has skew in task run time (median is 50.00 sec, max is 4.17 min).", + "Stage 10 has skew in memory bytes spilled (median is 50 MB, max is 250 MB).", + "Stage 10 has skew in task input bytes (median is 50 MB, max is 250 MB).", + "Stage 10: please try to modify the application to make the input partitions more even.")) + .spill(Severity.SEVERE, Severity.NONE, 0, 250, 2L << 20, + Seq("Stage 10: a large amount of data is being processesd. Examine the application to see if this can be reduced.", + "Stage 10 has 2 TB execution memory spill.", + "Stage 10 has 5 tasks, 6 TB input read, 0 B shuffle read, 0 B shuffle write, 0 B output.", + "Stage 10 has median task values: 50 MB memory spill, 50 MB input, 0 B shuffle read, 0 B shuffle write, 0 B output.")) + .create(), + StageAnalysisBuilder(11, 3).taskRuntime(50, 250).input(6 << 20) + .skew(Severity.MODERATE, Severity.MODERATE, 6, + Seq("Stage 11 has skew in task run time (median is 50.00 sec, max is 4.17 min).", + "Stage 11 has skew in memory bytes spilled (median is 50 MB, max is 250 MB).", + "Stage 11 has skew in task input bytes (median is 50 MB, max is 250 MB).", + "Stage 11: please try to modify the application to make the input partitions more even.")) + .spill(Severity.CRITICAL, Severity.NONE, 0, 250, 3L << 20, + Seq("Stage 11: a large amount of data is being processesd. Examine the application to see if this can be reduced.", + "Stage 11 has 3 TB execution memory spill.", + "Stage 11 has 3 tasks, 6 TB input read, 0 B shuffle read, 0 B shuffle write, 0 B output.", + "Stage 11 has median task values: 50 MB memory spill, 50 MB input, 0 B shuffle read, 0 B shuffle write, 0 B output.")) + .create(), + StageAnalysisBuilder(12, 3).taskRuntime(50, 250).output(6L << 20) + .skew(Severity.MODERATE, Severity.MODERATE, 6, + Seq("Stage 12 has skew in task run time (median is 50.00 sec, max is 4.17 min).", + "Stage 12 has skew in memory bytes spilled (median is 50 MB, max is 250 MB).", + "Stage 12 has skew in task output bytes (median is 50 MB, max is 250 MB).", + "Stage 12: please try to modify the application to make the partitions more even.")) + .spill(Severity.CRITICAL, Severity.NONE, 0, 250, 4L << 20, + Seq("Stage 12: a large amount of data is being processesd. Examine the application to see if this can be reduced.", + "Stage 12 has 4 TB execution memory spill.", + "Stage 12 has 3 tasks, 0 B input read, 0 B shuffle read, 0 B shuffle write, 6 TB output.", + "Stage 12 has median task values: 50 MB memory spill, 0 B input, 0 B shuffle read, 0 B shuffle write, 50 MB output.")) + .create()) + + val stageAnalyzer = new StagesAnalyzer(heuristicConfigurationData, data) + val stageAnalysis = stageAnalyzer.getStageAnalysis() + (0 until expectedAnalysis.size).foreach { i => + compareStageAnalysis(stageAnalysis(i), expectedAnalysis(i)) + } + } + } + + it("custom recommendations") { + val heuristicConfigurationData = createHeuristicConfigurationData( + Map("execution_memory_spill_large_data_recommendation" -> "please try to filter the data to reduce the size", + "task_skew_input_data_recommendation" -> "please set DaliSpark.SPLIT_SIZE to make partitions more even", + "task_skew_generic_recommendation" -> "please make the partitions more even", + "long_tasks_large_data_recommendation" -> "please try to filter the data to reduce the size and increase speed", + "slow_tasks_recommendation" -> "optimize the code to increase speed", + "long tasks_few_partitions" -> "increase the number of partitions to speed up the stage", + "long tasks_few_input_partitions" -> "please set DaliSpark.SPLIT_SIZE to make partitions more even")) + val stages = Seq( + StageBuilder(1, 4003).taskRuntime(3000, 3000, 9000).shuffleRead(2 << 20, 3 << 20, 5 << 20).create(), + StageBuilder(2, 4000).taskRuntime(700, 900, 2000).create(), + StageBuilder(3, 2).taskRuntime(700, 900, 2000).create(), + StageBuilder(4, 3).taskRuntime(3000, 3000, 9000).input(2 << 20, 3 << 20, 5 << 20).create(), + StageBuilder(5, 3).taskRuntime(5, 250, 70).create(), + StageBuilder(6, 3).taskRuntime(20, 250, 300).input(20, 250, 300).create(), + StageBuilder(9, 5).taskRuntime(50, 50, 350).output(250, 250, 6L << 20) + .spill(250, 250, 2L << 20).create()) + val properties = Map( "spark.sql.shuffle.partitions" -> "3") + val data = createSparkApplicationData(stages, Seq.empty, Some(properties)) + + val expectedAnalysis = Seq( + StageAnalysisBuilder(1, 4003).taskRuntime(3000, 3000).longTask(Severity.CRITICAL, 16012, + Seq("Stage 1 has a long median task run time of 50.00 min.", + "Stage 1 has 4003 tasks, 0 B input, 5 TB shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 1: please try to filter the data to reduce the size and increase speed.")) + .shuffleRead(5 << 20).create(), + StageAnalysisBuilder(2, 4000).taskRuntime(700, 900).longTask(Severity.SEVERE, 12000, + Seq("Stage 2 has a long median task run time of 11.67 min.", + "Stage 2 has 4000 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 2: optimize the code to increase speed.")).create(), + StageAnalysisBuilder(3, 2).taskRuntime(700, 900).longTask(Severity.SEVERE, 6, + Seq("Stage 3 has a long median task run time of 11.67 min.", + "Stage 3 has 2 tasks, 0 B input, 0 B shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 3: increase the number of partitions to speed up the stage.")).create(), + StageAnalysisBuilder(4, 3).taskRuntime(3000, 3000).longTask(Severity.CRITICAL, 12, + Seq("Stage 4 has a long median task run time of 50.00 min.", + "Stage 4 has 3 tasks, 5 TB input, 0 B shuffle read, 0 B shuffle write, and 0 B output.", + "Stage 4: please set DaliSpark.SPLIT_SIZE to make partitions more even.")) + .input(5 << 20).create(), + StageAnalysisBuilder(5, 3).taskRuntime(5, 250) + .skew(Severity.CRITICAL, Severity.CRITICAL, 12, + Seq("Stage 5 has skew in task run time (median is 5.00 sec, max is 4.17 min).", + "Stage 5: please make the partitions more even.") + ).create(), + StageAnalysisBuilder(6, 3).taskRuntime(20, 250).input(300) + .skew(Severity.SEVERE, Severity.SEVERE, 9, + Seq("Stage 6 has skew in task run time (median is 20.00 sec, max is 4.17 min).", + "Stage 6 has skew in task input bytes (median is 20 MB, max is 250 MB).", + "Stage 6: please set DaliSpark.SPLIT_SIZE to make partitions more even.")).create(), + StageAnalysisBuilder(7, 5).taskRuntime(50, 50).output(6L << 20) + .spill(Severity.SEVERE, Severity.NONE, 0, 250, 2L << 20, + Seq("Stage 9: please try to filter the data to reduce the size.", + "Stage 9 has 2 TB execution memory spill.", + "Stage 9 has 5 tasks, 0 B input read, 0 B shuffle read, 0 B shuffle write, 6 TB output.", + "Stage 9 has median task values: 250 MB memory spill, 0 B input, 0 B shuffle read, 0 B shuffle write, 250 MB output.")) + .create()) + + val stageAnalyzer = new StagesAnalyzer(heuristicConfigurationData, data) + val stageAnalysis = stageAnalyzer.getStageAnalysis() + (0 until expectedAnalysis.size).foreach { i => + compareStageAnalysis(stageAnalysis(i), expectedAnalysis(i)) + } + } + + /** compare actual and expected StageAnalysis */ + private def compareStageAnalysis(actual: StageAnalysis, expected: StageAnalysis): Unit = { + compareExecutionMemorySpillResult(actual.executionMemorySpillResult, expected.executionMemorySpillResult) + compareSimpleStageAnalysisResult(actual.longTaskResult, expected.longTaskResult) + compareTaskSkewResult(actual.taskSkewResult, expected.taskSkewResult) + compareTaskFailureResult(actual.taskFailureResult, expected.taskFailureResult) + compareSimpleStageAnalysisResult(actual.stageFailureResult, expected.stageFailureResult) + compareSimpleStageAnalysisResult(actual.stageGCResult, expected.stageGCResult) + actual.numTasks should be (expected.numTasks) + actual.medianRunTime should be (expected.medianRunTime) + actual.maxRunTime should be (expected.maxRunTime) + actual.stageDuration should be (expected.stageDuration) + actual.inputBytes should be(expected.inputBytes) + actual.outputBytes should be(expected.outputBytes) + actual.shuffleReadBytes should be(expected.shuffleReadBytes) + actual.shuffleWriteBytes should be(expected.shuffleWriteBytes) + } + + /** compare actual and expected ExecutionMemorySpillResult */ + private def compareExecutionMemorySpillResult( + actual: ExecutionMemorySpillResult, + expected: ExecutionMemorySpillResult) = { + actual.severity should be(expected.severity) + actual.rawSeverity should be(expected.rawSeverity) + actual.score should be(expected.score) + actual.memoryBytesSpilled should be(expected.memoryBytesSpilled) + actual.maxTaskBytesSpilled should be(expected.maxTaskBytesSpilled) + actual.details should be(expected.details) + } + + /** compare actual and expected SimpleStageAnalysisResult */ + private def compareSimpleStageAnalysisResult( + actual: SimpleStageAnalysisResult, + expected: SimpleStageAnalysisResult) = { + actual.severity should be(expected.severity) + actual.score should be(expected.score) + actual.details should be(expected.details) + } + + /** compare actual and expected TaskSkewResult */ + private def compareTaskSkewResult( + actual: TaskSkewResult, + expected: TaskSkewResult) = { + actual.severity should be(expected.severity) + actual.rawSeverity should be(expected.rawSeverity) + actual.score should be(expected.score) + actual.details should be(expected.details) + } + + /** compare actual and expected TaskFailureResult */ + private def compareTaskFailureResult( + actual: TaskFailureResult, + expected: TaskFailureResult) = { + actual.severity should be(expected.severity) + actual.oomSeverity should be(expected.oomSeverity) + actual.containerKilledSeverity should be(expected.containerKilledSeverity) + actual.score should be(expected.score) + actual.numFailures should be(expected.numFailures) + actual.numOOM should be(expected.numOOM) + actual.numContainerKilled should be (expected.numContainerKilled) + actual.details should be(expected.details) + } +} diff --git a/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala index e6aae4fe1..cd1acc3d0 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala @@ -133,10 +133,16 @@ object StagesHeuristicTest { status, stageId, attemptId = 0, + numTasks = numCompleteTasks + numFailedTasks, numActiveTasks = numCompleteTasks + numFailedTasks, numCompleteTasks, numFailedTasks, executorRunTime, + executorCpuTime = 0, + submissionTime = None, + firstTaskLaunchedTime = None, + completionTime = None, + failureReason = None, inputBytes = 0, inputRecords = 0, outputBytes = 0, @@ -152,7 +158,13 @@ object StagesHeuristicTest { schedulingPool = "", accumulatorUpdates = Seq.empty, tasks = None, - executorSummary = None + executorSummary = None, + peakJvmUsedMemory = None, + peakExecutionMemory = None, + peakStorageMemory = None, + peakUnifiedMemory = None, + taskSummary = None, + executorMetricsSummary = None ) def newFakeSparkApplicationData( diff --git a/test/com/linkedin/drelephant/spark/heuristics/StagesWithFailedTasksHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/StagesWithFailedTasksHeuristicTest.scala index cdfdc11ea..7a6191395 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/StagesWithFailedTasksHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/StagesWithFailedTasksHeuristicTest.scala @@ -97,11 +97,18 @@ object StagesWithFailedTasksHeuristicTest { status, stageId, attemptId = 0, + numTasks = 0, numActiveTasks = numCompleteTasks, numCompleteTasks, numFailedTasks = 3, executorRunTime = 0, - inputBytes = 0, + executorCpuTime = 0, + submissionTime = None, + firstTaskLaunchedTime = None, + completionTime = None, + failureReason = None, + + inputBytes = 0, inputRecords = 0, outputBytes = 0, outputRecords = 0, @@ -150,7 +157,13 @@ object StagesWithFailedTasksHeuristicTest { errorMessage = None, taskMetrics = None) )), - executorSummary = None + executorSummary = None, + peakJvmUsedMemory = None, + peakExecutionMemory = None, + peakStorageMemory = None, + peakUnifiedMemory = None, + taskSummary = None, + executorMetricsSummary = None ) def newFakeSparkApplicationData diff --git a/test/com/linkedin/drelephant/util/UtilsTest.java b/test/com/linkedin/drelephant/util/UtilsTest.java index 69f5509c4..8cef6c2ea 100644 --- a/test/com/linkedin/drelephant/util/UtilsTest.java +++ b/test/com/linkedin/drelephant/util/UtilsTest.java @@ -197,6 +197,17 @@ public void testGetDurationBreakdown() { assertEquals("0:05:24", Utils.getDurationBreakdown(durations[4])); assertEquals("314483:43:34", Utils.getDurationBreakdown(durations[5])); } + + @Test + public void testGetDuration() { + long []durations = { 153, 25431, 432344, 23423562, 178123456L}; + assertEquals("153 ms", Utils.getDuration(durations[0])); + assertEquals("25.43 sec", Utils.getDuration(durations[1])); + assertEquals("7.21 min", Utils.getDuration(durations[2])); + assertEquals("6.51 hr", Utils.getDuration(durations[3])); + assertEquals("2.06 days", Utils.getDuration(durations[4])); + } + @Test public void testGetPercentage() { long []numerators = {10,20,30,40,50};