Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lihadoop 39635.2 #457

Open
wants to merge 9 commits into
base: customSHSWork
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,20 @@ class SparkRestClient(sparkConf: SparkConf) {
getLogData(attemptTarget)
}
} else Future.successful(None)
val futureFailedTasks = if (fetchFailedTasks) {
Future {
getStagesWithFailedTasks(attemptTarget)
}
} else {
Future.successful(Seq.empty)
}

SparkRestDerivedData(
applicationInfo,
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
Await.result(futureExecutorSummaries, Duration(5, SECONDS)),
Seq.empty,
Await.result(futureFailedTasks, DEFAULT_TIMEOUT),
Await.result(futureLogData, Duration(5, SECONDS))
)

Expand Down Expand Up @@ -211,7 +218,7 @@ class SparkRestClient(sparkConf: SparkConf) {
}

private def getStageDatas(attemptTarget: WebTarget): Seq[StageDataImpl] = {
val target = attemptTarget.path("stages")
val target = attemptTarget.path("stages/withSummaries")
try {
get(target, SparkRestObjectMapper.readValue[Seq[StageDataImpl]])
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,17 @@ trait StageData{
def status: StageStatus
def stageId: Int
def attemptId: Int
def numTasks: Int
def numActiveTasks: Int
def numCompleteTasks: Int
def numFailedTasks: Int

def executorRunTime: Long
def executorCpuTime: Long
def submissionTime: Option[Date]
def firstTaskLaunchedTime: Option[Date]
def completionTime: Option[Date]
def failureReason: Option[String]

def inputBytes: Long
def inputRecords: Long
Expand All @@ -166,7 +172,14 @@ trait StageData{

def accumulatorUpdates: Seq[AccumulableInfo]
def tasks: Option[Map[Long, TaskDataImpl]]
def executorSummary: Option[Map[String, ExecutorStageSummary]]}
def executorSummary: Option[Map[String, ExecutorStageSummary]]

def peakJvmUsedMemory: Option[Long]
def peakExecutionMemory: Option[Long]
def peakStorageMemory: Option[Long]
def peakUnifiedMemory: Option[Long]
def taskSummary : Option[TaskMetricDistributions]
def executorMetricsSummary : Option[ExecutorMetricDistributions]}

trait TaskData{
def taskId: Long
Expand Down Expand Up @@ -219,10 +232,15 @@ trait TaskMetricDistributions{
def quantiles: IndexedSeq[Double]

def executorDeserializeTime: IndexedSeq[Double]
def executorDeserializeCpuTime: IndexedSeq[Double]
def executorRunTime: IndexedSeq[Double]
def executorCpuTime: IndexedSeq[Double]
def resultSize: IndexedSeq[Double]
def jvmGcTime: IndexedSeq[Double]
def resultSerializationTime: IndexedSeq[Double]
def gettingResultTime: IndexedSeq[Double]
def schedulerDelay: IndexedSeq[Double]
def peakExecutionMemory: IndexedSeq[Double]
def memoryBytesSpilled: IndexedSeq[Double]
def diskBytesSpilled: IndexedSeq[Double]

Expand All @@ -246,13 +264,33 @@ trait ShuffleReadMetricDistributions{
def localBlocksFetched: IndexedSeq[Double]
def fetchWaitTime: IndexedSeq[Double]
def remoteBytesRead: IndexedSeq[Double]
def remoteBytesReadToDisk: IndexedSeq[Double]
def totalBlocksFetched: IndexedSeq[Double]}

trait ShuffleWriteMetricDistributions{
def writeBytes: IndexedSeq[Double]
def writeRecords: IndexedSeq[Double]
def writeTime: IndexedSeq[Double]}

trait ExecutorMetricDistributions{
def quantiles: IndexedSeq[Double]
def numTasks: IndexedSeq[Double]
def inputBytes : IndexedSeq[Double]
def inputRecords : IndexedSeq[Double]
def outputBytes : IndexedSeq[Double]
def outputRecords : IndexedSeq[Double]
def shuffleRead : IndexedSeq[Double]
def shuffleReadRecords : IndexedSeq[Double]
def shuffleWrite : IndexedSeq[Double]
def shuffleWriteRecords : IndexedSeq[Double]
def memoryBytesSpilled : IndexedSeq[Double]
def diskBytesSpilled : IndexedSeq[Double]
def peakJvmUsedMemory : IndexedSeq[Double]
def peakExecutionMemory : IndexedSeq[Double]
def peakStorageMemory : IndexedSeq[Double]
def peakUnifiedMemory : IndexedSeq[Double]}


trait AccumulableInfo{
def id: Long
def name: String
Expand Down Expand Up @@ -353,11 +391,17 @@ class StageDataImpl(
var status: StageStatus,
var stageId: Int,
var attemptId: Int,
var numTasks: Int,
var numActiveTasks: Int ,
var numCompleteTasks: Int,
var numFailedTasks: Int,

var executorRunTime: Long,
var executorCpuTime: Long,
var submissionTime: Option[Date],
var firstTaskLaunchedTime: Option[Date],
var completionTime: Option[Date],
var failureReason: Option[String],

var inputBytes: Long,
var inputRecords: Long,
Expand All @@ -376,7 +420,13 @@ class StageDataImpl(

var accumulatorUpdates: Seq[AccumulableInfoImpl],
var tasks: Option[Map[Long, TaskDataImpl]],
var executorSummary: Option[Map[String, ExecutorStageSummaryImpl]]) extends StageData
var executorSummary: Option[Map[String, ExecutorStageSummaryImpl]],
var peakJvmUsedMemory: Option[Long],
var peakExecutionMemory: Option[Long],
var peakStorageMemory: Option[Long],
var peakUnifiedMemory: Option[Long],
var taskSummary : Option[TaskMetricDistributionsImpl],
var executorMetricsSummary : Option[ExecutorMetricDistributionsImpl]) extends StageData

class TaskDataImpl(
var taskId: Long,
Expand Down Expand Up @@ -427,12 +477,16 @@ class ShuffleWriteMetricsImpl(

class TaskMetricDistributionsImpl(
var quantiles: IndexedSeq[Double],

var executorDeserializeTime: IndexedSeq[Double],
var executorDeserializeCpuTime: IndexedSeq[Double],
var executorRunTime: IndexedSeq[Double],
var executorCpuTime: IndexedSeq[Double],
var resultSize: IndexedSeq[Double],
var jvmGcTime: IndexedSeq[Double],
var resultSerializationTime: IndexedSeq[Double],
var gettingResultTime: IndexedSeq[Double],
var schedulerDelay: IndexedSeq[Double],
var peakExecutionMemory: IndexedSeq[Double],
var memoryBytesSpilled: IndexedSeq[Double],
var diskBytesSpilled: IndexedSeq[Double],

Expand All @@ -456,6 +510,7 @@ class ShuffleReadMetricDistributionsImpl(
var localBlocksFetched: IndexedSeq[Double],
var fetchWaitTime: IndexedSeq[Double],
var remoteBytesRead: IndexedSeq[Double],
var remoteBytesReadToDisk: IndexedSeq[Double],
var totalBlocksFetched: IndexedSeq[Double]) extends ShuffleReadMetricDistributions

class ShuffleWriteMetricDistributionsImpl(
Expand All @@ -468,3 +523,21 @@ class AccumulableInfoImpl(
var name: String,
var update: Option[String],
var value: String) extends AccumulableInfo

class ExecutorMetricDistributionsImpl(
var quantiles: IndexedSeq[Double],
var numTasks: IndexedSeq[Double],
var inputBytes : IndexedSeq[Double],
var inputRecords : IndexedSeq[Double],
var outputBytes : IndexedSeq[Double],
var outputRecords : IndexedSeq[Double],
var shuffleRead : IndexedSeq[Double],
var shuffleReadRecords : IndexedSeq[Double],
var shuffleWrite : IndexedSeq[Double],
var shuffleWriteRecords : IndexedSeq[Double],
var memoryBytesSpilled : IndexedSeq[Double],
var diskBytesSpilled : IndexedSeq[Double],
var peakJvmUsedMemory : IndexedSeq[Double],
var peakExecutionMemory : IndexedSeq[Double],
var peakStorageMemory : IndexedSeq[Double],
var peakUnifiedMemory : IndexedSeq[Double]) extends ExecutorMetricDistributions
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.linkedin.drelephant.spark.heuristics

/**
* Adjustments to configuration parameters for fixing flagged issues.
*/
private[heuristics] sealed trait ConfigurationParameterAdjustment[T] {

/**
* Determine if the value should be adjusted.
*
* @param value the value to adjust.
* @return true if the value should be adjusted, false otherwise.
*/
def canAdjust(value: T): Boolean

/** Adjust the value.
*
* @param value the value to adjust.
* @return the adjusted recommended value.
*/
def adjust(value: T): T
}

/** If the number of cores is greater than the threshold, then divide by divisor. */
private[heuristics] case class CoreDivisorAdjustment(
threshold: Int,
divisor: Double) extends ConfigurationParameterAdjustment[Int] {
override def canAdjust(numCores: Int): Boolean = (numCores > threshold)
override def adjust(numCores: Int): Int = Math.ceil(numCores / divisor).toInt
}

/** Set the number of cores to threshold, if the number of cores is greater. */
private[heuristics] case class CoreSetAdjustment(
threshold: Int) extends ConfigurationParameterAdjustment[Int] {
override def canAdjust(numCores: Int): Boolean = (numCores > threshold)
override def adjust(numCores: Int): Int = threshold
}

/** If the memory is less than the threshold, then multiply by multiplier. */
private[heuristics] case class MemoryMultiplierAdjustment(
threshold: Long,
multiplier: Double) extends ConfigurationParameterAdjustment[Long] {
override def canAdjust(memBytes: Long): Boolean = (memBytes < threshold)
override def adjust(memBytes: Long): Long = (memBytes * multiplier).toLong
}

/** If the memory is less than the threshold, then set to the theshold. */
private[heuristics] case class MemorySetAdjustment(
threshold: Long) extends ConfigurationParameterAdjustment[Long] {
override def canAdjust(memBytes: Long): Boolean = (memBytes < threshold)
override def adjust(memBytes: Long): Long = threshold
}

/** If the number of partitions is less than the threshold, then multiply by multiplier. */
private[heuristics] case class PartitionMultiplierAdjustment(
threshold: Int,
multiplier: Double) extends ConfigurationParameterAdjustment[Int] {
override def canAdjust(numPartitions: Int): Boolean = (numPartitions < threshold)
override def adjust(numPartitions: Int): Int = (numPartitions * multiplier).toInt
}

/** If the number of partitions is less than the threshold, then set to threshold. */
private[heuristics] case class PartitionSetAdjustment(
threshold: Int) extends ConfigurationParameterAdjustment[Int] {
override def canAdjust(numPartitions: Int): Boolean = (numPartitions < threshold)
override def adjust(numPartitions: Int): Int = threshold
}
Loading