Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial changes for adding Spark stage analysis #438

Open
wants to merge 8 commits into
base: customSHSWork
Choose a base branch
from

Conversation

edwinalu
Copy link

  • Add StageAnalyzer for analyzing the stages for a Spark application for execution memory spill,
    long tasks, task skew, and failures.
  • Call REST API for getting failed tasks.
  • Modify call to stages REST API to get task and executor summaries.

(stageData.stageId, stageData.tasks.map(tasks => tasks.values))
}.toMap

val failedTasksStageMap = data.stagesWithFailedTasks.map { stageData =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use flatMap here instead of map and then flatten.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed to flatMap.

stageData.shuffleWriteBytes, stageData.outputBytes).max
val rawSpillSeverity = executionMemorySpillThresholds.severityOf(
stageData.memoryBytesSpilled / maxData.toDouble)
val tmp = DEFAULT_MAX_DATA_PROCESSED_THRESHOLD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used anywhere

taskSkewThresholds.severityOf(max / median)
case _ => Severity.NONE
}
val median = medianTime.getOrElse(0.0D)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used below

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

// add more information about what might be causing skew if skew is being flagged
// (reported severity is significant), or there is execution memory spill, since skew
// can also cause execution memory spill.
val median = Utils.getDuration(medianTime.map(_.toLong).getOrElse(0L))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use above declared variables?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming was confusing -- these are the string representation of median and max -- I've renamed to medianStr and maxStr.

