Skip to content

Commit

Permalink
code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
edwinalu committed Oct 24, 2018
1 parent 7966c61 commit 135bb05
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,27 @@ object ConfigurationUtils {
val TASK_SKEW_TASK_DURATION_MIN_THRESHOLD_KEY = "task_skew_task_duration_threshold"
val MAX_RECOMMENDED_PARTITIONS_KEY = "max_recommended_partitions"


// Severity hresholds for task duration in minutes, when checking to see if the median task
// keys for finding specific recommendations
val EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION_KEY = "execution_memory_spill_large_data_recommendation"
val TASK_SKEW_INPUT_DATA_RECOMMENDATION_KEY = "task_skew_input_data_recommendation"
val TASK_SKEW_GENERIC_RECOMMENDATION_KEY = "task_skew_generic_recommendation"
val LONG_TASKS_LARGE_DATA_RECOMMENDATION_KEY = "long_tasks_large_data_recommendation"
val SLOW_TASKS_RECOMMENDATION_KEY = "slow_tasks_recommendation"
val LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION_KEY = "long tasks_few_partitions"
val LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION_KEY = "long tasks_few_input_partitions"

// default recommendations
val DEFAULT_EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION = "a large amount of data is being processesd. " +
"Examine the application to see if this can be reduced"
val DEFAULT_TASK_SKEW_INPUT_DATA_RECOMMENDATION = "please try to modify the application to make the input partitions more even"
val DEFAULT_TASK_SKEW_GENERIC_RECOMMENDATION = "please try to modify the application to make the partitions more even"
val DEFAULT_LONG_TASKS_LARGE_DATA_RECOMMENDATION = "please try to reduce the amount of data being processed"
val DEFAULT_SLOW_TASKS_RECOMMENDATION = "please optimize the code to improve performance"
val DEFAULT_LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION = "please increase the number of partitions"
val DEFAULT_LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION = "please increase the number of partitions for reading data"


// Severity thresholds for task duration in minutes, when checking to see if the median task
// run time is too long for a stage.
val DEFAULT_TASK_DURATION_THRESHOLDS =
SeverityThresholds(low = 2.5D * MILLIS_PER_MIN, moderate = 5.0D * MILLIS_PER_MIN,
Expand All @@ -82,6 +101,11 @@ object ConfigurationUtils {
// The default threshold (3TB) for checking for maximum amount of data processed, for which to
// alert for execution memory spill. Tasks processing more data would be expected to have some
// amount of spill, due to the large amount of data processed.
// Estimating the size based on some reasonable values for configuration parameters (and how
// much data could be kept in unified memory given these values):
// spark.executor.memory / spark.executor.cores * spark.memory.fraction *
// (1 - spark.memory.storageFraction) * spark.sql.shuffle.partitions
// = 5GB / 2 * 0.6 * (1 - 0.5) * 4000
val DEFAULT_MAX_DATA_PROCESSED_THRESHOLD = "3TB"

// The default threshold for the ratio of the time for longest running task for a stage to the
Expand Down
70 changes: 52 additions & 18 deletions app/com/linkedin/drelephant/spark/heuristics/StagesAnalyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,43 @@ private[heuristics] class StagesAnalyzer(
private val maxRecommendedPartitions = heuristicConfigurationData.getParamMap
.getOrDefault(MAX_RECOMMENDED_PARTITIONS_KEY, DEFAULT_MAX_RECOMMENDED_PARTITIONS).toInt

// recommendation to give if there is execution memory spill due to too much data being processed.
// Some amount of spill is expected in this, but alert the users so that they are aware that spill
// is happening.
private val executionMemorySpillRecommendation = heuristicConfigurationData.getParamMap
.getOrDefault(EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION_KEY,
DEFAULT_EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION)

// recommendation to give if task skew is detected, and input data is read for the stage.
private val taskSkewInputDataRecommendation = heuristicConfigurationData.getParamMap
.getOrDefault(TASK_SKEW_INPUT_DATA_RECOMMENDATION_KEY, DEFAULT_TASK_SKEW_INPUT_DATA_RECOMMENDATION)

// recommendation to give if task skew is detected, and there is no input data.
private val taskSkewGenericRecommendation = heuristicConfigurationData.getParamMap
.getOrDefault(TASK_SKEW_GENERIC_RECOMMENDATION_KEY, DEFAULT_TASK_SKEW_GENERIC_RECOMMENDATION)

// recommendation to give if there are long running tasks, and there is a lot of data being
// processed, and many partitions already. In this case, long running tasks may be expected, but
// alert the user, in case it is possible to filter out some data.
private val longTasksLargeDataRecommenation = heuristicConfigurationData.getParamMap
.getOrDefault(LONG_TASKS_LARGE_DATA_RECOMMENDATION_KEY, DEFAULT_LONG_TASKS_LARGE_DATA_RECOMMENDATION)

// recommendation to give if there are long running tasks, a reasonable number of partitions,
// and not too much data processed. In this case, the tasks are slow.
private val slowTasksRecommendation = heuristicConfigurationData.getParamMap
.getOrDefault(SLOW_TASKS_RECOMMENDATION_KEY, DEFAULT_SLOW_TASKS_RECOMMENDATION)

// recommendation to give if there are long running tasks and relatively few partitions.
private val longTasksFewPartitionsRecommendation = heuristicConfigurationData.getParamMap
.getOrDefault(LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION_KEY, DEFAULT_LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION)

// recommendation to give if there are long running tasks, input data is being read (and so
// controlling the number of tasks), and relatively few partitions.
private val longTasksFewInputPartitionsRecommendation = heuristicConfigurationData.getParamMap
.getOrDefault(LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION_KEY,
DEFAULT_LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION)


/** @return list of analysis results for all the stages. */
def getStageAnalysis(): Seq[StageAnalysis] = {
val appConfigurationProperties: Map[String, String] = data.appConfigurationProperties
Expand Down Expand Up @@ -158,8 +195,7 @@ private[heuristics] class StagesAnalyzer(
// don't flag execution memory spill if there is a lot of data being processed,
// since some spill may be unavoidable in this case.
if (hasSignificantSeverity(rawSpillSeverity)) {
details += s"Stage $stageId is processing a lot of data; examine the application to see " +
s"if this can be reduced."
details += s"Stage $stageId: ${executionMemorySpillRecommendation}."
}
Severity.NONE
}
Expand Down Expand Up @@ -246,7 +282,7 @@ private[heuristics] class StagesAnalyzer(
var inputSkewSeverity = Severity.NONE
if (hasSignificantSeverity(taskSkewSeverity)) {
details +=
s"Stage $stageId has skew in task run time (median is $medianStr, max is $maximumStr)"
s"Stage $stageId has skew in task run time (median is $medianStr, max is $maximumStr)."
}
stageData.taskSummary.foreach { summary =>
checkSkewedData(stageId, summary.memoryBytesSpilled(DISTRIBUTION_MEDIAN_IDX),
Expand All @@ -256,8 +292,7 @@ private[heuristics] class StagesAnalyzer(
input.bytesRead(DISTRIBUTION_MAX_IDX), "task input bytes", details)
if (hasSignificantSeverity(inputSkewSeverity)) {
// The stage is reading input data, try to adjust the amount of data to even the partitions
details += s"Stage $stageId: please set DaliSpark.SPLIT_SIZE to make " +
"partitions more even."
details += s"Stage $stageId: ${taskSkewInputDataRecommendation}."
}
}
summary.outputMetrics.foreach { output =>
Expand All @@ -274,8 +309,7 @@ private[heuristics] class StagesAnalyzer(
}
}
if (hasSignificantSeverity(rawSkewSeverity) && !hasSignificantSeverity(inputSkewSeverity)) {
details += s"Stage $stageId: please try to modify the application to make " +
"the partitions more even."
details += s"Stage $stageId: ${taskSkewGenericRecommendation}."
}
}
val score = Utils.getHeuristicScore(taskSkewSeverity, stageData.numTasks)
Expand Down Expand Up @@ -331,26 +365,26 @@ private[heuristics] class StagesAnalyzer(
val runTime = Utils.getDuration(medianTime.map(_.toLong).getOrElse(0L))
val maxData = Seq(stageData.inputBytes, stageData.shuffleReadBytes, stageData.shuffleWriteBytes,
stageData.outputBytes).max
details += s"Stage $stageId: median task run time is $runTime."
val inputBytes = MemoryFormatUtils.bytesToString(stageData.inputBytes)
val outputBytes = MemoryFormatUtils.bytesToString(stageData.outputBytes)
val shuffleReadBytes = MemoryFormatUtils.bytesToString(stageData.shuffleReadBytes)
val shuffleWriteBytes = MemoryFormatUtils.bytesToString(stageData.shuffleWriteBytes)
details += s"Stage $stageId has a long median task run time of $runTime."
details += s"Stage $stageId has ${stageData.numTasks} tasks, $inputBytes input," +
s" $shuffleReadBytes shuffle read, $shuffleWriteBytes shuffle write, and $outputBytes output."
if (stageData.numTasks >= maxRecommendedPartitions) {
if (maxData >= maxDataProcessedThreshold) {
val inputBytes = MemoryFormatUtils.bytesToString(stageData.inputBytes)
val shuffleReadBytes = MemoryFormatUtils.bytesToString(stageData.shuffleReadBytes)
val shuffleWriteBytes = MemoryFormatUtils.bytesToString(stageData.shuffleWriteBytes)
details += s"Stage $stageId has $inputBytes input, $shuffleReadBytes shuffle read, " +
"$shuffleWriteBytes shuffle write. Please try to reduce the amount of data being processed."
details += s"Stage $stageId: ${longTasksLargeDataRecommenation}."
} else {
details += s"Stage $stageId: please optimize the code to improve performance."
details += s"Stage $stageId: ${slowTasksRecommendation}."
}
}
else {
if (stageData.inputBytes > 0) {
// The stage is reading input data, try to increase the number of readers
details += s"Stage $stageId: please set DaliSpark.SPLIT_SIZE to a smaller " +
"value to increase the number of tasks reading input data for this stage."
details += s"Stage $stageId: ${longTasksFewInputPartitionsRecommendation}."
} else if (stageData.numTasks != curNumPartitions) {
details += s"Stage $stageId: please increase the number of partitions, which " +
s"is currently set to ${stageData.numTasks}."
details += s"Stage $stageId: ${longTasksFewPartitionsRecommendation}."
}
}
}
Expand Down
Loading

0 comments on commit 135bb05

Please sign in to comment.