Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial changes for adding Spark stage analysis #438

Open
wants to merge 8 commits into
base: customSHSWork
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,20 @@ class SparkRestClient(sparkConf: SparkConf) {
getLogData(attemptTarget)
}
} else Future.successful(None)
val futureFailedTasks = if (fetchFailedTasks) {
Future {
getStagesWithFailedTasks(attemptTarget)
}
} else {
Future.successful(Seq.empty)
}

SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, Duration(5, SECONDS)),
Seq.empty,
Await.result(futureFailedTasks, DEFAULT_TIMEOUT),
Await.result(futureLogData, Duration(5, SECONDS))
)

Expand Down Expand Up @@ -211,7 +218,7 @@ class SparkRestClient(sparkConf: SparkConf) {
}

private def getStageDatas(attemptTarget: WebTarget): Seq[StageDataImpl] = {
val target = attemptTarget.path("stages")
val target = attemptTarget.path("stages/withSummaries")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't find this api in spark documentation. Is it LinkedIn specific enhancements?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is LinkedIn specific enhancements. Should there be a flag/configuration parameter for adding "withSummaries"? It's needed for some of the heuristics (task skew and long tasks).

try {
get(target, SparkRestObjectMapper.readValue[Seq[StageDataImpl]])
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,17 @@ trait StageData{
def status: StageStatus
def stageId: Int
def attemptId: Int
def numTasks: Int
def numActiveTasks: Int
def numCompleteTasks: Int
def numFailedTasks: Int

def executorRunTime: Long
def executorCpuTime: Long
def submissionTime: Option[Date]
def firstTaskLaunchedTime: Option[Date]
def completionTime: Option[Date]
def failureReason: Option[String]

def inputBytes: Long
def inputRecords: Long
Expand All @@ -166,7 +172,14 @@ trait StageData{

def accumulatorUpdates: Seq[AccumulableInfo]
def tasks: Option[Map[Long, TaskDataImpl]]
def executorSummary: Option[Map[String, ExecutorStageSummary]]}
def executorSummary: Option[Map[String, ExecutorStageSummary]]

def peakJvmUsedMemory: Option[Long]
def peakExecutionMemory: Option[Long]
def peakStorageMemory: Option[Long]
def peakUnifiedMemory: Option[Long]
def taskSummary : Option[TaskMetricDistributions]
def executorMetricsSummary : Option[ExecutorMetricDistributions]}

trait TaskData{
def taskId: Long
Expand Down Expand Up @@ -219,10 +232,15 @@ trait TaskMetricDistributions{
def quantiles: IndexedSeq[Double]

def executorDeserializeTime: IndexedSeq[Double]
def executorDeserializeCpuTime: IndexedSeq[Double]
def executorRunTime: IndexedSeq[Double]
def executorCpuTime: IndexedSeq[Double]
def resultSize: IndexedSeq[Double]
def jvmGcTime: IndexedSeq[Double]
def resultSerializationTime: IndexedSeq[Double]
def gettingResultTime: IndexedSeq[Double]
def schedulerDelay: IndexedSeq[Double]
def peakExecutionMemory: IndexedSeq[Double]
def memoryBytesSpilled: IndexedSeq[Double]
def diskBytesSpilled: IndexedSeq[Double]

Expand All @@ -246,13 +264,33 @@ trait ShuffleReadMetricDistributions{
def localBlocksFetched: IndexedSeq[Double]
def fetchWaitTime: IndexedSeq[Double]
def remoteBytesRead: IndexedSeq[Double]
def remoteBytesReadToDisk: IndexedSeq[Double]
def totalBlocksFetched: IndexedSeq[Double]}

trait ShuffleWriteMetricDistributions{
def writeBytes: IndexedSeq[Double]
def writeRecords: IndexedSeq[Double]
def writeTime: IndexedSeq[Double]}

trait ExecutorMetricDistributions{
def quantiles: IndexedSeq[Double]
def numTasks: IndexedSeq[Double]
def inputBytes : IndexedSeq[Double]
def inputRecords : IndexedSeq[Double]
def outputBytes : IndexedSeq[Double]
def outputRecords : IndexedSeq[Double]
def shuffleRead : IndexedSeq[Double]
def shuffleReadRecords : IndexedSeq[Double]
def shuffleWrite : IndexedSeq[Double]
def shuffleWriteRecords : IndexedSeq[Double]
def memoryBytesSpilled : IndexedSeq[Double]
def diskBytesSpilled : IndexedSeq[Double]
def peakJvmUsedMemory : IndexedSeq[Double]
def peakExecutionMemory : IndexedSeq[Double]
def peakStorageMemory : IndexedSeq[Double]
def peakUnifiedMemory : IndexedSeq[Double]}


trait AccumulableInfo{
def id: Long
def name: String
Expand Down Expand Up @@ -353,11 +391,17 @@ class StageDataImpl(
var status: StageStatus,
var stageId: Int,
var attemptId: Int,
var numTasks: Int,
var numActiveTasks: Int ,
var numCompleteTasks: Int,
var numFailedTasks: Int,

var executorRunTime: Long,
var executorCpuTime: Long,
var submissionTime: Option[Date],
var firstTaskLaunchedTime: Option[Date],
var completionTime: Option[Date],
var failureReason: Option[String],

var inputBytes: Long,
var inputRecords: Long,
Expand All @@ -376,7 +420,13 @@ class StageDataImpl(

var accumulatorUpdates: Seq[AccumulableInfoImpl],
var tasks: Option[Map[Long, TaskDataImpl]],
var executorSummary: Option[Map[String, ExecutorStageSummaryImpl]]) extends StageData
var executorSummary: Option[Map[String, ExecutorStageSummaryImpl]],
var peakJvmUsedMemory: Option[Long],
var peakExecutionMemory: Option[Long],
var peakStorageMemory: Option[Long],
var peakUnifiedMemory: Option[Long],
var taskSummary : Option[TaskMetricDistributionsImpl],
var executorMetricsSummary : Option[ExecutorMetricDistributionsImpl]) extends StageData

class TaskDataImpl(
var taskId: Long,
Expand Down Expand Up @@ -427,12 +477,16 @@ class ShuffleWriteMetricsImpl(

class TaskMetricDistributionsImpl(
var quantiles: IndexedSeq[Double],

var executorDeserializeTime: IndexedSeq[Double],
var executorDeserializeCpuTime: IndexedSeq[Double],
var executorRunTime: IndexedSeq[Double],
var executorCpuTime: IndexedSeq[Double],
var resultSize: IndexedSeq[Double],
var jvmGcTime: IndexedSeq[Double],
var resultSerializationTime: IndexedSeq[Double],
var gettingResultTime: IndexedSeq[Double],
var schedulerDelay: IndexedSeq[Double],
var peakExecutionMemory: IndexedSeq[Double],
var memoryBytesSpilled: IndexedSeq[Double],
var diskBytesSpilled: IndexedSeq[Double],

Expand All @@ -456,6 +510,7 @@ class ShuffleReadMetricDistributionsImpl(
var localBlocksFetched: IndexedSeq[Double],
var fetchWaitTime: IndexedSeq[Double],
var remoteBytesRead: IndexedSeq[Double],
var remoteBytesReadToDisk: IndexedSeq[Double],
var totalBlocksFetched: IndexedSeq[Double]) extends ShuffleReadMetricDistributions

class ShuffleWriteMetricDistributionsImpl(
Expand All @@ -468,3 +523,21 @@ class AccumulableInfoImpl(
var name: String,
var update: Option[String],
var value: String) extends AccumulableInfo

class ExecutorMetricDistributionsImpl(
var quantiles: IndexedSeq[Double],
var numTasks: IndexedSeq[Double],
var inputBytes : IndexedSeq[Double],
var inputRecords : IndexedSeq[Double],
var outputBytes : IndexedSeq[Double],
var outputRecords : IndexedSeq[Double],
var shuffleRead : IndexedSeq[Double],
var shuffleReadRecords : IndexedSeq[Double],
var shuffleWrite : IndexedSeq[Double],
var shuffleWriteRecords : IndexedSeq[Double],
var memoryBytesSpilled : IndexedSeq[Double],
var diskBytesSpilled : IndexedSeq[Double],
var peakJvmUsedMemory : IndexedSeq[Double],
var peakExecutionMemory : IndexedSeq[Double],
var peakStorageMemory : IndexedSeq[Double],
var peakUnifiedMemory : IndexedSeq[Double]) extends ExecutorMetricDistributions
128 changes: 128 additions & 0 deletions app/com/linkedin/drelephant/spark/heuristics/ConfigurationUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis.SeverityThresholds

object ConfigurationUtils {
val JVM_USED_MEMORY = "jvmUsedMemory"

// Spark configuration parameters
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"
val SPARK_DRIVER_MEMORY = "spark.driver.memory"
val SPARK_EXECUTOR_MEMORY_OVERHEAD = "spark.yarn.executor.memoryOverhead"
val SPARK_DRIVER_MEMORY_OVERHEAD = "spark.yarn.driver.memoryOverhead"
val SPARK_EXECUTOR_CORES = "spark.executor.cores"
val SPARK_DRIVER_CORES = "spark.driver.cores"
val SPARK_EXECUTOR_INSTANCES = "spark.executor.instances"
val SPARK_SQL_SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val SPARK_MEMORY_FRACTION = "spark.memory.fraction"

// Spark default configuration values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these default parameters should picked from spark default configuration.

Copy link
Author

@edwinalu edwinalu Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not explicitly specified by the user or spark-defaults.conf, then it may not be listed as part of the Spark environment REST API return value. spark.sql.shuffle.partitions isn't specified in spark-default.conf, and many users don't specify the value explicitly. Just double-checked, and it set to default Spark values.

val SPARK_EXECUTOR_MEMORY_DEFAULT = "1g"
val SPARK_DRIVER_MEMORY_DEFAULT = "1g"
val SPARK_EXECUTOR_CORES_DEFAULT = 1
val SPARK_DRIVER_CORES_DEFAULT = 1
val SPARK_SQL_SHUFFLE_PARTITIONS_DEFAULT = 200
val SPARK_MEMORY_FRACTION_DEFAULT = 0.6

// if the overhead memory is not explicitly specified by the user, the default amount is
// max(0.1 * spark.executor.memory, 384MB)
val SPARK_MEMORY_OVERHEAD_PCT_DEFAULT = 0.1

// the minimum amount of overhead memory
val SPARK_MEMORY_OVERHEAD_MIN_DEFAULT = 384L << 20 // 384MB

// the amount of Spark reserved memory (300MB)
val SPARK_RESERVED_MEMORY = 300L << 20

// number of milliseconds in a minute
val MILLIS_PER_MIN = 1000D * 60.0D

// the index for the median value for executor and task metrics distributions
val DISTRIBUTION_MEDIAN_IDX = 2

// the index for the max value for executor and task metrics distributions
val DISTRIBUTION_MAX_IDX = 4

// keys for finding Dr. Elephant configuration parameter values
val SPARK_STAGE_EXECUTION_MEMORY_SPILL_THRESHOLD_KEY = "spark_stage_execution_memory_spill_threshold"
val SPARK_STAGE_TASK_SKEW_THRESHOLD_KEY = "spark_stage_task_skew_threshold"
val SPARK_STAGE_TASK_DURATION_THRESHOLD_KEY = "spark_stage_task_duration_threshold"
val SPARK_STAGE_MAX_DATA_PROCESSED_THRESHOLD_KEY = "spark_stage_task_duration_threshold"
val TASK_FAILURE_RATE_SEVERITY_THRESHOLDS_KEY = "stage_task_failure_rate_severity_threshold"
val MAX_DATA_PROCESSED_THRESHOLD_KEY = "execution_memory_spill_max_data_threshold"
val LONG_TASK_TO_STAGE_DURATION_RATIO_KEY = "task_skew_task_to_stage_duration_ratio"
val TASK_SKEW_TASK_DURATION_MIN_THRESHOLD_KEY = "task_skew_task_duration_threshold"
val MAX_RECOMMENDED_PARTITIONS_KEY = "max_recommended_partitions"

// keys for finding specific recommendations
val EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION_KEY = "execution_memory_spill_large_data_recommendation"
val TASK_SKEW_INPUT_DATA_RECOMMENDATION_KEY = "task_skew_input_data_recommendation"
val TASK_SKEW_GENERIC_RECOMMENDATION_KEY = "task_skew_generic_recommendation"
val LONG_TASKS_LARGE_DATA_RECOMMENDATION_KEY = "long_tasks_large_data_recommendation"
val SLOW_TASKS_RECOMMENDATION_KEY = "slow_tasks_recommendation"
val LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION_KEY = "long tasks_few_partitions"
val LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION_KEY = "long tasks_few_input_partitions"

// default recommendations
val DEFAULT_EXECUTION_MEMORY_SPILL_LARGE_DATA_RECOMMENDATION = "a large amount of data is being processesd. " +
"Examine the application to see if this can be reduced"
val DEFAULT_TASK_SKEW_INPUT_DATA_RECOMMENDATION = "please try to modify the application to make the input partitions more even"
val DEFAULT_TASK_SKEW_GENERIC_RECOMMENDATION = "please try to modify the application to make the partitions more even"
val DEFAULT_LONG_TASKS_LARGE_DATA_RECOMMENDATION = "please try to reduce the amount of data being processed"
val DEFAULT_SLOW_TASKS_RECOMMENDATION = "please optimize the code to improve performance"
val DEFAULT_LONG_TASKS_FEW_PARTITIONS_RECOMMENDATION = "please increase the number of partitions"
val DEFAULT_LONG_TASKS_FEW_INPUT_PARTITIONS_RECOMMENDATION = "please increase the number of partitions for reading data"


// Severity thresholds for task duration in minutes, when checking to see if the median task
// run time is too long for a stage.
val DEFAULT_TASK_DURATION_THRESHOLDS =
SeverityThresholds(low = 2.5D * MILLIS_PER_MIN, moderate = 5.0D * MILLIS_PER_MIN,
severe = 10.0D * MILLIS_PER_MIN, critical = 15.0D * MILLIS_PER_MIN, ascending = true)

// Severity thresholds for checking task skew, ratio of maximum to median task run times.
val DEFAULT_TASK_SKEW_THRESHOLDS =
SeverityThresholds(low = 2, moderate = 4, severe = 8, critical = 16, ascending = true)

// Severity thresholds for checking execution memory spill, ratio of execution spill compared
// to the maximum amount of data (input, output, shuffle read, or shuffle write) processed.
val DEFAULT_EXECUTION_MEMORY_SPILL_THRESHOLDS =
SeverityThresholds(low = 0.01D, moderate = 0.1D, severe = 0.25D, critical = 0.5D, ascending = true)

// The ascending severity thresholds for the ratio of JVM GC time and task run time,
// checking if too much time is being spent in GC.
val DEFAULT_GC_SEVERITY_A_THRESHOLDS =
SeverityThresholds(low = 0.08D, moderate = 0.09D, severe = 0.1D, critical = 0.15D, ascending = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be more lenient for GC threshold , since as per the feedback , increasing memory for it cause other heuristics to get failed. Moreover UDF can also cause GC .

Copy link
Author

@edwinalu edwinalu Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a while, the GC stats were being double counted (LIHADOOP-40386), and GC was also being flagged for very short runtimes (LIHADOOP-38532) -- let's wait on changing the threshold values, and check the results for more recent runs (if there are still a lot of conflicting GC warnings). We do still want to flag (and give recommendations if the issue due to UDFs (users can try to fix/optimize the UDFs if they are aware of the problem), although it's not something that can be fixed by changing configuration parameters. We can also recommend changing to ParallelGC or G1GC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let see behaviour after LIHADOOP-40386, LIHADOOP-38532 and see for conflicting GC warnings .


/** The default severity thresholds for the rate of a stage's tasks failing. */
val DEFAULT_TASK_FAILURE_RATE_SEVERITY_THRESHOLDS =
SeverityThresholds(low = 0.05D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true)

// The default threshold (3TB) for checking for maximum amount of data processed, for which to
// alert for execution memory spill. Tasks processing more data would be expected to have some
// amount of spill, due to the large amount of data processed.
// Estimating the size based on some reasonable values for configuration parameters (and how
// much data could be kept in unified memory given these values):
// spark.executor.memory / spark.executor.cores * spark.memory.fraction *
// (1 - spark.memory.storageFraction) * spark.sql.shuffle.partitions
// = 5GB / 2 * 0.6 * (1 - 0.5) * 4000
val DEFAULT_MAX_DATA_PROCESSED_THRESHOLD = "3TB"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rational behind 3TB as threshold ?

Copy link
Author

@edwinalu edwinalu Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a lot of data being processed, it may not be possible (or at least desirable) to avoid execution memory spill. We don't want to recommend increasing executor memory too high (resource consumption), or too many partitions (shuffle overhead), or reduce cores too low (decreased parallelism). For 3TB number, using some reasonable numbers for configuration parameters, I'm estimating based on:

spark.executor.memory / spark.executor.cores * spark.memory.fraction * spark.memory.storageFraction * spark.sql.shuffle.partitions

5GB / 2 * 0.6 * 0.5 * 4000

Looking at this again though, it would make more sense to calculate the estimate based on some of the other constants and thresholds as well, so that this number would be adjusted automatically with the other values. @pralabhkumar , let me know if this sounds good. Another option is to add more comments about calculating a value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think for now just adding more comments about calculating a value should be good and monitories the behaviour.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments.


// The default threshold for the ratio of the time for longest running task for a stage to the
// stage duration. With Spark, some amount of task skew may be OK, since exectuors can process
// multiple tasks, so one executor could process multiple shorter tasks, while another executor
// processes a longer task. However, if the length of the long task is a large fraction of the
// stage duration, then it is likely contributing to the overall stage duration.
val DEFAULT_LONG_TASK_TO_STAGE_DURATION_RATIO = "0.75"

// Some task skew is also tolerable if the tasks are short (2.5 minutes or less).
val DEFAULT_TASK_SKEW_TASK_DURATION_MIN_THRESHOLD = "150000"

// The target task duration (2.5 minutes). This is the same as the idle executor timeout.
val DEFAULT_TARGET_TASK_DURATION = "150000"

// The default maximum number of partitions that would be recommended. More partitions means
// less data per partition, so shorter tasks and less memory needed per task. However more
// partitions also inceases the amount of overhead for shuffle.
val DEFAULT_MAX_RECOMMENDED_PARTITIONS = "4000"
}
Loading