*/
def getStageAnalysis(curNumPartitions: Int): Seq[StageAnalysis] = {
data.stagesWithFailedTasks.map { stageData =>
(stageData.stageId, stageData.tasks.map(tasks => tasks.values))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this block has no usage below.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed on Slack, leaving this in -- it will be called, and the results saved in SparkApplicationData, for the different heuristics (long task, task skew, execution memory spill, and configuration parameter recommendations). The code for configuration parameter recommendations isn't quite ready yet, so I will merge that separately.

* (value of spark.sql.shuffle.partitions).
* @return list of analysis results of stages.
*/
def getStageAnalysis(curNumPartitions: Int): Seq[StageAnalysis] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we calculate the cuNumPartition in the method itself?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it can parse out the value for "spark.sql.shuffle.partitions" -- I'll change.

val MAX_RECOMMENDED_PARTITIONS_KEY = "max_recommended_partitions"


// Severity hresholds for task duration in minutes, when checking to see if the median task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit : hresholds => thresholds

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// The ascending severity thresholds for the ratio of JVM GC time and task run time,
// checking if too much time is being spent in GC.
val DEFAULT_GC_SEVERITY_A_THRESHOLDS =
SeverityThresholds(low = 0.08D, moderate = 0.09D, severe = 0.1D, critical = 0.15D, ascending = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be more lenient for GC threshold , since as per the feedback , increasing memory for it cause other heuristics to get failed. Moreover UDF can also cause GC .

Copy link
Author

@edwinalu edwinalu Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a while, the GC stats were being double counted (LIHADOOP-40386), and GC was also being flagged for very short runtimes (LIHADOOP-38532) -- let's wait on changing the threshold values, and check the results for more recent runs (if there are still a lot of conflicting GC warnings). We do still want to flag (and give recommendations if the issue due to UDFs (users can try to fix/optimize the UDFs if they are aware of the problem), although it's not something that can be fixed by changing configuration parameters. We can also recommend changing to ParallelGC or G1GC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let see behaviour after LIHADOOP-40386, LIHADOOP-38532 and see for conflicting GC warnings .

// The default threshold (3TB) for checking for maximum amount of data processed, for which to
// alert for execution memory spill. Tasks processing more data would be expected to have some
// amount of spill, due to the large amount of data processed.
val DEFAULT_MAX_DATA_PROCESSED_THRESHOLD = "3TB"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rational behind 3TB as threshold ?

Copy link
Author

@edwinalu edwinalu Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a lot of data being processed, it may not be possible (or at least desirable) to avoid execution memory spill. We don't want to recommend increasing executor memory too high (resource consumption), or too many partitions (shuffle overhead), or reduce cores too low (decreased parallelism). For 3TB number, using some reasonable numbers for configuration parameters, I'm estimating based on:

spark.executor.memory / spark.executor.cores * spark.memory.fraction * spark.memory.storageFraction * spark.sql.shuffle.partitions

5GB / 2 * 0.6 * 0.5 * 4000

Looking at this again though, it would make more sense to calculate the estimate based on some of the other constants and thresholds as well, so that this number would be adjusted automatically with the other values. @pralabhkumar , let me know if this sounds good. Another option is to add more comments about calculating a value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think for now just adding more comments about calculating a value should be good and monitories the behaviour.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments.

details += s"Stage $stageId has $inputBytes input, $shuffleReadBytes shuffle read, " +
"$shuffleWriteBytes shuffle write. Please try to reduce the amount of data being processed."
} else {
details += s"Stage $stageId: please optimize the code to improve performance."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we planning to use this details . How are we planning to show this to user ? . Is it going to be used in Heuristics ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the current plan is that the details would be shown in Heuristics. This could be pretty messy if there are a lot of lines, however. Is there a better way to present the information?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes showing details on heuristics , would make UI pretty messy . I think we can add , detail button on each heuristics , clicking that can show this detail (again I am not sure other ways to represent details)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be a bit more hidden, but a button that would show the details would be cleaner.

else {
if (stageData.inputBytes > 0) {
// The stage is reading input data, try to increase the number of readers
details += s"Stage $stageId: please set DaliSpark.SPLIT_SIZE to a smaller " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is Dali specific suggestion . What if user is not using Dali or for open source , should we give more general suggestion regarding split size ? .

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is Dali and LinkedIn specific, so not appropriate for open source. I can modify the suggestion to make it more general, but it would be good to have the more specific suggestion for our users. What would be the best way to customize? Perhaps there could be a heuristic configuration, with a map of issue type to recommendation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think , we can enhance the details and be specific to LinkedIn user and other outside users . Something like "please set DaliSpark.SPLIT_SIZE to a smaller : Linkedin User ,
please set mapreduce.input.fileinputformat.split.maxsize : Outside user" .

I think , it make sense to have map of issue type to recommendation

* from analyzing the stage for errors causing tasks to fail will be appended.
* @return
*/
private def checkForSpecificTaskError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the use of this check ? . If the task failed because of some reason other than OOM (which is handled for checkForTaskFailure) and the job passes (because of task retry) , why user should bother about the reason of task failed .?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also used to identify container killed by YARN for exeeding memory limits.

@@ -211,7 +214,7 @@ class SparkRestClient(sparkConf: SparkConf) {
}

private def getStageDatas(attemptTarget: WebTarget): Seq[StageDataImpl] = {
val target = attemptTarget.path("stages")
val target = attemptTarget.path("stages/withSummaries")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't find this api in spark documentation. Is it LinkedIn specific enhancements?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is LinkedIn specific enhancements. Should there be a flag/configuration parameter for adding "withSummaries"? It's needed for some of the heuristics (task skew and long tasks).

val SPARK_SQL_SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val SPARK_MEMORY_FRACTION = "spark.memory.fraction"

// Spark default configuration values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these default parameters should picked from spark default configuration.

Copy link
Author

@edwinalu edwinalu Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not explicitly specified by the user or spark-defaults.conf, then it may not be listed as part of the Spark environment REST API return value. spark.sql.shuffle.partitions isn't specified in spark-default.conf, and many users don't specify the value explicitly. Just double-checked, and it set to default Spark values.

details += s"Stage $stageId has $inputBytes input, $shuffleReadBytes shuffle read, " +
"$shuffleWriteBytes shuffle write. Please try to reduce the amount of data being processed."
} else {
details += s"Stage $stageId: please optimize the code to improve performance."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, how would user come to know about which code to optimize. From Spark history UI it is very difficult to find out stage to code mapping.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is really difficult unfortunately. This is mostly alerting them that there is some issue. They can view the DAG for the stage, which may give some idea about the operations, and help narrow down the code. This (mapping stage to code) is something that would be good for us to add better support for. Adding more information about datasets read/processed could help. Adding some support for getting the DAG from the REST API would make it possible to list the operations, and possibly add more information to Spark.

@edwinalu
Copy link
Author

Some of the thresholds are meant to be set with heuristic configuration parameters. The stage analysis can be used for multiple heuristics (long task, task skew, execution memory spill, configuration parameter recommendations). Does it make sense to set these thresholds for each heuristic (and call StageAnalysis each time), or would it be better to consolidate? With the independent configuration parameters, users can decide which ones to use/include. However, keeping the values in sync across multiple heuristics seems awkward.

Perhaps this could be multi-level, with a general Spark (or Pig) configuration parameter list, which would kick in if there isn't a heuristic-level setting. This could still be confusing if misconfigured though.

@varunsaxena varunsaxena force-pushed the customSHSWork branch 2 times, most recently from fc7de94 to ffe4d17 Compare October 16, 2018 10:13
- Add StageAnalyzer for analyzing the stages for a Spark application for execution memory spill,
  long tasks, task skew, and failures.
- Call REST API for getting failed tasks.
- Modify call to stages REST API to get task and executor summaries.
@fusonghe
Copy link

image

Dr-elephant Compare does not display information How to configure @edwinalu @chriseppstein

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants